diff --git a/README.mkd b/README.mkd index ff659499..d48cfd96 100644 --- a/README.mkd +++ b/README.mkd @@ -6,70 +6,23 @@ behavior. pygatt provides a Pythonic API by wrapping two different backends: -* BlueZ's `gatttool` command-line utility. -* Bluegiga's BGAPI, compatble with dongles like the BLED112. - -Requires Python 2.7. +* BlueZ (requires Linux), using the `gatttool` command-line utility. +* Bluegiga's BGAPI, compatible with USB adapters like the BLED112. ## Motivation -Despite the popularilty of BLE, we have yet to find a good programming interface +Despite the popularity of BLE, we have yet to find a good programming interface for it on desktop computers. Since most peripherals are designed to work with smartphones, this space is neglected. One interactive interface, BlueZ's `gatttool`, is functional but difficult to use programatically. BlueZ itself obviously works, but the interface leaves something to be desired only works in Linux. -pygatt consists of a front end that provies an API and two interchangable -backends that implement the Bluetooth communication using differently. The -backend can either use gatttool/BlueZ or a Bluegiga BGAPI compatible inteface. -gatttool/BlueZ is Linux only whereas the BGAPI is cross platform. - -### Front end - -The front end class pygatt.pygatt.BluetoothLEDevice represents a remote BLE -device. The API provides the methods -- connect, char_read, char_write, -subscribe -- that you need to interact with the device. When a BluetoothLEDevice -object is created, there is an optional argument "backend" that allows for -selection of the backend used. The default is gattool but the BGAPI can be -used by setting the optional bled112 arguement to an instance of a -BGAPIBackend. Note that there are optional arguments for some of the methods -of BluetoothLEDevice that must be specified when using one backend and not the -other. - -### BGAPI Backend - -This backend uses a minimalist implementation of Bluegiga's BGAPI to execute the -Bluetooth communication. In this case, the class used by BluetoothLEDevice is -pygatt.backends.BGAPIBackend. - -BGAPIBackend in turn uses pygatt.backends.bgapi.bglib to communicate with the -BLED112 dongle. BGLib's job is to construct comands for the dongle and parse the -bytes received from the dongle into packets. BGAPIBackend's job is to manage -the timing of commands, handling of the data, and keep track of the state of the -dongle and connection to the remote device. - -#### Dependencies - -* The BGAPI backend should work on Linux, Windows, and Mac OS and has no other - external dependencies. +## Requirements -### GATTTool Backend - -This backend uses gatttool/BlueZ on Linux to execute the Bluetooth -communication. In this case, the class used by BluetoothLEDevice is -pygatt.gatttool_classes.GATTToolBackend. GATTToolBackend uses the python module -pexpect to execute gatttool commannds as if a user were entering them on the -comand line. - -#### Dependencies - -* Currently the gatttool backend is currently only tested under Linux as it - requires `gatttool` which is included with BlueZ which is a Linux library. -* BlueZ >= 5.5 - * Tested on 5.18 and 5.21 - * Ubuntu is stuck on BlueZ 4.x and does not work - you need to build BlueZ - from source. +* Python 2.7 +* BlueZ 5.5 or greater (with gatttool) - required for the gatttool backend only. + * Tested on 5.18, 5.21 and 5.35. ## Installation @@ -83,6 +36,27 @@ backend, install the optional dependencies with: $ pip install pygatt[GATTTOOL] +## Example Use + +The primary API for users of this library is provided by +`pygatt.backends.BLEBackend` and `pygatt.BLEDevice`. After initializing an +instance of the preferred backend (available implementations are found in +`pygatt.backends`, use the `BLEBackend.connect` method to connect to a device +and get an instance of `BLEDevice.` + +```python +import pygatt.backends + +# The BGAPI backend will attemt to auto-discover the serial device name of the +# attached BGAPI-compatible USB adapter. +adapter = pygatt.backends.BGAPIBackend() +device = adapter.connect('01:23:45:67:89:ab') +value = device.char_read("a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b") +``` + +Note that not all backends support connecting to more than 1 device at at time, +so calling `BLEBackend.connect` again may terminate existing connections. + ## Authors - Jeff Rowberg @jrowberg https://github.com/jrowberg/bglib diff --git a/pygatt/__init__.py b/pygatt/__init__.py index ae591a37..82c285f3 100644 --- a/pygatt/__init__.py +++ b/pygatt/__init__.py @@ -1,2 +1,3 @@ -from .exceptions import BluetoothLEError # noqa -from .classes import BluetoothLEDevice # noqa +from .exceptions import BLEError # noqa +from .device import BLEDevice # noqa +from .backends import BGAPIBackend, GATTToolBackend # noqa diff --git a/pygatt/backends/__init__.py b/pygatt/backends/__init__.py index b196b5f6..3898d4ff 100644 --- a/pygatt/backends/__init__.py +++ b/pygatt/backends/__init__.py @@ -1,2 +1,3 @@ +from .backend import BLEBackend, Characteristic # noqa from .bgapi.bgapi import BGAPIBackend # noqa from .gatttool.gatttool import GATTToolBackend # noqa diff --git a/pygatt/backends/backend.py b/pygatt/backends/backend.py index 2733d95b..0d4e6284 100644 --- a/pygatt/backends/backend.py +++ b/pygatt/backends/backend.py @@ -1,18 +1,25 @@ -import threading import logging -from collections import defaultdict -from binascii import hexlify - log = logging.getLogger(__name__) +DEFAULT_CONNECT_TIMEOUT_S = 5.0 + class BLEBackend(object): - """Abstract base class representing a Bluetooth adapter backend. """ + """Abstract base class representing a Bluetooth adapter backend. See the + `pygatt.backends` module for available implementations. + """ + + def start(self): + """Initialize and resource required to run the backend, e.g. background + threads, USB device connections, etc. + """ + raise NotImplementedError() - def __init__(self): - self._callbacks = defaultdict(set) - self._subscribed_handlers = {} + def stop(self): + """Stop and free any resources required while the backend is running. + """ + raise NotImplementedError() def supports_unbonded(self): """Return True if the backend supports unbonded communication - this is @@ -21,103 +28,18 @@ def supports_unbonded(self): """ return True - def bond(self): - raise NotImplementedError() - - def connect(self, address, **kwargs): - raise NotImplementedError() - - def char_read_uuid(self, uuid): - raise NotImplementedError() - - def char_write(self, handle, value, wait_for_response=False): - raise NotImplementedError() - - def char_write_uuid(self, uuid, value, wait_for_response=False): - log.info("char_write %s", uuid) - handle = self.get_handle(uuid) - self.char_write(handle, value, wait_for_response=wait_for_response) - - def encrypt(self): - raise NotImplementedError() - - def get_rssi(self): - raise NotImplementedError() - - def start(self): - self._lock = threading.Lock() - - def stop(self): - raise NotImplementedError() - - def disconnect(self): + def connect(self, address, timeout=DEFAULT_CONNECT_TIMEOUT_S, **kwargs): + """Return a BLEDevice for the connection if connected, otherwise raise + an exception. + """ raise NotImplementedError() - def subscribe(self, uuid, callback=None, indication=False): + def scan(self, *args, **kwargs): """ - Enables subscription to a Characteristic with ability to call callback. + Performs a BLE scan. - uuid -- UUID as a string of the characteristic to subscribe. - callback -- function to be called when a notification/indication is - received on this characteristic. - indication -- use indications (requires application ACK) rather than - notifications (does not requrie application ACK). - """ - log.info( - 'Subscribing to uuid=%s with callback=%s and indication=%s', - uuid, callback, indication) - # Expect notifications on the value handle... - value_handle = self.get_handle(uuid) - - # but write to the characteristic config to enable notifications - # TODO with the BGAPI backend we can be smarter and fetch the actual - # characteristic config handle - we can also do that with gattool if we - # use the 'desc' command, so we'll need to change the "get_handle" API - # to be able to get the value or characteristic config handle. - characteristic_config_handle = value_handle + 1 - - properties = bytearray([ - 0x2 if indication else 0x1, - 0x0 - ]) - - try: - self._lock.acquire() - - if callback is not None: - self._callbacks[value_handle].add(callback) - - if self._subscribed_handlers.get(value_handle, None) != properties: - self.char_write( - characteristic_config_handle, - properties, - wait_for_response=False - ) - log.debug("Subscribed to uuid=%s", uuid) - self._subscribed_handlers[value_handle] = properties - else: - log.debug("Already subscribed to uuid=%s", uuid) - finally: - self._lock.release() - - def _handle_notification(self, handle, value): - """ - Receive a notification from the connected device and propagate the value - to all registered callbacks. + Returns a list of BLE devices found. """ - - log.info('Received notification on handle=0x%x, value=0x%s', - handle, hexlify(value)) - try: - self._lock.acquire() - - if handle in self._callbacks: - for callback in self._callbacks[handle]: - callback(handle, value) - finally: - self._lock.release() - - def get_handle(self, characteristic_uuid, descriptor_uuid=None): raise NotImplementedError() def filtered_scan(self, name_filter="", *args, **kwargs): @@ -131,10 +53,30 @@ def filtered_scan(self, name_filter="", *args, **kwargs): return [device for device in devices if name_filter in (device['name'] or '')] - def scan(self, *args, **kwargs): + def clear_bond(self, address=None): + raise NotImplementedError() + + +class Characteristic(object): + """ + A GATT characteristic, including it handle value and associated descriptors. + Only valid for the lifespan of a BLE connection, since the handle values are + dynamic. + """ + def __init__(self, uuid, handle): """ - Performs a BLE scan. + Sets the characteritic uuid and handle. - Returns a list of BLE devices found. + handle - a bytearray """ - raise NotImplementedError() + self.uuid = uuid + self.handle = handle + self.descriptors = { + # uuid_string: handle + } + + def add_descriptor(self, uuid, handle): + """ + Add a characteristic descriptor to the dictionary of descriptors. + """ + self.descriptors[uuid] = handle diff --git a/pygatt/backends/bgapi/bgapi.py b/pygatt/backends/bgapi/bgapi.py index a672ccb2..c76ca88c 100644 --- a/pygatt/backends/bgapi/bgapi.py +++ b/pygatt/backends/bgapi/bgapi.py @@ -6,16 +6,20 @@ import time import threading from binascii import hexlify +from uuid import UUID from enum import Enum +from collections import defaultdict -from pygatt.exceptions import BluetoothLEError, NotConnectedError -from pygatt.backends.backend import BLEBackend +from pygatt.exceptions import NotConnectedError +from pygatt.backends import BLEBackend, Characteristic +from pygatt.util import uuid16_to_uuid from . import bglib, constants -from .util import uuid_to_bytearray +from .exceptions import BGAPIError, ExpectedResponseTimeout +from .device import BGAPIBLEDevice from .bglib import EventPacketType, ResponsePacketType from .packets import BGAPICommandPacketBuilder as CommandBuilder -from .error_codes import get_return_message, ErrorCode +from .error_codes import get_return_message from .util import find_usb_serial_devices log = logging.getLogger(__name__) @@ -28,39 +32,6 @@ 'descriptor', 'characteristic']) -class BGAPIError(BluetoothLEError): - pass - - -class ExpectedResponseTimeout(BGAPIError): - def __init__(self, expected_packets, timeout): - super(ExpectedResponseTimeout, self).__init__( - "Timed out after %fs waiting for %s" % ( - timeout or 0, expected_packets)) - - -class Characteristic(object): - """ - GATT characteristic. For internal use within BGAPIBackend. - """ - def __init__(self, name, handle): - """ - Sets the characteritic name and handle. - - handle - a bytearray - """ - self.handle = handle - self.descriptors = { - # uuid_string: handle - } - - def add_descriptor(self, uuid, handle): - """ - Add a characteristic descriptor to the dictionary of descriptors. - """ - self.descriptors[uuid] = handle - - class AdvertisingAndScanInfo(object): """ Holds the advertising and scan response packet data from a device at a given @@ -77,23 +48,16 @@ def __init__(self): class BGAPIBackend(BLEBackend): """ - Pygatt BLE device backend using a Bluegiga BGAPI compatible dongle. - - Only supports 1 device connection at a time. - - This object is NOT threadsafe. + A BLE backend for a BGAPI compatible USB adapter. """ def __init__(self, serial_port=None): """ - Initialize the BGAPI device to be ready for use with a BLE device, i.e., - stop ongoing procedures, disconnect any connections, optionally start - the receiver thread, and optionally delete any stored bonds. + Initialize the backend, but don't start the USB connection yet. Must + call .start(). serial_port -- The name of the serial port for the BGAPI-compatible - USB interface. + USB interface. If not provided, will attempt to auto-detect. """ - super(BGAPIBackend, self).__init__() - self._lib = bglib.BGLib() if serial_port is None: log.info("Auto-discovering serial port for BLED112") @@ -108,29 +72,27 @@ def __init__(self, serial_port=None): self._ser = None self._receiver = None - self._running = threading.Event() + self._running = None + self._lock = threading.Lock() # buffer for packets received self._receiver_queue = Queue.Queue() + self._connected_devices = { + # handle: BLEDevice + } + # State - self._expected_attribute_handle = None # expected handle after a read - self._num_bonds = 0 # number of bonds stored on the dongle - self._stored_bonds = [] # bond handles stored on the dongle - self._connection_handle = 0x00 # handle for the device connection + self._num_bonds = 0 # number of bonds stored on the adapter + self._stored_bonds = [] # bond handles stored on the adapter self._devices_discovered = { # 'address': AdvertisingAndScanInfo, # Note: address formatted like "01:23:45:67:89:AB" } - self._characteristics = {} - self._current_characteristic = None # used in char/descriptor discovery - - # Flags - self._bonded = False # device is bonded - self._connected = False # device is connected - self._encrypted = False # connection is encrypted - self._bond_expected = False # tell bond_status handler to set _bonded + self._characteristics = defaultdict(dict) + self._connections = {} + self._current_characteristic = None # used in char/descriptor discovery self._packet_handlers = { ResponsePacketType.sm_get_bonds: self._ble_rsp_sm_get_bonds, EventPacketType.attclient_attribute_value: ( @@ -146,41 +108,83 @@ def __init__(self, serial_port=None): log.info("Initialized new BGAPI backend on %s", serial_port) - def bond(self): + def start(self): """ - Create a bond and encrypted connection with the device. - - This requires that a connection is already extablished with the device. + Connect to the USB adapter, reset it's state and start a backgroud + receiver thread. """ - self._assert_connected() + if self._running and self._running.is_set(): + self.stop() + + self._ser = serial.Serial(self._serial_port, baudrate=256000, + timeout=0.25) + self._receiver = threading.Thread(target=self._receive) + self._receiver.daemon = True + + self._running = threading.Event() + self._running.set() + self._receiver.start() + + self.disable_advertising() + + self.set_bondable(False) + + # TODO should disconnect from anything so we are in a clean slate + + # Stop any ongoing procedure + log.info("Stopping any outstanding GAP procedure") + self.send_command(CommandBuilder.gap_end_procedure()) + try: + self.expect(ResponsePacketType.gap_end_procedure) + except BGAPIError: + # Ignore any errors if there was no GAP procedure running + pass + + def stop(self): + for device in self._connections.values(): + try: + device.disconnect() + except NotConnectedError: + pass + if self._running.is_set(): + log.info('Stopping') + self._running.clear() + + if self._receiver: + self._receiver.join() + self._receiver = None - # Set to bondable mode - self._bond_expected = True - log.info("Bonding to device") - self._lib.send_command( - self._ser, - CommandBuilder.sm_set_bondable_mode(constants.bondable['yes'])) + if self._ser: + self._ser.close() + self._ser = None + def set_bondable(self, bondable): + self.send_command( + CommandBuilder.sm_set_bondable_mode( + constants.bondable['yes' if bondable else 'no'])) self.expect(ResponsePacketType.sm_set_bondable_mode) - log.debug("Enabling encryption") - self._lib.send_command( - self._ser, - CommandBuilder.sm_encrypt_start( - self._connection_handle, constants.bonding['create_bonding'])) - - self.expect(ResponsePacketType.sm_encrypt_start) - while self._connected and not self._bonded and not self._encrypted: - matched_packet_type, response = self.expect_any( - [EventPacketType.connection_status, - EventPacketType.sm_bonding_fail]) - if matched_packet_type == EventPacketType.sm_bonding_fail: - raise BGAPIError("Bonding failed") - # TODO how many times shoulud we try to bond? when does this loop - # exit? + + def disable_advertising(self): + log.info("Disabling advertising") + self.send_command( + CommandBuilder.gap_set_mode( + constants.gap_discoverable_mode['non_discoverable'], + constants.gap_connectable_mode['non_connectable'])) + self.expect(ResponsePacketType.gap_set_mode) + + def send_command(self, *args, **kwargs): + with self._lock: + if self._ser is None: + log.warn("Unexpectedly not connected to USB device") + raise NotConnectedError() + return self._lib.send_command(self._ser, *args, **kwargs) def clear_bond(self, address=None): """ - Delete the bonds stored on the dongle. + Delete the bonds stored on the adapter. + + address - the address of the device to unbond. If not provided, will + erase all bonds. Note: this does not delete the corresponding bond stored on the remote device. @@ -188,7 +192,7 @@ def clear_bond(self, address=None): # Find bonds log.debug("Fetching existing bonds for devices") self._stored_bonds = [] - self._lib.send_command(self._ser, CommandBuilder.sm_get_bonds()) + self.send_command(CommandBuilder.sm_get_bonds()) try: self.expect(ResponsePacketType.sm_get_bonds) @@ -204,341 +208,33 @@ def clear_bond(self, address=None): for b in reversed(self._stored_bonds): log.info("Deleting bond %s", b) - self._lib.send_command(self._ser, - CommandBuilder.sm_delete_bonding(b)) + self.send_command(CommandBuilder.sm_delete_bonding(b)) self.expect(ResponsePacketType.sm_delete_bonding) - def char_write(self, handle, value, wait_for_response=False): - """ - Write a value to a characteristic on the device. - - This requires that a connection is already extablished with the device. - - handle -- the characteristic/descriptor handle (integer) to write to. - value -- a bytearray holding the value to write. - - Raises BGAPIError on failure. - """ - if wait_for_response: - raise NotImplementedError("bgapi subscribe wait for response") - - while True: - self._assert_connected() - - value_list = [b for b in value] - log.info("attribute_write") - self._lib.send_command( - self._ser, - CommandBuilder.attclient_attribute_write( - self._connection_handle, handle, value_list)) - - self.expect(ResponsePacketType.attclient_attribute_write) - packet_type, response = self.expect( - EventPacketType.attclient_procedure_completed) - if (response['result'] != - ErrorCode.insufficient_authentication.value): - # Continue to retry until we are bonded - break - - def char_read_uuid(self, uuid): - handle = self.get_handle(uuid) - return self._char_read(handle) - - def _char_read(self, handle): - """ - Read a value from a characteristic on the device. - - This requires that a connection is already established with the device. - - handle -- the characteristic handle (integer) to read from. - - Returns a bytearray containing the value read, on success. - Raised BGAPIError on failure. - """ - self._assert_connected() - - log.info("Reading characteristic at handle %d", handle) - self._expected_attribute_handle = handle - self._lib.send_command( - self._ser, - CommandBuilder.attclient_read_by_handle( - self._connection_handle, handle)) - - self.expect(ResponsePacketType.attclient_read_by_handle) - matched_packet_type, response = self.expect_any( - [EventPacketType.attclient_attribute_value, - EventPacketType.attclient_procedure_completed]) - # TODO why not just expect *only* the attribute value response, then it - # would time out and raise an exception if allwe got was the 'procedure - # completed' response? - if matched_packet_type != EventPacketType.attclient_attribute_value: - raise BGAPIError("Unable to read characteristic") - return bytearray(response['value']) - - def connect(self, address, timeout=5, - addr_type=constants.ble_address_type[ - 'gap_address_type_public']): - """ - Connnect directly to a device given the ble address then discovers and - stores the characteristic and characteristic descriptor handles. - - Requires that the dongle is not connected to a device already. - - address -- a bytearray containing the device mac address. - timeout -- number of seconds to wait before returning if not connected. - addr_type -- one of the ble_address_type constants. - - Raises BGAPIError or NotConnectedError on failure. - """ - if self._connected: - raise BGAPIError("Already connected") - - address_bytes = [int(b, 16) for b in address.split(":")] - interval_min = 60 - interval_max = 76 - supervision_timeout = 100 - latency = 0 # intervals that can be skipped - log.info("Connecting to device at address %s (timeout %ds)", - address, timeout) - self._lib.send_command( - self._ser, - CommandBuilder.gap_connect_direct( - address_bytes, addr_type, interval_min, interval_max, - supervision_timeout, latency)) - - self.expect(ResponsePacketType.gap_connect_direct) - try: - self.expect(EventPacketType.connection_status, timeout=timeout) - except ExpectedResponseTimeout: - raise NotConnectedError() - - def disconnect(self, fail_quietly=False): - """ - Disconnect from the device if connected. - - fail_quietly -- do not raise an exception on failure. - """ - - if self._ser is None: - return - - log.debug("Disconnecting") - self._lib.send_command( - self._ser, - CommandBuilder.connection_disconnect(self._connection_handle)) - - try: - self.expect(ResponsePacketType.connection_disconnect) - except (BGAPIError, NotConnectedError): - if not fail_quietly: - raise - log.info("Disconnected") - - def encrypt(self): - """ - Begin encryption on the connection with the device. - - This requires that a connection is already established with the device. - - Raises BGAPIError on failure. - """ - self._assert_connected() - - self._lib.send_command( - self._ser, - CommandBuilder.sm_set_bondable_mode(constants.bondable['no'])) - - # TODO expecting the matching response for a command is a repeated - # pattern - the send_command function should have an option to wait for - # the response for the command and return it. - self.expect(ResponsePacketType.sm_set_bondable_mode) - - log.info("Starting encryption") - self._lib.send_command( - self._ser, - CommandBuilder.sm_encrypt_start( - self._connection_handle, - constants.bonding['do_not_create_bonding'])) - - self.expect(ResponsePacketType.sm_encrypt_start) - self.expect(EventPacketType.connection_status) - if not self._encrypted: - msg = "Expected to be encrypted, but wasn't" - log.warn(msg) - raise BGAPIError(msg) - - def _cache_characteristics(self): - if len(self._characteristics) == 0: - att_handle_start = 0x0001 # first valid handle - att_handle_end = 0xFFFF # last valid handle - log.info("Fetching characteristics") - self._lib.send_command( - self._ser, - CommandBuilder.attclient_find_information( - self._connection_handle, att_handle_start, att_handle_end)) - - self.expect(ResponsePacketType.attclient_find_information) - self.expect(EventPacketType.attclient_procedure_completed, - timeout=10) - - for char_uuid_str, char_obj in self._characteristics.iteritems(): - log.debug("Characteristic 0x%s is handle 0x%x", - char_uuid_str, char_obj.handle) - for desc_uuid_str, desc_handle in ( - char_obj.descriptors.iteritems()): - log.debug("Characteristic descriptor 0x%s is handle %x", - desc_uuid_str, desc_handle) - - def get_handle(self, characteristic_uuid, descriptor_uuid=None): - """ - Get the handle (integer) for a characteristic or descriptor. - - This requires that a connection is already established with the device. - - characteristic_uuid -- bytearray containing the characteristic UUID. - descriptor_uuid -- optional bytearray containg the GATT descriptor UUID - for the given characteristic. Note: use the - gatt_characteristic_descriptor_uuid constant. - - Returns an integer containing the handle on success. - Raises BGAPIError on failure. - """ - self._assert_connected() - self._cache_characteristics() - # Return the handle if it exists - char = None - - uuid_bytes = uuid_to_bytearray(characteristic_uuid) - char = self._characteristics.get(hexlify(uuid_bytes)) - if char is None: - warning = ( - "No characteristic found matching %s" % characteristic_uuid) - log.warn(warning) - raise BGAPIError(warning) - - if descriptor_uuid is None: - return char.handle - - desc_uuid_str = hexlify(descriptor_uuid) - if not (desc_uuid_str in char.descriptors): - warning = "No descriptor found matching %s" % desc_uuid_str - log.warn(warning) - raise BGAPIError(warning) - desc_handle = char.descriptors[desc_uuid_str] - return desc_handle - - def get_rssi(self): - # The BGAPI has some strange behavior where it will return 25 for - # the RSSI value sometimes... Try a maximum of 3 times. - for i in range(0, 3): - rssi = self._get_rssi_once() - if rssi != 25: - return rssi - time.sleep(0.1) - raise BGAPIError("get rssi failed") - - def _get_rssi_once(self): - """ - Get the receiver signal strength indicator (RSSI) value from the device. - - This requires that a connection is already established with the device. - - Returns the RSSI as in integer in dBm. - """ - self._assert_connected() - - log.info("Fetching RSSI one time") - self._lib.send_command( - self._ser, - CommandBuilder.connection_get_rssi(self._connection_handle)) - - _, response = self.expect(ResponsePacketType.connection_get_rssi) - return response['rssi'] - - def start(self): - """ - Put the interface into a known state to start. And start the receiver - thread. - """ - super(BGAPIBackend, self).start() - if self._running.is_set(): - self.stop() - self._ser = serial.Serial(self._serial_port, baudrate=256000, - timeout=0.25) - - self._receiver = threading.Thread(target=self._receive) - self._receiver.daemon = True - - self._running.set() - self._receiver.start() - - # Disconnect any connections - self.disconnect(fail_quietly=True) - - # Stop advertising - log.info("Disabling advertising") - self._lib.send_command( - self._ser, - CommandBuilder.gap_set_mode( - constants.gap_discoverable_mode['non_discoverable'], - constants.gap_connectable_mode['non_connectable'])) - - try: - self.expect(ResponsePacketType.gap_set_mode) - except BGAPIError: - # TODO should we do something about this error? is it fatal? - pass - - # Stop any ongoing procedure - log.info("Stopping any outstanding GAP procedure") - self._lib.send_command(self._ser, CommandBuilder.gap_end_procedure()) - - try: - self.expect(ResponsePacketType.gap_end_procedure) - except BGAPIError: - # TODO should we do something about this error? is it fatal? - pass - - self._lib.send_command( - self._ser, - CommandBuilder.sm_set_bondable_mode(constants.bondable['no'])) - - self.expect(ResponsePacketType.sm_set_bondable_mode) - - def reset(self): - self.disconnect(fail_quietly=True) - self.clear_bond() - def scan(self, timeout=10, scan_interval=75, scan_window=50, active=True, discover_mode=constants.gap_discover_mode['observation']): """ Perform a scan to discover BLE devices. + timeout -- the number of seconds this scan should last. scan_interval -- the number of miliseconds until scanning is restarted. scan_window -- the number of miliseconds the scanner will listen on one frequency for advertisement packets. active -- True --> ask sender for scan response data. False --> don't. - timeout -- the number of seconds this scan should last. discover_mode -- one of the gap_discover_mode constants. """ - # Set scan parameters - if active: - active = 0x01 - else: - active = 0x00 + parameters = 1 if active else 0 # NOTE: the documentation seems to say that the times are in units of # 625us but the ranges it gives correspond to units of 1ms.... - self._lib.send_command( - self._ser, + self.send_command( CommandBuilder.gap_set_scan_parameters( - scan_interval, scan_window, active + scan_interval, scan_window, parameters )) self.expect(ResponsePacketType.gap_set_scan_parameters) - log.info("Starting an %s scan", "active" if active == 1 else "passive") - self._lib.send_command(self._ser, - CommandBuilder.gap_discover(discover_mode)) + log.info("Starting an %s scan", "active" if active else "passive") + self.send_command(CommandBuilder.gap_discover(discover_mode)) self.expect(ResponsePacketType.gap_discover) @@ -546,8 +242,7 @@ def scan(self, timeout=10, scan_interval=75, scan_window=50, active=True, time.sleep(timeout) log.info("Stopping scan") - self._lib.send_command(self._ser, CommandBuilder.gap_end_procedure()) - + self.send_command(CommandBuilder.gap_end_procedure()) self.expect(ResponsePacketType.gap_end_procedure) devices = [] @@ -557,30 +252,90 @@ def scan(self, timeout=10, scan_interval=75, scan_window=50, active=True, 'name': info.name, 'rssi': info.rssi }) + self._devices_discovered = {} return devices - def stop(self): - self.disconnect(fail_quietly=True) - self._running.clear() - if self._receiver: - self._receiver.join() - self._receiver = None + def connect(self, address, timeout=5, + addr_type=constants.ble_address_type[ + 'gap_address_type_public'], + interval_min=60, interval_max=76, supervision_timeout=100, + latency=0): + """ + Connnect directly to a device given the ble address then discovers and + stores the characteristic and characteristic descriptor handles. - if self._ser: - self._ser.close() - self._ser = None + Requires that the adapter is not connected to a device already. - def _assert_connected(self): - """ - Checks if there is/isn't a connection already established with a device. + address -- a bytearray containing the device mac address. + timeout -- number of seconds to wait before returning if not connected. + addr_type -- one of the ble_address_type constants. - Raises NotConnectedError on failure if check_if_connected == True. + Raises BGAPIError or NotConnectedError on failure. """ - if self._ser is None or not self._connected: - log.warn("Unexpectedly not connected") + + address_bytes = [int(b, 16) for b in address.split(":")] + for device in self._connections.values(): + if device._address == address_bytes: + return device + + log.debug("Connecting to device at address %s (timeout %ds)", + address, timeout) + self.set_bondable(False) + self.send_command( + CommandBuilder.gap_connect_direct( + address_bytes, addr_type, interval_min, interval_max, + supervision_timeout, latency)) + + self.expect(ResponsePacketType.gap_connect_direct) + try: + _, packet = self.expect(EventPacketType.connection_status, + timeout=timeout) + # TODO what do we do if the status isn't 'connected'? Retry? Raise + # an exception? Should also check the address matches the expected + # TODO i'm finding that when reconnecting to the same MAC, we geta + # conneciotn status of "disconnected" but that is picked up here as + # "connected", then we don't get anything else. + if self._connection_status_flag( + packet['flags'], + constants.connection_status_flag['connected']): + device = BGAPIBLEDevice(packet['address'], + packet['connection_handle'], + self) + if self._connection_status_flag( + packet['flags'], + constants.connection_status_flag['encrypted']): + device.encrypted = True + self._connections[packet['connection_handle']] = device + log.debug("Connected to %s", address) + return device + except ExpectedResponseTimeout: raise NotConnectedError() - def _connection_status_flag(self, flags, flag_to_find): + def discover_characteristics(self, connection_handle): + att_handle_start = 0x0001 # first valid handle + att_handle_end = 0xFFFF # last valid handle + log.info("Fetching characteristics for connection %d", + connection_handle) + self.send_command( + CommandBuilder.attclient_find_information( + connection_handle, att_handle_start, att_handle_end)) + + self.expect(ResponsePacketType.attclient_find_information) + self.expect(EventPacketType.attclient_procedure_completed, + timeout=10) + + for char_uuid_str, char_obj in ( + self._characteristics[connection_handle].iteritems()): + log.info("Characteristic 0x%s is handle 0x%x", + char_uuid_str, char_obj.handle) + for desc_uuid_str, desc_handle in ( + char_obj.descriptors.iteritems()): + log.info("Characteristic descriptor 0x%s is handle %x", + desc_uuid_str, desc_handle) + return self._characteristics[connection_handle] + + @staticmethod + def _connection_status_flag(flags, flag_to_find): """ Is the given flag in the connection status flags? @@ -591,7 +346,8 @@ def _connection_status_flag(self, flags, flag_to_find): """ return (flags & flag_to_find) == flag_to_find - def _get_uuid_type(self, uuid): + @staticmethod + def _get_uuid_type(uuid): """ Checks if the UUID is a custom 128-bit UUID or a GATT characteristic descriptor UUID. @@ -728,8 +484,9 @@ def _receive(self): if packet is not None: packet_type, args = self._lib.decode_packet(packet) if packet_type == EventPacketType.attclient_attribute_value: - self._handle_notification(args['atthandle'], - bytearray(args['value'])) + device = self._connections[args['connection_handle']] + device.receive_notification(args['atthandle'], + bytearray(args['value'])) self._receiver_queue.put(packet) log.info("Stopping receiver") @@ -761,32 +518,35 @@ def _ble_evt_attclient_find_information_found(self, args): args -- dictionary containing the characteristic handle ('chrhandle'), and characteristic UUID ('uuid') """ - uuid = bytearray(list(reversed(args['uuid']))) - # Add uuid to characteristics as characteristic or descriptor - uuid_type = self._get_uuid_type(uuid) - uuid_str = hexlify(uuid) + raw_uuid = bytearray(reversed(args['uuid'])) + uuid_type = self._get_uuid_type(raw_uuid) + if uuid_type != UUIDType.custom: + uuid = uuid16_to_uuid( + int(hexlify(bytearray(reversed(args['uuid']))), 16)) + else: + uuid = UUID(hexlify(raw_uuid)) + + # TODO is there a way to get the characteristic from the packet instead + # of having to track the "current" characteristic? if (uuid_type == UUIDType.descriptor and self._current_characteristic is not None): - self._current_characteristic.add_descriptor( - uuid_str, args['chrhandle']) + self._current_characteristic.add_descriptor(uuid, args['chrhandle']) elif uuid_type == UUIDType.custom: - log.debug("Found custom characteristic %s" % uuid_str) + log.info("Found custom characteristic %s" % uuid) new_char = Characteristic(uuid, args['chrhandle']) self._current_characteristic = new_char - self._characteristics[uuid_str] = new_char + self._characteristics[ + args['connection_handle']][uuid] = new_char def _ble_evt_connection_disconnected(self, args): """ Handles the event for the termination of a connection. """ - self._connected = False - self._encrypted = False - self._bonded = False - raise NotConnectedError() + self._connections.pop(args['connection_handle'], None) def _ble_evt_connection_status(self, args): """ - Handles the event for reporting connection parameters. + Handles the event for reporting connection status. args -- dictionary containing the connection status flags ('flags'), device address ('address'), device address type ('address_type'), @@ -794,38 +554,19 @@ def _ble_evt_connection_status(self, args): (timeout'), device latency ('latency'), device bond handle ('bonding') """ - self._connection_handle = args['connection_handle'] - flags = "" - if self._connection_status_flag( - args['flags'], constants.connection_status_flag['connected']): - self._connected = True - flags += 'connected, ' - if self._connection_status_flag( - args['flags'], constants.connection_status_flag['encrypted']): - self._encrypted = True - flags += 'encrypted, ' - if self._connection_status_flag( - args['flags'], constants.connection_status_flag['completed']): - flags += 'completed, ' - if self._connection_status_flag( + connection_handle = args['connection_handle'] + if not self._connection_status_flag( args['flags'], - constants.connection_status_flag['parameters_change']): - flags += 'parameters_change, ' - - address_type = "unknown" - if (args['address_type'] == - constants.ble_address_type['gap_address_type_public']): - address_type = "public" - elif (args['address_type'] == - constants.ble_address_type['gap_address_type_random']): - address_type = "random" - log.info("Connection status: handle=%s, flags=%s, address=0x%s, " - "address_type=%s, connection interval=%fms, timeout=%d, " + constants.connection_status_flag['connected']): + # Disconnected + self._connections.pop(connection_handle, None) + + log.info("Connection status: handle=0x%x, flags=%s, address=0x%s, " + "connection interval=%fms, timeout=%d, " "latency=%d intervals, bonding=0x%x", - hex(args['connection_handle']), - flags, + connection_handle, + args['address'], hexlify(bytearray(args['address'])), - address_type, args['conn_interval'] * 1.25, args['timeout'] * 10, args['latency'], @@ -868,24 +609,14 @@ def _ble_evt_sm_bond_status(self, args): """ Handles the event for reporting a stored bond. - Adds the stored bond to the list of bond handles if no _bond_expected. - Sets _bonded True if _bond_expected. + Adds the stored bond to the list of bond handles. args -- dictionary containing the bond handle ('bond'), encryption key size used in the long-term key ('keysize'), was man in the middle used ('mitm'), keys stored for bonding ('keys') """ # Add to list of stored bonds found or set flag - if self._bond_expected: - self._bond_expected = False - self._bonded = True - else: - self._stored_bonds.append(args['bond']) - - log.debug("bond handle = %s", hex(args['bond'])) - log.debug("keysize = %d", args['keysize']) - log.debug("man in the middle = %d", args['mitm']) - log.debug("keys = %s", hex(args['keys'])) + self._stored_bonds.append(args['bond']) def _ble_rsp_sm_delete_bonding(self, args): """ @@ -903,7 +634,7 @@ def _ble_rsp_sm_get_bonds(self, args): Handles the response for the start of stored bond enumeration. Sets self._num_bonds to the number of stored bonds. - args -- dictionary containing the number of stored bonds ('bonds), + args -- dictionary containing the number of stored bonds ('bonds'), """ self._num_bonds = args['bonds'] log.info("num bonds = %d", args['bonds']) diff --git a/pygatt/backends/bgapi/device.py b/pygatt/backends/bgapi/device.py new file mode 100644 index 00000000..6e4ea092 --- /dev/null +++ b/pygatt/backends/bgapi/device.py @@ -0,0 +1,124 @@ +import logging +import time + +from pygatt import BLEDevice, exceptions +from . import constants +from .bgapi import BGAPIError +from .error_codes import ErrorCode +from .packets import BGAPICommandPacketBuilder as CommandBuilder +from .bglib import EventPacketType, ResponsePacketType + +log = logging.getLogger(__name__) + + +def connection_required(func): + """Raise an exception if the device is not connected before calling the + actual function. + """ + def wrapper(self, *args, **kwargs): + if self._handle is None: + raise exceptions.NotConnectedError() + return func(self, *args, **kwargs) + return wrapper + + +class BGAPIBLEDevice(BLEDevice): + def __init__(self, address, handle, backend): + super(BGAPIBLEDevice, self).__init__(address) + self._handle = handle + self._backend = backend + self._characteristics = {} + + @connection_required + def bond(self, permanent=False): + """ + Create a bond and encrypted connection with the device. + """ + + # Set to bondable mode so bonds are store permanently + if permanent: + self._backend.set_bondable(True) + log.info("Bonding to %s", self._address) + self._backend.send_command( + CommandBuilder.sm_encrypt_start( + self._handle, constants.bonding['create_bonding'])) + self._backend.expect(ResponsePacketType.sm_encrypt_start) + + packet_type, response = self._backend.expect_any( + [EventPacketType.connection_status, + EventPacketType.sm_bonding_fail]) + if packet_type == EventPacketType.sm_bonding_fail: + raise BGAPIError("Bonding failed") + + @connection_required + def get_rssi(self): + """ + Get the receiver signal strength indicator (RSSI) value from the device. + + Returns the RSSI as in integer in dBm. + """ + # The BGAPI has some strange behavior where it will return 25 for + # the RSSI value sometimes... Try a maximum of 3 times. + for i in range(0, 3): + self._backend.send_command( + CommandBuilder.connection_get_rssi(self._handle)) + _, response = self._backend.expect( + ResponsePacketType.connection_get_rssi) + rssi = response['rssi'] + if rssi != 25: + return rssi + time.sleep(0.1) + raise BGAPIError("get rssi failed") + + @connection_required + def char_read(self, uuid): + handle = self.get_handle(uuid) + log.info("Reading characteristic at handle %d", handle) + self._backend.send_command( + CommandBuilder.attclient_read_by_handle( + self._handle, handle)) + + self._backend.expect(ResponsePacketType.attclient_read_by_handle) + matched_packet_type, response = self._backend.expect_any( + [EventPacketType.attclient_attribute_value, + EventPacketType.attclient_procedure_completed]) + # TODO why not just expect *only* the attribute value response, then it + # would time out and raise an exception if allwe got was the 'procedure + # completed' response? + if matched_packet_type != EventPacketType.attclient_attribute_value: + raise BGAPIError("Unable to read characteristic") + return bytearray(response['value']) + + @connection_required + def char_write_handle(self, char_handle, value, wait_for_response=False): + if wait_for_response: + raise NotImplementedError() + + while True: + value_list = [b for b in value] + log.info("attribute_write") + self._backend.send_command( + CommandBuilder.attclient_attribute_write( + self._handle, char_handle, value_list)) + + self._backend.expect(ResponsePacketType.attclient_attribute_write) + packet_type, response = self._backend.expect( + EventPacketType.attclient_procedure_completed) + if (response['result'] != + ErrorCode.insufficient_authentication.value): + # Continue to retry until we are bonded + break + + @connection_required + def disconnect(self): + log.debug("Disconnecting from %s", self._address) + self._backend.send_command( + CommandBuilder.connection_disconnect(self._handle)) + + self._backend.expect(ResponsePacketType.connection_disconnect) + log.info("Disconnected from %s", self._address) + self._handle = None + + @connection_required + def discover_characteristics(self): + return self._backend.discover_characteristics(self._handle) diff --git a/pygatt/backends/bgapi/error_codes.py b/pygatt/backends/bgapi/error_codes.py index a5e05bd8..89fbaab6 100644 --- a/pygatt/backends/bgapi/error_codes.py +++ b/pygatt/backends/bgapi/error_codes.py @@ -7,7 +7,8 @@ class ErrorCode(Enum): insufficient_authentication = 0x0405 -error_codes = { +return_codes = { + 0: "Success", # BGAPI errors 0x0180: "Invalid parameter", 0x0181: "Device in wrong state", @@ -72,9 +73,7 @@ class ErrorCode(Enum): def get_return_message(return_code): - if return_code == 0: - return "Success" - elif return_code in error_codes: - return error_codes[return_code] - else: - return str("Unknown return code %04x" % return_code) + try: + return return_codes[return_code] + except KeyError: + return "Unknown return code %04x" % return_code diff --git a/pygatt/backends/bgapi/exceptions.py b/pygatt/backends/bgapi/exceptions.py new file mode 100644 index 00000000..558e8a18 --- /dev/null +++ b/pygatt/backends/bgapi/exceptions.py @@ -0,0 +1,12 @@ +from pygatt.exceptions import BLEError + + +class BGAPIError(BLEError): + pass + + +class ExpectedResponseTimeout(BGAPIError): + def __init__(self, expected_packets, timeout): + super(ExpectedResponseTimeout, self).__init__( + "Timed out after %fs waiting for %s" % ( + timeout or 0, expected_packets)) diff --git a/pygatt/backends/bgapi/util.py b/pygatt/backends/bgapi/util.py index 8ab58001..bca30f80 100644 --- a/pygatt/backends/bgapi/util.py +++ b/pygatt/backends/bgapi/util.py @@ -1,7 +1,6 @@ import re import logging import serial.tools.list_ports -from binascii import unhexlify log = logging.getLogger(__name__) @@ -90,15 +89,3 @@ def find_usb_serial_devices(vendor_id=None, product_id=None): devices.append(dev) log.info("USB device: %s", dev) return devices - - -def uuid_to_bytearray(uuid_str): - """ - Turns a UUID string in the format "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" - to a bytearray. - - uuid -- the UUID to convert. - - Returns a bytearray containing the UUID. - """ - return unhexlify(uuid_str.replace('-', '')) diff --git a/pygatt/backends/gatttool/device.py b/pygatt/backends/gatttool/device.py new file mode 100644 index 00000000..59e2789a --- /dev/null +++ b/pygatt/backends/gatttool/device.py @@ -0,0 +1,51 @@ +import logging + +from pygatt import BLEDevice, exceptions + +log = logging.getLogger(__name__) + + +def connection_required(func): + """Raise an exception before calling the actual function if the device is + not connection. + """ + def wrapper(self, *args, **kwargs): + if not self._connected: + raise exceptions.NotConnectedError() + return func(self, *args, **kwargs) + return wrapper + + +class GATTToolBLEDevice(BLEDevice): + """A BLE device connection initiated by the GATTToolBackend. + + Since the GATTToolBackend can only support 1 device connection at at time, + the device implementation defers to the backend for all functionality - + every command has to synchronize around a the same interactive gatttool + session, using the same connection. + """ + def __init__(self, address, backend): + super(GATTToolBLEDevice, self).__init__(address) + self._backend = backend + self._connected = True + + @connection_required + def bond(self, *args, **kwargs): + self._backend.bond(self, *args, **kwargs) + + @connection_required + def char_read(self, uuid, *args, **kwargs): + return self._backend.char_read(self, uuid, *args, **kwargs) + + @connection_required + def char_write_handle(self, handle, *args, **kwargs): + self._backend.char_write_handle(self, handle, *args, **kwargs) + + @connection_required + def disconnect(self): + self._backend.disconnect(self) + self._connected = False + + @connection_required + def discover_characteristics(self): + return self._backend.discover_characteristics(self) diff --git a/pygatt/backends/gatttool/gatttool.py b/pygatt/backends/gatttool/gatttool.py index 008cd50f..2b440738 100644 --- a/pygatt/backends/gatttool/gatttool.py +++ b/pygatt/backends/gatttool/gatttool.py @@ -8,22 +8,40 @@ import time import threading import subprocess +from uuid import UUID try: import pexpect except Exception as e: if platform.system() != 'Windows': print("WARNING:", e, file=sys.stderr) -from pygatt import constants -from pygatt import exceptions -from pygatt.backends.backend import BLEBackend +from pygatt.exceptions import (NotConnectedError, NotificationTimeout, BLEError, + NoResponseError) +from pygatt.backends import BLEBackend, Characteristic +from pygatt.backends.backend import DEFAULT_CONNECT_TIMEOUT_S +from .device import GATTToolBLEDevice log = logging.getLogger(__name__) +DEFAULT_TIMEOUT_S = 0.5 + + +def at_most_one_device(func): + """Every connection-specific function on the backend takes an instance of + GATTToolBLEDevice as the first argument - this decorator will raise an + exception if that device is not what the backend thinks is the currently + connected device. + """ + def wrapper(self, connected_device, *args, **kwargs): + if connected_device != self._connected_device: + raise NotConnectedError() + return func(self, *args, **kwargs) + return wrapper + class GATTToolBackend(BLEBackend): """ - Backend to pygatt that uses gatttool/bluez on the linux command line. + Backend to pygatt that uses BlueZ's interactive gatttool CLI prompt. """ _GATTTOOL_PROMPT = r".*> " @@ -32,34 +50,129 @@ def __init__(self, hci_device='hci0', gatttool_logfile=None): Initialize. hci_device -- the hci_device to use with GATTTool. - loghandler -- logging.handler object to use for the logger. - loglevel -- log level for this module's logger. + gatttool_logfile -- an optional filename to store raw gatttool + input and output. """ - super(GATTToolBackend, self).__init__() - - # Internal state self._hci_device = hci_device + self._connected_device = None self._gatttool_logfile = gatttool_logfile - self._handles = {} - self._thread = None # background notification receiving thread + self._receiver = None # background notification receiving thread self._con = None # gatttool interactive session def supports_unbonded(self): return False - def bond(self): - """Securely Bonds to the BLE device.""" - log.info('Bonding') - self._con.sendline('sec-level medium') - self._con.expect(self._GATTTOOL_PROMPT, timeout=1) - - def connect(self, address, timeout=constants.DEFAULT_CONNECT_TIMEOUT_S): - """Connect to the device.""" - if self._con and self._running: + def start(self): + if self._con and self._running.is_set(): self.stop() - self.start() + self._running = threading.Event() + self._running.set() + self._connection_lock = threading.RLock() + + # Without restarting, sometimes when trying to bond with the GATTTool + # backend, the entire computer will lock up. + self.reset() + + # Start gatttool interactive session for device + gatttool_cmd = ' '.join([ + 'gatttool', + '-i', + self._hci_device, + '-I' + ]) + log.debug('gatttool_cmd=%s', gatttool_cmd) + self._con = pexpect.spawn(gatttool_cmd, logfile=self._gatttool_logfile) + # Wait for response + self._con.expect(r'\[LE\]>', timeout=1) + + # Start the notification receiving thread + self._receiver = threading.Thread(target=self._receive) + self._receiver.daemon = True + self._receiver.start() + + def stop(self): + """ + Disconnects any connected device, stops the backgroud receiving thread + and closes the spawned gatttool process. + disconnect. + """ + self.disconnect(self._connected_device) + if self._running.is_set(): + log.info('Stopping') + self._running.clear() + + if self._receiver: + self._receiver.join() + self._receiver = None + + if self._con and self._con.isalive(): + self._con.sendline('exit') + while True: + if not self._con.isalive(): + break + time.sleep(0.1) + self._con.close() + self._con = None + + def scan(self, timeout=10, run_as_root=False): + """ + By default, scanning with gatttool requires root privileges. + If you don't want to require root, you must add a few + 'capabilities' to your system. If you have libcap installed, run this to + enable normal users to perform LE scanning: + setcap 'cap_net_raw,cap_net_admin+eip' `which hcitool` + + If you do use root, the hcitool subprocess becomes more difficult to + terminate cleanly, and may leave your Bluetooth adapter in a bad state. + """ + + cmd = 'hcitool lescan' + if run_as_root: + cmd = 'sudo %s' % cmd + + log.info("Starting BLE scan") + scan = pexpect.spawn(cmd) + # "lescan" doesn't exit, so we're forcing a timeout here: + try: + scan.expect('foooooo', timeout=timeout) + except pexpect.EOF: + message = "Unexpected error when scanning" + if "No such device" in scan.before: + message = "No BLE adapter found" + log.error(message) + raise BLEError(message) + except pexpect.TIMEOUT: + devices = {} + for line in scan.before.split('\r\n'): + match = re.match( + r'(([0-9A-Fa-f][0-9A-Fa-f]:?){6}) (\(?[\w]+\)?)', line) + + if match is not None: + address = match.group(1) + name = match.group(3) + if name == "(unknown)": + name = None + + if address in devices: + if (devices[address]['name'] is None) and (name is not + None): + log.info("Discovered name of %s as %s", + address, name) + devices[address]['name'] = name + else: + log.info("Discovered %s (%s)", address, name) + devices[address] = { + 'address': address, + 'name': name + } + log.info("Found %d BLE devices", len(devices)) + return [device for device in devices.values()] + return [] + + def connect(self, address, timeout=DEFAULT_CONNECT_TIMEOUT_S): log.info('Connecting with timeout=%s', timeout) + self._con.sendline('sec-level low') self._address = address try: with self._connection_lock: @@ -69,70 +182,86 @@ def connect(self, address, timeout=constants.DEFAULT_CONNECT_TIMEOUT_S): message = ("Timed out connecting to %s after %s seconds." % (self._address, timeout)) log.error(message) - raise exceptions.NotConnectedError(message) + raise NotConnectedError(message) + + self._connected_device = GATTToolBLEDevice(address, self) + return self._connected_device + + def clear_bond(self, address=None): + """Use the 'bluetoothctl' program to erase a stored BLE bond. + """ + con = pexpect.spawn('sudo bluetoothctl') + con.expect("bluetooth", timeout=1) + + log.info("Clearing bond for %s", address) + con.sendline("remove " + address.upper()) + try: + con.expect(["Device has been removed", + "# " + ], + timeout=.5) + except pexpect.TIMEOUT: + log.error("Unable to remove bonds for %s: %s", + address, con.before) + log.info("Removed bonds for %s", address) + @at_most_one_device def disconnect(self): with self._connection_lock: - self._con.sendline('disconnect') + # TODO with gattool from bluez 5.35, gatttol consumes 100% CPU after + # sending "disconnect". If you let the remote device do the + # disconnect, it doesn't. Leaving it commented out for now. + # self._con.sendline('disconnect') + pass + self._connected_device = None + # TODO make call a disconnected callback on the device, so the device + # knows if it was async disconnected? + + @at_most_one_device + def bond(self, *args, **kwargs): + log.info('Bonding') + self._con.sendline('sec-level medium') + self._con.expect(self._GATTTOOL_PROMPT, timeout=1) - def get_handle(self, uuid, descriptor_uuid=None): - """ - Look up and return the handle for an attribute by its UUID. - :param uuid: The UUID of the characteristic. - :type uuid: str - :return: None if the UUID was not found. - """ - if uuid not in self._handles: - log.debug("Looking up handle for characteristic %s", uuid) - with self._connection_lock: - self._con.sendline('characteristics') + @at_most_one_device + def discover_characteristics(self): + characteristics = {} + with self._connection_lock: + self._con.sendline('characteristics') - timeout = 2 - while True: + timeout = 2 + while True: + try: + self._con.expect( + r"handle: 0x([a-fA-F0-9]{4}), " + "char properties: 0x[a-fA-F0-9]{2}, " + "char value handle: 0x([a-fA-F0-9]{4}), " + "uuid: ([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})\r\n", # noqa + timeout=timeout) + except pexpect.TIMEOUT: + break + except pexpect.EOF: + break + else: try: - self._con.expect( - r"handle: 0x([a-fA-F0-9]{4}), " - "char properties: 0x[a-fA-F0-9]{2}, " - "char value handle: 0x([a-fA-F0-9]{4}), " - "uuid: ([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})\r\n", # noqa - timeout=timeout) - except pexpect.TIMEOUT: - break - except pexpect.EOF: - break - else: - try: - value_handle = int(self._con.match.group(2), 16) - char_uuid = self._con.match.group(3).strip() - self._handles[char_uuid] = value_handle - log.debug( - "Found characteristic %s, value handle: 0x%x", - char_uuid, - value_handle) - - # The characteristics all print at once, so after - # waiting 1-2 seconds for them to all fetch, you can - # load the rest without much delay at all. - timeout = .01 - except AttributeError: - pass - - if len(self._handles) == 0: - raise exceptions.BluetoothLEError( - "No characteristics found - disconnected unexpectedly?") - - handle = self._handles.get(uuid) - if handle is None: - message = "No characteristic found matching %s" % uuid - log.warn(message) - raise exceptions.BluetoothLEError(message) - - log.debug( - "Characteristic %s, handle: 0x%x", uuid, handle) - return handle - - def _expect(self, expected, - timeout=constants.DEFAULT_TIMEOUT_S): + value_handle = int(self._con.match.group(2), 16) + char_uuid = self._con.match.group(3).strip() + characteristics[UUID(char_uuid)] = Characteristic( + char_uuid, value_handle) + log.debug( + "Found characteristic %s, value handle: 0x%x", + char_uuid, + value_handle) + + # The characteristics all print at once, so after + # waiting 1-2 seconds for them to all fetch, you can + # load the rest without much delay at all. + timeout = .01 + except AttributeError: + pass + return characteristics + + def _expect(self, expected, timeout=DEFAULT_TIMEOUT_S): """ We may (and often do) get an indication/notification before a write completes, and so it can be lost if we "expect()"'d something @@ -159,25 +288,22 @@ def _expect(self, expected, break elif matched_pattern_index in [1, 2]: self._handle_notification_string(self._con.after) - elif matched_pattern_index in [3, 4]: - message = "" - if self._running: - message = ("Unexpectedly disconnected - do you " - "need to clear bonds?") - log.error(message) - self._running = False - raise exceptions.NotConnectedError(message) + elif matched_pattern_index == 3: + if self._running.is_set(): + log.info("Disconnected") except pexpect.TIMEOUT: - raise exceptions.NotificationTimeout( + raise NotificationTimeout( "Timed out waiting for a notification") def _handle_notification_string(self, msg): hex_handle, _, hex_value = string.split(msg.strip(), maxsplit=5)[3:] handle = int(hex_handle, 16) value = bytearray.fromhex(hex_value) - self._handle_notification(handle, value) + if self._connected_device is not None: + self._connected_device.receive_notification(handle, value) - def char_write(self, handle, value, wait_for_response=False): + @at_most_one_device + def char_write_handle(self, handle, value, wait_for_response=False): """ Writes a value to a given characteristic handle. :param handle: @@ -200,15 +326,16 @@ def char_write(self, handle, value, wait_for_response=False): if wait_for_response: try: self._expect('Characteristic value written successfully') - except exceptions.NoResponseError: + except NoResponseError: log.error("No response received", exc_info=True) raise log.info('Sent cmd=%s', cmd) - def char_read_uuid(self, uuid): + @at_most_one_device + def char_read(self, uuid): """ - Reads a Characteristic by UUID. + Reads a Characteristic by uuid. :param uuid: UUID of Characteristic to read. :type uuid: str :return: bytearray of result. @@ -222,144 +349,24 @@ def char_read_uuid(self, uuid): return bytearray([int(x, 16) for x in rval]) - def char_read_hnd(self, handle): - """ - Reads a Characteristic by Handle. - :param handle: Handle of Characteristic to read. - :type handle: str - :return: bytearray of result - :rtype: bytearray - """ - with self._connection_lock: - self._con.sendline('char-read-hnd 0x%02x' % handle) - self._expect('descriptor: .*?\r') - - rval = self._con.after.split()[1:] - - return bytearray([int(n, 16) for n in rval]) - - def start(self): - super(GATTToolBackend, self).start() - self._running = True - self._connection_lock = threading.RLock() - - # Without restarting, sometimes when trying to bond with the GATTTool - # backend, the entire computer will lock up. - self.reset() - - # Start gatttool interactive session for device - gatttool_cmd = ' '.join([ - 'gatttool', - '-i', - self._hci_device, - '-I' - ]) - log.debug('gatttool_cmd=%s', gatttool_cmd) - self._con = pexpect.spawn(gatttool_cmd, logfile=self._gatttool_logfile) - # Wait for response - self._con.expect(r'\[LE\]>', timeout=1) - - # Start the notification receiving thread - self._thread = threading.Thread(target=self._receive) - self._thread.daemon = True - self._thread.start() - - def stop(self): - """ - Stop the backgroud notification handler in preparation for a - disconnect. - """ - if self._running: - log.info('Stopping') - self._running = False - - if self._con and self._con.isalive(): - self._con.sendline('exit') - while True: - if not self._con.isalive(): - break - time.sleep(0.1) - self._con.close() - self._con = None - - if self._thread: - self._thread.join() - self._thread = None - def _receive(self): """ Run a background thread to listen for notifications. """ log.info('Running...') - while self._running: - with self._connection_lock: - try: - self._expect("fooooooo", timeout=.1) - except exceptions.NotificationTimeout: - pass - except (exceptions.NotConnectedError, pexpect.EOF): - break + while self._running.is_set(): + try: + self._expect("fooooooo", timeout=.1) + except NotificationTimeout: + pass + except (NotConnectedError, pexpect.EOF): + break # TODO need some delay to avoid aggresively grabbing the lock, # blocking out the others. worst case is 1 second delay for async # not received as a part of another request time.sleep(.01) log.info("Listener thread finished") - def scan(self, timeout=10, run_as_root=False): - """ - By default, scanning with gatttool requires root privileges. - If you don't want to require root, you must add a few - 'capabilities' to your system. If you have libcap installed, run this to - enable normal users to perform LE scanning: - setcap 'cap_net_raw,cap_net_admin+eip' `which hcitool` - - If you do use root, the hcitool subprocess becomes more difficult to - terminate cleanly, and may leave your Bluetooth adapter in a bad state. - """ - - cmd = 'hcitool lescan' - if run_as_root: - cmd = 'sudo %s' % cmd - - log.info("Starting BLE scan") - scan = pexpect.spawn(cmd) - # "lescan" doesn't exit, so we're forcing a timeout here: - try: - scan.expect('foooooo', timeout=timeout) - except pexpect.EOF: - message = "Unexpected error when scanning" - if "No such device" in scan.before: - message = "No BLE adapter found" - log.error(message) - raise exceptions.BluetoothLEError(message) - except pexpect.TIMEOUT: - devices = {} - for line in scan.before.split('\r\n'): - match = re.match( - r'(([0-9A-Fa-f][0-9A-Fa-f]:?){6}) (\(?[\w]+\)?)', line) - - if match is not None: - address = match.group(1) - name = match.group(3) - if name == "(unknown)": - name = None - - if address in devices: - if (devices[address]['name'] is None) and (name is not - None): - log.info("Discovered name of %s as %s", - address, name) - devices[address]['name'] = name - else: - log.info("Discovered %s (%s)", address, name) - devices[address] = { - 'address': address, - 'name': name - } - log.info("Found %d BLE devices", len(devices)) - return [device for device in devices.values()] - return [] - def reset(self): subprocess.Popen(["sudo", "systemctl", "restart", "bluetooth"]).wait() subprocess.Popen([ diff --git a/pygatt/classes.py b/pygatt/classes.py deleted file mode 100644 index 51c8fc33..00000000 --- a/pygatt/classes.py +++ /dev/null @@ -1,113 +0,0 @@ -from __future__ import print_function - -import logging - -from constants import DEFAULT_CONNECT_TIMEOUT_S - -log = logging.getLogger(__name__) - - -class BluetoothLEDevice(object): - """ - Interface for a Bluetooth Low Energy device that can use either the Bluegiga - BGAPI (cross platform) or GATTTOOL (Linux only) as the backend. - """ - def __init__(self, mac_address, backend): - """ - Initialize. - - mac_address -- a string containing the mac address of the BLE device in - the following format: "XX:XX:XX:XX:XX:XX" - backend -- an instantiated instance of a BLEBacked. - - Example: - - dongle = pygatt.backends.BGAPIBackend('/dev/ttyAMC0') - my_ble_device = pygatt.classes.BluetoothLEDevice( - '01:23:45:67:89:ab', bgapi=dongle) - """ - self._backend = backend - self._mac_address = mac_address - - def bond(self): - """ - Create a new bond or use an existing bond with the device and make the - current connection bonded and encrypted. - """ - self._backend.bond() - - def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT_S): - """ - Connect to the BLE device. - - timeout -- the length of time to try to establish a connection before - returning. - - Example: - - my_ble_device.connect(timeout=5) - - """ - self._backend.connect(self._mac_address, timeout=timeout) - - def char_read(self, uuid): - """ - Reads a Characteristic by UUID. - - uuid -- UUID of Characteristic to read as a string. - - Returns a bytearray containing the characteristic value on success. - Returns None on failure. - - Example: - my_ble_device.char_read('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b') - """ - return self._backend.char_read_uuid(uuid) - - def char_write(self, *args, **kwargs): - """ - Writes a value to a given characteristic handle. - - uuid -- the UUID of the characteristic to write to. - value -- the value as a bytearray to write to the characteristic. - wait_for_response -- wait for response after writing (GATTTOOL only). - - Example: - my_ble_device.char_write('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b', - bytearray([0x00, 0xFF])) - """ - self._backend.char_write_uuid(*args, **kwargs) - - def encrypt(self): - """ - Form an encrypted, but not bonded, connection. - """ - self._backend.encrypt() - - def get_rssi(self): - """ - Get the receiver signal strength indicator (RSSI) value from the BLE - device. - - Returns the RSSI value in dBm on success. - Returns None on failure. - """ - return self._backend.get_rssi() - - def run(self): - """ - Start a background thread to listen for notifications. - """ - self._backend.start() - - def stop(self): - """ - Stop the any background threads and disconnect. - """ - self._backend.stop() - - def disconnect(self): - self._backend.disconnect() - - def subscribe(self, *args, **kwargs): - self._backend.subscribe(*args, **kwargs) diff --git a/pygatt/constants.py b/pygatt/constants.py deleted file mode 100644 index 082eecb4..00000000 --- a/pygatt/constants.py +++ /dev/null @@ -1,14 +0,0 @@ -""" -Constants for pygatt Module. -""" - -# Connection -DEFAULT_TIMEOUT_S = 0.5 -DEFAULT_ASYNC_TIMEOUT_S = 0.5 -DEFAULT_CONNECT_TIMEOUT_S = 5.0 - -# Backends -BACKEND = { - 'GATTTOOL': 0, - 'BGAPI': 1, -} diff --git a/pygatt/device.py b/pygatt/device.py new file mode 100644 index 00000000..507fda43 --- /dev/null +++ b/pygatt/device.py @@ -0,0 +1,180 @@ +from __future__ import print_function + +import threading +import logging +from collections import defaultdict +from binascii import hexlify +from uuid import UUID + +from . import exceptions + +log = logging.getLogger(__name__) + + +class BLEDevice(object): + """ + An BLE device connection instance, returned by one of the BLEBackend + implementations. This class is not meant to be instantiated directly - use + BLEBackend.connect() to create one. + """ + def __init__(self, address): + """ + Initialize. + + address -- the BLE address (aka MAC address) of the device as a string. + """ + self._address = address + self._characteristics = {} + self._callbacks = defaultdict(set) + self._subscribed_handlers = {} + self._lock = threading.Lock() + + def bond(self, permanent=False): + """ + Create a new bond or use an existing bond with the device and make the + current connection bonded and encrypted. + """ + raise NotImplementedError() + + def get_rssi(self): + """ + Get the receiver signal strength indicator (RSSI) value from the BLE + device. + + Returns the RSSI value in dBm on success. + Returns None on failure. + """ + raise NotImplementedError() + + def char_read(self, uuid): + """ + Reads a Characteristic by UUID. + + uuid -- UUID of Characteristic to read as a string. + + Returns a bytearray containing the characteristic value on success. + + Example: + my_ble_device.char_read('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b') + """ + raise NotImplementedError() + + def char_write(self, uuid, value, wait_for_response=False): + """ + Writes a value to a given characteristic UUID. + + uuid -- the UUID of the characteristic to write to. + value -- a bytearray to write to the characteristic. + wait_for_response -- wait for response after writing. + + Example: + my_ble_device.char_write('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b', + bytearray([0x00, 0xFF])) + """ + return self.char_write_handle(self.get_handle(uuid), value, + wait_for_response=wait_for_response) + + def char_write_handle(self, handle, value, wait_for_response=False): + """ + Writes a value to a given characteristic handle. This can be used to + write to the characteristic config handle for a primary characteristic. + + hande -- the handle to write to. + value -- a bytearray to write to the characteristic. + wait_for_response -- wait for response after writing. + + Example: + my_ble_device.char_write(42, + bytearray([0x00, 0xFF])) + """ + raise NotImplementedError() + + def disconnect(self): + """ + Disconnect from the device. This instance of BLEDevice cannot be used + after calling this method, you must call BLEBackend.connect() again to + get a fresh connection. + """ + raise NotImplementedError() + + def subscribe(self, uuid, callback=None, indication=False): + """ + Enable notifications or indications for a characteristic and register a + callback function to be called whenever a new value arrives. + + uuid -- UUID as a string of the characteristic to subscribe. + callback -- function to be called when a notification/indication is + received on this characteristic. + indication -- use indications (where each notificaiton is ACKd). This is + more reliable, but slower. + """ + log.info( + 'Subscribing to uuid=%s with callback=%s and indication=%s', + uuid, callback, indication) + # Expect notifications on the value handle... + value_handle = self.get_handle(uuid) + + # but write to the characteristic config to enable notifications + # TODO with the BGAPI backend we can be smarter and fetch the actual + # characteristic config handle - we can also do that with gattool if we + # use the 'desc' command, so we'll need to change the "get_handle" API + # to be able to get the value or characteristic config handle. + characteristic_config_handle = value_handle + 1 + + properties = bytearray([ + 0x2 if indication else 0x1, + 0x0 + ]) + + with self._lock: + if callback is not None: + self._callbacks[value_handle].add(callback) + + if self._subscribed_handlers.get(value_handle, None) != properties: + self.char_write_handle( + characteristic_config_handle, + properties, + wait_for_response=False + ) + log.debug("Subscribed to uuid=%s", uuid) + self._subscribed_handlers[value_handle] = properties + else: + log.debug("Already subscribed to uuid=%s", uuid) + + def get_handle(self, char_uuid): + """ + Look up and return the handle for an attribute by its UUID. + :param char_uuid: The UUID of the characteristic. + :type uuid: str + :return: None if the UUID was not found. + """ + if isinstance(char_uuid, str): + char_uuid = UUID(char_uuid) + log.debug("Looking up handle for characteristic %s", char_uuid) + if char_uuid not in self._characteristics: + # TODO need to expose discovering characterstics via BLEDevice + self._characteristics = self.discover_characteristics() + + characteristic = self._characteristics.get(char_uuid) + if characteristic is None: + message = "No characteristic found matching %s" % char_uuid + log.warn(message) + raise exceptions.BLEError(message) + + # TODO support filtering by descriptor UUID, or maybe return the whole + # Characteristic object + log.debug("Found %s" % characteristic) + return characteristic.handle + + def receive_notification(self, handle, value): + """ + Receive a notification from the connected device and propagate the value + to all registered callbacks. + """ + + log.info('Received notification on handle=0x%x, value=0x%s', + handle, hexlify(value)) + with self._lock: + if handle in self._callbacks: + for callback in self._callbacks[handle]: + callback(handle, value) diff --git a/pygatt/exceptions.py b/pygatt/exceptions.py index 294a6376..9876b4f3 100644 --- a/pygatt/exceptions.py +++ b/pygatt/exceptions.py @@ -3,19 +3,22 @@ """ -class BluetoothLEError(Exception): +class BLEError(Exception): """Exception class for pygatt.""" def __repr__(self): return "<%s %s>" % (self.__class__.__name__, self.message) + def __str__(self): + return repr(self) -class NotConnectedError(BluetoothLEError): + +class NotConnectedError(BLEError): pass -class NotificationTimeout(BluetoothLEError): +class NotificationTimeout(BLEError): pass -class NoResponseError(BluetoothLEError): +class NoResponseError(BLEError): pass diff --git a/pygatt/util.py b/pygatt/util.py new file mode 100644 index 00000000..9246df8d --- /dev/null +++ b/pygatt/util.py @@ -0,0 +1,5 @@ +from uuid import UUID + + +def uuid16_to_uuid(uuid16): + return UUID("0000%04x-0000-1000-8000-00805F9B34FB" % uuid16) diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..6e2d07f9 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,4 @@ +[nosetests] +cover-tests = 0 +cover-package = pygatt +verbosity = 2 diff --git a/tests/bgapi/mocker.py b/tests/bgapi/mocker.py index 5d968db3..434b3620 100644 --- a/tests/bgapi/mocker.py +++ b/tests/bgapi/mocker.py @@ -1,9 +1,21 @@ from mock import patch +from binascii import unhexlify from tests.serial_mock import SerialMock from .packets import BGAPIPacketBuilder -from pygatt.backends.bgapi.util import uuid_to_bytearray + + +def uuid_to_bytearray(uuid_str): + """ + Turns a UUID string in the format "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" + to a bytearray. + + uuid -- the UUID to convert. + + Returns a bytearray containing the UUID. + """ + return unhexlify(uuid_str.replace('-', '')) class MockBGAPISerialDevice(object): @@ -68,6 +80,8 @@ def stage_run_packets(self, connection_handle=0x00): BGAPIPacketBuilder.sm_set_bondable_mode()) def stage_connect_packets(self, addr, flags, connection_handle=0x00): + self.mocked_serial.stage_output( + BGAPIPacketBuilder.sm_set_bondable_mode()) # Stage ble_rsp_gap_connect_direct (success) self.mocked_serial.stage_output( BGAPIPacketBuilder.gap_connect_direct(connection_handle, 0x0000)) @@ -83,20 +97,6 @@ def stage_get_rssi_packets(self, connection_handle=0x00, self.mocked_serial.stage_output( BGAPIPacketBuilder.connection_get_rssi(connection_handle, rssi)) - def stage_encrypt_packets(self, addr, flags, - connection_handle=0x00): - # Stage ble_rsp_sm_set_bondable_mode (always success) - self.mocked_serial.stage_output( - BGAPIPacketBuilder.sm_set_bondable_mode()) - # Stage ble_rsp_sm_encrypt_start (success) - self.mocked_serial.stage_output(BGAPIPacketBuilder.sm_encrypt_start( - connection_handle, 0x0000)) - # Stage ble_evt_connection_status - flags_byte = self._get_connection_status_flags_byte(flags) - self.mocked_serial.stage_output(BGAPIPacketBuilder.connection_status( - addr, flags_byte, connection_handle, 0, - 0x0014, 0x0006, 0x0000, 0xFF)) - def stage_bond_packets(self, addr, flags, connection_handle=0x00, bond_handle=0x01): # Stage ble_rsp_sm_set_bondable_mode (always success) @@ -153,7 +153,7 @@ def stage_scan_packets(self, scan_responses=[]): self.mocked_serial.stage_output( BGAPIPacketBuilder.gap_end_procedure(0x0000)) - def stage_get_handle_packets( + def stage_discover_characteristics_packets( self, uuid_handle_list, connection_handle=0x00): # Stage ble_rsp_attclient_find_information (success) self.mocked_serial.stage_output( diff --git a/tests/bgapi/test_bgapi.py b/tests/bgapi/test_bgapi.py index cc58f350..64d2dcf5 100644 --- a/tests/bgapi/test_bgapi.py +++ b/tests/bgapi/test_bgapi.py @@ -1,21 +1,16 @@ from __future__ import print_function -from nose.tools import eq_ +from nose.tools import eq_, ok_ import unittest -import threading -import time from pygatt.backends import BGAPIBackend from pygatt.backends.bgapi.util import extract_vid_pid +from pygatt.backends.bgapi.error_codes import get_return_message from .mocker import MockBGAPISerialDevice -from pygatt.backends.bgapi.util import uuid_to_bytearray class BGAPIBackendTests(unittest.TestCase): - """ - Test the functionality of the BGAPIBackend class. - """ def setUp(self): self.mock_device = MockBGAPISerialDevice() self.backend = BGAPIBackend( @@ -24,6 +19,9 @@ def setUp(self): self.address = [0x01, 0x23, 0x45, 0x67, 0x89, 0xAB] self.address_string = ":".join("%02x" % b for b in self.address) + self.mock_device.stage_run_packets() + self.backend.start() + def tearDown(self): self.mock_device.stop() # TODO if we call stop without staging another disconnect packet, the @@ -35,107 +33,17 @@ def tearDown(self): def _connect(self): self.mock_device.stage_connect_packets( self.address, ['connected', 'completed']) - self.backend.connect(self.address_string) - - def test_run_backend(self): - self.mock_device.stage_run_packets() - self.backend.start() + return self.backend.connect(self.address_string) def test_connect(self): - self.mock_device.stage_run_packets() - self.backend.start() - self._connect() - - def test_disconnect_when_connected(self): - self.mock_device.stage_run_packets() - self.backend.start() - self._connect() - # test disconnect (connected, not fail) - self.mock_device.stage_disconnect_packets(True, False) - self.backend.disconnect() - - def test_char_read(self): - self.mock_device.stage_run_packets() - self.backend.start() - self._connect() - uuid_char = '01234567-0123-0123-0123-0123456789AB' - handle_char = 0x1234 - uuid_desc = '2902' - handle_desc = 0x5678 - self.mock_device.stage_get_handle_packets([ - uuid_char, handle_char, - uuid_desc, handle_desc]) - handle = self.backend.get_handle(uuid_char) - # Test char_read - expected_value = [0xBE, 0xEF, 0x15, 0xF0, 0x0D] - self.mock_device.stage_char_read_packets( - handle, 0x00, expected_value) - value = self.backend._char_read(handle) - assert(value == bytearray(expected_value)) - - def test_char_write(self): - self.mock_device.stage_run_packets() - self.backend.start() - self._connect() - uuid_char = '01234567-0123-0123-0123-0123456789AB' - handle_char = 0x1234 - uuid_desc = '2902' - handle_desc = 0x5678 - self.mock_device.stage_get_handle_packets([ - uuid_char, handle_char, - uuid_desc, handle_desc]) - handle = self.backend.get_handle(uuid_char) - # Test char_write - value = [0xF0, 0x0F, 0x00] - self.mock_device.stage_char_write_packets(handle, value) - self.backend.char_write(handle, bytearray(value)) - - def test_encrypt(self): - self.mock_device.stage_run_packets() - self.backend.start() - self._connect() - # Test encrypt - self.mock_device.stage_encrypt_packets( - self.address, ['connected', 'encrypted']) - self.backend.encrypt() - - def test_bond(self): - self.mock_device.stage_run_packets() - self.backend.start() - self._connect() - self.mock_device.stage_bond_packets( - self.address, ['connected', 'encrypted', 'parameters_change']) - self.backend.bond() - - def test_get_rssi(self): - self.mock_device.stage_run_packets() - self.backend.start() self._connect() - # Test get_rssi - self.mock_device.stage_get_rssi_packets() - assert(self.backend.get_rssi() == -80) - def test_get_handle(self): - self.mock_device.stage_run_packets() - self.backend.start() - self._connect() - # Test get_handle - uuid_char = '01234567-0123-0123-0123-0123456789AB' - handle_char = 0x1234 - uuid_desc = '2902' - handle_desc = 0x5678 - self.mock_device.stage_get_handle_packets([ - uuid_char, handle_char, - uuid_desc, handle_desc]) - handle = self.backend.get_handle(uuid_char) - assert(handle == handle_char) - handle = self.backend.get_handle( - uuid_char, uuid_to_bytearray(uuid_desc)) - assert(handle == handle_desc) + def test_connect_already_connected(self): + device = self._connect() + another_device = self.backend.connect(self.address_string) + eq_(device, another_device) def test_scan_and_get_devices_discovered(self): - self.mock_device.stage_run_packets() - self.backend.start() # Test scan scan_responses = [] addr_0 = [0x01, 0x23, 0x45, 0x67, 0x89, 0xAB] @@ -153,64 +61,7 @@ def test_scan_and_get_devices_discovered(self): eq_('Hello!', devs[0]['name']) eq_(-80, devs[0]['rssi']) - def stage_subscribe_packets(self, uuid_char, handle_char, - indications=False, connection_handle=0x00): - # TODO this is a candidate to move to the BGAPIBackendSpy, but why does - # it need to call get_handle on the backend? otherwise it would just - # generate its own fake ouput for the serial device. - - # Stage get_handle packets - uuid_desc = '2902' - handle_desc = 0x5678 - self.mock_device.stage_get_handle_packets([ - uuid_char, handle_char, uuid_desc, handle_desc]) - handle = self.backend.get_handle( - uuid_char, uuid_to_bytearray(uuid_desc)) - # Stage char_write packets - if indications: - value = [0x02, 0x00] - else: - value = [0x01, 0x00] - self.mock_device.stage_char_write_packets( - handle, value, connection_handle=connection_handle) - - def test_subscribe_with_notify(self): - class NotificationHandler(object): - def __init__(self, expected_value_bytearray): - self.expected_value_bytearray = expected_value_bytearray - self.received_value_bytearray = None - self.called = threading.Event() - - def handle(self, handle, received_value_bytearray): - self.received_value_bytearray = received_value_bytearray - self.called.set() - - self.mock_device.stage_run_packets() - self.backend.start() - self._connect() - # Test subscribe with indications - packet_values = [bytearray([0xF0, 0x0D, 0xBE, 0xEF])] - my_handler = NotificationHandler(packet_values[0]) - handle = 0x1234 - uuid = '01234567-0123-0123-0123-0123456789AB' - self.stage_subscribe_packets(uuid, handle) - self.backend.subscribe(uuid, callback=my_handler.handle, - indication=True) - start_time = time.time() - self.mock_device.stage_indication_packets(handle, packet_values) - while not my_handler.called.is_set(): - elapsed_time = start_time - time.time() - if elapsed_time >= 5: - raise Exception("Callback wasn't called after {0} seconds." - .format(elapsed_time)) - print([b for b in my_handler.expected_value_bytearray]) - print([b for b in my_handler.received_value_bytearray]) - assert(my_handler.expected_value_bytearray == - my_handler.received_value_bytearray) - def test_clear_bonds(self): - self.mock_device.stage_run_packets() - self.backend.start() # Test delete stored bonds self.mock_device.stage_clear_bonds_packets( [0x00, 0x01, 0x02, 0x03, 0x04]) @@ -218,8 +69,6 @@ def test_clear_bonds(self): def test_clear_bonds_disconnect(self): """clear_bonds shouldn't abort if disconnected.""" - self.mock_device.stage_run_packets() - self.backend.start() # Test delete stored bonds self.mock_device.stage_clear_bonds_packets( [0x00, 0x01, 0x02, 0x03, 0x04], disconnects=True) @@ -245,3 +94,9 @@ def test_mac(self): def test_invalid(self): eq_(None, extract_vid_pid("2458:1")) + + +class ReturnCodeTests(unittest.TestCase): + + def test_unrecognized_return_code(self): + ok_(get_return_message(123123123123123) is not None) diff --git a/tests/bgapi/test_device.py b/tests/bgapi/test_device.py new file mode 100644 index 00000000..12270616 --- /dev/null +++ b/tests/bgapi/test_device.py @@ -0,0 +1,100 @@ +from __future__ import print_function + +from nose.tools import eq_ +import unittest +from uuid import UUID + +from pygatt.util import uuid16_to_uuid +from pygatt.backends import BGAPIBackend + +from .mocker import MockBGAPISerialDevice + + +class BGAPIDeviceTests(unittest.TestCase): + def setUp(self): + self.mock_device = MockBGAPISerialDevice() + self.backend = BGAPIBackend( + serial_port=self.mock_device.serial_port_name) + + self.address = [0x01, 0x23, 0x45, 0x67, 0x89, 0xAB] + self.address_string = ":".join("%02x" % b for b in self.address) + + self.mock_device.stage_run_packets() + self.backend.start() + + def tearDown(self): + self.mock_device.stop() + # TODO if we call stop without staging another disconnect packet, the + # bglib explodes because of a packet == None and you get a runaway + # process. what we can do about that? + self.mock_device.stage_disconnect_packets(True, False) + self.backend.stop() + + def _connect(self): + self.mock_device.stage_connect_packets( + self.address, ['connected', 'completed']) + return self.backend.connect(self.address_string) + + def test_disconnect_when_connected(self): + device = self._connect() + # test disconnect (connected, not fail) + self.mock_device.stage_disconnect_packets(True, False) + device.disconnect() + + def test_char_read(self): + device = self._connect() + uuid_char = '01234567-0123-0123-0123-0123456789AB' + handle_char = 0x1234 + uuid_desc = '2902' + handle_desc = 0x5678 + self.mock_device.stage_discover_characteristics_packets([ + uuid_char, handle_char, + uuid_desc, handle_desc]) + # Test char_read + expected_value = [0xBE, 0xEF, 0x15, 0xF0, 0x0D] + self.mock_device.stage_char_read_packets( + 0, 0x00, expected_value) + value = device.char_read(UUID(uuid_char)) + eq_(bytearray(expected_value), value) + + def test_char_write(self): + device = self._connect() + uuid_char = '01234567-0123-0123-0123-0123456789AB' + handle_char = 0x1234 + uuid_desc = '2902' + handle_desc = 0x5678 + self.mock_device.stage_discover_characteristics_packets([ + uuid_char, handle_char, + uuid_desc, handle_desc]) + # Test char_write + value = [0xF0, 0x0F, 0x00] + self.mock_device.stage_char_write_packets(0, value) + device.char_write(UUID(uuid_char), bytearray(value)) + + def test_bond(self): + device = self._connect() + self.mock_device.stage_bond_packets( + self.address, ['connected', 'encrypted', 'parameters_change']) + device.bond() + + def test_get_rssi(self): + device = self._connect() + # Test get_rssi + self.mock_device.stage_get_rssi_packets() + eq_(-80, device.get_rssi()) + + def test_discover_characteristics(self): + device = self._connect() + + uuid_char = UUID('01234567-0123-0123-0123-0123456789AB') + handle_char = 0x1234 + uuid_desc = '2902' + handle_desc = 0x5678 + + self.mock_device.stage_discover_characteristics_packets([ + str(uuid_char), handle_char, + uuid_desc, handle_desc]) + characteristics = device.discover_characteristics() + eq_(characteristics[uuid_char].handle, handle_char) + eq_(characteristics[uuid_char].descriptors[uuid16_to_uuid(0x2902)], + handle_desc) diff --git a/tests/gatttool/__init__.py b/tests/gatttool/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/gatttool/test_backend.py b/tests/gatttool/test_backend.py new file mode 100644 index 00000000..729b5009 --- /dev/null +++ b/tests/gatttool/test_backend.py @@ -0,0 +1,40 @@ +from __future__ import print_function + +from nose.tools import eq_, ok_ +from nose import SkipTest +from mock import patch +import unittest + +from pygatt.backends import GATTToolBackend + + +class GATTToolBackendTests(unittest.TestCase): + def setUp(self): + raise SkipTest() + self.patchers = [] + self.patchers.append( + patch('pygatt.backends.gatttool.gatttool.pexpect.spawn')) + self.spawn = self.patchers[0].start() + self.spawn.return_value.isalive.return_value = False + self.patchers.append( + patch('pygatt.backends.gatttool.gatttool.subprocess')) + self.patchers[1].start() + self.backend = GATTToolBackend() + self.mock_expect = patch.object(self.backend, '_expect').start() + self.backend.start() + + def tearDown(self): + self.backend.stop() + for patcher in self.patchers: + patcher.stop() + + def test_scan(self): + # TODO mock a successful scan + devices = self.backend.scan() + ok_(devices is not None) + eq_(0, len(devices)) + + def test_connect(self): + address = "11:22:33:44:55:66" + device = self.backend.connect(address) + ok_(device is not None) diff --git a/tests/gatttool/test_device.py b/tests/gatttool/test_device.py new file mode 100644 index 00000000..13474489 --- /dev/null +++ b/tests/gatttool/test_device.py @@ -0,0 +1,76 @@ +import unittest +import uuid +from mock import MagicMock, patch +from nose.tools import ok_, eq_, raises + +from pygatt.exceptions import NotConnectedError +from pygatt.backends import Characteristic +from pygatt.backends.gatttool.device import GATTToolBLEDevice + + +class GATTToolBLEDeviceTests(unittest.TestCase): + def setUp(self): + super(GATTToolBLEDeviceTests, self).setUp() + self.address = "11:22:33:44:55:66" + self.backend = MagicMock() + self.device = GATTToolBLEDevice(self.address, self.backend) + + self.expected_handle = 99 + self.char_uuid = uuid.uuid4() + self.backend.discover_characteristics.return_value = { + self.char_uuid: Characteristic(self.char_uuid, self.expected_handle) + } + + def test_bond(self): + self.device.bond() + ok_(self.backend.bond.called) + eq_(self.device, self.backend.bond.call_args[0][0]) + + def test_char_read(self): + expected_value = bytearray(range(4)) + self.backend.char_read.return_value = expected_value + with patch.object(self.backend, 'get_handle', return_value=24 + ) as get_handle: + char_uuid = uuid.uuid4() + value = self.device.char_read(char_uuid) + ok_(not get_handle.called) + ok_(self.backend.char_read.called) + eq_(self.device, self.backend.char_read.call_args[0][0]) + eq_(char_uuid, self.backend.char_read.call_args[0][1]) + eq_(expected_value, value) + + def test_char_write(self): + with patch.object(self.device, 'get_handle', return_value=24 + ) as get_handle: + char_uuid = uuid.uuid4() + value = bytearray(range(4)) + self.device.char_write(char_uuid, value) + ok_(get_handle.called) + eq_(char_uuid, get_handle.call_args[0][0]) + ok_(self.backend.char_write_handle.called) + eq_(self.device, self.backend.char_write_handle.call_args[0][0]) + eq_(24, self.backend.char_write_handle.call_args[0][1]) + eq_(value, self.backend.char_write_handle.call_args[0][2]) + + def test_disconnect(self): + self.device.disconnect() + ok_(self.backend.disconnect.called) + eq_(self.device, self.backend.disconnect.call_args[0][0]) + + @raises(NotConnectedError) + def test_write_after_disconnect(self): + self.device.disconnect() + self.device.char_read(uuid.uuid4()) + + def test_get_handle(self): + handle = self.device.get_handle(self.char_uuid) + ok_(self.backend.discover_characteristics.called) + eq_(self.device, self.backend.discover_characteristics.call_args[0][0]) + eq_(self.expected_handle, handle) + + def test_get_cached_handle(self): + handle = self.device.get_handle(self.char_uuid) + with patch.object(self.backend, 'discover_characteristics') as discover: + next_handle = self.device.get_handle(self.char_uuid) + eq_(handle, next_handle) + ok_(not discover.called) diff --git a/tests/test_device.py b/tests/test_device.py new file mode 100644 index 00000000..3faaf204 --- /dev/null +++ b/tests/test_device.py @@ -0,0 +1,60 @@ +import unittest +import uuid +from mock import MagicMock, patch +from nose.tools import ok_, eq_ + +from pygatt import BLEDevice +from pygatt.backends import Characteristic + + +class TestBLEDevice(BLEDevice): + CHAR_UUID = uuid.uuid4() + EXPECTED_HANDLE = 99 + + def discover_characteristics(self): + return { + self.CHAR_UUID: Characteristic(self.CHAR_UUID, self.EXPECTED_HANDLE) + } + + +class BLEDeviceTest(unittest.TestCase): + def setUp(self): + super(BLEDeviceTest, self).setUp() + self.address = "11:22:33:44:55:66" + self.backend = MagicMock() + self.device = TestBLEDevice(self.address) + + def _subscribe(self): + callback = MagicMock() + with patch.object(self.device, 'char_write_handle') as char_write: + self.device.subscribe(self.device.CHAR_UUID, callback=callback) + ok_(char_write.called) + eq_(self.device.EXPECTED_HANDLE + 1, char_write.call_args[0][0]) + eq_(bytearray([1, 0]), char_write.call_args[0][1]) + return callback + + def test_subscribe(self): + self._subscribe() + + def test_subscribe_another_callback(self): + self._subscribe() + another_callback = MagicMock() + with patch.object(self.device, 'char_write_handle') as char_write: + self.device.subscribe(self.device.CHAR_UUID, + callback=another_callback) + ok_(not char_write.called) + + def test_receive_notification(self): + callback = self._subscribe() + value = bytearray([24]) + self.device.receive_notification(TestBLEDevice.EXPECTED_HANDLE, value) + ok_(callback.called) + eq_(TestBLEDevice.EXPECTED_HANDLE, callback.call_args[0][0]) + eq_(value, callback.call_args[0][1]) + + def test_ignore_notification_for_another_handle(self): + callback = self._subscribe() + value = bytearray([24]) + self.device.receive_notification( + TestBLEDevice.EXPECTED_HANDLE + 1, value) + ok_(not callback.called)