forked from rtyle/lc7001
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cli.py
224 lines (191 loc) · 7.63 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""Command Line Interpreter to interact with LC7001 HOSTs
Run with --help to see command line usage.
This code serves as a demonstration of expected lc7001.aio usage:
There will be some type(s) of Hub(s) to add behavior beyond that of a
Connector (which is a message Authenticator, Emitter, Receiver and Sender).
Here, we use Hub which does nothing more than be an Authenticator
and then an Emitter for each connection/session.
With DEBUG turned on, the messages passed in (>) to us
and out (<) from us are logged.
This is a great way to demonstrate the LC7001 behavior.
There will be some type(s) of Interpreter(s) that will need to interact
with LC7001 HOSTs.
Here, our _Interpreter takes lines from STDIN, composes messages from them
and sends them to (through the current session with)
the currently targeted HOST.
The STDIN commands are:
-- a blank line sends a REPORT_SYSTEM_PROPERTIES message
a # -- send a SET_SYSTEM_PROPERTIES message with ADD_A_LIGHT True|False
d # -- send a DELETE_ZONE message for ZID #
h -- target the next HOST in rotation (start with first HOST)
q -- quit
s -- send a LIST_SCENES message
s * -- send a LIST_SCENES then a REPORT_SCENE_PROPERTIES for each
s SID -- send a REPORT_SCENE_PROPERTIES message for SID (0-99)
z -- send a LIST_ZONES message
z * -- send a LIST_ZONES then a REPORT_ZONE_PROPERTIES for each
z ZID -- send a REPORT_ZONE_PROPERTIES message for ZID (0-99)
z ZID # -- send a SET_ZONE_PROPERTIES message with POWER as False|True
z ZID #% -- send a SET_ZONE_PROPERTIES message with POWER_LEVEL as #
z ZID #/ -- send a SET_ZONE_PROPERTIES message with RAMP_RATE as #
"""
import argparse
import asyncio
import logging
import os
import re
import sys
from typing import Final, Sequence
import lc7001.aio
_module = sys.modules[__name__]
_logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
class _Interpreter: # pylint: disable=too-few-public-methods
@staticmethod
async def _stdio():
reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(reader)
loop = asyncio.get_event_loop()
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, os.fdopen(1, "wb")
)
writer = asyncio.StreamWriter(
writer_transport, writer_protocol, None, loop
)
await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin)
return reader, writer
async def _command_scene(self, hub, token):
try:
sid = next(token)
except StopIteration:
await hub.send(hub.compose_list_scenes())
else:
if sid == "*":
async def handle(message):
lc7001.aio.Receiver.StatusError(message).raise_if()
for item in message[hub.SCENE_LIST]:
await hub.send(
hub.compose_report_scene_properties(item[hub.SID])
)
await hub.handle_send(handle, hub.compose_list_scenes())
else:
await hub.send(hub.compose_report_scene_properties(int(sid)))
async def _command_zone(self, hub, token):
try:
zid = next(token)
except StopIteration:
await hub.send(hub.compose_list_zones())
else:
if zid == "*":
async def handle(message):
lc7001.aio.Receiver.StatusError(message).raise_if()
for item in message[hub.ZONE_LIST]:
await hub.send(
hub.compose_report_zone_properties(item[hub.ZID])
)
await hub.handle_send(handle, hub.compose_list_zones())
else:
kwargs = {}
for value in token:
try:
value = int(value)
except ValueError:
match = re.search("^(\\d+)([%/])$", value)
if match:
if match.group(2) == "%":
kwargs["power_level"] = int(match.group(1))
else:
kwargs["ramp_rate"] = int(match.group(1))
else:
kwargs["power"] = bool(value)
if len(kwargs) == 0:
await hub.send(
hub.compose_report_zone_properties(int(zid))
)
else:
await hub.send(
hub.compose_set_zone_properties(int(zid), **kwargs)
)
async def _command(self, hub, line: bytes):
token = iter(line.decode().strip().split())
try:
command = next(token)
except StopIteration:
await hub.send(hub.compose_report_system_properties())
else:
if command.startswith("a"):
try:
enable = bool(next(token))
except StopIteration:
enable = True
await hub.send(
hub.compose_set_system_properties(add_a_light=enable)
)
if command.startswith("d"):
try:
zid = int(next(token))
except StopIteration:
pass
else:
await hub.send(hub.compose_delete_zone(zid))
elif command.startswith("h"):
self._host += 1
self._host %= len(self._hosts)
_logger.info("host %s", self._hosts[self._host])
elif command.startswith("s"):
await self._command_scene(hub, token)
elif command.startswith("z"):
await self._command_zone(hub, token)
async def main(self):
"""Interpret commands from STDIN to hub of selected host."""
reader, _ = await self._stdio()
while True:
line = await reader.readline()
if len(line) == 0 or line.startswith(b"q"):
for hub in self._hubs:
await hub.cancel()
raise asyncio.CancelledError
await self._command(self._hubs[self._host], line)
def __init__(self, hosts: Sequence[str], hubs: Sequence):
self._hubs = hubs
self._hosts = hosts
self._host = 0
class _Main: # pylint: disable=too-few-public-methods
async def _main(self):
hubs = [lc7001.aio.Hub(host, key=self._key) for host in self._hosts]
interpreter = _Interpreter(self._hosts, hubs)
await asyncio.gather(interpreter.main(), *(hub.loop() for hub in hubs))
def __init__(self, key, *hosts):
self._key = key
self._hosts = hosts
try:
asyncio.run(self._main())
except asyncio.CancelledError:
pass
except KeyboardInterrupt:
pass
parser = argparse.ArgumentParser(
description="Command Line Interpreter to interact with LC7001 HOSTs"
)
HOSTS: Final = [lc7001.aio.Connector.HOST]
parser.add_argument(
"--password",
metavar="PASSWORD",
type=str,
help="""password for each HOST
(minimum 8 characters for Legrand Lighting Control App compatibility)""",
)
parser.add_argument(
"hosts",
metavar="HOST",
type=str,
nargs="*",
default=HOSTS,
help=f"resolves to LC7001 IP address (default {HOSTS[0]})",
)
args = parser.parse_args()
if args.password is None:
KEY = None
else:
KEY = lc7001.aio.hash_password(args.password.encode())
_Main(KEY, *args.hosts)