Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi-button presses on Philips remotes #3592

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 78 additions & 52 deletions tests/test_philips.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,10 @@ def reset(self):

self._click_counter = 0
self._callback = None
self._button = None

def press(self, callback, button):
def press(self, callback):
"""Process a button press."""
if button != self._button:
self._click_counter = 1
else:
self._click_counter += 1
self._button = button
self._click_counter += 1
self._callback = callback


Expand Down Expand Up @@ -374,11 +369,14 @@ def test_PhilipsRemoteCluster_short_press(
cluster = device.endpoints[ep].philips_remote_cluster
listener = mock.MagicMock()
cluster.add_listener(listener)
cluster.button_press_queue = ManuallyFiredButtonPressQueue()
cluster.button_press_queue = {
k: ManuallyFiredButtonPressQueue() for k in cluster.BUTTONS
}

cluster.handle_cluster_request(ZCLHeader(), [1, 0, 0, 0, 0])
cluster.handle_cluster_request(ZCLHeader(), [1, 0, 2, 0, 0])
cluster.button_press_queue.fire()
for q in cluster.button_press_queue.values():
q.fire()

assert listener.zha_send_event.call_count == 2

Expand Down Expand Up @@ -462,14 +460,17 @@ def test_PhilipsRemoteCluster_multi_press(
cluster = device.endpoints[ep].philips_remote_cluster
listener = mock.MagicMock()
cluster.add_listener(listener)
cluster.button_press_queue = ManuallyFiredButtonPressQueue()
cluster.button_press_queue = {
k: ManuallyFiredButtonPressQueue() for k in cluster.BUTTONS
}

for _ in range(0, count):
# btn1 short press
cluster.handle_cluster_request(ZCLHeader(), [1, 0, 0, 0, 0])
# btn1 short release
cluster.handle_cluster_request(ZCLHeader(), [1, 0, 2, 0, 0])
cluster.button_press_queue.fire()
for q in cluster.button_press_queue.values():
q.fire()

assert listener.zha_send_event.call_count == 1
args_button_id = count + 2
Expand Down Expand Up @@ -620,30 +621,21 @@ def test_PhilipsRemoteCluster_long_press(


@pytest.mark.parametrize(
"button_presses, result_count",
"button_presses",
(
(
[1],
1,
),
(
[1, 1],
2,
),
(
[1, 1, 3, 3, 3, 2, 2, 2, 2],
4,
),
(1),
(2),
(4),
),
)
def test_ButtonPressQueue_presses_without_pause(button_presses, result_count):
def test_ButtonPressQueue_presses_without_pause(button_presses):
"""Test ButtonPressQueue presses without pause in between presses."""

q = ButtonPressQueue()
q._ms_threshold = 50
cb = mock.MagicMock()
for btn in button_presses:
q.press(cb, btn)
for _ in range(button_presses):
q.press(cb)

# await cluster.button_press_queue._task
# Instead of awaiting the job, significantly extending the time
Expand All @@ -653,48 +645,32 @@ def test_ButtonPressQueue_presses_without_pause(button_presses, result_count):
q._task.cancel()
q._ms_last_click = 0
q._callback(q._click_counter)
cb.assert_called_once_with(result_count)
cb.assert_called_once_with(button_presses)


@pytest.mark.parametrize(
"press_sequence, results",
"press_sequence",
(
(
# switch buttons within a sequence,
# new sequence start with different button
(
[1, 1, 3, 3],
[2, 2, 2],
),
(2, 3),
),
(
# no button switch within a sequence,
# new sequence with same button
(
[1, 1, 1],
[1],
),
(3, 1),
),
((2, 3)),
((3, 1)),
),
)
async def test_ButtonPressQueue_presses_with_pause(press_sequence, results):
async def test_ButtonPressQueue_presses_with_pause(press_sequence):
"""Test ButtonPressQueue with pauses in between button press sequences."""

q = ButtonPressQueue()
q._ms_threshold = 50
cb = mock.MagicMock()

for seq in press_sequence:
for btn in seq:
q.press(cb, btn)
for _ in range(seq):
q.press(cb)
await q._task

assert cb.call_count == len(results)
assert cb.call_count == len(press_sequence)

calls = []
for res in results:
for res in press_sequence:
calls.append(mock.call(res))

cb.assert_has_calls(calls)
Expand Down Expand Up @@ -768,3 +744,53 @@ def test_contact_sensor(zigpy_device_from_v2_quirk):
# update again with the same value and except no new update
hue_cluster.update_attribute(hue_cluster.AttributeDefs.contact.id, 1)
assert len(on_off_listener.attribute_updates) == 2


@pytest.mark.parametrize(
"dev, ep, button_events, expected_actions",
(
(
PhilipsWallSwitch,
1,
(
[
b"\x1d\x0b\x106\x00\x01\x00\x000\x00!\x00\x00",
b"\x1d\x0b\x107\x00\x01\x00\x000\x02!\x01\x00",
],
[
b"\x1d\x0b\x108\x00\x02\x00\x000\x00!\x00\x00",
b"\x1d\x0b\x109\x00\x02\x00\x000\x02!\x01\x00",
],
),
["left_press", "left_short_release", "right_press", "right_short_release"],
),
),
)
def test_PhilipsRemoteCluster_multi_button_press(
zigpy_device_from_quirk, dev, ep, button_events, expected_actions
):
"""Test PhilipsRemoteCluster short button press logic."""

device = zigpy_device_from_quirk(dev)

remote_cluster = device.endpoints[ep].philips_remote_cluster
remote_cluster.button_press_queue = {
k: ManuallyFiredButtonPressQueue() for k in remote_cluster.BUTTONS
}
remote_listener = mock.MagicMock()
remote_cluster.add_listener(remote_listener)

expected_event_count = 0
for button in button_events:
for eventData in button:
hdr, args = remote_cluster.deserialize(eventData)
remote_cluster.handle_message(hdr, args)
expected_event_count += 1

for q in remote_cluster.button_press_queue.values():
q.fire()

assert remote_listener.zha_send_event.call_count == expected_event_count

for i, expected_action in enumerate(expected_actions):
assert remote_listener.zha_send_event.call_args_list[i][0][0] == expected_action
46 changes: 29 additions & 17 deletions zhaquirks/philips/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,18 @@ def __init__(self):
self._ms_threshold = 300
self._ms_last_click = 0
self._click_counter = 1
self._button = None
self._callback = lambda x: None
self._task = None

async def _job(self):
await asyncio.sleep(self._ms_threshold / 1000)
self._callback(self._click_counter)

def _reset(self, button):
if self._task:
self._task.cancel()
self._click_counter = 1
self._button = button

def press(self, callback, button):
def press(self, callback):
"""Process a button press."""
self._callback = callback
now_ms = time.time() * 1000
if self._button != button:
self._reset(button)
elif now_ms - self._ms_last_click > self._ms_threshold:
if now_ms - self._ms_last_click > self._ms_threshold:
self._click_counter = 1
else:
self._task.cancel()
Expand Down Expand Up @@ -188,7 +179,12 @@ class PhilipsRemoteCluster(CustomCluster):
PressType(SHORT_RELEASE, COMMAND_M_SHORT_RELEASE),
]

button_press_queue = ButtonPressQueue()
def __init__(self, endpoint, is_server=True):
"""Initialize button press queue for each button."""

super().__init__(endpoint, is_server)

self.button_press_queue = {k: ButtonPressQueue() for k in self.BUTTONS}

def handle_cluster_request(
self,
Expand All @@ -209,15 +205,20 @@ def handle_cluster_request(
)

button = self.BUTTONS.get(args[0])
# Bail on unknown buttons. (This gets rid of dial button "presses")
if button is None:
_LOGGER.debug(
"%s - handle_cluster_request unknown button id [%s]",
self.__class__.__name__,
args[0],
)
return
_LOGGER.debug(
"%s - handle_cluster_request button id: [%s], button name: [%s]",
self.__class__.__name__,
args[0],
button,
)
# Bail on unknown buttons. (This gets rid of dial button "presses")
if button is None:
return

press_type = self.PRESS_TYPES.get(args[2])
if (
Expand All @@ -227,6 +228,11 @@ def handle_cluster_request(
):
press_type = self.SIMULATE_SHORT_EVENTS[1]
if press_type is None:
_LOGGER.debug(
"%s - handle_cluster_request unknown button press type: [%s]",
self.__class__.__name__,
press_type,
)
return

duration = args[4]
Expand Down Expand Up @@ -288,15 +294,21 @@ def send_press_event(click_count):
sim_event_args[ARGS][2] = 2
action = f"{button.action}_{press_type.action}"
_LOGGER.debug(
"%s - send_press_event emitting simulated action: [%s]",
"%s - send_press_event emitting simulated action: [%s] event_args: %s",
self.__class__.__name__,
action,
sim_event_args,
)
self.listener_event(ZHA_SEND_EVENT, action, sim_event_args)

# Derive Multiple Presses
if press_type.name == SHORT_RELEASE:
self.button_press_queue.press(send_press_event, button.id)
_LOGGER.debug(
"%s - handle_cluster_request handling short release. Push to button press queue for button %s",
self.__class__.__name__,
args[0],
)
self.button_press_queue[args[0]].press(send_press_event)
else:
action = f"{button.action}_{press_type.action}"
self.listener_event(ZHA_SEND_EVENT, action, event_args)
Expand Down
16 changes: 14 additions & 2 deletions zhaquirks/philips/wall_switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@
)


class SwitchMode(t.enum8):
"""Wall switch modes. See https://zigbee.blakadder.com/Philips_RDM001.html."""

SingleRocker = 0x00
SinglePush = 0x01
DoubleRocker = 0x02
DoublePush = 0x03


class PhilipsWallSwitchBasicCluster(PhilipsBasicCluster):
"""Philips wall switch Basic cluster."""

Expand All @@ -49,11 +58,14 @@ class AttributeDefs(PhilipsBasicCluster.AttributeDefs):

mode: Final = ZCLAttributeDef(
id=0x0034,
type=t.enum8,
type=SwitchMode,
is_manufacturer_specific=True,
)

attr_config = {**PhilipsBasicCluster.attr_config, AttributeDefs.mode.id: 0x02}
attr_config = {
**PhilipsBasicCluster.attr_config,
AttributeDefs.mode.id: SwitchMode.DoublePush,
}


class PhilipsWallSwitchRemoteCluster(PhilipsRemoteCluster):
Expand Down
Loading