1
0
Fork 0
mirror of https://github.com/DarkMatterCore/nxdumptool.git synced 2024-11-22 18:26:39 +00:00

Minor improvements to the companion script.

* The KeyboardInterrupt exception is now being caught.

* The USBError exception is now only being caught while reading a command header, which has a maxed out timeout value. This is used as a way to detect if the console has been disconnected.

* An output path can now be passed as the first argument for the script. User home directory and environment variables are expanded, if needed.

* If no path is provided, the script will proceed to create a 'nxdumptool' directory inside the directory where the script is located, and use it to store all the data sent by the console.
This commit is contained in:
Pablo Curiel 2021-02-18 22:49:01 -04:00
parent 85a045ba38
commit 954e250151

129
host.py
View file

@ -11,6 +11,9 @@ import threading
import shutil import shutil
from tqdm import tqdm from tqdm import tqdm
# Script version.
SCRIPT_VERSION = '0.1'
# USB VID/PID pair. # USB VID/PID pair.
USB_DEV_VID = 0x057E USB_DEV_VID = 0x057E
USB_DEV_PID = 0x3000 USB_DEV_PID = 0x3000
@ -73,8 +76,7 @@ g_nspRemainingSize = 0
g_nspFile = None g_nspFile = None
g_nspFilePath = None g_nspFilePath = None
# TO DO: change this. g_outputDir = None
g_outputDir = '.'
def utilsIsValueAlignedToEndpointPacketSize(value): def utilsIsValueAlignedToEndpointPacketSize(value):
global g_usbEpMaxPacketSize global g_usbEpMaxPacketSize
@ -91,7 +93,7 @@ def utilsResetNspInfo():
g_nspFile = None g_nspFile = None
g_nspFilePath = None g_nspFilePath = None
def utilsGetSizeUnit(size): def utilsGetSizeUnitAndDivisor(size):
size_suffixes = [ 'B', 'KiB', 'MiB', 'GiB' ] size_suffixes = [ 'B', 'KiB', 'MiB', 'GiB' ]
size_suffixes_count = len(size_suffixes) size_suffixes_count = len(size_suffixes)
@ -99,19 +101,21 @@ def utilsGetSizeUnit(size):
ret = None ret = None
for i in range(size_suffixes_count): for i in range(size_suffixes_count):
if (float_size >= pow(1024, i + 1)) and ((i + 1) < size_suffixes_count): if (float_size < pow(1024, i + 1)) or ((i + 1) >= size_suffixes_count):
continue ret = (size_suffixes[i], pow(1024, i))
break
return (size_suffixes[i], pow(1024, i))
return ret
def usbGetDeviceEndpoints(): def usbGetDeviceEndpoints():
global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize
prev_dev = cur_dev = None prev_dev = cur_dev = None
g_usbEpIn_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN usb_ep_in_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN
g_usbEpOut_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT usb_ep_out_lambda = lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT
usb_version = None
print('Please connect a Nintendo Switch console running nxdumptool.') print('Please connect a Nintendo Switch console running nxdumptool. Use Ctrl+C to abort.\n')
while True: while True:
# Find a connected USB device with a matching VID/PID pair. # Find a connected USB device with a matching VID/PID pair.
@ -141,21 +145,23 @@ def usbGetDeviceEndpoints():
intf = cfg[(0,0)] intf = cfg[(0,0)]
# Retrieve endpoints. # Retrieve endpoints.
g_usbEpIn = usb.util.find_descriptor(intf, custom_match=g_usbEpIn_lambda) g_usbEpIn = usb.util.find_descriptor(intf, custom_match=usb_ep_in_lambda)
g_usbEpOut = usb.util.find_descriptor(intf, custom_match=g_usbEpOut_lambda) g_usbEpOut = usb.util.find_descriptor(intf, custom_match=usb_ep_out_lambda)
if (g_usbEpIn is None) or (g_usbEpOut is None): if (g_usbEpIn is None) or (g_usbEpOut is None):
print('Invalid endpoint addresses! (bus %u, address %u).' % (cur_dev.bus, cur_dev.address)) print('Invalid endpoint addresses! (bus %u, address %u).' % (cur_dev.bus, cur_dev.address))
time.sleep(0.1) time.sleep(0.1)
continue continue
# Save endpoint max packet size. # Save endpoint max packet size and USB version.
g_usbEpMaxPacketSize = g_usbEpIn.wMaxPacketSize g_usbEpMaxPacketSize = g_usbEpIn.wMaxPacketSize
usb_version = cur_dev.bcdUSB
break break
print('Successfully retrieved USB endpoints! (bus %u, address %u).' % (cur_dev.bus, cur_dev.address)) print('Successfully retrieved USB endpoints! (bus %u, address %u).' % (cur_dev.bus, cur_dev.address))
print('Exit nxdumptool at any time to close this script.\n') print('Max packet size: 0x%X (USB %u.%u).\n' % (g_usbEpMaxPacketSize, usb_version >> 8, (usb_version & 0xFF) >> 4))
print('Exit nxdumptool or disconnect your console at any time to close this script.\n')
def usbRead(size, timeout=-1): def usbRead(size, timeout=-1):
global g_usbEpIn global g_usbEpIn
@ -242,7 +248,7 @@ def usbHandleSendFileProperties(cmd_block):
filename = filename.replace('/', '\\') filename = filename.replace('/', '\\')
# Generate full, absolute path to the destination file. # Generate full, absolute path to the destination file.
fullpath = os.path.abspath(os.path.expanduser(os.path.expandvars(g_outputDir)) + os.path.sep + filename) fullpath = os.path.abspath(g_outputDir + os.path.sep + filename)
# Get parent directory path. # Get parent directory path.
dirpath = os.path.dirname(fullpath) dirpath = os.path.dirname(fullpath)
@ -250,7 +256,7 @@ def usbHandleSendFileProperties(cmd_block):
# Create full directory tree. # Create full directory tree.
os.makedirs(dirpath, exist_ok=True) os.makedirs(dirpath, exist_ok=True)
# Make sure the output file doesn't already exist as a directory. # Make sure the output filepath doesn't point to an existing directory.
if (os.path.exists(fullpath) == True) and (os.path.isfile(fullpath) == False): if (os.path.exists(fullpath) == True) and (os.path.isfile(fullpath) == False):
utilsResetNspInfo() utilsResetNspInfo()
print('Output filepath points to an existing directory! ("%s").' % (fullpath)) print('Output filepath points to an existing directory! ("%s").' % (fullpath))
@ -304,8 +310,8 @@ def usbHandleSendFileProperties(cmd_block):
# Initialize progress bar. # Initialize progress bar.
ascii = (False if (os.name != 'nt') else True) ascii = (False if (os.name != 'nt') else True)
(unit, unit_divisor) = utilsGetSizeUnit(file_size) (unit, unit_divisor) = utilsGetSizeUnitAndDivisor(file_size)
bar_format = '{percentage:3.0f}% |{bar}| {n:.2f}/{total:.2f} [{elapsed}<{remaining}, {rate_noinv_fmt}]' bar_format = '{percentage:3.0f}% |{bar}| {n:.2f}/{total:.2f} [{elapsed}<{remaining}, {rate_fmt}]'
pbar = tqdm(total=(float(file_size) / unit_divisor), ascii=ascii, unit=unit, dynamic_ncols=True, bar_format=bar_format) pbar = tqdm(total=(float(file_size) / unit_divisor), ascii=ascii, unit=unit, dynamic_ncols=True, bar_format=bar_format)
@ -403,7 +409,7 @@ def usbHandleEndSession(cmd_block):
def usbCommandHandler(): def usbCommandHandler():
# CancelFileTransfer is handled in usbHandleSendFileProperties(). # CancelFileTransfer is handled in usbHandleSendFileProperties().
cmd_switcher = { cmd_dict = {
USB_CMD_START_SESSION: usbHandleStartSession, USB_CMD_START_SESSION: usbHandleStartSession,
USB_CMD_SEND_FILE_PROPERTIES: usbHandleSendFileProperties, USB_CMD_SEND_FILE_PROPERTIES: usbHandleSendFileProperties,
USB_CMD_SEND_NSP_HEADER: usbHandleSendNspHeader, USB_CMD_SEND_NSP_HEADER: usbHandleSendNspHeader,
@ -414,36 +420,21 @@ def usbCommandHandler():
usbGetDeviceEndpoints() usbGetDeviceEndpoints()
while True: while True:
# Read command header. try:
cmd_header = usbRead(USB_CMD_HEADER_SIZE) # Read command header.
cmd_header = usbRead(USB_CMD_HEADER_SIZE)
except usb.core.USBError:
print('Nintendo Switch disconnected. Exiting.')
return
if (cmd_header is None) or (len(cmd_header) != USB_CMD_HEADER_SIZE): if (cmd_header is None) or (len(cmd_header) != USB_CMD_HEADER_SIZE):
continue continue
# Parse command header. # Parse command header.
(magic, cmd_id, cmd_block_size, padding) = struct.unpack('<4sII4p', cmd_header) (magic, cmd_id, cmd_block_size, padding) = struct.unpack('<4sII4p', cmd_header)
# Verify magic word. # Read command block right away (if needed).
if magic != USB_MAGIC_WORD: # nxdumptool expects us to read it right after sending the command header.
print('Received command header with invalid magic word!\n')
usbSendStatus(USB_STATUS_INVALID_MAGIC_WORD)
continue
# Get command handler function.
cmd_func = cmd_switcher.get(cmd_id, None)
if cmd_func is None:
print('Received command header with unsupported ID %02X.\n' % (cmd_id))
usbSendStatus(USB_STATUS_UNSUPPORTED_CMD)
continue
# Verify command block size.
if ((cmd_id == USB_CMD_START_SESSION) and (cmd_block_size != USB_CMD_BLOCK_SIZE_START_SESSION)) or \
((cmd_id == USB_CMD_SEND_FILE_PROPERTIES) and (cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES)) or \
((cmd_id == USB_CMD_SEND_NSP_HEADER) and (cmd_block_size == 0)):
print('Invalid command block size for command ID %02X! (0x%X).\n' % (cmd_id, cmd_block_size))
usbSendStatus(USB_STATUS_MALFORMED_COMMAND)
continue
# Read command block (if needed).
cmd_block = None cmd_block = None
if cmd_block_size > 0: if cmd_block_size > 0:
# Handle Zero-Length Termination packet (if needed). # Handle Zero-Length Termination packet (if needed).
@ -457,6 +448,27 @@ def usbCommandHandler():
print('Failed to read 0x%X byte(s) long command block for command ID %02X!\n' % (cmd_block_size, cmd_id)) print('Failed to read 0x%X byte(s) long command block for command ID %02X!\n' % (cmd_block_size, cmd_id))
continue continue
# Verify magic word.
if magic != USB_MAGIC_WORD:
print('Received command header with invalid magic word!\n')
usbSendStatus(USB_STATUS_INVALID_MAGIC_WORD)
continue
# Get command handler function.
cmd_func = cmd_dict.get(cmd_id, None)
if cmd_func is None:
print('Received command header with unsupported ID %02X.\n' % (cmd_id))
usbSendStatus(USB_STATUS_UNSUPPORTED_CMD)
continue
# Verify command block size.
if ((cmd_id == USB_CMD_START_SESSION) and (cmd_block_size != USB_CMD_BLOCK_SIZE_START_SESSION)) or \
((cmd_id == USB_CMD_SEND_FILE_PROPERTIES) and (cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES)) or \
((cmd_id == USB_CMD_SEND_NSP_HEADER) and (cmd_block_size == 0)):
print('Invalid command block size for command ID %02X! (0x%X).\n' % (cmd_id, cmd_block_size))
usbSendStatus(USB_STATUS_MALFORMED_COMMAND)
continue
# Run command handler function. # Run command handler function.
status = cmd_func(cmd_block) status = cmd_func(cmd_block)
print('') print('')
@ -469,7 +481,36 @@ def usbCommandHandler():
break break
def main(): def main():
global g_outputDir
print('nxdumptool companion script v%s.' % (SCRIPT_VERSION))
# Check if the user provided an output directory.
if len(sys.argv) >= 2:
# Expand environment variables and user's home directory.
g_outputDir = os.path.abspath(os.path.expanduser(os.path.expandvars(sys.argv[1])))
# Make sure we're dealing with an existing directory.
if (os.path.exists(g_outputDir) == False) or (os.path.isdir(g_outputDir) == False):
print('The provided path doesn\'t point to an existing directory!')
return
else:
# Create 'nxdumptool' subdirectory in the directory where the script is located.
g_outputDir = (sys.path[0] + os.path.sep + 'nxdumptool')
if os.path.exists(g_outputDir) == False:
os.mkdir(g_outputDir)
print('Output directory set to "%s".\n' % (g_outputDir))
# Start USB command handler.
usbCommandHandler() usbCommandHandler()
if __name__ == "__main__": if __name__ == "__main__":
main() try:
main()
except KeyboardInterrupt:
print('\nScript interrupted.')
try:
sys.exit(0)
except SystemExit:
os._exit(0)