diff --git a/README.md b/README.md index 30b8e8b..3bf6ccf 100644 --- a/README.md +++ b/README.md @@ -66,11 +66,10 @@ BlueZ also provides an interactive commandline tool to interact with Bluetooth d ### Installing GATT SDK for Python -To install this GATT module and the Python3 D-Bus dependency globally, run: +To install this GATT module globally, run: ``` sudo pip3 install gatt -sudo apt-get install python3-dbus ``` #### Running the GATT control script diff --git a/gatt/gatt_linux.py b/gatt/gatt_linux.py index df91d51..da560dc 100644 --- a/gatt/gatt_linux.py +++ b/gatt/gatt_linux.py @@ -1,22 +1,10 @@ -try: - import dbus - import dbus.mainloop.glib -except ImportError: - import sys - print("Module 'dbus' not found") - print("Please run: sudo apt-get install python3-dbus") - print("See also: https://github.com/getsenic/gatt-python#installing-gatt-sdk-for-python") - sys.exit(1) - import re -from gi.repository import GObject +from gi.repository import GObject, Gio, GLib from . import errors - -dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) -dbus.mainloop.glib.threads_init() +GObject.threads_init() class DeviceManager: @@ -30,15 +18,26 @@ def __init__(self, adapter_name): self.listener = None self.adapter_name = adapter_name - self._bus = dbus.SystemBus() - try: - adapter_object = self._bus.get_object('org.bluez', '/org/bluez/' + adapter_name) - except dbus.exceptions.DBusException as e: - raise _error_from_dbus_error(e) - object_manager_object = self._bus.get_object("org.bluez", "/") - self._adapter = dbus.Interface(adapter_object, 'org.bluez.Adapter1') - self._adapter_properties = dbus.Interface(self._adapter, 'org.freedesktop.DBus.Properties') - self._object_manager = dbus.Interface(object_manager_object, "org.freedesktop.DBus.ObjectManager") + self._bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) + object_manager = Gio.DBusObjectManagerClient.new_sync( + self._bus, Gio.DBusObjectManagerClientFlags.NONE, + name="org.bluez", object_path="/") + if object_manager.get_object('/org/bluez/' + adapter_name) is None: + raise errors.Failed('No object at path "/org/bluez/' + adapter_name + '"') + + self._adapter = Gio.DBusProxy.new_sync(self._bus, Gio.DBusProxyFlags.NONE, None, + 'org.bluez', + '/org/bluez/' + adapter_name, + 'org.bluez.Adapter1', None) + + self._adapter_properties = Gio.DBusProxy.new_sync(self._bus, Gio.DBusProxyFlags.NONE, None, + 'org.bluez', + '/org/bluez/' + adapter_name, + 'org.freedesktop.DBus.Properties', None) + self._object_manager = Gio.DBusProxy.new_sync(self._bus, Gio.DBusProxyFlags.NONE, None, + 'org.bluez', + '/', + 'org.freedesktop.DBus.ObjectManager', None) self._device_path_regex = re.compile('^/org/bluez/' + adapter_name + '/dev((_[A-Z0-9]{2}){6})$') self._devices = {} self._discovered_devices = {} @@ -50,11 +49,11 @@ def __init__(self, adapter_name): @property def is_adapter_powered(self): - return self._adapter_properties.Get('org.bluez.Adapter1', 'Powered') == 1 + return self._adapter_properties.Get('(ss)', 'org.bluez.Adapter1', 'Powered') == 1 @is_adapter_powered.setter def is_adapter_powered(self, powered): - return self._adapter_properties.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(powered)) + return self._adapter_properties.Set('(ssv)', 'org.bluez.Adapter1', 'Powered', GLib.Variant('b', powered)) def run(self): """ @@ -66,31 +65,36 @@ def run(self): if self._main_loop: return - self._interface_added_signal = self._bus.add_signal_receiver( - self._interfaces_added, - dbus_interface='org.freedesktop.DBus.ObjectManager', - signal_name='InterfacesAdded') + self._interface_added_signal = self._object_manager.connect( + 'g-signal', + lambda sender, connection_name, signal, parameters: + self._interfaces_added(parameters.unpack()[0], parameters.unpack()[1]) if signal == 'InterfacesAdded' else None) # TODO: Also listen to 'interfaces removed' events? - self._properties_changed_signal = self._bus.add_signal_receiver( - self._properties_changed, - dbus_interface=dbus.PROPERTIES_IFACE, - signal_name='PropertiesChanged', - arg0='org.bluez.Device1', - path_keyword='path') + # TODO: Does this signal even make sense given that every device + # connects its own PropertiesChanged and the handler only calls + # _device_discovered? + self._properties_changed_signal = self._bus.signal_subscribe( + None, + 'org.freedesktop.DBus.Properties', + 'PropertiesChanged', + None, + 'org.bluez.Device1', + Gio.DBusSignalFlags.NONE, + lambda conn, sender, path, interface, signal, parameters: self._properties_changed(*parameters.unpack(), path) if signal == "PropertiesChanged" else None) def disconnect_signals(): for device in self._devices.values(): device.invalidate() - self._properties_changed_signal.remove() - self._interface_added_signal.remove() + self._bus.signal_unsubscribe(self._properties_changed_signal) + self._object_manager.disconnect(self._interface_added_signal) self._main_loop = GObject.MainLoop() try: self._main_loop.run() disconnect_signals() - except Exception: + except: disconnect_signals() raise @@ -129,19 +133,19 @@ def start_discovery(self, service_uuids=[]): :param service_uuids: Filters the search to only return devices with given UUIDs. """ - discovery_filter = {'Transport': 'le'} + discovery_filter = {'Transport': GLib.Variant('s', 'le')} if service_uuids: # D-Bus doesn't like empty lists, it needs to guess the type discovery_filter['UUIDs'] = service_uuids try: - self._adapter.SetDiscoveryFilter(discovery_filter) + self._adapter.SetDiscoveryFilter('(a{sv})', discovery_filter) self._adapter.StartDiscovery() - except dbus.exceptions.DBusException as e: - if e.get_dbus_name() == 'org.bluez.Error.NotReady': + except GLib.GError as e: + if Gio.DBusError.get_remote_error(e) == 'org.bluez.Error.NotReady': raise errors.NotReady( "Bluetooth adapter not ready. " "Set `is_adapter_powered` to `True` or run 'echo \"power on\" | sudo bluetoothctl'.") - if e.get_dbus_name() == 'org.bluez.Error.InProgress': + if Gio.DBusError.get_remote_error(e) == 'org.bluez.Error.InProgress': # Discovery was already started - ignore exception pass else: @@ -153,8 +157,8 @@ def stop_discovery(self): """ try: self._adapter.StopDiscovery() - except dbus.exceptions.DBusException as e: - if (e.get_dbus_name() == 'org.bluez.Error.Failed') and (e.get_dbus_message() == 'No discovery started'): + except GLib.GError as e: + if (Gio.DBusError.get_remote_error(e) == 'org.bluez.Error.Failed') and (e.message.endswith('No discovery started')): pass else: raise _error_from_dbus_error(e) @@ -234,9 +238,14 @@ def __init__(self, mac_address, manager, managed=True): # TODO: Device needs to be created if it's not yet known to bluetoothd, see "test-device" in bluez-5.43/test/ self._device_path = '/org/bluez/%s/dev_%s' % (manager.adapter_name, mac_address.replace(':', '_').upper()) - device_object = self._bus.get_object('org.bluez', self._device_path) - self._object = dbus.Interface(device_object, 'org.bluez.Device1') - self._properties = dbus.Interface(self._object, 'org.freedesktop.DBus.Properties') + self._object = Gio.DBusProxy.new_sync(self._bus, Gio.DBusProxyFlags.NONE, None, + 'org.bluez', + self._device_path, + 'org.bluez.Device1', None) + self._properties = Gio.DBusProxy.new_sync(self._bus, Gio.DBusProxyFlags.NONE, None, + 'org.bluez', + self._device_path, + 'org.freedesktop.DBus.Properties', None) self._properties_signal = None self._connect_retry_attempt = None @@ -275,17 +284,17 @@ def _connect(self): if not self.services and self.is_services_resolved(): self.services_resolved() - except dbus.exceptions.DBusException as e: - if (e.get_dbus_name() == 'org.freedesktop.DBus.Error.UnknownObject'): + except GLib.GError as e: + if (Gio.DBusError.get_remote_error(e) == 'org.freedesktop.DBus.Error.UnknownObject'): self.connect_failed(errors.Failed("Device does not exist, check adapter name and MAC address.")) - elif ((e.get_dbus_name() == 'org.bluez.Error.Failed') and - (e.get_dbus_message() == "Operation already in progress")): + elif ((Gio.DBusError.get_remote_error(e) == 'org.bluez.Error.Failed') and + (e.message.endswith("Operation already in progress"))): pass elif ((self._connect_retry_attempt < 5) and - (e.get_dbus_name() == 'org.bluez.Error.Failed') and - (e.get_dbus_message() == "Software caused connection abort")): + (Gio.DBusError.get_remote_error(e) == 'org.bluez.Error.Failed') and + (e.message.endswith("Software caused connection abort"))): self._connect() - elif (e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply'): + elif (Gio.DBusError.get_remote_error(e) == 'org.freedesktop.DBus.Error.NoReply'): # TODO: How to handle properly? # Reproducable when we repeatedly shut off Nuimo immediately after its flashing Bluetooth icon appears self.connect_failed(_error_from_dbus_error(e)) @@ -294,7 +303,11 @@ def _connect(self): def _connect_signals(self): if self._properties_signal is None: - self._properties_signal = self._properties.connect_to_signal('PropertiesChanged', self.properties_changed) + self._properties_signal = self._properties.connect( + 'g-signal', + lambda sender, connection_name, signal, parameters: + self.properties_changed(*parameters.unpack()) if signal == 'PropertiesChanged' else None) + self._connect_service_signals() def _connect_service_signals(self): @@ -329,7 +342,7 @@ def disconnect_succeeded(self): def _disconnect_signals(self): if self._properties_signal is not None: - self._properties_signal.remove() + self._properties.disconnect(self._properties_signal) self._properties_signal = None self._disconnect_service_signals() @@ -341,22 +354,22 @@ def is_connected(self): """ Returns `True` if the device is connected, otherwise `False`. """ - return self._properties.Get('org.bluez.Device1', 'Connected') == 1 + return self._properties.Get('(ss)', 'org.bluez.Device1', 'Connected') == 1 def is_services_resolved(self): """ Returns `True` is services are discovered, otherwise `False`. """ - return self._properties.Get('org.bluez.Device1', 'ServicesResolved') == 1 + return self._properties.Get('(ss)', 'org.bluez.Device1', 'ServicesResolved') == 1 def alias(self): """ Returns the device's alias (name). """ try: - return self._properties.Get('org.bluez.Device1', 'Alias') - except dbus.exceptions.DBusException as e: - if e.get_dbus_name() == 'org.freedesktop.DBus.Error.UnknownObject': + return self._properties.Get('(ss)', 'org.bluez.Device1', 'Alias') + except GLib.GError as e: + if Gio.DBusError.get_remote_error(e) == 'org.freedesktop.DBus.Error.UnknownObject': # BlueZ sometimes doesn't provide an alias, we then simply return `None`. # Might occur when device was deleted as the following issue points out: # https://github.com/blueman-project/blueman/issues/460 @@ -450,7 +463,6 @@ def __init__(self, device, path, uuid): self._path = path self._bus = device._bus self._object_manager = device._object_manager - self._object = self._bus.get_object('org.bluez', self._path) self.characteristics = [] self.characteristics_resolved() @@ -498,17 +510,27 @@ def __init__(self, service, path, uuid): self._path = path self._bus = service._bus self._object_manager = service._object_manager - self._object = self._bus.get_object('org.bluez', self._path) - self._properties = dbus.Interface(self._object, "org.freedesktop.DBus.Properties") + self._properties = Gio.DBusProxy.new_sync(self._bus, Gio.DBusProxyFlags.NONE, None, + 'org.bluez', + self._path, + 'org.freedesktop.DBus.Properties', None) + self._characteristic = Gio.DBusProxy.new_sync(self._bus, Gio.DBusProxyFlags.NONE, None, + 'org.bluez', + self._path, + 'org.bluez.GattCharacteristic1', None) self._properties_signal = None def _connect_signals(self): if self._properties_signal is None: - self._properties_signal = self._properties.connect_to_signal('PropertiesChanged', self.properties_changed) + self._properties_signal = self._properties.connect( + 'g-signal', + lambda sender, connection_name, signal, parameters: + self.properties_changed(*parameters.unpack()) if signal == 'PropertiesChanged' else None) + def _disconnect_signals(self): if self._properties_signal is not None: - self._properties_signal.remove() + self._properties.disconnect(self._properties_signal) self._properties_signal = None def properties_changed(self, properties, changed_properties, invalidated_properties): @@ -527,10 +549,9 @@ def read_value(self, offset=0): otherwise `characteristic_read_value_failed()` is invoked. """ try: - return self._object.ReadValue( - {'offset': dbus.UInt16(offset, variant_level=1)}, - dbus_interface='org.bluez.GattCharacteristic1') - except dbus.exceptions.DBusException as e: + return self._characteristic.ReadValue('(a{sv})', + {'offset': GLib.Variant("q", offset)}) + except GLib.GError as e: error = _error_from_dbus_error(e) self.service.device.characteristic_read_value_failed(self, error=error) @@ -543,29 +564,27 @@ def write_value(self, value, offset=0): :param value: array of bytes to be written :param offset: offset from where to start writing the bytes (defaults to 0) """ - bytes = [dbus.Byte(b) for b in value] - + bytes = value try: - self._object.WriteValue( + self._characteristic.WriteValue('(aya{sv})', bytes, - {'offset': dbus.UInt16(offset, variant_level=1)}, - reply_handler=self._write_value_succeeded, - error_handler=self._write_value_failed, - dbus_interface='org.bluez.GattCharacteristic1') - except dbus.exceptions.DBusException as e: - self._write_value_failed(self, error=e) + {'offset': GLib.Variant("i", offset)}, + result_handler=self._write_value_succeeded, + error_handler=self._write_value_failed) + except GLib.GError as e: + self._write_value_failed(self, gerror=e) - def _write_value_succeeded(self): + def _write_value_succeeded(self, proxy, result, user_data): """ Called when the write request has succeeded. """ self.service.device.characteristic_write_value_succeeded(characteristic=self) - def _write_value_failed(self, dbus_error): + def _write_value_failed(self, proxy, gerror, user_data): """ Called when the write request has failed. """ - error = _error_from_dbus_error(dbus_error) + error = _error_from_dbus_error(gerror) self.service.device.characteristic_write_value_failed(characteristic=self, error=error) def enable_notifications(self, enabled=True): @@ -580,44 +599,44 @@ def enable_notifications(self, enabled=True): """ try: if enabled: - self._object.StartNotify( - reply_handler=self._enable_notifications_succeeded, + self._characteristic.StartNotify( + result_handler=self._enable_notifications_succeeded, error_handler=self._enable_notifications_failed, dbus_interface='org.bluez.GattCharacteristic1') else: - self._object.StopNotify( - reply_handler=self._enable_notifications_succeeded, + self._characteristic.StopNotify( + result_handler=self._enable_notifications_succeeded, error_handler=self._enable_notifications_failed, dbus_interface='org.bluez.GattCharacteristic1') - except dbus.exceptions.DBusException as e: - self._enable_notifications_failed(error=e) + except GLib.GError as e: + self._enable_notifications_failed(gerror=e) - def _enable_notifications_succeeded(self): + def _enable_notifications_succeeded(self, proxy, result, user_data): """ Called when notification enabling has succeeded. """ self.service.device.characteristic_enable_notifications_succeeded(characteristic=self) - def _enable_notifications_failed(self, dbus_error): + def _enable_notifications_failed(self, proxy, gerror, user_data): """ Called when notification enabling has failed. """ - if ((dbus_error.get_dbus_name() == 'org.bluez.Error.Failed') and - ((dbus_error.get_dbus_message() == "Already notifying") or - (dbus_error.get_dbus_message() == "No notify session started"))): + if ((gerror.code == Gio.DBusError.FAILED) and + (("Already notifying" in gerror.message) or + ("No notify session started" in gerror.message))): # Ignore cases where notifications where already enabled or already disabled return - error = _error_from_dbus_error(dbus_error) + error = _error_from_dbus_error(gerror) self.service.device.characteristic_enable_notifications_failed(characteristic=self, error=error) def _error_from_dbus_error(e): return { - 'org.bluez.Error.Failed': errors.Failed(e.get_dbus_message()), - 'org.bluez.Error.InProgress': errors.InProgress(e.get_dbus_message()), - 'org.bluez.Error.InvalidValueLength': errors.InvalidValueLength(e.get_dbus_message()), - 'org.bluez.Error.NotAuthorized': errors.NotAuthorized(e.get_dbus_message()), - 'org.bluez.Error.NotPermitted': errors.NotPermitted(e.get_dbus_message()), - 'org.bluez.Error.NotSupported': errors.NotSupported(e.get_dbus_message()), + 'org.bluez.Error.Failed': errors.Failed(e.message), + 'org.bluez.Error.InProgress': errors.InProgress(e.message), + 'org.bluez.Error.InvalidValueLength': errors.InvalidValueLength(e.message), + 'org.bluez.Error.NotAuthorized': errors.NotAuthorized(e.message), + 'org.bluez.Error.NotPermitted': errors.NotPermitted(e.message), + 'org.bluez.Error.NotSupported': errors.NotSupported(e.message), 'org.freedesktop.DBus.Error.AccessDenied': errors.AccessDenied("Root permissions required") - }.get(e.get_dbus_name(), errors.Failed(e.get_dbus_message())) + }.get(Gio.DBusError.get_remote_error(e), errors.Failed(e.message))