diff --git a/pygatt/backends/gatttool/gatttool.py b/pygatt/backends/gatttool/gatttool.py index 4d038fae..bc8b11dd 100644 --- a/pygatt/backends/gatttool/gatttool.py +++ b/pygatt/backends/gatttool/gatttool.py @@ -20,14 +20,9 @@ log = logging.getLogger(__name__) -if hasattr(bytes, 'fromhex'): - # Python 3. - def _hex_value_parser(x): - return bytearray.fromhex(x.decode('utf8')) -else: - # Python 2.7 - def _hex_value_parser(x): - return bytearray.fromhex(x) + +def _hex_value_parser(x): + return bytearray.fromhex(x) def is_windows(): @@ -490,14 +485,14 @@ def _handle_notification_string(self, event): log.warn("Blank message received in notification, ignored") return - split_msg = msg.strip().split(None, 5) - if len(split_msg) < 6: + match_obj = re.match(r'Notification handle = (0x[0-9a-f]+) value:(.*)', + msg.decode('utf-8')) + if match_obj is None: log.warn("Unable to parse notification string, ignoring: %s", msg) return - hex_handle, _, hex_values = split_msg[3:] - handle = int(hex_handle, 16) - values = _hex_value_parser(hex_values) + handle = int(match_obj.group(1), 16) + values = _hex_value_parser(match_obj.group(2).strip()) if self._connected_device is not None: self._connected_device.receive_notification(handle, values) diff --git a/tests/gatttool/test_backend.py b/tests/gatttool/test_backend.py index 00f63d53..56c5bcf1 100644 --- a/tests/gatttool/test_backend.py +++ b/tests/gatttool/test_backend.py @@ -75,7 +75,7 @@ def test_multi_byte_notification(self): eq_(bytearray([0x64, 0x46, 0x72]), device.receive_notification.call_args[0][1]) - def test_malformed_notification(self): + def test_empty_notification(self): event = { 'after': "Notification handle = 0x0024 value: ".encode("utf8") } @@ -83,4 +83,14 @@ def test_malformed_notification(self): device = self.backend.connect(address) device.receive_notification = MagicMock() device._backend._handle_notification_string(event) + ok_(device.receive_notification.called) + + def test_malformed_notification(self): + event = { + 'after': "Notification handle = 0x0024vlue: ".encode("utf8") + } + address = "11:22:33:44:55:66" + device = self.backend.connect(address) + device.receive_notification = MagicMock() + device._backend._handle_notification_string(event) ok_(not device.receive_notification.called)