1
0
Fork 0
mirror of https://github.com/DarkMatterCore/nxdumptool.git synced 2025-01-24 00:23:53 +00:00

cli logging w/time+color, refactoring, remember last dir

This commit is contained in:
bilditup1 2023-11-26 15:09:29 -05:00
parent 13a9f86fe9
commit 4e6f91eaa5

View file

@ -55,13 +55,22 @@ from tkinter import filedialog, messagebox, font, scrolledtext
from tqdm import tqdm
from argparse import ArgumentParser
from argparse import ArgumentParser, RawTextHelpFormatter, ArgumentDefaultsHelpFormatter
from io import BufferedWriter
from typing import List, Tuple, Any, Callable, Optional
from datetime import datetime
# Terminal colors using ANSI escape sequences.
# ref: https://gist.github.com/fnky/458719343aabd01cfb17a3a4f7296797
COLOR_BACKGROUND = "\033[40m\033[0K" # black
COLOR_DEBUG = "\033[39m" # vanilla
COLOR_INFO = "\033[38;5;255m" # bright white
COLOR_WARNING = "\033[38;5;202m" # orange
COLOR_ERROR = "\033[1;31m" # red (intense)
COLOR_CRITICAL = "\033[4;31m" # underlined red
COLOR_RESET = "\033[0m\033[0K"; # resets all colors; blanks line from cursor pos
# Scaling factors.
WINDOWS_SCALING_FACTOR = 96.0
@ -305,8 +314,12 @@ TASKBAR_LIB = b'TVNGVAIAAQAAAAAACQQAAAAAAABBAAAAAQAAAAAAAAAOAAAA/////wAAAAAAAAAA
# Global variables used throughout the code.
g_cliMode: bool = False
g_logToFile: bool = False
g_logVerbose: bool = False
g_terminalColors: bool = False
g_outputDir: str = ''
g_logPath: str = ''
g_pathSep: str = ''
g_logLevelIntVar: Optional[tk.IntVar] = None
g_logToFileBoolVar: Optional[tk.BooleanVar] = None
@ -316,6 +329,7 @@ g_osVersion: str = ''
g_isWindows: bool = False
g_isWindowsVista: bool = False
g_isWindows7: bool = False
g_isWindows10: bool = False
g_tkRoot: Optional[tk.Tk] = None
g_tkCanvas: Optional[tk.Canvas] = None
@ -336,6 +350,7 @@ g_taskbar: Any = None
g_usbEpIn: Any = None
g_usbEpOut: Any = None
g_usbEpMaxPacketSize: int = 0
g_usbVer: str = ""
g_nxdtVersionMajor: int = 0
g_nxdtVersionMinor: int = 0
@ -351,6 +366,15 @@ g_nspRemainingSize: int = 0
g_nspFile: Optional[BufferedWriter] = None
g_nspFilePath: str = ''
g_extractedFsDumpMode: bool = False
g_formattedFileSize: float = 0
g_fileSizeMiB: float = 0
g_formattedFileUnit: str = 'B'
g_startTime: float = 0
# Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget.
class LogQueueHandler(logging.Handler):
def __init__(self, log_queue: queue.Queue):
@ -359,15 +383,53 @@ class LogQueueHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
if g_cliMode:
msg = self.format(record)
print(msg)
msg = self.format_message(record)
self.log_to_stdout(record, msg)
if g_logToFile:
self.log_to_file(msg)
else:
self.log_queue.put(record)
if g_logToFileBoolVar.get():
logToFileMsg=datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'\t['+ \
record.levelname+']\t'+self.format(record).strip()+'\n'
with open (g_logPath, 'a', encoding="utf-8") as f:
f.write(logToFileMsg)
def format_message(self, record:logging.LogRecord) -> str:
msg = ""
prepend = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'\t['+record.levelname+']\t'
content = self.format(record)
if content[0] == '\n':
msg = prepend + '\n' + prepend
content = content[1:]
else:
msg = prepend
msg = msg + content
if content[-1:] == '\n':
msg = msg + prepend + '\n'
else:
msg = msg + '\n'
return msg
def log_to_stdout(self, record:logging.LogRecord, msg: str) -> None:
if g_terminalColors:
match record.levelname:
case "DEBUG":
print(COLOR_DEBUG + msg, end="")
case "INFO":
print(COLOR_INFO + msg, end="")
case "WARNING":
print(COLOR_WARNING + msg, end="")
case "ERROR":
print(COLOR_ERROR + msg, end="")
case "CRITICAL":
print(COLOR_CRITICAL + msg, end="")
case _:
print(COLOR_DEBUG + msg, end="")
if g_isWindows10:
print(COLOR_BACKGROUND, end="")
else:
print(msg, end="")
def log_to_file (self, msg: str) -> None:
with open (g_logPath, 'a', encoding="utf-8") as f:
f.write(msg)
# Reference: https://beenje.github.io/blog/posts/logging-to-a-tkinter-scrolledtext-widget.
class LogConsole:
@ -408,6 +470,8 @@ class LogConsole:
break
else:
self.display(record)
if g_logToFile:
self.queue_handler.log_to_file(self.queue_handler.format_message(record))
if self.frame:
self.frame.after(100, self.poll_log_queue)
@ -466,6 +530,7 @@ class ProgressBarWindow:
self.tk_parent.after(0, self.tk_window.destroy)
def start(self, total: int, n: int = 0, divider: int = 1, prefix: str = '', unit: str = 'B') -> None:
if (total <= 0) or (n < 0) or (divider < 1):
raise Exception('Invalid arguments!')
@ -477,6 +542,7 @@ class ProgressBarWindow:
self.unit = unit
if self.tk_pbar:
#print()
self.tk_pbar.configure(maximum=self.total_div, mode='determinate')
self.start_time = time.time()
else:
@ -548,7 +614,7 @@ class ProgressBarWindow:
#assert self.pbar is not None
self.pbar.close()
self.pbar = None
print()
#print()
def set_prefix(self, prefix) -> None:
self.prefix = prefix
@ -585,36 +651,73 @@ def utilsGetPath(path_arg: str, fallback_path: str, is_file: bool, create: bool
def utilsGetWinFullPath(path_arg: str) -> str:
return '\\\\?\\' + path_arg.replace("/", "\\")
def utilsResetOutputDir() -> None:
global g_outputDir
global g_logPath
#assert g_tkDirText is not None
g_outputDir = g_tkDirText.get('1.0', tk.END).strip()
if not g_outputDir:
# We should never reach this, honestly.
messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot)
return
def utilsUpdateLogPath() -> None:
global g_logPath
if g_logToFileBoolVar.get():
g_logPath = os.path.abspath(g_outputDir + os.path.sep + \
g_logPath = os.path.abspath(g_outputDir + os.path.sep + \
"nxdt_host_" + datetime.now().strftime('%Y-%m-%d_%H%M%S') + '.log')
if g_isWindows:
if g_isWindows:
g_logPath = utilsGetWinFullPath(g_logPath)
print(g_logPath)
else:
print("Not logging to file.")
# Make sure the full directory tree exists.
try:
os.makedirs(g_outputDir, exist_ok=True)
except:
utilsLogException(traceback.format_exc())
messagebox.showerror('Error', 'Unable to create full output directory tree!', parent=g_tkRoot)
return
return
# Enable terminal colors on *nix and supported Windows (10.0.10586+)
def utilsSetupTerminal() -> None:
global g_terminalColors
# ref0: https://stackoverflow.com/a/36760881
# ref1: https://learn.microsoft.com/en-us/windows/console/setconsolemode
if g_isWindows10:
try:
import ctypes
ctypes.windll.kernel32.GetStdHandle((-11), 7)
g_terminalColors = True
except:
utilsLogException(tracebackSer.format_exc())
g_terminalColors = False
else:
if not g_isWindows:
g_terminalColors = True
else:
g_terminalColors = False
# If colors supported, unconditionally set background color to black
if g_terminalColors:
print(COLOR_BACKGROUND)
# Log basic info about the script and settings.
def utilsLogBasicInfo() -> None:
global g_logToFile, g_logVerbose, g_pathSep
g_logger.info('\n' + SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.')
g_logger.info('\nServer started...\n')
g_logger.info('Sys:\tPython ' + platform.python_version() + " on "+ g_osType+" "+g_osVersion)
g_logger.info('Dst:\t' + g_outputDir)
if g_logToFile:
g_logger.info('Log:\t' + g_logPath.rsplit(g_pathSep, 1)[-1])
else:
g_logger.info('Logging to file is disabled.')
if g_logVerbose:
g_logger.info('Verbose logging is enabled.\n')
else:
g_logger.info('Verbose logging is disabled.\n')
return
# On successful transfer, log elapsed time and (within reason) average transfer speed
def utilsLogTransferStats(elapsed_time: float) -> None:
if g_formattedFileUnit == "GiB" or g_formattedFileUnit == "MiB":
formatted_time = f'{elapsed_time:.2f}s' if round(elapsed_time < 60) else tqdm.format_interval(elapsed_time)
g_logger.info(f'{g_formattedFileSize:.2f}{g_formattedFileUnit} transferred in {formatted_time}.')
if elapsed_time > float(1):
g_logger.info(f'Avg speed: {g_fileSizeMiB/elapsed_time:.2f}MiB/s\n')
else:
g_logger.info(" ")
def utilsIsValueAlignedToEndpointPacketSize(value: int) -> bool:
return bool((value & (g_usbEpMaxPacketSize - 1)) == 0)
@ -648,8 +751,16 @@ def utilsGetSizeUnitAndDivisor(size: int) -> Tuple[str, int]:
return ret
def utilsInitTransferVars(file_size: int) -> None:
global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime
(g_formattedFileUnit,divisor) = utilsGetSizeUnitAndDivisor(file_size)
g_formattedFileSize = file_size / divisor
g_fileSizeMiB = file_size / 1048576
g_startTime = time.time()
def usbGetDeviceEndpoints() -> bool:
global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize
global g_usbEpIn, g_usbEpOut, g_usbEpMaxPacketSize, g_usbVer
#assert g_logger is not None
#assert g_stopEvent is not None
@ -660,7 +771,7 @@ def usbGetDeviceEndpoints() -> bool:
usb_version = 0
if g_cliMode:
g_logger.info(f'Please connect a Nintendo Switch console running {USB_DEV_PRODUCT}.')
g_logger.info(f'Please connect a Nintendo Switch console running {USB_DEV_PRODUCT}.\n')
while True:
# Check if the user decided to stop the server.
@ -710,13 +821,10 @@ def usbGetDeviceEndpoints() -> bool:
usb_version = cur_dev.bcdUSB
break
g_usbVer = f'USB {usb_version >> 8}.{(usb_version & 0xFF) >> 4}'
g_logger.debug(f'Successfully retrieved USB endpoints! (bus {cur_dev.bus}, address {cur_dev.address}).')
g_logger.debug(f'Max packet size: 0x{g_usbEpMaxPacketSize:X} (USB {usb_version >> 8}.{(usb_version & 0xFF) >> 4}).\n')
if g_cliMode:
g_logger.info(f'Exit {USB_DEV_PRODUCT} or disconnect your console at any time to close this script.')
g_logger.debug(f'Max packet size: 0x{g_usbEpMaxPacketSize:X} ({g_usbVer}).')
return True
def usbRead(size: int, timeout: int = -1) -> bytes:
@ -730,7 +838,7 @@ def usbRead(size: int, timeout: int = -1) -> bytes:
except usb.core.USBError:
if not g_cliMode:
utilsLogException(traceback.format_exc())
g_logger.error('\nUSB timeout triggered or console disconnected.')
g_logger.error('\nUSB timeout triggered or console disconnected.\n')
return rd
@ -744,7 +852,7 @@ def usbWrite(data: bytes, timeout: int = -1) -> int:
except usb.core.USBError:
if not g_cliMode:
utilsLogException(traceback.format_exc())
g_logger.error('\nUSB timeout triggered or console disconnected.')
g_logger.error('\nUSB timeout triggered or console disconnected.\n')
return wr
@ -757,10 +865,7 @@ def usbHandleStartSession(cmd_block: bytes) -> int:
#assert g_logger is not None
if g_cliMode:
print()
g_logger.debug(f'Received StartSession ({USB_CMD_START_SESSION:02X}) command.')
g_logger.debug(f'\nReceived StartSession ({USB_CMD_START_SESSION:02X}) command.')
# Parse command block.
(g_nxdtVersionMajor, g_nxdtVersionMinor, g_nxdtVersionMicro, abi_version, git_commit) = struct.unpack_from('<BBBB8s', cmd_block, 0)
@ -771,11 +876,17 @@ def usbHandleStartSession(cmd_block: bytes) -> int:
g_nxdtAbiVersionMinor = (abi_version & 0x0F)
# Print client info.
g_logger.info(f'Client info: {USB_DEV_PRODUCT} v{g_nxdtVersionMajor}.{g_nxdtVersionMinor}.{g_nxdtVersionMicro}, USB ABI v{g_nxdtAbiVersionMajor}.{g_nxdtAbiVersionMinor} (commit {g_nxdtGitCommit}).\n')
g_logger.info(f'Client: {USB_DEV_PRODUCT} v{g_nxdtVersionMajor}.{g_nxdtVersionMinor}.{g_nxdtVersionMicro}, USB ABI v{g_nxdtAbiVersionMajor}.{g_nxdtAbiVersionMinor} (commit {g_nxdtGitCommit}).')
if not g_logVerbose:
g_logger.info(f'Connection: {g_usbVer}.\n')
if g_cliMode:
g_logger.info(f'Exit {USB_DEV_PRODUCT} or disconnect your console at any time to close this script.\n')
# Check if we support this ABI version.
if (g_nxdtAbiVersionMajor != USB_ABI_VERSION_MAJOR) or (g_nxdtAbiVersionMinor != USB_ABI_VERSION_MINOR):
g_logger.error('Unsupported ABI version!')
g_logger.error('\nUnsupported ABI version!\n')
return USB_STATUS_UNSUPPORTED_ABI_VERSION
# Return status code.
@ -783,41 +894,61 @@ def usbHandleStartSession(cmd_block: bytes) -> int:
def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
global g_nspTransferMode, g_nspSize, g_nspHeaderSize, g_nspRemainingSize, g_nspFile, g_nspFilePath, g_outputDir, g_tkRoot, g_progressBarWindow
global g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime
#assert g_logger is not None
#assert g_progressBarWindow is not None
if g_cliMode and not g_nspTransferMode:
print()
g_logger.debug(f'Received SendFileProperties ({USB_CMD_SEND_FILE_PROPERTIES:02X}) command.')
g_logger.debug(f'\nReceived SendFileProperties ({USB_CMD_SEND_FILE_PROPERTIES:02X}) command.')
# Parse command block.
(file_size, filename_length, nsp_header_size, raw_filename) = struct.unpack_from(f'<QII{USB_FILE_PROPERTIES_MAX_NAME_LENGTH}s', cmd_block, 0)
filename = raw_filename.decode('utf-8').strip('\x00')
file_type_str = ('file' if (not g_nspTransferMode) else 'NSP file entry')
# Print info.
# Print info (debug / verbose).
dbg_str = f'File size: 0x{file_size:X} | Filename length: 0x{filename_length:X}'
if nsp_header_size > 0:
dbg_str += f' | NSP header size: 0x{nsp_header_size:X}'
g_logger.debug(dbg_str + '.')
file_type_str = ('file' if (not g_nspTransferMode) else 'NSP file entry')
if not g_cliMode or (g_cliMode and not g_nspTransferMode):
g_logger.info(f'Receiving {file_type_str}: "{filename}".')
# Log basic file info
if not g_nspTransferMode and not g_extractedFsDumpMode:
ext = filename[-3:]
match ext:
case "xci": g_logger.info("\tXCI transfer started!")
case "bin": g_logger.info("\tGamecard extra data transfer started!")
case "nsp": g_logger.info("\tNSP transfer started!")
case "nca": g_logger.info("\tRaw NCA transfer started!")
case "tik": g_logger.info("\tTicket transfer started!")
case _: g_logger.info("\tTransfer of unknown data type started!") # uh-oh?
utilsInitTransferVars(file_size)
g_logger.info(f'\nFile:\t{filename}')
g_logger.info(f'Size:\t{g_formattedFileSize:.2f}{g_formattedFileUnit}')
if(ext == "nsp"):
g_logger.info(f'Contents:')
else:
(unit,div) = utilsGetSizeUnitAndDivisor(file_size)
fs = file_size / div
if g_extractedFsDumpMode:
path_array = filename.split("/")
fn = '/'.join(path_array[7:])
elif g_nspTransferMode:
fn = filename
g_logger.info(f'\t{fn} ({fs:.2f} {unit})')
# Perform validity checks.
if (not g_nspTransferMode) and file_size and (nsp_header_size >= file_size):
g_logger.error('NSP header size must be smaller than the full NSP size!\n')
g_logger.error('\nNSP header size must be smaller than the full NSP size!\n')
return USB_STATUS_MALFORMED_CMD
if g_nspTransferMode and nsp_header_size:
g_logger.error('Received non-zero NSP header size during NSP transfer mode!\n')
g_logger.error('\nReceived non-zero NSP header size during NSP transfer mode!\n')
return USB_STATUS_MALFORMED_CMD
if (not filename_length) or (filename_length > USB_FILE_PROPERTIES_MAX_NAME_LENGTH):
g_logger.error('Invalid filename length!\n')
g_logger.error('\nInvalid filename length!\n')
return USB_STATUS_MALFORMED_CMD
# Enable NSP transfer mode (if needed).
@ -828,7 +959,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
g_nspRemainingSize = (file_size - nsp_header_size)
g_nspFile = None
g_nspFilePath = ''
g_logger.debug('NSP transfer mode enabled!\n')
g_logger.debug('\nNSP transfer mode enabled!')
# Perform additional validity checks and get a file object to work with.
if (not g_nspTransferMode) or (g_nspFile is None):
@ -848,14 +979,14 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Make sure the output filepath doesn't point to an existing directory.
if os.path.exists(fullpath) and (not os.path.isfile(fullpath)):
utilsResetNspInfo()
g_logger.error(f'Output filepath points to an existing directory! ("{fullpath}").\n')
g_logger.error(f'\nOutput filepath points to an existing directory! ("{fullpath}").\n')
return USB_STATUS_HOST_IO_ERROR
# Make sure we have enough free space.
(total_space, used_space, free_space) = shutil.disk_usage(dirpath)
if free_space <= file_size:
utilsResetNspInfo()
g_logger.error('Not enough free space available in output volume!\n')
g_logger.error('\nNot enough free space available in output volume!\n')
return USB_STATUS_HOST_IO_ERROR
# Get file object.
@ -882,13 +1013,12 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Close file (if needed).
if not g_nspTransferMode:
file.close()
# Let the command handler take care of sending the status response for us.
return USB_STATUS_SUCCESS
# Send status response before entering the data transfer stage.
usbSendStatus(USB_STATUS_SUCCESS)
# Start data transfer stage.
g_logger.debug(f'Data transfer started. Saving {file_type_str} to: "{fullpath}".')
@ -935,6 +1065,9 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Start transfer process.
start_time = time.time()
# if not g_nspTransferMode:
# g_startTime = start_time
# print(f'1: {g_startTime}')
while offset < file_size:
# Update block size (if needed).
@ -949,7 +1082,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Read current chunk.
chunk = usbRead(rd_size, USB_TRANSFER_TIMEOUT)
if not chunk:
g_logger.error(f'Failed to read 0x{rd_size:X}-byte long data chunk!')
g_logger.error(f'\nFailed to read 0x{rd_size:X}-byte long data chunk!\n')
# Cancel file transfer.
cancelTransfer()
@ -966,7 +1099,7 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
# Cancel file transfer.
cancelTransfer()
g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_CANCEL_FILE_TRANSFER:02X}) command.')
g_logger.debug(f'\nReceived CancelFileTransfer ({USB_CMD_CANCEL_FILE_TRANSFER:02X}) command.')
g_logger.warning('Transfer cancelled.')
# Let the command handler take care of sending the status response for us.
@ -987,30 +1120,34 @@ def usbHandleSendFileProperties(cmd_block: bytes) -> int | None:
if use_pbar:
g_progressBarWindow.update(chunk_size)
elapsed_time = round(time.time() - start_time)
g_logger.debug(f'File transfer successfully completed in {tqdm.format_interval(elapsed_time)}!\n')
# Close file handle (if needed).
if not g_nspTransferMode:
file.close()
elapsed_time = time.time() - start_time
g_logger.debug(f'\nFile transfer successfully completed in {elapsed_time:.2f}s!')
# Hide progress bar window (if needed).
if use_pbar and ((not g_nspTransferMode) or (not g_nspRemainingSize)):
g_progressBarWindow.end()
# Close file handle (if needed); log successful non-constitutent transfer.
if not g_nspTransferMode:
file.close()
if not g_extractedFsDumpMode:
if not g_logVerbose:
g_logger.info('\n\tTransfer complete!\n')
utilsLogTransferStats(elapsed_time);
return USB_STATUS_SUCCESS
def usbHandleCancelFileTransfer(cmd_block: bytes) -> int:
#assert g_logger is not None
g_logger.debug(f'Received CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.')
g_logger.debug(f'\nReceived CancelFileTransfer ({USB_CMD_START_SESSION:02X}) command.')
if g_nspTransferMode:
utilsResetNspInfo(True)
g_logger.warning('Transfer cancelled.')
return USB_STATUS_SUCCESS
else:
g_logger.error('Unexpected transfer cancellation.')
g_logger.error('\nUnexpected transfer cancellation.\n')
return USB_STATUS_MALFORMED_CMD
def usbHandleSendNspHeader(cmd_block: bytes) -> int:
@ -1021,27 +1158,33 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int:
nsp_header_size = len(cmd_block)
g_logger.debug(f'Received SendNspHeader ({USB_CMD_SEND_NSP_HEADER:02X}) command.')
g_logger.debug(f'\nReceived SendNspHeader ({USB_CMD_SEND_NSP_HEADER:02X}) command.')
# Validity checks.
if not g_nspTransferMode:
g_logger.error('Received NSP header out of NSP transfer mode!\n')
g_logger.error('\nReceived NSP header out of NSP transfer mode!\n')
return USB_STATUS_MALFORMED_CMD
if g_nspRemainingSize:
g_logger.error(f'Received NSP header before receiving all NSP data! (missing 0x{g_nspRemainingSize:X} byte[s]).\n')
g_logger.error(f'\nReceived NSP header before receiving all NSP data! (missing 0x{g_nspRemainingSize:X} byte[s]).\n')
return USB_STATUS_MALFORMED_CMD
if nsp_header_size != g_nspHeaderSize:
g_logger.error(f'NSP header size mismatch! (0x{nsp_header_size:X} != 0x{g_nspHeaderSize:X}).\n')
g_logger.error(f'\nNSP header size mismatch! (0x{nsp_header_size:X} != 0x{g_nspHeaderSize:X}).\n')
return USB_STATUS_MALFORMED_CMD
# Write NSP header.
g_nspFile.seek(0)
g_nspFile.write(cmd_block)
# Log successful NSP transfer (header distinguishes it from constituent NCA transfers)
g_logger.debug(f'Successfully wrote 0x{nsp_header_size:X}-byte long NSP header to "{g_nspFilePath}".\n')
g_logger.debug(f'Successfully wrote 0x{nsp_header_size:X}-byte long NSP header to "{g_nspFilePath}".')
if not g_logVerbose:
g_logger.info('\n\tTransfer complete!\n')
utilsLogTransferStats(time.time() - g_startTime)
# Disable NSP transfer mode.
utilsResetNspInfo()
@ -1049,31 +1192,49 @@ def usbHandleSendNspHeader(cmd_block: bytes) -> int:
def usbHandleEndSession(cmd_block: bytes) -> int:
#assert g_logger is not None
g_logger.debug(f'Received EndSession ({USB_CMD_END_SESSION:02X}) command.')
g_logger.debug(f'\nReceived EndSession ({USB_CMD_END_SESSION:02X}) command.')
return USB_STATUS_SUCCESS
def usbHandleStartExtractedFsDump(cmd_block: bytes) -> int:
#assert g_logger is not None
g_logger.debug(f'Received StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.')
global g_extractedFsDumpMode, g_formattedFileSize, g_formattedFileUnit, g_fileSizeMiB, g_startTime
g_logger.debug(f'\nReceived StartExtractedFsDump ({USB_CMD_START_EXTRACTED_FS_DUMP:02X}) command.')
if g_nspTransferMode:
g_logger.error('StartExtractedFsDump received mid NSP transfer.')
g_logger.error('\nStartExtractedFsDump received mid NSP transfer.\n')
return USB_STATUS_MALFORMED_CMD
# Parse command block.
(extracted_fs_size, extracted_fs_root_path) = struct.unpack_from(f'<Q{USB_FILE_PROPERTIES_MAX_NAME_LENGTH}s', cmd_block, 0)
extracted_fs_root_path = extracted_fs_root_path.decode('utf-8').strip('\x00')
g_logger.info(f'Starting extracted FS dump (size 0x{extracted_fs_size:X}, output relative path "{extracted_fs_root_path}").')
utilsInitTransferVars(extracted_fs_size)
path_array = extracted_fs_root_path.split('/')
if not g_logVerbose:
g_logger.info(f'\tExtracted FS dump started!')
g_logger.info(f'\nSrc:\t{path_array[4]}')
g_logger.info(f'\t{path_array[5]}, FS section #{path_array[6]}')
g_logger.info(f'Size:\t{g_formattedFileSize:.2f} {g_formattedFileUnit}')
g_logger.info(f'Files:')
else:
g_logger.debug(f'Starting extracted FS dump (size 0x{extracted_fs_size:X}, output relative path "{extracted_fs_root_path}").')
g_extractedFsDumpMode = True
g_startTime = time.time()
# Return status code.
return USB_STATUS_SUCCESS
def usbHandleEndExtractedFsDump(cmd_block: bytes) -> int:
global g_extractedFsDumpMode
#assert g_logger is not None
g_logger.debug(f'Received EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.')
g_logger.info(f'Finished extracted FS dump.')
g_extractedFsDumpMode = False
g_logger.debug(f'\nReceived EndExtractedFsDump ({USB_CMD_END_EXTRACTED_FS_DUMP:02X}) command.')
if not g_logVerbose:
g_logger.info(f'\n\tExtracted FS dump complete!\n')
utilsLogTransferStats(time.time() - g_startTime)
return USB_STATUS_SUCCESS
def usbCommandHandler() -> None:
@ -1110,7 +1271,7 @@ def usbCommandHandler() -> None:
# Read command header.
cmd_header = usbRead(USB_CMD_HEADER_SIZE)
if (not cmd_header) or (len(cmd_header) != USB_CMD_HEADER_SIZE):
g_logger.error(f'Failed to read 0x{USB_CMD_HEADER_SIZE:X}-byte long command header!')
g_logger.error(f'\nFailed to read 0x{USB_CMD_HEADER_SIZE:X}-byte long command header!\n')
break
# Parse command header.
@ -1128,19 +1289,19 @@ def usbCommandHandler() -> None:
cmd_block = usbRead(rd_size, USB_TRANSFER_TIMEOUT)
if (not cmd_block) or (len(cmd_block) != cmd_block_size):
g_logger.error(f'Failed to read 0x{cmd_block_size:X}-byte long command block for command ID {cmd_id:02X}!')
g_logger.error(f'\nFailed to read 0x{cmd_block_size:X}-byte long command block for command ID {cmd_id:02X}!\n')
break
# Verify magic word.
if magic != USB_MAGIC_WORD:
g_logger.error('Received command header with invalid magic word!\n')
g_logger.error('\nReceived command header with invalid magic word!\n')
usbSendStatus(USB_STATUS_INVALID_MAGIC_WORD)
continue
# Get command handler function.
cmd_func = cmd_dict.get(cmd_id, None)
if cmd_func is None:
g_logger.error(f'Received command header with unsupported ID {cmd_id:02X}.\n')
g_logger.error(f'\nReceived command header with unsupported ID {cmd_id:02X}.\n')
usbSendStatus(USB_STATUS_UNSUPPORTED_CMD)
continue
@ -1149,7 +1310,7 @@ def usbCommandHandler() -> None:
(cmd_id == USB_CMD_SEND_FILE_PROPERTIES and cmd_block_size != USB_CMD_BLOCK_SIZE_SEND_FILE_PROPERTIES) or \
(cmd_id == USB_CMD_SEND_NSP_HEADER and not cmd_block_size) or \
(cmd_id == USB_CMD_START_EXTRACTED_FS_DUMP and cmd_block_size != USB_CMD_BLOCK_SIZE_START_EXTRACTED_FS_DUMP):
g_logger.error(f'Invalid command block size for command ID {cmd_id:02X}! (0x{cmd_block_size:X}).\n')
g_logger.error(f'\nInvalid command block size for command ID {cmd_id:02X}! (0x{cmd_block_size:X}).\n')
usbSendStatus(USB_STATUS_MALFORMED_CMD)
continue
@ -1159,7 +1320,7 @@ def usbCommandHandler() -> None:
if (status is None) or (not usbSendStatus(status)) or (cmd_id == USB_CMD_END_SESSION) or (status == USB_STATUS_UNSUPPORTED_ABI_VERSION):
break
g_logger.info('\nStopping server.')
g_logger.info('Stopping server...\n')
if not g_cliMode:
# Update UI.
@ -1170,25 +1331,21 @@ def uiStopServer() -> None:
#assert g_stopEvent is not None
g_stopEvent.set()
# Log the end of the session.
g_logger.info("\nServer stopped.\n")
g_logger.info("Server stopped.\n")
def uiStartServer() -> None:
# Reset output path variable based on possible user input unconditionally.
utilsResetOutputDir()
# Set new log path for this session if logging to file is turned on.
if g_logToFile:
utilsUpdateLogPath()
# Update UI.
uiToggleElements(False)
# Log basic session status and info.
g_logger.info('\nServer started.')
g_logger.info('Output Dir:\t' + g_outputDir)
if g_logToFileBoolVar.get():
g_logger.info('Logging to:\t' + g_logPath.rsplit('\\', 1)[-1] + '\n')
else:
g_logger.info('Logging to file disabled.\n')
# Log basic info about the script and settings.
utilsLogBasicInfo()
# Create background server thread.
server_thread = threading.Thread(target=usbCommandHandler, daemon=True)
server_thread.start()
@ -1225,9 +1382,16 @@ def uiToggleElements(flag: bool) -> None:
g_tkVerboseCheckbox.configure(state='disabled')
def uiChooseDirectory() -> None:
dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=INITIAL_DIR, mustexist=True)
#assert g_tkDirText is not None
dirtext = g_tkDirText.get('1.0', tk.END).strip()
initdir = dirtext if os.path.exists(dirtext) else INITIAL_DIR
dir = filedialog.askdirectory(parent=g_tkRoot, title='Select an output directory', initialdir=initdir, mustexist=True)
if dir:
uiUpdateDirectoryField(os.path.abspath(dir))
uiUpdateOutputDir()
def uiUpdateDirectoryField(path: str) -> None:
#assert g_tkDirText is not None
@ -1236,6 +1400,27 @@ def uiUpdateDirectoryField(path: str) -> None:
g_tkDirText.insert('1.0', path)
g_tkDirText.configure(state='disabled')
def uiUpdateOutputDir() -> None:
global g_outputDir
#assert g_tkDirText is not None
g_outputDir = g_tkDirText.get('1.0', tk.END).strip()
if not g_outputDir:
# We should never reach this, honestly.
messagebox.showerror('Error', 'You must provide an output directory!', parent=g_tkRoot)
return
# Make sure the full directory tree exists.
try:
os.makedirs(g_outputDir, exist_ok=True)
except:
utilsLogException(traceback.format_exc())
messagebox.showerror('Error', 'Unable to create full output directory tree!', parent=g_tkRoot)
return
return
def uiHandleExitProtocol() -> None:
#assert g_tkRoot is not None
g_tkRoot.destroy()
@ -1246,13 +1431,24 @@ def uiHandleExitProtocolStub() -> None:
def uiScaleMeasure(measure: int) -> int:
return round(float(measure) * SCALE)
def uiHandleLogToFileCheckbox() -> None:
#assert g_logToFile is not None
#assert g_logToFileBoolVar is not None
global g_logToFile
g_logToFile = g_logToFileBoolVar.get()
return
def uiHandleVerboseCheckbox() -> None:
#assert g_logger is not None
#assert g_logLevelIntVar is not None
g_logger.setLevel(g_logLevelIntVar.get())
global g_logVerbose
logLevel=g_logLevelIntVar.get()
g_logger.setLevel(logLevel)
g_logVerbose = True if(logLevel == logging.DEBUG) else False
return
def uiInitialize() -> None:
global SCALE, g_logLevelIntVar, g_logToFileBoolVar
global SCALE, g_logLevelIntVar, g_logToFileBoolVar, g_logToFile, g_logVerbose
global g_tkRoot, g_tkCanvas, g_tkDirText, g_tkChooseDirButton, g_tkServerButton, g_tkTipMessage, g_tkScrolledTextLog, g_tkLogToFileCheckbox, g_tkVerboseCheckbox
global g_stopEvent, g_tlb, g_taskbar, g_progressBarWindow
@ -1325,6 +1521,7 @@ def uiInitialize() -> None:
default_font = font.nametofont('TkDefaultFont')
default_font_family = ('Segoe UI' if g_isWindows else 'sans-serif')
default_font_size = (-12 if g_isWindows else -10) # Measured in pixels. Reference: https://docs.python.org/3/library/tkinter.font.html
padding_bottom = WINDOW_HEIGHT + default_font_size - 1
default_font.configure(family=default_font_family, size=uiScaleMeasure(default_font_size), weight=font.NORMAL)
"""print(screen_width_px, screen_height_px)
@ -1341,10 +1538,11 @@ def uiInitialize() -> None:
g_tkCanvas = tk.Canvas(g_tkRoot, width=window_width_px, height=window_height_px)
g_tkCanvas.pack()
g_tkCanvas.create_text(uiScaleMeasure(60), uiScaleMeasure(30), text='Output Dir:', anchor=tk.CENTER)
g_tkCanvas.create_text(uiScaleMeasure(60), uiScaleMeasure(30), text='Output directory:', anchor=tk.CENTER)
g_tkDirText = tk.Text(g_tkRoot, height=1, width=45, font=default_font, wrap='none', state='disabled', bg='#F0F0F0')
uiUpdateDirectoryField(g_outputDir)
g_tkCanvas.create_window(uiScaleMeasure(260), uiScaleMeasure(30), window=g_tkDirText, anchor=tk.CENTER)
g_tkChooseDirButton = tk.Button(g_tkRoot, text='Choose', width=10, command=uiChooseDirectory)
@ -1364,18 +1562,19 @@ def uiInitialize() -> None:
g_tkScrolledTextLog.tag_config('CRITICAL', foreground='red', underline=True)
g_tkCanvas.create_window(uiScaleMeasure(int(WINDOW_WIDTH / 2)), uiScaleMeasure(280), window=g_tkScrolledTextLog, anchor=tk.CENTER)
g_tkCanvas.create_text(uiScaleMeasure(5), uiScaleMeasure(WINDOW_HEIGHT - 13), text=COPYRIGHT_TEXT, anchor=tk.W)
g_tkCanvas.create_text(uiScaleMeasure(5), uiScaleMeasure(padding_bottom), text=COPYRIGHT_TEXT, anchor=tk.W)
g_logToFileBoolVar = tk.BooleanVar()
g_tkLogToFileCheckbox = tk.Checkbutton(g_tkRoot, text='Log to file', variable=g_logToFileBoolVar,onvalue=True, offvalue=False)
g_tkLogToFileCheckbox = tk.Checkbutton(g_tkRoot, text='Log to file', variable=g_logToFileBoolVar,onvalue=True, offvalue=False, command=uiHandleLogToFileCheckbox)
g_tkLogToFileCheckbox.select()
g_logToFile = g_logToFileBoolVar.get()
g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 165), uiScaleMeasure(WINDOW_HEIGHT - 13), window=g_tkLogToFileCheckbox, anchor=tk.CENTER)
g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 165), uiScaleMeasure(padding_bottom), window=g_tkLogToFileCheckbox, anchor=tk.CENTER)
g_logLevelIntVar = tk.IntVar()
g_tkVerboseCheckbox = tk.Checkbutton(g_tkRoot, text='Verbose output', variable=g_logLevelIntVar, onvalue=logging.DEBUG, offvalue=logging.INFO, command=uiHandleVerboseCheckbox)
g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 55), uiScaleMeasure(WINDOW_HEIGHT - 13), window=g_tkVerboseCheckbox, anchor=tk.CENTER)
g_tkCanvas.create_window(uiScaleMeasure(WINDOW_WIDTH - 55), uiScaleMeasure(padding_bottom), window=g_tkVerboseCheckbox, anchor=tk.CENTER)
# Initialize console logger.
console = LogConsole(g_tkScrolledTextLog)
@ -1392,37 +1591,47 @@ def cliInitialize() -> None:
global g_progressBarWindow
#assert g_logger is not None
# determines whether to use colors in terminal and sets up accordingly
utilsSetupTerminal()
# Set log path if logging to file specified at cmd line
if g_logToFile:
utilsUpdateLogPath()
# Initialize console logger.
console = LogConsole()
# Initialize progress bar window object.
bar_format = '{percentage:.2f}% |{bar}| {n:.2f}/{total:.2f} [{elapsed}<{remaining}, {rate_fmt}]'
g_progressBarWindow = ProgressBarWindow(bar_format)
# Print info.
g_logger.info('\n' + SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.')
g_logger.info('Output Dir: "' + g_outputDir + '".\n')
# Log basic info about the script and settings.
utilsLogBasicInfo()
# Start USB command handler directly.
usbCommandHandler()
def main() -> int:
global g_cliMode, g_outputDir, g_osType, g_osVersion, g_isWindows, g_isWindowsVista, g_isWindows7, g_logger
global g_cliMode, g_outputDir, g_logToFile, g_logVerbose, g_osType, g_osVersion, g_isWindows, g_isWindowsVista, g_isWindows7, g_isWindows10, g_pathSep, g_logger
# Disable warnings.
warnings.filterwarnings("ignore")
# Parse command line arguments.
parser = ArgumentParser(description=SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.')
parser = ArgumentParser(formatter_class=RawTextHelpFormatter, description=SCRIPT_TITLE + '. ' + COPYRIGHT_TEXT + '.')
parser.add_argument('-c', '--cli', required=False, action='store_true', default=False, help='Start the script in CLI mode.')
parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help='Path to output directory. Defaults to "' + DEFAULT_DIR + '".')
parser.add_argument('-o', '--outdir', required=False, type=str, metavar='DIR', help='Path to output directory; will attempt to create if non-existent.'+\
'\nDefaults to "' + DEFAULT_DIR + '".')
parser.add_argument('-l', '--log', required=False, action='store_true', default=False, help='Enables logging to file in output directory in CLI mode.')
parser.add_argument('-v', '--verbose', required=False, action='store_true', default=False, help='Enable verbose output.')
args = parser.parse_args()
# Update global flags.
g_cliMode = args.cli
g_outputDir = utilsGetPath(args.outdir, DEFAULT_DIR, False, True)
g_logToFile = args.log
g_logVerbose = args.verbose
# Get OS information.
g_osType = platform.system()
@ -1430,23 +1639,27 @@ def main() -> int:
# Get Windows information.
g_isWindows = (g_osType == 'Windows')
g_isWindowsVista = g_isWindows7 = False
g_isWindowsVista = g_isWindows7 = g_isWindows10 = False
if g_isWindows:
win_ver = g_osVersion.split('.')
win_ver_major = int(win_ver[0])
win_ver_minor = int(win_ver[1])
win_build = int(win_ver[2])
g_isWindowsVista = (win_ver_major >= 6)
g_isWindows7 = (True if (win_ver_major > 6) else (win_ver_major == 6 and win_ver_minor > 0))
# ANSI colors in cmd.exe min. build
# ref: https://github.com/dart-lang/sdk/issues/28614#issuecomment-287282970
g_isWindows10 = (win_ver_major >= 10 and win_build >= 10586)
g_pathSep = os.path.sep if not g_isWindows else '\\'
# Setup logging mechanism.
logging.basicConfig(level=(logging.DEBUG if args.verbose else logging.INFO))
logging.basicConfig(level=(logging.DEBUG if g_logVerbose else logging.INFO))
g_logger = logging.getLogger()
if len(g_logger.handlers):
# Remove stderr output handler from logger. We'll control standard output on our own.
log_stderr = g_logger.handlers[0]
g_logger.removeHandler(log_stderr)
if g_cliMode:
# Initialize CLI.
cliInitialize()
@ -1463,7 +1676,8 @@ if __name__ == "__main__":
ret = main()
except KeyboardInterrupt:
time.sleep(0.2)
print("\nHost script interrupted!")
g_logger.info("Host script exited!")
if g_isWindows10: print(COLOR_RESET)
except Exception as e:
utilsLogException(traceback.format_exc())