From 980e23308c8abe8f6dfbc03ce010e0b266b203bf Mon Sep 17 00:00:00 2001 From: rocky14683 Date: Fri, 26 Jul 2024 18:39:35 +0800 Subject: [PATCH] debug --- pros/serial/ports/ble_port.py | 39 +++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/pros/serial/ports/ble_port.py b/pros/serial/ports/ble_port.py index 04093410..0ab3a7ce 100644 --- a/pros/serial/ports/ble_port.py +++ b/pros/serial/ports/ble_port.py @@ -31,7 +31,7 @@ def find_device(scan_time: time, max_devices_count: int) -> list[ble.Peripheral] if len(adapters) == 0: print("No adapters found") - return None + pass adapter = adapters[0] def device_found_or_updated_callback(peripheral: ble.Peripheral): @@ -64,14 +64,20 @@ def device_found_or_updated_callback(peripheral: ble.Peripheral): class BlePort(BasePort): + @property + def name(self): + return self._name + def __init__(self, name: str, peripheral: ble.Peripheral): self.name = name self.peripheral = peripheral - - self.pairing() self.buffer: bytearray = bytearray() + self.pairing() - self.peripheral.notify(V5_SERVICE, CHARACTERISTIC_SYSTEM_TX, self.on_notify) + try: + self.peripheral.notify(V5_SERVICE, CHARACTERISTIC_SYSTEM_TX, self.on_notify) + except Exception as e: + pass @staticmethod def create_ble_port(peripheral: ble.Peripheral): @@ -83,8 +89,6 @@ def create_ble_port(peripheral: ble.Peripheral): if not peripheral.is_connected(): peripheral.connect() - else: - raise ConnectionRefusedException(peripheral.identifier(), Exception("Peripheral is already connected")) print(f"{len(peripheral.services())}") for service in peripheral.services(): @@ -118,25 +122,30 @@ def pairing(self): magic = self.peripheral.read(V5_SERVICE, CHARACTERISTIC_PAIRING) if int.from_bytes(magic, "big") != UNPAIRED_MAGIC: - logger(__name__).warning("Unable to pair with peripheral") - exit() + logger(__name__).info("Already paired") + print("Already paired") + pass else: + # request showing pairing code on the brain screen self.peripheral.write_request(V5_SERVICE, CHARACTERISTIC_PAIRING, bytes([0xff, 0xff, 0xff, 0xff])) # Request pairing code from user - code: int = click.prompt("Enter the pairing code(4 digits)", type=int) - print(code) - pairing_code: bytes = code.to_bytes(4, "big") + code: str = click.prompt("Enter the pairing code(4 digits)", type=str) + # print(code) + pairing_code: bytes = bytes(int(c) for c in code) + # print(f"pairing bytes: {pairing_code.__str__()}") self.peripheral.write_request(V5_SERVICE, CHARACTERISTIC_PAIRING, pairing_code) # Wait for peripheral to send acknowledgement read_ack = bytes([]) while read_ack != pairing_code: read_ack = self.peripheral.read(V5_SERVICE, CHARACTERISTIC_PAIRING) + # print(read_ack.__str__()) # print("Pairing failed") # logger(__name__).warning("Pairing failed") # exit() + print("Pairing successful") logger(__name__).info("Pairing successful") @@ -172,10 +181,6 @@ def destroy(self): self.peripheral.disconnect() pass - @property - def name(self) -> str: - return self.name - def __str__(self): return str("Bluetooth Port") @@ -187,5 +192,7 @@ def name(self, value): if __name__ == "__main__": # test this first p_list = find_device(5000, 2) - BlePort.create_ble_port(p_list[0]) + ble = BlePort.create_ble_port(p_list[0]) + time.sleep(5) + ble.destroy() pass