forked from Energenie/pyenergenie
-
Notifications
You must be signed in to change notification settings - Fork 0
/
switch.py
121 lines (91 loc) · 4.29 KB
/
switch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# switch.py 17/03/2016 D.J.Whale
#
# Control Energenie switches.
# Note, at the moment, this only works with MiHome Adaptor Plus devices
# because the 'sensorid' is discovered via the monitor message.
# You could probably fake it by preloading the directory with your sensorid
# if you know what it is.
# Note, this is *only* a test program, to exercise the lower level code.
# Don't expect this to be a good starting point for an application.
# Consider waiting for me to finish developing the device object interface first.
import time
from Timer import Timer
from energenie import Devices, Messages, Registry, OpenThings, radio
# Increase this if you have lots of switches, so that the receiver has enough
# time to receive update messages, otherwise your devices won't make it into
# the device directory.
TX_RATE = 2 # seconds between each switch change cycle
def warning(msg):
print("warning:%s" % str(msg))
def trace(msg):
print("monitor:%s" % str(msg))
#----- TEST APPLICATION -------------------------------------------------------
def switch_sniff_loop():
"""Listen to sensor messages and add them to the Registry"""
# See if there is a payload, and if there is, process it
if radio.is_receive_waiting():
##trace("receiving payload")
payload = radio.receive()
try:
decoded = OpenThings.decode(payload)
now = time.time()
except OpenThings.OpenThingsException as e:
warning("Can't decode payload:" + str(e))
return
OpenThings.showMessage(decoded, timestamp=now)
# Any device that reports will be added to the non-persistent directory
Registry.update(decoded)
##trace(decoded)
# Process any JOIN messages by sending back a JOIN-ACK to turn the LED off
if len(decoded["recs"]) == 0:
# handle messages with zero recs in them silently
print("Empty record:%s" % decoded)
else:
# assume only 1 rec in a join, for now
if decoded["recs"][0]["paramid"] == OpenThings.PARAM_JOIN:
mfrid = OpenThings.getFromMessage(decoded, "header_mfrid")
productid = OpenThings.getFromMessage(decoded, "header_productid")
sensorid = OpenThings.getFromMessage(decoded, "header_sensorid")
Messages.send_join_ack(radio, mfrid, productid, sensorid)
def switch_toggle_loop():
"""Toggle the switch on all devices in the directory"""
global switch_state
if Registry.size() > 0 and sendSwitchTimer.check():
print("transmit")
radio.transmitter()
for sensorid in Registry.get_sensorids():
# Only try to toggle the switch for devices that actually have a switch
header = Registry.get_info(sensorid)["header"]
mfrid = header["mfrid"]
productid = header["productid"]
if Devices.hasSwitch(mfrid, productid):
request = OpenThings.alterMessage(Messages.SWITCH,
header_sensorid=sensorid,
recs_0_value=switch_state)
p = OpenThings.encode(request)
print("Sending switch message to %s %s" % (hex(productid), hex(sensorid)))
# Transmit multiple times, hope one of them gets through
radio.transmit(p, inner_times=2)
radio.receiver()
switch_state = (switch_state+1) % 2 # toggle
if __name__ == "__main__":
trace("starting switch tester")
radio.init()
OpenThings.init(Devices.CRYPT_PID)
# Seed the registry with a known device, to simplify tx-only testing
SENSOR_ID = 0x68B # captured from a real device
device_header = OpenThings.alterMessage(Messages.REGISTERED_SENSOR,
header_mfrid = Devices.MFRID,
header_productid = Devices.PRODUCTID_MIHO005, # adaptor plus
header_sensorid = SENSOR_ID)
Registry.update(device_header)
sendSwitchTimer = Timer(TX_RATE, 1) # every n seconds offset by initial 1
switch_state = 0 # OFF
radio.receiver()
try:
while True:
switch_sniff_loop()
switch_toggle_loop()
finally:
radio.finished()
# END