Skip to content

Commit

Permalink
WIP: Add USB device support
Browse files Browse the repository at this point in the history
TODO
  • Loading branch information
nkarstens committed Nov 15, 2023
1 parent 8c85931 commit 99bae21
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 7 deletions.
12 changes: 9 additions & 3 deletions pybricksdev/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,11 @@ def add_parser(self, subparsers: argparse._SubParsersAction):
)

async def run(self, args: argparse.Namespace):
from ..ble import find_device
from ..ble import find_device as find_ble
from ..connections.ev3dev import EV3Connection
from ..connections.lego import REPLHub
from ..connections.pybricks import PybricksHub
from usb.core import find as find_usb

# Pick the right connection
if args.conntype == "ssh":
Expand All @@ -188,11 +189,16 @@ async def run(self, args: argparse.Namespace):
elif args.conntype == "ble":
# It is a Pybricks Hub with BLE. Device name or address is given.
print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
device_or_address = await find_device(args.name)
device_or_address = await find_ble(args.name)
hub = PybricksHub(device_or_address)

elif args.conntype == "usb":
hub = REPLHub()
device_or_address = find_usb(idVendor=0x0483, idProduct=0x5740)

if device_or_address is not None and device_or_address.product == "Pybricks Hub":
hub = PybricksHub(device_or_address)
else:
hub = REPLHub()
else:
raise ValueError(f"Unknown connection type: {args.conntype}")

Expand Down
91 changes: 87 additions & 4 deletions pybricksdev/connections/pybricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from reactivex.subject import BehaviorSubject, Subject
from tqdm.auto import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from typing import Callable
from typing import Callable, Union
from uuid import UUID
from usb.core import Device as USBDevice
from usb.control import get_descriptor
from usb.util import *

from ..ble.lwp3.bytecodes import HubKind
from ..ble.nus import NUS_RX_UUID, NUS_TX_UUID
Expand Down Expand Up @@ -75,6 +79,79 @@ async def write_gatt_char(self, uuid: str, data, response: bool) -> None:
async def start_notify(self, uuid: str, callback: Callable) -> None:
return await self._client.start_notify(uuid, callback)

class PybricksHubUSBClient:
_device: USBDevice
_disconnected_callback: Callable = None
_fw_version: str
_protocol_version: str
_hub_capabilities: bytearray

def __init__(self, device: USBDevice, disconnected_callback = None):
self._device = device
self._disconnected_callback = disconnected_callback

async def connect(self) -> bool:
self._device.set_configuration()

# Get length of BOS descriptor
bos_descriptor = get_descriptor(self._device, 5, 0x0F, 0)
(ofst, _, bos_len, _) = struct.unpack("<BBHB", bos_descriptor)

# Get full BOS descriptor
bos_descriptor = get_descriptor(self._device, bos_len, 0x0F, 0)

while ofst < bos_len:
(len, desc_type, cap_type) = struct.unpack_from("<BBB", bos_descriptor, offset=ofst)

if desc_type != 0x10:
logger.error("Expected Device Capability descriptor")
exit(1)

# Look for platform descriptors
if cap_type == 0x05:
uuid_bytes = bos_descriptor[ofst + 4 : ofst + 4 + 16]
uuid_str = str(UUID(bytes_le = bytes(uuid_bytes)))

if uuid_str == FW_REV_UUID:
self._fw_version = bytearray(bos_descriptor[ofst + 20 : ofst + len - 1]) # Remove null-terminator
elif uuid_str == SW_REV_UUID:
self._protocol_version = bytearray(bos_descriptor[ofst + 20 : ofst + len - 1]) # Remove null-terminator
elif uuid_str == PYBRICKS_HUB_CAPABILITIES_UUID:
self._hub_capabilities = bytearray(bos_descriptor[ofst + 20 : ofst + len])

ofst += len

return True

async def disconnect(self) -> bool:
self._disconnected_callback()
return True

async def read_gatt_char(self, uuid: str) -> bytearray:
if uuid == FW_REV_UUID:
return self._fw_version
elif uuid == SW_REV_UUID:
return self._protocol_version
elif uuid == PYBRICKS_HUB_CAPABILITIES_UUID:
return self._hub_capabilities
elif uuid == PNP_ID_UUID:
return None

async def write_gatt_char(self, uuid: str, data, response: bool) -> None:
# Get output endpoint
cfg = self._device.get_active_configuration()
intf = cfg[(0,0)]
ep = find_descriptor(intf, custom_match = lambda e: endpoint_direction(e.bEndpointAddress) == ENDPOINT_OUT)

uuid_bytes = UUID(uuid).bytes_le
ep.write(uuid_bytes + data)

# TODO: Handle response

async def start_notify(self, uuid: str, callback: Callable) -> None:
# TODO
return

class PybricksHub:
EOL = b"\r\n" # MicroPython EOL

Expand Down Expand Up @@ -109,7 +186,7 @@ class PybricksHub:
has not been connected yet or the connected hub has Pybricks profile < v1.2.0.
"""

def __init__(self, device: BLEDevice):
def __init__(self, device: Union[BLEDevice, USBDevice]):
self.connection_state_observable = BehaviorSubject(ConnectionState.DISCONNECTED)
self.status_observable = BehaviorSubject(StatusFlag(0))
self._stdout_subject = Subject()
Expand Down Expand Up @@ -155,7 +232,12 @@ def handle_disconnect():
logger.info("Disconnected!")
self.connection_state_observable.on_next(ConnectionState.DISCONNECTED)

self.client = PybricksHubBLEClient(device, disconnected_callback=handle_disconnect)
if isinstance(device, BLEDevice):
self.client = PybricksHubBLEClient(device, disconnected_callback=handle_disconnect)
elif isinstance(device, USBDevice):
self.client = PybricksHubUSBClient(device, disconnected_callback=handle_disconnect)
else:
raise TypeError

@property
def stdout_observable(self) -> Observable[bytes]:
Expand Down Expand Up @@ -308,7 +390,8 @@ async def connect(self):
)

pnp_id = await self.client.read_gatt_char(PNP_ID_UUID)
_, _, self.hub_kind, self.hub_variant = unpack_pnp_id(pnp_id)
if pnp_id:
_, _, self.hub_kind, self.hub_variant = unpack_pnp_id(pnp_id)

if protocol_version >= "1.2.0":
caps = await self.client.read_gatt_char(PYBRICKS_HUB_CAPABILITIES_UUID)
Expand Down

0 comments on commit 99bae21

Please sign in to comment.