diff --git a/host/nxdt_host.py b/host/nxdt_host.py index 579602a..96a9640 100644 --- a/host/nxdt_host.py +++ b/host/nxdt_host.py @@ -55,13 +55,22 @@ from tkinter import filedialog, messagebox, font, scrolledtext from tqdm import tqdm -from argparse import ArgumentParser +from argparse import ArgumentParser, RawTextHelpFormatter, ArgumentDefaultsHelpFormatter from io import BufferedWriter from typing import List, Tuple, Any, Callable, Optional from datetime import datetime +# Terminal colors using ANSI escape sequences. +# ref: https://gist.github.com/fnky/458719343aabd01cfb17a3a4f7296797 +COLOR_BACKGROUND = "\033[40m\033[0K" # black +COLOR_DEBUG = "\033[39m" # vanilla +COLOR_INFO = "\033[38;5;255m" # bright white +COLOR_WARNING = "\033[38;5;202m" # orange +COLOR_ERROR = "\033[1;31m" # red (intense) +COLOR_CRITICAL = "\033[4;31m" # underlined red +COLOR_RESET = "\033[0m\033[0K"; # resets all colors; blanks line from cursor pos # Scaling factors. WINDOWS_SCALING_FACTOR = 96.0 @@ -305,8 +314,12 @@ TASKBAR_LIB = b'TVNGVAIAAQAAAAAACQQAAAAAAABBAAAAAQAAAAAAAAAOAAAA/////wAAAAAAAAAA # Global variables used throughout the code. g_cliMode: bool = False +g_logToFile: bool = False +g_logVerbose: bool = False +g_terminalColors: bool = False g_outputDir: str = '' g_logPath: str = '' +g_pathSep: str = '' g_logLevelIntVar: Optional[tk.IntVar] = None g_logToFileBoolVar: Optional[tk.BooleanVar] = None @@ -316,6 +329,7 @@ g_osVersion: str = '' g_isWindows: bool = False g_isWindowsVista: bool = False g_isWindows7: bool = False +g_isWindows10: bool = False g_tkRoot: Optional[tk.Tk] = None g_tkCanvas: Optional[tk.Canvas] = None @@ -336,6 +350,7 @@ g_taskbar: Any = None g_usbEpIn: Any = None g_usbEpOut: Any = None g_usbEpMaxPacketSize: int = 0 +g_usbVer: str = "" g_nxdtVersionMajor: int = 0 g_nxdtVersionMinor: int = 0 @@ -351,6 +366,15 @@ g_nspRemainingSize: int = 0 g_nspFile: Optional[BufferedWriter] = None g_nspFilePath: str = '' +g_extractedFsDumpMode: bool = False + +g_formattedFileSize: float = 0 +g_fileSizeMiB: float = 0 +g_formattedFileUnit: str = 'B' +g_startTime: float = 0 + + + # Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget. class LogQueueHandler(logging.Handler): def __init__(self, log_queue: queue.Queue): @@ -359,15 +383,53 @@ class LogQueueHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: if g_cliMode: - msg = self.format(record) - print(msg) + msg = self.format_message(record) + self.log_to_stdout(record, msg) + if g_logToFile: + self.log_to_file(msg) else: self.log_queue.put(record) - if g_logToFileBoolVar.get(): - logToFileMsg=datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'\t['+ \ - record.levelname+']\t'+self.format(record).strip()+'\n' - with open (g_logPath, 'a', encoding="utf-8") as f: - f.write(logToFileMsg) + + def format_message(self, record:logging.LogRecord) -> str: + msg = "" + prepend = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'\t['+record.levelname+']\t' + content = self.format(record) + if content[0] == '\n': + msg = prepend + '\n' + prepend + content = content[1:] + else: + msg = prepend + msg = msg + content + if content[-1:] == '\n': + msg = msg + prepend + '\n' + else: + msg = msg + '\n' + return msg + + def log_to_stdout(self, record:logging.LogRecord, msg: str) -> None: + if g_terminalColors: + match record.levelname: + case "DEBUG": + print(COLOR_DEBUG + msg, end="") + case "INFO": + print(COLOR_INFO + msg, end="") + case "WARNING": + print(COLOR_WARNING + msg, end="") + case "ERROR": + print(COLOR_ERROR + msg, end="") + case "CRITICAL": + print(COLOR_CRITICAL + msg, end="") + case _: + print(COLOR_DEBUG + msg, end="") + if g_isWindows10: + print(COLOR_BACKGROUND, end="") + else: + print(msg, end="") + + def log_to_file (self, msg: str) -> None: + with open (g_logPath, 'a', encoding="utf-8") as f: + f.write(msg) + # Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget. class LogConsole: @@ -408,6 +470,8 @@ class LogConsole: break else: self.display(record) + if g_logToFile: + self.queue_handler.log_to_file(self.queue_handler.format_message(record)) if self.frame: self.frame.after(100, self.poll_log_queue) @@ -466,6 +530,7 @@ class ProgressBarWindow: self.tk_parent.after(0, self.tk_window.destroy) def start(self, total: int, n: int = 0, divider: int = 1, prefix: str = '', unit: str = 'B') -> None: + if (total <= 0) or (n < 0) or (divider < 1): raise Exception('Invalid arguments!') @@ -477,6 +542,7 @@ class ProgressBarWindow: self.unit = unit if self.tk_pbar: + #print() self.tk_pbar.configure(maximum=self.total_div, mode='determinate') self.start_time = time.time() else: @@ -548,7 +614,7 @@ class ProgressBarWindow: #assert self.pbar is not None self.pbar.close() self.pbar = None - print() + #print() def set_prefix(self, prefix) -> None: self.prefix = prefix @@ -585,36 +651,73 @@ def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool def utilsGetWinFullPath(path_arg: str) -> str: return '\\\\?\\' + path_arg.replace("/", "\\") - -def utilsResetOutputDir() -> None: - global g_outputDir - global g_logPath - #assert g_tkDirText is not None - g_outputDir = g_tkDirText.get('1.0', tk.END).strip() - if not g_outputDir: - # We should never reach this, honestly. - messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot) - return +def utilsUpdateLogPath() -> None: + global g_logPath - if g_logToFileBoolVar.get(): - g_logPath = os.path.abspath(g_outputDir + os.path.sep + \ + g_logPath = os.path.abspath(g_outputDir + os.path.sep + \ "nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log') - if g_isWindows: + if g_isWindows: g_logPath = utilsGetWinFullPath(g_logPath) - print(g_logPath) - else: - print("Not logging to file.") - - # Make sure the full directory tree exists. - try: - os.makedirs(g_outputDir, exist_ok=True) - except: - utilsLogException(traceback.format_exc()) - messagebox.showerror('Error', 'Unable to create full output directory tree!', parent=g_tkRoot) - return return + +# Enable terminal colors on *nix and supported Windows (10.0.10586+) + +def utilsSetupTerminal() -> None: + global g_terminalColors + + # ref0: https://stackoverflow.com/a/36760881 + # ref1: https://learn.microsoft.com/en-us/windows/console/setconsolemode + + if g_isWindows10: + try: + import ctypes + ctypes.windll.kernel32.GetStdHandle((-11), 7) + g_terminalColors = True + except: + utilsLogException(tracebackSer.format_exc()) + g_terminalColors = False + else: + if not g_isWindows: + g_terminalColors = True + else: + g_terminalColors = False + + # If colors supported, unconditionally set background color to black + if g_terminalColors: + print(COLOR_BACKGROUND) + +# Log basic info about the script and settings. +def utilsLogBasicInfo() -> None: + global g_logToFile, g_logVerbose, g_pathSep + g_logger.info('\n' + SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') + g_logger.info('\nServer started...\n') + g_logger.info('Sys:\tPython ' + platform.python_version() + " on "+ g_osType+" "+g_osVersion) + g_logger.info('Dst:\t' + g_outputDir) + + if g_logToFile: + g_logger.info('Log:\t' + g_logPath.rsplit(g_pathSep, 1)[-1]) + else: + g_logger.info('Logging to file is disabled.') + + if g_logVerbose: + g_logger.info('Verbose logging is enabled.\n') + else: + g_logger.info('Verbose logging is disabled.\n') + + return + +# On successful transfer, log elapsed time and (within reason) average transfer speed +def utilsLogTransferStats(elapsed_time: float) -> None: + if g_formattedFileUnit == "GiB" or g_formattedFileUnit == "MiB": + formatted_time = f'{elapsed_time:.2f}s' if round(elapsed_time < 60) else tqdm.format_interval(elapsed_time) + g_logger.info(f'{g_formattedFileSize:.2f}{g_formattedFileUnit} transferred in {formatted_time}.') + if elapsed_time > float(1): + g_logger.info(f'Avg speed: {g_fileSizeMiB/elapsed_time:.2f}MiB/s\n') + else: + g_logger.info(" ") + def utilsIsValueAlignedToEndpointPacketSize(value: int) -> bool: return bool((value & (g_usbEpMaxPacketSize - 1)) == 0) @@ -648,8 +751,16 @@ def utilsGetSizeUnitAndDivisor(size: int) -> Tuple[str, int]: return ret +def utilsInitTransferVars(file_size: int) -> None: + global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime + (g_formattedFileUnit,divisor) = utilsGetSizeUnitAndDivisor(file_size) + g_formattedFileSize = file_size / divisor + g_fileSizeMiB = file_size / 1048576 + g_startTime = time.time() + + def usbGetDeviceEndpoints() -> bool: - global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize + global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize, g_usbVer #assert g_logger is not None #assert g_stopEvent is not None @@ -660,7 +771,7 @@ def usbGetDeviceEndpoints() -> bool: usb_version = 0 if g_cliMode: - g_logger.info(f'Please connect a Nintendo Switch console running {USB_DEV_PRODUCT}.') + g_logger.info(f'Please connect a Nintendo Switch console running {USB_DEV_PRODUCT}.\n') while True: # Check if the user decided to stop the server. @@ -710,13 +821,10 @@ def usbGetDeviceEndpoints() -> bool: usb_version = cur_dev.bcdUSB break - + g_usbVer = f'USB {usb_version >> 8}.{(usb_version & 0xFF) >> 4}' g_logger.debug(f'Successfully retrieved USB endpoints! (bus {cur_dev.bus}, address {cur_dev.address}).') - g_logger.debug(f'Max packet size: 0x{g_usbEpMaxPacketSize:X} (USB {usb_version >> 8}.{(usb_version & 0xFF) >> 4}).\n') - - if g_cliMode: - g_logger.info(f'Exit {USB_DEV_PRODUCT} or disconnect your console at any time to close this script.') - + g_logger.debug(f'Max packet size: 0x{g_usbEpMaxPacketSize:X} ({g_usbVer}).') + return True def usbRead(size: int, timeout: int = -1) -> bytes: @@ -730,7 +838,7 @@ def usbRead(size: int, timeout: int = -1) -> bytes: except usb.core.USBError: if not g_cliMode: utilsLogException(traceback.format_exc()) - g_logger.error('\nUSB timeout triggered or console disconnected.') + g_logger.error('\nUSB timeout triggered or console disconnected.\n') return rd @@ -744,7 +852,7 @@ def usbWrite(data: bytes, timeout: int = -1) -> int: except usb.core.USBError: if not g_cliMode: utilsLogException(traceback.format_exc()) - g_logger.error('\nUSB timeout triggered or console disconnected.') + g_logger.error('\nUSB timeout triggered or console disconnected.\n') return wr @@ -757,10 +865,7 @@ def usbHandleStartSession(cmd_block: bytes) -> int: #assert g_logger is not None - if g_cliMode: - print() - - g_logger.debug(f'Received StartSession ({USB_CMD_START_SESSION:02X}) command.') + g_logger.debug(f'\nReceived StartSession ({USB_CMD_START_SESSION:02X}) command.') # Parse command block. (g_nxdtVersionMajor, g_nxdtVersionMinor, g_nxdtVersionMicro, abi_version, git_commit) = struct.unpack_from(' int: g_nxdtAbiVersionMinor = (abi_version & 0x0F) # Print client info. - g_logger.info(f'Client info: {USB_DEV_PRODUCT} v{g_nxdtVersionMajor}.{g_nxdtVersionMinor}.{g_nxdtVersionMicro}, USB ABI v{g_nxdtAbiVersionMajor}.{g_nxdtAbiVersionMinor} (commit {g_nxdtGitCommit}).\n') + g_logger.info(f'Client: {USB_DEV_PRODUCT} v{g_nxdtVersionMajor}.{g_nxdtVersionMinor}.{g_nxdtVersionMicro}, USB ABI v{g_nxdtAbiVersionMajor}.{g_nxdtAbiVersionMinor} (commit {g_nxdtGitCommit}).') + if not g_logVerbose: + g_logger.info(f'Connection: {g_usbVer}.\n') + + if g_cliMode: + g_logger.info(f'Exit {USB_DEV_PRODUCT} or disconnect your console at any time to close this script.\n') + # Check if we support this ABI version. if (g_nxdtAbiVersionMajor != USB_ABI_VERSION_MAJOR) or (g_nxdtAbiVersionMinor != USB_ABI_VERSION_MINOR): - g_logger.error('Unsupported ABI version!') + g_logger.error('\nUnsupported ABI version!\n') return USB_STATUS_UNSUPPORTED_ABI_VERSION # Return status code. @@ -783,41 +894,61 @@ def usbHandleStartSession(cmd_block: bytes) -> int: def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_outputDir, g_tkRoot, g_progressBarWindow + global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime #assert g_logger is not None #assert g_progressBarWindow is not None - if g_cliMode and not g_nspTransferMode: - print() - - g_logger.debug(f'Received SendFileProperties ({USB_CMD_SEND_FILE_PROPERTIES:02X}) command.') + g_logger.debug(f'\nReceived SendFileProperties ({USB_CMD_SEND_FILE_PROPERTIES:02X}) command.') # Parse command block. (file_size, filename_length, nsp_header_size, raw_filename) = struct.unpack_from(f' 0: dbg_str += f' | NSP header size: 0x{nsp_header_size:X}' g_logger.debug(dbg_str + '.') - file_type_str = ('file' if (not g_nspTransferMode) else 'NSP file entry') - - if not g_cliMode or (g_cliMode and not g_nspTransferMode): - g_logger.info(f'Receiving {file_type_str}: "{filename}".') - + # Log basic file info + if not g_nspTransferMode and not g_extractedFsDumpMode: + ext = filename[-3:] + match ext: + case "xci": g_logger.info("\tXCI transfer started!") + case "bin": g_logger.info("\tGamecard extra data transfer started!") + case "nsp": g_logger.info("\tNSP transfer started!") + case "nca": g_logger.info("\tRaw NCA transfer started!") + case "tik": g_logger.info("\tTicket transfer started!") + case _: g_logger.info("\tTransfer of unknown data type started!") # uh-oh? + utilsInitTransferVars(file_size) + g_logger.info(f'\nFile:\t{filename}') + g_logger.info(f'Size:\t{g_formattedFileSize:.2f}{g_formattedFileUnit}') + if(ext == "nsp"): + g_logger.info(f'Contents:') + else: + (unit,div) = utilsGetSizeUnitAndDivisor(file_size) + fs = file_size / div + if g_extractedFsDumpMode: + path_array = filename.split("/") + fn = '/'.join(path_array[7:]) + elif g_nspTransferMode: + fn = filename + g_logger.info(f'\t{fn} ({fs:.2f} {unit})') + # Perform validity checks. if (not g_nspTransferMode) and file_size and (nsp_header_size >= file_size): - g_logger.error('NSP header size must be smaller than the full NSP size!\n') + g_logger.error('\nNSP header size must be smaller than the full NSP size!\n') return USB_STATUS_MALFORMED_CMD if g_nspTransferMode and nsp_header_size: - g_logger.error('Received non-zero NSP header size during NSP transfer mode!\n') + g_logger.error('\nReceived non-zero NSP header size during NSP transfer mode!\n') return USB_STATUS_MALFORMED_CMD if (not filename_length) or (filename_length > USB_FILE_PROPERTIES_MAX_NAME_LENGTH): - g_logger.error('Invalid filename length!\n') + g_logger.error('\nInvalid filename length!\n') return USB_STATUS_MALFORMED_CMD # Enable NSP transfer mode (if needed). @@ -828,7 +959,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: g_nspRemainingSize = (file_size - nsp_header_size) g_nspFile = None g_nspFilePath = '' - g_logger.debug('NSP transfer mode enabled!\n') + g_logger.debug('\nNSP transfer mode enabled!') # Perform additional validity checks and get a file object to work with. if (not g_nspTransferMode) or (g_nspFile is None): @@ -848,14 +979,14 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Make sure the output filepath doesn't point to an existing directory. if os.path.exists(fullpath) and (not os.path.isfile(fullpath)): utilsResetNspInfo() - g_logger.error(f'Output filepath points to an existing directory! ("{fullpath}").\n') + g_logger.error(f'\nOutput filepath points to an existing directory! ("{fullpath}").\n') return USB_STATUS_HOST_IO_ERROR # Make sure we have enough free space. (total_space, used_space, free_space) = shutil.disk_usage(dirpath) if free_space <= file_size: utilsResetNspInfo() - g_logger.error('Not enough free space available in output volume!\n') + g_logger.error('\nNot enough free space available in output volume!\n') return USB_STATUS_HOST_IO_ERROR # Get file object. @@ -882,13 +1013,12 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Close file (if needed). if not g_nspTransferMode: file.close() - # Let the command handler take care of sending the status response for us. return USB_STATUS_SUCCESS # Send status response before entering the data transfer stage. usbSendStatus(USB_STATUS_SUCCESS) - + # Start data transfer stage. g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{fullpath}".') @@ -935,6 +1065,9 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Start transfer process. start_time = time.time() + # if not g_nspTransferMode: + # g_startTime = start_time + # print(f'1: {g_startTime}') while offset < file_size: # Update block size (if needed). @@ -949,7 +1082,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Read current chunk. chunk = usbRead(rd_size, USB_TRANSFER_TIMEOUT) if not chunk: - g_logger.error(f'Failed to read 0x{rd_size:X}-byte long data chunk!') + g_logger.error(f'\nFailed to read 0x{rd_size:X}-byte long data chunk!\n') # Cancel file transfer. cancelTransfer() @@ -966,7 +1099,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: # Cancel file transfer. cancelTransfer() - g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_CANCEL_FILE_TRANSFER:02X}) command.') + g_logger.debug(f'\nReceived CancelFileTransfer ({USB_CMD_CANCEL_FILE_TRANSFER:02X}) command.') g_logger.warning('Transfer cancelled.') # Let the command handler take care of sending the status response for us. @@ -987,30 +1120,34 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None: if use_pbar: g_progressBarWindow.update(chunk_size) - elapsed_time = round(time.time() - start_time) - g_logger.debug(f'File transfer successfully completed in {tqdm.format_interval(elapsed_time)}!\n') - - # Close file handle (if needed). - if not g_nspTransferMode: - file.close() + elapsed_time = time.time() - start_time + g_logger.debug(f'\nFile transfer successfully completed in {elapsed_time:.2f}s!') # Hide progress bar window (if needed). if use_pbar and ((not g_nspTransferMode) or (not g_nspRemainingSize)): g_progressBarWindow.end() + # Close file handle (if needed); log successful non-constitutent transfer. + if not g_nspTransferMode: + file.close() + if not g_extractedFsDumpMode: + if not g_logVerbose: + g_logger.info('\n\tTransfer complete!\n') + utilsLogTransferStats(elapsed_time); + return USB_STATUS_SUCCESS def usbHandleCancelFileTransfer(cmd_block: bytes) -> int: #assert g_logger is not None - g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.') + g_logger.debug(f'\nReceived CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.') if g_nspTransferMode: utilsResetNspInfo(True) g_logger.warning('Transfer cancelled.') return USB_STATUS_SUCCESS else: - g_logger.error('Unexpected transfer cancellation.') + g_logger.error('\nUnexpected transfer cancellation.\n') return USB_STATUS_MALFORMED_CMD def usbHandleSendNspHeader(cmd_block: bytes) -> int: @@ -1021,27 +1158,33 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int: nsp_header_size = len(cmd_block) - g_logger.debug(f'Received SendNspHeader ({USB_CMD_SEND_NSP_HEADER:02X}) command.') + g_logger.debug(f'\nReceived SendNspHeader ({USB_CMD_SEND_NSP_HEADER:02X}) command.') # Validity checks. if not g_nspTransferMode: - g_logger.error('Received NSP header out of NSP transfer mode!\n') + g_logger.error('\nReceived NSP header out of NSP transfer mode!\n') return USB_STATUS_MALFORMED_CMD if g_nspRemainingSize: - g_logger.error(f'Received NSP header before receiving all NSP data! (missing 0x{g_nspRemainingSize:X} byte[s]).\n') + g_logger.error(f'\nReceived NSP header before receiving all NSP data! (missing 0x{g_nspRemainingSize:X} byte[s]).\n') return USB_STATUS_MALFORMED_CMD if nsp_header_size != g_nspHeaderSize: - g_logger.error(f'NSP header size mismatch! (0x{nsp_header_size:X} != 0x{g_nspHeaderSize:X}).\n') + g_logger.error(f'\nNSP header size mismatch! (0x{nsp_header_size:X} != 0x{g_nspHeaderSize:X}).\n') return USB_STATUS_MALFORMED_CMD # Write NSP header. g_nspFile.seek(0) g_nspFile.write(cmd_block) + + # Log successful NSP transfer (header distinguishes it from constituent NCA transfers) - g_logger.debug(f'Successfully wrote 0x{nsp_header_size:X}-byte long NSP header to "{g_nspFilePath}".\n') + g_logger.debug(f'Successfully wrote 0x{nsp_header_size:X}-byte long NSP header to "{g_nspFilePath}".') + if not g_logVerbose: + g_logger.info('\n\tTransfer complete!\n') + utilsLogTransferStats(time.time() - g_startTime) + # Disable NSP transfer mode. utilsResetNspInfo() @@ -1049,31 +1192,49 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int: def usbHandleEndSession(cmd_block: bytes) -> int: #assert g_logger is not None - g_logger.debug(f'Received EndSession ({USB_CMD_END_SESSION:02X}) command.') + g_logger.debug(f'\nReceived EndSession ({USB_CMD_END_SESSION:02X}) command.') return USB_STATUS_SUCCESS def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int: #assert g_logger is not None - - g_logger.debug(f'Received StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') + global g_extractedFsDumpMode, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime + + g_logger.debug(f'\nReceived StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.') if g_nspTransferMode: - g_logger.error('StartExtractedFsDump received mid NSP transfer.') + g_logger.error('\nStartExtractedFsDump received mid NSP transfer.\n') return USB_STATUS_MALFORMED_CMD # Parse command block. (extracted_fs_size, extracted_fs_root_path) = struct.unpack_from(f' int: + global g_extractedFsDumpMode #assert g_logger is not None - g_logger.debug(f'Received EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') - g_logger.info(f'Finished extracted FS dump.') + g_extractedFsDumpMode = False + g_logger.debug(f'\nReceived EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.') + if not g_logVerbose: + g_logger.info(f'\n\tExtracted FS dump complete!\n') + utilsLogTransferStats(time.time() - g_startTime) return USB_STATUS_SUCCESS def usbCommandHandler() -> None: @@ -1110,7 +1271,7 @@ def usbCommandHandler() -> None: # Read command header. cmd_header = usbRead(USB_CMD_HEADER_SIZE) if (not cmd_header) or (len(cmd_header) != USB_CMD_HEADER_SIZE): - g_logger.error(f'Failed to read 0x{USB_CMD_HEADER_SIZE:X}-byte long command header!') + g_logger.error(f'\nFailed to read 0x{USB_CMD_HEADER_SIZE:X}-byte long command header!\n') break # Parse command header. @@ -1128,19 +1289,19 @@ def usbCommandHandler() -> None: cmd_block = usbRead(rd_size, USB_TRANSFER_TIMEOUT) if (not cmd_block) or (len(cmd_block) != cmd_block_size): - g_logger.error(f'Failed to read 0x{cmd_block_size:X}-byte long command block for command ID {cmd_id:02X}!') + g_logger.error(f'\nFailed to read 0x{cmd_block_size:X}-byte long command block for command ID {cmd_id:02X}!\n') break # Verify magic word. if magic != USB_MAGIC_WORD: - g_logger.error('Received command header with invalid magic word!\n') + g_logger.error('\nReceived command header with invalid magic word!\n') usbSendStatus(USB_STATUS_INVALID_MAGIC_WORD) continue # Get command handler function. cmd_func = cmd_dict.get(cmd_id, None) if cmd_func is None: - g_logger.error(f'Received command header with unsupported ID {cmd_id:02X}.\n') + g_logger.error(f'\nReceived command header with unsupported ID {cmd_id:02X}.\n') usbSendStatus(USB_STATUS_UNSUPPORTED_CMD) continue @@ -1149,7 +1310,7 @@ def usbCommandHandler() -> None: (cmd_id == USB_CMD_SEND_FILE_PROPERTIES and cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES) or \ (cmd_id == USB_CMD_SEND_NSP_HEADER and not cmd_block_size) or \ (cmd_id == USB_CMD_START_EXTRACTED_FS_DUMP and cmd_block_size != USB_CMD_BLOCK_SIZE_START_EXTRACTED_FS_DUMP): - g_logger.error(f'Invalid command block size for command ID {cmd_id:02X}! (0x{cmd_block_size:X}).\n') + g_logger.error(f'\nInvalid command block size for command ID {cmd_id:02X}! (0x{cmd_block_size:X}).\n') usbSendStatus(USB_STATUS_MALFORMED_CMD) continue @@ -1159,7 +1320,7 @@ def usbCommandHandler() -> None: if (status is None) or (not usbSendStatus(status)) or (cmd_id == USB_CMD_END_SESSION) or (status == USB_STATUS_UNSUPPORTED_ABI_VERSION): break - g_logger.info('\nStopping server.') + g_logger.info('Stopping server...\n') if not g_cliMode: # Update UI. @@ -1170,25 +1331,21 @@ def uiStopServer() -> None: #assert g_stopEvent is not None g_stopEvent.set() # Log the end of the session. - g_logger.info("\nServer stopped.\n") + g_logger.info("Server stopped.\n") def uiStartServer() -> None: - # Reset output path variable based on possible user input unconditionally. - utilsResetOutputDir() - + # Set new log path for this session if logging to file is turned on. + if g_logToFile: + utilsUpdateLogPath() + # Update UI. uiToggleElements(False) - # Log basic session status and info. - g_logger.info('\nServer started.') - g_logger.info('Output Dir:\t' + g_outputDir) - if g_logToFileBoolVar.get(): - g_logger.info('Logging to:\t' + g_logPath.rsplit('\\', 1)[-1] + '\n') - else: - g_logger.info('Logging to file disabled.\n') - + # Log basic info about the script and settings. + utilsLogBasicInfo() + # Create background server thread. server_thread = threading.Thread(target=usbCommandHandler, daemon=True) server_thread.start() @@ -1225,9 +1382,16 @@ def uiToggleElements(flag: bool) -> None: g_tkVerboseCheckbox.configure(state='disabled') def uiChooseDirectory() -> None: - dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=INITIAL_DIR, mustexist=True) + #assert g_tkDirText is not None + dirtext = g_tkDirText.get('1.0', tk.END).strip() + initdir = dirtext if os.path.exists(dirtext) else INITIAL_DIR + + dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=initdir, mustexist=True) + if dir: uiUpdateDirectoryField(os.path.abspath(dir)) + uiUpdateOutputDir() + def uiUpdateDirectoryField(path: str) -> None: #assert g_tkDirText is not None @@ -1236,6 +1400,27 @@ def uiUpdateDirectoryField(path: str) -> None: g_tkDirText.insert('1.0', path) g_tkDirText.configure(state='disabled') + +def uiUpdateOutputDir() -> None: + global g_outputDir + #assert g_tkDirText is not None + + g_outputDir = g_tkDirText.get('1.0', tk.END).strip() + if not g_outputDir: + # We should never reach this, honestly. + messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot) + return + + # Make sure the full directory tree exists. + try: + os.makedirs(g_outputDir, exist_ok=True) + except: + utilsLogException(traceback.format_exc()) + messagebox.showerror('Error', 'Unable to create full output directory tree!', parent=g_tkRoot) + return + + return + def uiHandleExitProtocol() -> None: #assert g_tkRoot is not None g_tkRoot.destroy() @@ -1246,13 +1431,24 @@ def uiHandleExitProtocolStub() -> None: def uiScaleMeasure(measure: int) -> int: return round(float(measure) * SCALE) +def uiHandleLogToFileCheckbox() -> None: + #assert g_logToFile is not None + #assert g_logToFileBoolVar is not None + global g_logToFile + g_logToFile = g_logToFileBoolVar.get() + return + def uiHandleVerboseCheckbox() -> None: #assert g_logger is not None #assert g_logLevelIntVar is not None - g_logger.setLevel(g_logLevelIntVar.get()) + global g_logVerbose + logLevel=g_logLevelIntVar.get() + g_logger.setLevel(logLevel) + g_logVerbose = True if(logLevel == logging.DEBUG) else False + return def uiInitialize() -> None: - global SCALE, g_logLevelIntVar, g_logToFileBoolVar + global SCALE, g_logLevelIntVar, g_logToFileBoolVar, g_logToFile, g_logVerbose global g_tkRoot, g_tkCanvas, g_tkDirText, g_tkChooseDirButton, g_tkServerButton, g_tkTipMessage, g_tkScrolledTextLog, g_tkLogToFileCheckbox, g_tkVerboseCheckbox global g_stopEvent, g_tlb, g_taskbar, g_progressBarWindow @@ -1325,6 +1521,7 @@ def uiInitialize() -> None: default_font = font.nametofont('TkDefaultFont') default_font_family = ('Segoe UI' if g_isWindows else 'sans-serif') default_font_size = (-12 if g_isWindows else -10) # Measured in pixels. Reference: https://docs.python.org/3/library/tkinter.font.html + padding_bottom = WINDOW_HEIGHT + default_font_size - 1 default_font.configure(family=default_font_family, size=uiScaleMeasure(default_font_size), weight=font.NORMAL) """print(screen_width_px, screen_height_px) @@ -1341,10 +1538,11 @@ def uiInitialize() -> None: g_tkCanvas = tk.Canvas(g_tkRoot, width=window_width_px, height=window_height_px) g_tkCanvas.pack() - g_tkCanvas.create_text(uiScaleMeasure(60), uiScaleMeasure(30), text='Output Dir:', anchor=tk.CENTER) + g_tkCanvas.create_text(uiScaleMeasure(60), uiScaleMeasure(30), text='Output directory:', anchor=tk.CENTER) g_tkDirText = tk.Text(g_tkRoot, height=1, width=45, font=default_font, wrap='none', state='disabled', bg='#F0F0F0') uiUpdateDirectoryField(g_outputDir) + g_tkCanvas.create_window(uiScaleMeasure(260), uiScaleMeasure(30), window=g_tkDirText, anchor=tk.CENTER) g_tkChooseDirButton = tk.Button(g_tkRoot, text='Choose', width=10, command=uiChooseDirectory) @@ -1364,18 +1562,19 @@ def uiInitialize() -> None: g_tkScrolledTextLog.tag_config('CRITICAL', foreground='red', underline=True) g_tkCanvas.create_window(uiScaleMeasure(int(WINDOW_WIDTH / 2)), uiScaleMeasure(280), window=g_tkScrolledTextLog, anchor=tk.CENTER) - g_tkCanvas.create_text(uiScaleMeasure(5), uiScaleMeasure(WINDOW_HEIGHT - 13), text=COPYRIGHT_TEXT, anchor=tk.W) + g_tkCanvas.create_text(uiScaleMeasure(5), uiScaleMeasure(padding_bottom), text=COPYRIGHT_TEXT, anchor=tk.W) g_logToFileBoolVar = tk.BooleanVar() - g_tkLogToFileCheckbox = tk.Checkbutton(g_tkRoot, text='Log to file', variable=g_logToFileBoolVar,onvalue=True, offvalue=False) + g_tkLogToFileCheckbox = tk.Checkbutton(g_tkRoot, text='Log to file', variable=g_logToFileBoolVar,onvalue=True, offvalue=False, command=uiHandleLogToFileCheckbox) g_tkLogToFileCheckbox.select() + g_logToFile = g_logToFileBoolVar.get() - g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 165), uiScaleMeasure(WINDOW_HEIGHT - 13), window=g_tkLogToFileCheckbox, anchor=tk.CENTER) + g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 165), uiScaleMeasure(padding_bottom), window=g_tkLogToFileCheckbox, anchor=tk.CENTER) g_logLevelIntVar = tk.IntVar() g_tkVerboseCheckbox = tk.Checkbutton(g_tkRoot, text='Verbose output', variable=g_logLevelIntVar, onvalue=logging.DEBUG, offvalue=logging.INFO, command=uiHandleVerboseCheckbox) - g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 55), uiScaleMeasure(WINDOW_HEIGHT - 13), window=g_tkVerboseCheckbox, anchor=tk.CENTER) + g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 55), uiScaleMeasure(padding_bottom), window=g_tkVerboseCheckbox, anchor=tk.CENTER) # Initialize console logger. console = LogConsole(g_tkScrolledTextLog) @@ -1392,37 +1591,47 @@ def cliInitialize() -> None: global g_progressBarWindow #assert g_logger is not None - + + # determines whether to use colors in terminal and sets up accordingly + utilsSetupTerminal() + + # Set log path if logging to file specified at cmd line + if g_logToFile: + utilsUpdateLogPath() + # Initialize console logger. console = LogConsole() # Initialize progress bar window object. bar_format = '{percentage:.2f}% |{bar}| {n:.2f}/{total:.2f} [{elapsed}<{remaining}, {rate_fmt}]' g_progressBarWindow = ProgressBarWindow(bar_format) - - # Print info. - g_logger.info('\n' + SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') - g_logger.info('Output Dir: "' + g_outputDir + '".\n') + + # Log basic info about the script and settings. + utilsLogBasicInfo() # Start USB command handler directly. usbCommandHandler() def main() -> int: - global g_cliMode, g_outputDir, g_osType, g_osVersion, g_isWindows, g_isWindowsVista, g_isWindows7, g_logger - + global g_cliMode, g_outputDir, g_logToFile, g_logVerbose, g_osType, g_osVersion, g_isWindows, g_isWindowsVista, g_isWindows7, g_isWindows10, g_pathSep, g_logger + # Disable warnings. warnings.filterwarnings("ignore") # Parse command line arguments. - parser = ArgumentParser(description=SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') + parser = ArgumentParser(formatter_class=RawTextHelpFormatter, description=SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.') parser.add_argument('-c', '--cli', required=False, action='store_true', default=False, help='Start the script in CLI mode.') - parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help='Path to output directory. Defaults to "' + DEFAULT_DIR + '".') + parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help='Path to output directory; will attempt to create if non-existent.'+\ + '\nDefaults to "' + DEFAULT_DIR + '".') + parser.add_argument('-l', '--log', required=False, action='store_true', default=False, help='Enables logging to file in output directory in CLI mode.') parser.add_argument('-v', '--verbose', required=False, action='store_true', default=False, help='Enable verbose output.') args = parser.parse_args() # Update global flags. g_cliMode = args.cli g_outputDir = utilsGetPath(args.outdir, DEFAULT_DIR, False, True) + g_logToFile = args.log + g_logVerbose = args.verbose # Get OS information. g_osType = platform.system() @@ -1430,23 +1639,27 @@ def main() -> int: # Get Windows information. g_isWindows = (g_osType == 'Windows') - g_isWindowsVista = g_isWindows7 = False + g_isWindowsVista = g_isWindows7 = g_isWindows10 = False if g_isWindows: win_ver = g_osVersion.split('.') win_ver_major = int(win_ver[0]) win_ver_minor = int(win_ver[1]) - + win_build = int(win_ver[2]) g_isWindowsVista = (win_ver_major >= 6) g_isWindows7 = (True if (win_ver_major > 6) else (win_ver_major == 6 and win_ver_minor > 0)) + # ANSI colors in cmd.exe min. build + # ref: https://github.com/dart-lang/sdk/issues/28614#issuecomment-287282970 + g_isWindows10 = (win_ver_major >= 10 and win_build >= 10586) + g_pathSep = os.path.sep if not g_isWindows else '\\' # Setup logging mechanism. - logging.basicConfig(level=(logging.DEBUG if args.verbose else logging.INFO)) + logging.basicConfig(level=(logging.DEBUG if g_logVerbose else logging.INFO)) g_logger = logging.getLogger() if len(g_logger.handlers): # Remove stderr output handler from logger. We'll control standard output on our own. log_stderr = g_logger.handlers[0] g_logger.removeHandler(log_stderr) - + if g_cliMode: # Initialize CLI. cliInitialize() @@ -1463,7 +1676,8 @@ if __name__ == "__main__": ret = main() except KeyboardInterrupt: time.sleep(0.2) - print("\nHost script interrupted!") + g_logger.info("Host script exited!") + if g_isWindows10: print(COLOR_RESET) except Exception as e: utilsLogException(traceback.format_exc())