From 954e250151d33a740f96495558a4ebfe896e59ea Mon Sep 17 00:00:00 2001 From: Pablo Curiel Date: Thu, 18 Feb 2021 22:49:01 -0400 Subject: [PATCH] Minor improvements to the companion script. * The KeyboardInterrupt exception is now being caught. * The USBError exception is now only being caught while reading a command header, which has a maxed out timeout value. This is used as a way to detect if the console has been disconnected. * An output path can now be passed as the first argument for the script. User home directory and environment variables are expanded, if needed. * If no path is provided, the script will proceed to create a 'nxdumptool' directory inside the directory where the script is located, and use it to store all the data sent by the console. --- host.py | 129 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 85 insertions(+), 44 deletions(-) diff --git a/host.py b/host.py index 0b09766..d44acc2 100644 --- a/host.py +++ b/host.py @@ -11,6 +11,9 @@ import threading import shutil from tqdm import tqdm +# Script version. +SCRIPT_VERSION = '0.1' + # USB VID/PID pair. USB_DEV_VID = 0x057E USB_DEV_PID = 0x3000 @@ -73,8 +76,7 @@ g_nspRemainingSize = 0 g_nspFile = None g_nspFilePath = None -# TO DO: change this. -g_outputDir = '.' +g_outputDir = None def utilsIsValueAlignedToEndpointPacketSize(value): global g_usbEpMaxPacketSize @@ -91,7 +93,7 @@ def utilsResetNspInfo(): g_nspFile = None g_nspFilePath = None -def utilsGetSizeUnit(size): +def utilsGetSizeUnitAndDivisor(size): size_suffixes = [ 'B', 'KiB', 'MiB', 'GiB' ] size_suffixes_count = len(size_suffixes) @@ -99,19 +101,21 @@ def utilsGetSizeUnit(size): ret = None for i in range(size_suffixes_count): - if (float_size >= pow(1024, i + 1)) and ((i + 1) < size_suffixes_count): - continue - - return (size_suffixes[i], pow(1024, i)) + if (float_size < pow(1024, i + 1)) or ((i + 1) >= size_suffixes_count): + ret = (size_suffixes[i], pow(1024, i)) + break + + return ret def usbGetDeviceEndpoints(): global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize prev_dev = cur_dev = None - g_usbEpIn_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN - g_usbEpOut_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT + usb_ep_in_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN + usb_ep_out_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT + usb_version = None - print('Please connect a Nintendo Switch console running nxdumptool.') + print('Please connect a Nintendo Switch console running nxdumptool. Use Ctrl+C to abort.\n') while True: # Find a connected USB device with a matching VID/PID pair. @@ -141,21 +145,23 @@ def usbGetDeviceEndpoints(): intf = cfg[(0,0)] # Retrieve endpoints. - g_usbEpIn = usb.util.find_descriptor(intf, custom_match=g_usbEpIn_lambda) - g_usbEpOut = usb.util.find_descriptor(intf, custom_match=g_usbEpOut_lambda) + g_usbEpIn = usb.util.find_descriptor(intf, custom_match=usb_ep_in_lambda) + g_usbEpOut = usb.util.find_descriptor(intf, custom_match=usb_ep_out_lambda) if (g_usbEpIn is None) or (g_usbEpOut is None): print('Invalid endpoint addresses! (bus %u, address %u).' % (cur_dev.bus, cur_dev.address)) time.sleep(0.1) continue - # Save endpoint max packet size. + # Save endpoint max packet size and USB version. g_usbEpMaxPacketSize = g_usbEpIn.wMaxPacketSize + usb_version = cur_dev.bcdUSB break print('Successfully retrieved USB endpoints! (bus %u, address %u).' % (cur_dev.bus, cur_dev.address)) - print('Exit nxdumptool at any time to close this script.\n') + print('Max packet size: 0x%X (USB %u.%u).\n' % (g_usbEpMaxPacketSize, usb_version >> 8, (usb_version & 0xFF) >> 4)) + print('Exit nxdumptool or disconnect your console at any time to close this script.\n') def usbRead(size, timeout=-1): global g_usbEpIn @@ -242,7 +248,7 @@ def usbHandleSendFileProperties(cmd_block): filename = filename.replace('/', '\\') # Generate full, absolute path to the destination file. - fullpath = os.path.abspath(os.path.expanduser(os.path.expandvars(g_outputDir)) + os.path.sep + filename) + fullpath = os.path.abspath(g_outputDir + os.path.sep + filename) # Get parent directory path. dirpath = os.path.dirname(fullpath) @@ -250,7 +256,7 @@ def usbHandleSendFileProperties(cmd_block): # Create full directory tree. os.makedirs(dirpath, exist_ok=True) - # Make sure the output file doesn't already exist as a directory. + # Make sure the output filepath doesn't point to an existing directory. if (os.path.exists(fullpath) == True) and (os.path.isfile(fullpath) == False): utilsResetNspInfo() print('Output filepath points to an existing directory! ("%s").' % (fullpath)) @@ -304,8 +310,8 @@ def usbHandleSendFileProperties(cmd_block): # Initialize progress bar. ascii = (False if (os.name != 'nt') else True) - (unit, unit_divisor) = utilsGetSizeUnit(file_size) - bar_format = '{percentage:3.0f}% |{bar}| {n:.2f}/{total:.2f} [{elapsed}<{remaining}, {rate_noinv_fmt}]' + (unit, unit_divisor) = utilsGetSizeUnitAndDivisor(file_size) + bar_format = '{percentage:3.0f}% |{bar}| {n:.2f}/{total:.2f} [{elapsed}<{remaining}, {rate_fmt}]' pbar = tqdm(total=(float(file_size) / unit_divisor), ascii=ascii, unit=unit, dynamic_ncols=True, bar_format=bar_format) @@ -403,7 +409,7 @@ def usbHandleEndSession(cmd_block): def usbCommandHandler(): # CancelFileTransfer is handled in usbHandleSendFileProperties(). - cmd_switcher = { + cmd_dict = { USB_CMD_START_SESSION: usbHandleStartSession, USB_CMD_SEND_FILE_PROPERTIES: usbHandleSendFileProperties, USB_CMD_SEND_NSP_HEADER: usbHandleSendNspHeader, @@ -414,36 +420,21 @@ def usbCommandHandler(): usbGetDeviceEndpoints() while True: - # Read command header. - cmd_header = usbRead(USB_CMD_HEADER_SIZE) + try: + # Read command header. + cmd_header = usbRead(USB_CMD_HEADER_SIZE) + except usb.core.USBError: + print('Nintendo Switch disconnected. Exiting.') + return + if (cmd_header is None) or (len(cmd_header) != USB_CMD_HEADER_SIZE): continue # Parse command header. (magic, cmd_id, cmd_block_size, padding) = struct.unpack('<4sII4p', cmd_header) - # Verify magic word. - if magic != USB_MAGIC_WORD: - print('Received command header with invalid magic word!\n') - usbSendStatus(USB_STATUS_INVALID_MAGIC_WORD) - continue - - # Get command handler function. - cmd_func = cmd_switcher.get(cmd_id, None) - if cmd_func is None: - print('Received command header with unsupported ID %02X.\n' % (cmd_id)) - usbSendStatus(USB_STATUS_UNSUPPORTED_CMD) - continue - - # Verify command block size. - if ((cmd_id == USB_CMD_START_SESSION) and (cmd_block_size != USB_CMD_BLOCK_SIZE_START_SESSION)) or \ - ((cmd_id == USB_CMD_SEND_FILE_PROPERTIES) and (cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES)) or \ - ((cmd_id == USB_CMD_SEND_NSP_HEADER) and (cmd_block_size == 0)): - print('Invalid command block size for command ID %02X! (0x%X).\n' % (cmd_id, cmd_block_size)) - usbSendStatus(USB_STATUS_MALFORMED_COMMAND) - continue - - # Read command block (if needed). + # Read command block right away (if needed). + # nxdumptool expects us to read it right after sending the command header. cmd_block = None if cmd_block_size > 0: # Handle Zero-Length Termination packet (if needed). @@ -457,6 +448,27 @@ def usbCommandHandler(): print('Failed to read 0x%X byte(s) long command block for command ID %02X!\n' % (cmd_block_size, cmd_id)) continue + # Verify magic word. + if magic != USB_MAGIC_WORD: + print('Received command header with invalid magic word!\n') + usbSendStatus(USB_STATUS_INVALID_MAGIC_WORD) + continue + + # Get command handler function. + cmd_func = cmd_dict.get(cmd_id, None) + if cmd_func is None: + print('Received command header with unsupported ID %02X.\n' % (cmd_id)) + usbSendStatus(USB_STATUS_UNSUPPORTED_CMD) + continue + + # Verify command block size. + if ((cmd_id == USB_CMD_START_SESSION) and (cmd_block_size != USB_CMD_BLOCK_SIZE_START_SESSION)) or \ + ((cmd_id == USB_CMD_SEND_FILE_PROPERTIES) and (cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES)) or \ + ((cmd_id == USB_CMD_SEND_NSP_HEADER) and (cmd_block_size == 0)): + print('Invalid command block size for command ID %02X! (0x%X).\n' % (cmd_id, cmd_block_size)) + usbSendStatus(USB_STATUS_MALFORMED_COMMAND) + continue + # Run command handler function. status = cmd_func(cmd_block) print('') @@ -469,7 +481,36 @@ def usbCommandHandler(): break def main(): + global g_outputDir + + print('nxdumptool companion script v%s.' % (SCRIPT_VERSION)) + + # Check if the user provided an output directory. + if len(sys.argv) >= 2: + # Expand environment variables and user's home directory. + g_outputDir = os.path.abspath(os.path.expanduser(os.path.expandvars(sys.argv[1]))) + + # Make sure we're dealing with an existing directory. + if (os.path.exists(g_outputDir) == False) or (os.path.isdir(g_outputDir) == False): + print('The provided path doesn\'t point to an existing directory!') + return + else: + # Create 'nxdumptool' subdirectory in the directory where the script is located. + g_outputDir = (sys.path[0] + os.path.sep + 'nxdumptool') + if os.path.exists(g_outputDir) == False: + os.mkdir(g_outputDir) + + print('Output directory set to "%s".\n' % (g_outputDir)) + + # Start USB command handler. usbCommandHandler() if __name__ == "__main__": - main() + try: + main() + except KeyboardInterrupt: + print('\nScript interrupted.') + try: + sys.exit(0) + except SystemExit: + os._exit(0)