Skip to content

Commit

Permalink
Merge pull request #211 from peplin/pr/202
Browse files Browse the repository at this point in the history
Add unit test for #202
  • Loading branch information
peplin authored Mar 30, 2019
2 parents 378871d + 32cd7a9 commit 56a9038
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
21 changes: 8 additions & 13 deletions pygatt/backends/gatttool/gatttool.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@

log = logging.getLogger(__name__)

if hasattr(bytes, 'fromhex'):
# Python 3.
def _hex_value_parser(x):
return bytearray.fromhex(x.decode('utf8'))
else:
# Python 2.7
def _hex_value_parser(x):
return bytearray.fromhex(x)

def _hex_value_parser(x):
return bytearray.fromhex(x)


def is_windows():
Expand Down Expand Up @@ -490,14 +485,14 @@ def _handle_notification_string(self, event):
log.warn("Blank message received in notification, ignored")
return

split_msg = msg.strip().split(None, 5)
if len(split_msg) < 6:
match_obj = re.match(r'Notification handle = (0x[0-9a-f]+) value:(.*)',
msg.decode('utf-8'))
if match_obj is None:
log.warn("Unable to parse notification string, ignoring: %s", msg)
return

hex_handle, _, hex_values = split_msg[3:]
handle = int(hex_handle, 16)
values = _hex_value_parser(hex_values)
handle = int(match_obj.group(1), 16)
values = _hex_value_parser(match_obj.group(2).strip())
if self._connected_device is not None:
self._connected_device.receive_notification(handle, values)

Expand Down
12 changes: 11 additions & 1 deletion tests/gatttool/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,22 @@ def test_multi_byte_notification(self):
eq_(bytearray([0x64, 0x46, 0x72]),
device.receive_notification.call_args[0][1])

def test_malformed_notification(self):
def test_empty_notification(self):
event = {
'after': "Notification handle = 0x0024 value: ".encode("utf8")
}
address = "11:22:33:44:55:66"
device = self.backend.connect(address)
device.receive_notification = MagicMock()
device._backend._handle_notification_string(event)
ok_(device.receive_notification.called)

def test_malformed_notification(self):
event = {
'after': "Notification handle = 0x0024vlue: ".encode("utf8")
}
address = "11:22:33:44:55:66"
device = self.backend.connect(address)
device.receive_notification = MagicMock()
device._backend._handle_notification_string(event)
ok_(not device.receive_notification.called)

0 comments on commit 56a9038

Please sign in to comment.