diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e77c69b --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +venv +build +dbibackend.egg-info +*__pycache__* \ No newline at end of file diff --git a/README.md b/README.md index ac31484..99e4c3e 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,8 @@ -DBI backend -=========== +# DBI backend PC-side server for games installation into Nintendo Switch -Requirements ------------- +## Requirements Host: - libusb @@ -14,8 +12,12 @@ Host: Nintendo Switch: - DBI v202+ -Usage ------ -* Run server `python3 dbibackend.py tites_dir_path` -* Run DBI and then Installation titles from USB -* Install files +## Usage + +1. Git clone this repository `git clone git@github.com:lunixoid/dbibackend.git` +1. `cd dbibackend` +1. Run `sudo bash` to start a bash shell in case you are using zsh or fish. + You need root privileges to access the USB device. +1. Run `. ./env.sh` to set up the environment. + This will set up the python environment and its dependencies. +1. Run `dbi -d ` diff --git a/dbibackend/dbi.py b/dbibackend/dbi.py new file mode 100644 index 0000000..3fb095c --- /dev/null +++ b/dbibackend/dbi.py @@ -0,0 +1,210 @@ +import usb.core +import usb.util +import struct +import sys +import time +import argparse +import logging +import os +from enum import IntEnum +from collections import OrderedDict +from pathlib import Path + + +log = logging.getLogger(__name__) +log.addHandler(logging.StreamHandler(sys.stdout)) +log.setLevel(logging.INFO) + +BUFFER_SEGMENT_DATA_SIZE = 0x100000 + + +class CommandID(IntEnum): + EXIT = 0 + LIST_DEPRECATED = 1 + FILE_RANGE = 2 + LIST = 3 + + +class CommandType(IntEnum): + REQUEST = 0 + RESPONSE = 1 + ACK = 2 + + +class UsbContext: + def __init__(self, vid: hex, pid: hex): + dev = usb.core.find(idVendor=vid, idProduct=pid) + if dev is None: + raise ConnectionError(f'Device {vid}:{pid} not found') + + dev.reset() + dev.set_configuration() + cfg = dev.get_active_configuration() + + self._out = usb.util.find_descriptor( + cfg[(0, 0)], + custom_match=lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_OUT + ) + self._in = usb.util.find_descriptor( + cfg[(0, 0)], + custom_match=lambda ep: usb.util.endpoint_direction(ep.bEndpointAddress) == usb.util.ENDPOINT_IN + ) + + if self._out is None: + raise LookupError(f'Device {vid}:{pid} output endpoint not found') + if self._in is None: + raise LookupError(f'Device {vid}:{pid} input endpoint not found') + + def read(self, data_size, timeout=0): + return self._in.read(data_size, timeout=timeout) + + def write(self, data, timeout=0): + self._out.write(data, timeout=timeout) + + +def process_file_range_command(data_size, context, cache=None): + log.info('File range') + context.write(struct.pack('<4sIII', b'DBI0', CommandType.ACK, CommandID.FILE_RANGE, data_size)) + file_range_header = context.read(data_size) + range_size = struct.unpack(' 0: + if nsp_name in cache: + nsp_name = cache[nsp_name] + + log.info(f'Range Size: {range_size}, Range Offset: {range_offset}, Name len: {nsp_name_len}, Name: {nsp_name}') + + response_bytes = struct.pack('<4sIII', b'DBI0', CommandType.RESPONSE, CommandID.FILE_RANGE, range_size) + context.write(response_bytes) + + ack = bytes(context.read(16, timeout=0)) + cmd_type = struct.unpack(' 0: + + chunk_size = min(BUFFER_SEGMENT_DATA_SIZE, remaining_size) + buf = f.read(chunk_size) + + if not buf: + log.warning("Unexpected EOF reached before reading complete range.") + break + + context.write(data=buf, timeout=0) + curr_off += chunk_size + remaining_size -= chunk_size + + +def process_exit_command(context): + log.info('Exit') + context.write(struct.pack('<4sIII', b'DBI0', CommandType.RESPONSE, CommandID.EXIT, 0)) + sys.exit(0) + + +def process_list_command(context, work_dir_path): + log.info('Get list') + + cached_roms = OrderedDict() + for dirName, subdirList, fileList in os.walk(work_dir_path): + log.debug(f'Found directory: {dirName}') + for filename in fileList: + if filename.lower().endswith('.nsp') or filename.lower().endswith('nsz') or filename.lower().endswith('.xci'): + log.debug(f'\t{filename}') + cached_roms[f'{filename}'] = str(Path(dirName).joinpath(filename)) + + nsp_path_list = '' + for rom in cached_roms.keys(): + nsp_path_list += f'{rom}\n' + nsp_path_list_bytes = nsp_path_list.encode('utf-8') + nsp_path_list_len = len(nsp_path_list_bytes) + + context.write(struct.pack('<4sIII', b'DBI0', CommandType.RESPONSE, CommandID.LIST, nsp_path_list_len)) + + ack = bytes(context.read(16, timeout=0)) + cmd_type = struct.unpack('