-
-
Notifications
You must be signed in to change notification settings - Fork 102
/
Copy pathterrariumCloud.py
257 lines (200 loc) · 9.07 KB
/
terrariumCloud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# -*- coding: utf-8 -*-
import terrariumLogging
logger = terrariumLogging.logging.getLogger(__name__)
import asyncio
import contextlib
import threading
import socket
from pathlib import Path
from time import sleep, time
from terrariumUtils import classproperty, terrariumCache, terrariumSingleton, terrariumAsync
import os
# pip install meross-iot
# https://github.com/albertogeniola/MerossIot
from meross_iot.http_api import MerossHttpClient
from meross_iot.manager import MerossManager
from meross_iot.controller.mixins.toggle import ToggleXMixin
from meross_iot.controller.subdevice import Ms100Sensor
from meross_iot.model.http.exception import BadLoginException, TooManyTokensException
from meross_iot.model.exception import CommandTimeoutError
from meross_iot.model.enums import Namespace
class TerrariumMerossCloud(terrariumSingleton):
@classproperty
def is_enabled(__cls__):
EMAIL = os.environ.get("MEROSS_EMAIL", "")
PASSWORD = os.environ.get("MEROSS_PASSWORD", "")
return EMAIL != "" and PASSWORD != ""
def __init__(self, username, password):
self.__asyncio = terrariumAsync()
self.__engine = {
"cache": terrariumCache(),
"running": False,
"reconnecting": False,
"restart_counter": 0,
"error": False,
"event": None,
}
self._data = {}
self._username = username
self._password = password
self.start()
def start(self, reconnecting=False):
def _run():
try:
self.__asyncio.run(self._main_process())
except Exception as ex:
logger.exception(f"Error in cloud run: {ex}")
self.reconnect()
start_time = time()
self.__engine["error"] = False
self.__engine["event"] = asyncio.Event()
self.__engine["thread"] = threading.Thread(target=_run)
self.__engine["thread"].start()
if reconnecting:
logger.info("Reconnecting to the Meross cloud")
counter = 0
while not self.__engine["running"] and not self.__engine["error"] and counter < 30:
logger.info("Waiting for Meross cloud connection ... ")
counter += 1
sleep(1)
if counter >= 30:
logger.error("Could not login to Meross cloud within 30 seconds. Restarting...")
self.reconnect()
if not self.__engine["error"]:
logger.info(
f'Meross cloud is {"re-" if reconnecting else ""}connected! Found {len(self._data)} devices in {time()-start_time:.2f} seconds.'
)
def _store_data(self):
for key in self._data:
self.__engine["cache"].set_data(key, self._data[key], 90)
def scan_hardware(self, device_type):
async def _scan_hardware(device_type):
await self.manager.async_device_discovery()
meross_devices = []
if "sensors" == device_type:
meross_devices = self.manager.find_devices(device_class=Ms100Sensor)
elif "relays" == device_type:
meross_devices = self.manager.find_devices(device_class=ToggleXMixin)
return meross_devices
if not self.__engine["running"]:
return []
devices = self.__asyncio.run(_scan_hardware(device_type))
return devices
def toggle_relay(self, device, switch, state):
TIMEOUT = 5
async def _toggle_relay(device, switch, state):
device = self.manager.find_devices(device_uuids=[device])
if len(device) == 1:
device = device[0]
if state != 0.0:
await device.async_turn_on(channel=switch, timeout=TIMEOUT + 1)
else:
await device.async_turn_off(channel=switch, timeout=TIMEOUT + 1)
return True
return None
if not self.__engine["running"]:
return None
# Create a timer for offline detection...
offline = threading.Timer(TIMEOUT, self.reconnect)
offline.start()
# Start the toggle action
result = self.__asyncio.run(_toggle_relay(device, switch, state))
# Stop the offline detection
offline.cancel()
return result
def stop(self):
logger.info("Stopping Meross cloud ... ")
self.__engine["running"] = False
self.__engine["event"].set()
self.__engine["thread"].join()
def reconnect(self):
if self.__engine["reconnecting"]:
return
self.__engine["reconnecting"] = True
logger.warning("Reconnecting to Meross cloud. Somehow the connection was lost ...")
self.stop()
self.start(True)
async def _main_process(self):
# https://stackoverflow.com/a/49632779
async def event_wait(evt, timeout):
# suppress TimeoutError because we'll return False in case of timeout
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(evt.wait(), timeout)
return evt.is_set()
async def _notification(namespace: Namespace, data: dict, device_internal_id: str, *args, **kwargs):
for device in data:
if hasattr(device, "is_on"):
self._data[f"{device.uuid}"] = []
for channel in device.channels:
self._data[f"{device.uuid}"].append(device.is_on(channel=channel.index))
logger.info(
f"Got an update from the Meross Cloud. Relay state {device.uuid} {self._data[device.uuid]}"
)
if hasattr(device, "last_sampled_temperature"):
self._data[f"{device.subdevice_id}"] = {
"temperature": device.last_sampled_temperature,
"humidity": device.last_sampled_humidity,
}
logger.info(
f"Got an update from the Meross Cloud. Setting temperature to {device.last_sampled_temperature} and humidity to {device.last_sampled_humidity}"
)
self._store_data()
try:
# Setup the HTTP client API from user-password
# We need to know where in the world we are....
continent = "eu" if Path("/etc/timezone").read_text().startswith("Europe/") else "us"
meross_url = f"https://iotx-{continent}.meross.com"
http_api_client = await MerossHttpClient.async_from_user_password(
api_base_url=meross_url, email=self._username, password=self._password
)
# Setup and start the device manager
self.manager = MerossManager(http_client=http_api_client)
await self.manager.async_init()
# Discover devices.
await self.manager.async_device_discovery()
meross_devices = self.manager.find_devices()
for dev in meross_devices:
# Is a relay
if hasattr(dev, "is_on"):
await dev.async_update()
self._data[f"{dev.uuid}"] = []
for channel in dev.channels:
self._data[f"{dev.uuid}"].append(dev.is_on(channel=channel.index))
# Is a sensor
if hasattr(dev, "last_sampled_temperature"):
await dev.async_update()
self._data[f"{dev.subdevice_id}"] = {
"temperature": dev.last_sampled_temperature,
"humidity": dev.last_sampled_humidity,
}
self._store_data()
self.__engine["running"] = True
self.__engine["reconnecting"] = False
self.__engine["restart_counter"] = 0
self.manager.register_push_notification_handler_coroutine(_notification)
while not await event_wait(self.__engine["event"], 30):
self._store_data()
except CommandTimeoutError:
logger.error("Meross communication timed out connecting with the server.")
except BadLoginException:
logger.error("Wrong login credentials for Meross. Please check your settings!")
except TooManyTokensException as ex:
logger.error(f"{ex}")
self.stop()
except socket.timeout:
self.__engine["error"] = True
if self.__engine["restart_counter"] < 10:
self.__engine["restart_counter"] += 1
logger.error(
f'Timeout logging into Meross Cloud. Reconnecting in 5 seconds attempt {self.__engine["restart_counter"]} ...'
)
threading.Timer(5, self.start).start()
else:
logger.error(
"Failed to connect to the Meross Cloud after 10 times. Please check your network configuration."
)
finally:
# Close the manager and logout from http_api
self.manager.close()
await http_api_client.async_logout()
logger.info("Closed Meross cloud connection")