-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsonoff_manager.py
138 lines (111 loc) · 4.96 KB
/
sonoff_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import asyncio
import logging
from typing import Optional
from aiohttp import ClientSession
# Make sure cloud.py and base.py are in your project and importable
from cloud import XRegistryCloud, XDevice
_LOGGER = logging.getLogger(__name__)
class SonoffManager:
"""
A convenient class that:
1. Logs into eWeLink Cloud via XRegistryCloud.
2. Starts the background WS (run_forever) for Cloud commands.
3. Discovers switch-type devices (has 'switch' or 'switches' in params).
4. Turns them ON/OFF using registry.send().
5. Cleans up tasks upon close().
"""
def __init__(self):
self.session: Optional[ClientSession] = None
self.registry: Optional[XRegistryCloud] = None
self.devices: list[dict] = [] # store discovered devices here
async def login(self, username: str, password: str, country_code: str = "+1") -> None:
"""
1) Create the session + registry
2) Log in to eWeLink Cloud
3) Start the background WebSocket connection
"""
_LOGGER.debug("Creating ClientSession...")
self.session = ClientSession()
_LOGGER.debug("Initializing XRegistryCloud...")
self.registry = XRegistryCloud(session=self.session)
# 1) Log into eWeLink Cloud
# If you have a token, call:
# await self.registry.login("token", "us:YOUR_TOKEN_HERE")
#
_LOGGER.debug("Logging in to eWeLink Cloud...")
await self.registry.login(
username=username,
password=password,
country_code=country_code
)
# 2) Start the registry's background task that opens and maintains
# the WebSocket to the eWeLink Cloud. This is critical for .send() calls.
_LOGGER.debug("Starting eWeLink WebSocket connection (registry.run_forever).")
self.registry.start()
# Optionally wait a short time to let the WS fully connect.
await asyncio.sleep(2)
async def discover_switches(self) -> list[dict]:
"""
Discover your eWeLink "homes," list all devices, and store only
those that appear to be "switch-like."
Returns:
A list of discovered devices with "switch"/"switches" in their params.
"""
if not self.registry:
_LOGGER.error("Cannot discover devices; registry not initialized!")
return []
_LOGGER.debug("Fetching homes from eWeLink Cloud...")
homes = await self.registry.get_homes()
_LOGGER.debug(f"Homes found: {homes}")
home_ids = list(homes.keys()) # collect all family IDs
_LOGGER.debug("Fetching devices for discovered homes...")
all_devices = await self.registry.get_devices(home_ids)
_LOGGER.debug(f"All devices: {all_devices}")
def is_switch(dev: dict) -> bool:
params = dev.get("params", {})
return ("switch" in params) or ("switches" in params)
# Keep only devices that appear to be switches
self.devices = [dev for dev in all_devices if is_switch(dev)]
return self.devices
async def turn_on(self, device_id: str) -> str:
"""
Turn ON a discovered device by its deviceid.
Returns: The short status from registry.send(), e.g. "online", "offline", "timeout", or "E#??"
"""
if not self.registry:
return "E#NoRegistry"
device = self._find_device(device_id)
if not device:
return f"E#DeviceNotFound:{device_id}"
# Single-channel example: just send {"switch": "on"}
_LOGGER.debug(f"Turning ON: {device['name']} (deviceid={device_id})")
return await self.registry.send(device, {"switch": "on"})
async def turn_off(self, device_id: str) -> str:
"""
Turn OFF a discovered device by its deviceid.
Returns: The short status from registry.send().
"""
if not self.registry:
return "E#NoRegistry"
device = self._find_device(device_id)
if not device:
return f"E#DeviceNotFound:{device_id}"
_LOGGER.debug(f"Turning OFF: {device['name']} (deviceid={device_id})")
return await self.registry.send(device, {"switch": "off"})
async def close(self):
"""
Stops the registry's run_forever task and closes the session.
"""
if not self.registry:
return
_LOGGER.debug("Stopping eWeLink Cloud WS task...")
await self.registry.stop()
if self.session and not self.session.closed:
_LOGGER.debug("Closing HTTP session...")
await self.session.close()
# ---------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------
def _find_device(self, device_id: str) -> Optional[XDevice]:
"""Utility to locate a device by deviceid in self.devices."""
return next((d for d in self.devices if d["deviceid"] == device_id), None)