From a291408b9dc3dbe10e25ebe219a1aa966297b496 Mon Sep 17 00:00:00 2001 From: Pablo Curiel Date: Wed, 17 Mar 2021 13:25:30 -0400 Subject: [PATCH] Host script changes. * Use Python keywords in conditional blocks. * Use one-liners whenever possible. * Removed unused code. * Only use a progress bar if the overall file size is greater than the hardcoded threshold value. --- nxdt_host.pyw | 175 +++++++++++++++++++++++--------------------------- 1 file changed, 81 insertions(+), 94 deletions(-) diff --git a/nxdt_host.pyw b/nxdt_host.pyw index 6d8e2dc..4d06c40 100644 --- a/nxdt_host.pyw +++ b/nxdt_host.pyw @@ -84,6 +84,9 @@ USB_TRANSFER_TIMEOUT = 5000 # USB transfer block size. USB_TRANSFER_BLOCK_SIZE = 0x800000 +# USB transfer threshold. Used to determine whether a progress bar should be displayed or not. +USB_TRANSFER_THRESHOLD = round(float(USB_TRANSFER_BLOCK_SIZE) * 2.5) + # USB command header/status magic word. USB_MAGIC_WORD = b'NXDT' @@ -254,8 +257,7 @@ class LogConsole: # Loosely based on tk.py from tqdm. class ProgressBar: def __init__(self, bar_format=None, tk_parent=None, window_title='', window_resize=False, window_protocol=None): - if tk_parent is None: - raise Exception('`tk_parent` must be provided!') + if tk_parent is None: raise Exception('`tk_parent` must be provided!') self.n = 0 self.total = 0 @@ -272,13 +274,11 @@ class ProgressBar: self.tk_window.withdraw() self.withdrawn = True - if window_title: - self.tk_window.title(window_title) + if window_title: self.tk_window.title(window_title) self.tk_window.resizable(window_resize, window_resize) - if window_protocol: - self.tk_window.protocol('WM_DELETE_WINDOW', window_protocol) + if window_protocol: self.tk_window.protocol('WM_DELETE_WINDOW', window_protocol) pbar_frame = ttk.Frame(self.tk_window, padding=5) pbar_frame.pack() @@ -296,8 +296,7 @@ class ProgressBar: self.tk_parent.after(0, self.tk_window.destroy) def start(self, n, total, prefix='', unit='B'): - if (n < 0) or (total <= 0): - raise Exception('Invalid arguments!') + if (n < 0) or (total <= 0): raise Exception('Invalid arguments!') self.total = total self.prefix = prefix @@ -309,8 +308,7 @@ class ProgressBar: def update(self, n): cur_n = (self.n + n) - if cur_n > self.total: - return + if cur_n > self.total: return self.elapsed_time = (time.time() - self.start_time) @@ -492,28 +490,27 @@ def usbHandleSendFileProperties(cmd_block): # Print info. info_str = ('File size: 0x%X | Filename length: 0x%X' % (file_size, filename_length)) - if nsp_header_size > 0: - info_str += (' | NSP header size: 0x%X' % (nsp_header_size)) + if nsp_header_size > 0: info_str += (' | NSP header size: 0x%X' % (nsp_header_size)) info_str += '.' g_Logger.info(info_str) g_Logger.info('Filename: "%s".' % (filename)) # Perform integrity checks - if (g_nspTransferMode == False) and (file_size > 0) and (nsp_header_size >= file_size): + if (not g_nspTransferMode) and file_size and (nsp_header_size >= file_size): g_Logger.error('NSP header size must be smaller than the full NSP size!\n') return USB_STATUS_MALFORMED_CMD - if (g_nspTransferMode == True) and (nsp_header_size > 0): + if g_nspTransferMode and nsp_header_size: g_Logger.error('Received non-zero NSP header size during NSP transfer mode!\n') return USB_STATUS_MALFORMED_CMD - if (filename_length <= 0) or (filename_length > USB_FILE_PROPERTIES_MAX_NAME_LENGTH): + if (not filename_length) or (filename_length > USB_FILE_PROPERTIES_MAX_NAME_LENGTH): g_Logger.error('Invalid filename length!\n') return USB_STATUS_MALFORMED_CMD # Enable NSP transfer mode (if needed). - if (g_nspTransferMode == False) and (file_size > 0) and (nsp_header_size > 0): + if (not g_nspTransferMode) and file_size and nsp_header_size: g_nspTransferMode = True g_nspSize = file_size g_nspHeaderSize = nsp_header_size @@ -523,14 +520,13 @@ def usbHandleSendFileProperties(cmd_block): g_Logger.debug('NSP transfer mode enabled!\n') # Perform additional integrity checks and get a file object to work with. - if (g_nspTransferMode == False) or ((g_nspTransferMode == True) and (g_nspFile is None)): + if (not g_nspTransferMode) or (g_nspFile is None): # Check if we're dealing with an absolute path. if filename[0] == '/': filename = filename[1:] # Replace all slashes with backslashes if we're running under Windows. - if os.name == 'nt': - filename = filename.replace('/', '\\') + if os.name == 'nt': filename = filename.replace('/', '\\') # Generate full, absolute path to the destination file. fullpath = os.path.abspath(g_outputDir + os.path.sep + filename) @@ -542,7 +538,7 @@ def usbHandleSendFileProperties(cmd_block): os.makedirs(dirpath, exist_ok=True) # Make sure the output filepath doesn't point to an existing directory. - if (os.path.exists(fullpath) == True) and (os.path.isfile(fullpath) == False): + if os.path.exists(fullpath) and (not os.path.isfile(fullpath)): utilsResetNspInfo() g_Logger.error('Output filepath points to an existing directory! ("%s").\n' % (fullpath)) return USB_STATUS_HOST_IO_ERROR @@ -557,7 +553,7 @@ def usbHandleSendFileProperties(cmd_block): # Get file object. file = open(fullpath, "wb") - if g_nspTransferMode == True: + if g_nspTransferMode: # Update NSP file object. g_nspFile = file @@ -573,10 +569,9 @@ def usbHandleSendFileProperties(cmd_block): dirpath = os.path.dirname(fullpath) # Check if we're dealing with an empty file or with the first SendFileProperties command from a NSP. - if (file_size == 0) or ((g_nspTransferMode == True) and (file_size == g_nspSize)): + if (not file_size) or (g_nspTransferMode and file_size == g_nspSize): # Close file (if needed). - if g_nspTransferMode == False: - file.close() + if not g_nspTransferMode: file.close() # Let the command handler take care of sending the status response for us. return USB_STATUS_SUCCESS @@ -585,61 +580,67 @@ def usbHandleSendFileProperties(cmd_block): usbSendStatus(USB_STATUS_SUCCESS) # Start data transfer stage. - file_type_str = ('file' if (g_nspTransferMode == False) else 'NSP file entry') + file_type_str = ('file' if (not g_nspTransferMode) else 'NSP file entry') g_Logger.info('Data transfer started. Saving %s to: "%s".' % (file_type_str, fullpath)) offset = 0 blksize = USB_TRANSFER_BLOCK_SIZE # Initialize progress bar. - idx = filename.rfind(os.path.sep) - prefix_filename = (filename[idx+1:] if (idx >= 0) else filename) - - prefix = ('Current %s: "%s".\n' % (file_type_str, prefix_filename)) - prefix += 'Use your console to cancel the file transfer if you wish to do so.\n\n' - - if (g_nspTransferMode == False) or ((g_nspTransferMode == True) and (g_nspRemainingSize == (g_nspSize - g_nspHeaderSize))): - if g_nspTransferMode == False: - pbar_n = 0 - pbar_file_size = file_size + use_pbar = (((not g_nspTransferMode) and (file_size > USB_TRANSFER_THRESHOLD)) or (g_nspTransferMode and (g_nspSize > USB_TRANSFER_THRESHOLD))) + if use_pbar: + idx = filename.rfind(os.path.sep) + prefix_filename = (filename[idx+1:] if (idx >= 0) else filename) + + prefix = ('Current %s: "%s".\n' % (file_type_str, prefix_filename)) + prefix += 'Use your console to cancel the file transfer if you wish to do so.\n\n' + + if (not g_nspTransferMode) or g_nspRemainingSize == (g_nspSize - g_nspHeaderSize): + if not g_nspTransferMode: + # Set current progress to zero and the maximum value to the provided file size. + pbar_n = 0 + pbar_file_size = file_size + else: + # Set current progress to the NSP header size and the maximum value to the provided NSP size. + pbar_n = g_nspHeaderSize + pbar_file_size = g_nspSize + + # Get progress bar unit and unit divider. These will be used to display and calculate size values using a specific size unit (B, KiB, MiB, GiB). + (unit, unit_divider) = utilsGetSizeUnitAndDivisor(pbar_file_size) + total = (float(pbar_file_size) / unit_divider) + + # Start the progress bar. + g_progressBar.start(pbar_n, total, prefix, unit) + + # Update the NSP unit divider (if needed). + if g_nspTransferMode: g_nspSizeUnitDivider = unit_divider else: - pbar_n = g_nspHeaderSize - pbar_file_size = g_nspSize - - (unit, unit_divider) = utilsGetSizeUnitAndDivisor(pbar_file_size) - total = (float(pbar_file_size) / unit_divider) - - g_progressBar.start(pbar_n, total, prefix, unit) - - if g_nspTransferMode == True: - g_nspSizeUnitDivider = unit_divider - else: - unit_divider = g_nspSizeUnitDivider - g_progressBar.set_prefix(prefix) + # Set unit divider for the whole NSP size and the current prefix (which holds the filename for the current NSP file entry). + unit_divider = g_nspSizeUnitDivider + g_progressBar.set_prefix(prefix) def cancelTransfer(): # Cancel file transfer. file.close() os.remove(fullpath) utilsResetNspInfo() - g_progressBar.end() + if use_pbar: g_progressBar.end() # Start transfer process. + start_time = time.time() + while offset < file_size: # Update block size (if needed). diff = (file_size - offset) - if blksize > diff: - blksize = diff + if blksize > diff: blksize = diff - # Handle Zero-Length Termination packet (if needed). - if ((offset + blksize) >= file_size) and (utilsIsValueAlignedToEndpointPacketSize(blksize) == True): - rd_size = (blksize + 1) - else: - rd_size = blksize + # Set block size and handle Zero-Length Termination packet (if needed). + rd_size = blksize + if ((offset + blksize) >= file_size) and utilsIsValueAlignedToEndpointPacketSize(blksize): rd_size += 1 # Read current chunk. chunk = usbRead(rd_size, USB_TRANSFER_TIMEOUT) - if chunk == None: + if chunk is None: g_Logger.error('Failed to read 0x%X-byte long data chunk!' % (rd_size)) # Cancel file transfer. @@ -670,22 +671,19 @@ def usbHandleSendFileProperties(cmd_block): offset = (offset + chunk_size) # Update remaining NSP data size. - if g_nspTransferMode == True: - g_nspRemainingSize -= chunk_size + if g_nspTransferMode: g_nspRemainingSize -= chunk_size - # Update progress bar once per second. - g_progressBar.update(float(chunk_size) / unit_divider) + # Update progress bar (if needed). + if use_pbar: g_progressBar.update(float(chunk_size) / unit_divider) - elapsed_time = round(g_progressBar.elapsed_time) + elapsed_time = round(time.time() - start_time) g_Logger.info('File transfer successfully completed in %s!\n' % (tqdm.format_interval(elapsed_time))) # Close file handle (if needed). - if g_nspTransferMode == False: - file.close() + if not g_nspTransferMode: file.close() # Hide progress bar (if needed). - if (g_nspTransferMode == False) or ((g_nspTransferMode == True) and (g_nspRemainingSize == 0)): - g_progressBar.end() + if use_pbar and ((not g_nspTransferMode) or (not g_nspRemainingSize)): g_progressBar.end() return USB_STATUS_SUCCESS @@ -697,11 +695,11 @@ def usbHandleSendNspHeader(cmd_block): g_Logger.debug('Received SendNspHeader (%02X) command.' % (USB_CMD_SEND_NSP_HEADER)) # Integrity checks. - if g_nspTransferMode == False: + if not g_nspTransferMode: g_Logger.error('Received NSP header out of NSP transfer mode!\n') return USB_STATUS_MALFORMED_CMD - if g_nspRemainingSize > 0: + if g_nspRemainingSize: g_Logger.error('Received NSP header before receiving all NSP data! (missing 0x%X byte[s]).\n' % (g_nspRemainingSize)) return USB_STATUS_MALFORMED_CMD @@ -735,7 +733,7 @@ def usbCommandHandler(): } # Get device endpoints. - if usbGetDeviceEndpoints() == False: + if not usbGetDeviceEndpoints(): # Update UI and return. uiToggleElements(True) return @@ -759,9 +757,9 @@ def usbCommandHandler(): # Read command block right away (if needed). # nxdumptool expects us to read it right after sending the command header. cmd_block = None - if cmd_block_size > 0: + if cmd_block_size: # Handle Zero-Length Termination packet (if needed). - if utilsIsValueAlignedToEndpointPacketSize(cmd_block_size) == True: + if utilsIsValueAlignedToEndpointPacketSize(cmd_block_size): rd_size = (cmd_block_size + 1) else: rd_size = cmd_block_size @@ -785,9 +783,9 @@ def usbCommandHandler(): continue # Verify command block size. - if ((cmd_id == USB_CMD_START_SESSION) and (cmd_block_size != USB_CMD_BLOCK_SIZE_START_SESSION)) or \ - ((cmd_id == USB_CMD_SEND_FILE_PROPERTIES) and (cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES)) or \ - ((cmd_id == USB_CMD_SEND_NSP_HEADER) and (cmd_block_size == 0)): + if (cmd_id == USB_CMD_START_SESSION and cmd_block_size != USB_CMD_BLOCK_SIZE_START_SESSION) or \ + (cmd_id == USB_CMD_SEND_FILE_PROPERTIES and cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES) or \ + (cmd_id == USB_CMD_SEND_NSP_HEADER and not cmd_block_size): g_Logger.error('Invalid command block size for command ID %02X! (0x%X).\n' % (cmd_id, cmd_block_size)) usbSendStatus(USB_STATUS_MALFORMED_COMMAND) continue @@ -795,7 +793,7 @@ def usbCommandHandler(): # Run command handler function. # Send status response afterwards. Bail out if requested. status = cmd_func(cmd_block) - if (status == None) or (not usbSendStatus(status)) or (cmd_id == USB_CMD_END_SESSION) or (status == USB_STATUS_UNSUPPORTED_ABI_VERSION): + if (status is None) or (not usbSendStatus(status)) or (cmd_id == USB_CMD_END_SESSION) or (status == USB_STATUS_UNSUPPORTED_ABI_VERSION): break g_Logger.info('Stopping server.') @@ -832,7 +830,7 @@ def uiStartServer(): server_thread.start() def uiToggleElements(enable): - if enable == True: + if enable: g_tkRoot.protocol('WM_DELETE_WINDOW', uiHandleExitProtocol) g_tkChooseDirButton.configure(state='normal') @@ -851,9 +849,7 @@ def uiToggleElements(enable): def uiChooseDirectory(): dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=INITIAL_DIR, mustexist=True) - if dir: - dir = os.path.abspath(dir) - uiUpdateDirectoryField(dir) + if dir: uiUpdateDirectoryField(os.path.abspath(dir)) def uiUpdateDirectoryField(dir): g_tkDirText.configure(state='normal') @@ -867,14 +863,6 @@ def uiHandleExitProtocol(): def uiHandleExitProtocolStub(): pass -def uiHandleTabInput(event): - event.widget.tk_focusNext().focus() - return 'break' - -def uiHandleShiftTabInput(event): - event.widget.tk_focusPrev().focus() - return 'break' - def uiScaleMeasure(measure): return round(float(measure) * SCALE) @@ -891,12 +879,11 @@ def main(): # Check if we're running under Windows Vista or later. dpi_aware = False win_vista = ((os_type == 'Windows') and (int(os_version[:os_version.find('.')]) >= 6)) - if win_vista == True: + if win_vista: try: # Enable high DPI scaling. dpi_aware = (ctypes.windll.user32.SetProcessDPIAware() == 1) - if dpi_aware == False: - dpi_aware = (ctypes.windll.shcore.SetProcessDpiAwareness(1) == 0) + if not dpi_aware: dpi_aware = (ctypes.windll.shcore.SetProcessDpiAwareness(1) == 0) except: traceback.print_exc() @@ -911,8 +898,7 @@ def main(): screen_dpi = round(g_tkRoot.winfo_fpixels('1i')) # Update scaling factor (if needed). - if (win_vista == True) and (dpi_aware == True): - SCALE = (float(screen_dpi) / WINDOWS_SCALING_FACTOR) + if win_vista and dpi_aware: SCALE = (float(screen_dpi) / WINDOWS_SCALING_FACTOR) # Decode embedded icon and set it. try: @@ -976,11 +962,12 @@ def main(): # Configure logging mechanism. logging.basicConfig(level=logging.DEBUG) - if len(g_Logger.handlers) > 0: + if len(g_Logger.handlers): + # Remove stderr output handler from logger. log_stderr = g_Logger.handlers[0] g_Logger.removeHandler(log_stderr) - # Initialize console g_Logger. + # Initialize console logger. console = LogConsole(g_tkScrolledTextLog) # Create hidden progress bar child window.