Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
Rocky14683 committed Jul 26, 2024
1 parent 0eb6f90 commit 980e233
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions pros/serial/ports/ble_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def find_device(scan_time: time, max_devices_count: int) -> list[ble.Peripheral]

if len(adapters) == 0:
print("No adapters found")
return None
pass
adapter = adapters[0]

def device_found_or_updated_callback(peripheral: ble.Peripheral):
Expand Down Expand Up @@ -64,14 +64,20 @@ def device_found_or_updated_callback(peripheral: ble.Peripheral):


class BlePort(BasePort):
@property
def name(self):
return self._name

def __init__(self, name: str, peripheral: ble.Peripheral):
self.name = name
self.peripheral = peripheral

self.pairing()
self.buffer: bytearray = bytearray()
self.pairing()

self.peripheral.notify(V5_SERVICE, CHARACTERISTIC_SYSTEM_TX, self.on_notify)
try:
self.peripheral.notify(V5_SERVICE, CHARACTERISTIC_SYSTEM_TX, self.on_notify)
except Exception as e:
pass

@staticmethod
def create_ble_port(peripheral: ble.Peripheral):
Expand All @@ -83,8 +89,6 @@ def create_ble_port(peripheral: ble.Peripheral):

if not peripheral.is_connected():
peripheral.connect()
else:
raise ConnectionRefusedException(peripheral.identifier(), Exception("Peripheral is already connected"))

print(f"{len(peripheral.services())}")
for service in peripheral.services():
Expand Down Expand Up @@ -118,25 +122,30 @@ def pairing(self):

magic = self.peripheral.read(V5_SERVICE, CHARACTERISTIC_PAIRING)
if int.from_bytes(magic, "big") != UNPAIRED_MAGIC:
logger(__name__).warning("Unable to pair with peripheral")
exit()
logger(__name__).info("Already paired")
print("Already paired")
pass
else:
# request showing pairing code on the brain screen
self.peripheral.write_request(V5_SERVICE, CHARACTERISTIC_PAIRING,
bytes([0xff, 0xff, 0xff, 0xff]))

# Request pairing code from user
code: int = click.prompt("Enter the pairing code(4 digits)", type=int)
print(code)
pairing_code: bytes = code.to_bytes(4, "big")
code: str = click.prompt("Enter the pairing code(4 digits)", type=str)
# print(code)
pairing_code: bytes = bytes(int(c) for c in code)
# print(f"pairing bytes: {pairing_code.__str__()}")
self.peripheral.write_request(V5_SERVICE, CHARACTERISTIC_PAIRING, pairing_code)

# Wait for peripheral to send acknowledgement
read_ack = bytes([])
while read_ack != pairing_code:
read_ack = self.peripheral.read(V5_SERVICE, CHARACTERISTIC_PAIRING)
# print(read_ack.__str__())
# print("Pairing failed")
# logger(__name__).warning("Pairing failed")
# exit()

print("Pairing successful")
logger(__name__).info("Pairing successful")

Expand Down Expand Up @@ -172,10 +181,6 @@ def destroy(self):
self.peripheral.disconnect()
pass

@property
def name(self) -> str:
return self.name

def __str__(self):
return str("Bluetooth Port")

Expand All @@ -187,5 +192,7 @@ def name(self, value):
if __name__ == "__main__":
# test this first
p_list = find_device(5000, 2)
BlePort.create_ble_port(p_list[0])
ble = BlePort.create_ble_port(p_list[0])
time.sleep(5)
ble.destroy()
pass

0 comments on commit 980e233

Please sign in to comment.