diff --git a/midealocal/device.py b/midealocal/device.py index ff695cbc..c2c9adc4 100644 --- a/midealocal/device.py +++ b/midealocal/device.py @@ -144,6 +144,8 @@ def __init__( self._refresh_interval = 30 self._heartbeat_interval = 10 self._default_refresh_interval = 30 + self._previous_refresh = 0.0 + self._previous_heartbeat = 0.0 self.name = self._device_name @property @@ -479,6 +481,16 @@ def set_refresh_interval(self, refresh_interval: int) -> None: """Set refresh interval.""" self._refresh_interval = refresh_interval + def _check_refresh(self, now: float) -> None: + if 0 < self._refresh_interval <= now - self._previous_refresh: + self.refresh_status() + self._previous_refresh = now + + def _check_heartbeat(self, now: float) -> None: + if now - self._previous_heartbeat >= self._heartbeat_interval: + self.send_heartbeat() + self._previous_heartbeat = now + def run(self) -> None: """Run loop.""" while self._is_run: @@ -488,21 +500,16 @@ def run(self) -> None: time.sleep(5) timeout_counter = 0 start = time.time() - previous_refresh = start - previous_heartbeat = start + self._previous_refresh = start + self._previous_heartbeat = start self._socket.settimeout(1) while True: try: now = time.time() - if 0 < self._refresh_interval <= now - previous_refresh: - self.refresh_status() - previous_refresh = now - if now - previous_heartbeat >= self._heartbeat_interval: - self.send_heartbeat() - previous_heartbeat = now + self._check_refresh(now) + self._check_heartbeat(now) msg = self._socket.recv(512) - msg_len = len(msg) - if msg_len == 0: + if len(msg) == 0: if self._is_run: _LOGGER.error( "[%s] Socket error - Connection closed by peer", diff --git a/midealocal/devices/fa/__init__.py b/midealocal/devices/fa/__init__.py index 684d79d2..f5cbcb9c 100644 --- a/midealocal/devices/fa/__init__.py +++ b/midealocal/devices/fa/__init__.py @@ -194,6 +194,86 @@ def process_message(self, msg: bytes) -> dict[str, Any]: new_status[str(status)] = self._attributes[status] return new_status + def _set_oscillation_mode(self, message: MessageSet, value: Any) -> None: + if value == "Off" or not value: + message.oscillate = False + else: + message.oscillate = True + message.oscillation_mode = MideaFADevice._oscillation_modes.index( + value, + ) + if value == "Oscillation": + if self._attributes[DeviceAttributes.oscillation_angle] == "Off": + message.oscillation_angle = 3 # 90 + else: + message.oscillation_angle = MideaFADevice._oscillation_angles.index( + self._attributes[DeviceAttributes.oscillation_angle], + ) + elif value == "Tilting": + if self._attributes[DeviceAttributes.tilting_angle] == "Off": + message.tilting_angle = 3 # 90 + else: + message.tilting_angle = MideaFADevice._tilting_angles.index( + self._attributes[DeviceAttributes.tilting_angle], + ) + else: + if self._attributes[DeviceAttributes.oscillation_angle] == "Off": + message.oscillation_angle = 3 # 90 + else: + message.oscillation_angle = MideaFADevice._oscillation_angles.index( + self._attributes[DeviceAttributes.oscillation_angle], + ) + if self._attributes[DeviceAttributes.tilting_angle] == "Off": + message.tilting_angle = 3 # 90 + else: + message.tilting_angle = MideaFADevice._tilting_angles.index( + self._attributes[DeviceAttributes.tilting_angle], + ) + + def _set_oscillation_angle(self, message: MessageSet, value: Any) -> None: + if value == "Off" or not value: + if self._attributes[DeviceAttributes.tilting_angle] == "Off": + message.oscillate = False + else: + message.oscillate = True + message.oscillation_mode = 2 + message.tilting_angle = MideaFADevice._tilting_angles.index( + self._attributes[DeviceAttributes.tilting_angle], + ) + else: + message.oscillation_angle = MideaFADevice._oscillation_angles.index( + value, + ) + message.oscillate = True + if self._attributes[DeviceAttributes.tilting_angle] == "Off": + message.oscillation_mode = 1 + elif self._attributes[DeviceAttributes.oscillation_mode] == "Tilting": + message.oscillation_mode = 6 + message.tilting_angle = MideaFADevice._tilting_angles.index( + self._attributes[DeviceAttributes.tilting_angle], + ) + + def _set_tilting_angle(self, message: MessageSet, value: Any) -> None: + if value == "Off" or not value: + if self._attributes[DeviceAttributes.oscillation_angle] == "Off": + message.oscillate = False + else: + message.oscillate = True + message.oscillation_mode = 1 + message.oscillation_angle = MideaFADevice._oscillation_angles.index( + self._attributes[DeviceAttributes.oscillation_angle], + ) + else: + message.tilting_angle = MideaFADevice._tilting_angles.index(value) + message.oscillate = True + if self._attributes[DeviceAttributes.oscillation_angle] == "Off": + message.oscillation_mode = 2 + elif self._attributes[DeviceAttributes.oscillation_mode] == "Oscillation": + message.oscillation_mode = 6 + message.oscillation_angle = MideaFADevice._oscillation_angles.index( + self._attributes[DeviceAttributes.oscillation_angle], + ) + def set_oscillation(self, attr: str, value: Any) -> MessageSet | None: """Set oscillation mode.""" message: MessageSet | None = None @@ -208,111 +288,17 @@ def set_oscillation(self, attr: str, value: Any) -> MessageSet | None: value in MideaFADevice._oscillation_modes or not value ): message = MessageSet(self._protocol_version, self.subtype) - if value == "Off" or not value: - message.oscillate = False - else: - message.oscillate = True - message.oscillation_mode = MideaFADevice._oscillation_modes.index( - value, - ) - if value == "Oscillation": - if ( - self._attributes[DeviceAttributes.oscillation_angle] - == "Off" - ): - message.oscillation_angle = 3 # 90 - else: - message.oscillation_angle = ( - MideaFADevice._oscillation_angles.index( - self._attributes[ - DeviceAttributes.oscillation_angle - ], - ) - ) - elif value == "Tilting": - if self._attributes[DeviceAttributes.tilting_angle] == "Off": - message.tilting_angle = 3 # 90 - else: - message.tilting_angle = MideaFADevice._tilting_angles.index( - self._attributes[DeviceAttributes.tilting_angle], - ) - else: - if ( - self._attributes[DeviceAttributes.oscillation_angle] - == "Off" - ): - message.oscillation_angle = 3 # 90 - else: - message.oscillation_angle = ( - MideaFADevice._oscillation_angles.index( - self._attributes[ - DeviceAttributes.oscillation_angle - ], - ) - ) - if self._attributes[DeviceAttributes.tilting_angle] == "Off": - message.tilting_angle = 3 # 90 - else: - message.tilting_angle = MideaFADevice._tilting_angles.index( - self._attributes[DeviceAttributes.tilting_angle], - ) + self._set_oscillation_mode(message, value) elif attr == DeviceAttributes.oscillation_angle and ( value in MideaFADevice._oscillation_angles or not value ): message = MessageSet(self._protocol_version, self.subtype) - if value == "Off" or not value: - if self._attributes[DeviceAttributes.tilting_angle] == "Off": - message.oscillate = False - else: - message.oscillate = True - message.oscillation_mode = 2 - message.tilting_angle = MideaFADevice._tilting_angles.index( - self._attributes[DeviceAttributes.tilting_angle], - ) - else: - message.oscillation_angle = MideaFADevice._oscillation_angles.index( - value, - ) - message.oscillate = True - if self._attributes[DeviceAttributes.tilting_angle] == "Off": - message.oscillation_mode = 1 - elif ( - self._attributes[DeviceAttributes.oscillation_mode] == "Tilting" - ): - message.oscillation_mode = 6 - message.tilting_angle = MideaFADevice._tilting_angles.index( - self._attributes[DeviceAttributes.tilting_angle], - ) + self._set_oscillation_angle(message, value) elif attr == DeviceAttributes.tilting_angle and ( value in MideaFADevice._tilting_angles or not value ): message = MessageSet(self._protocol_version, self.subtype) - if value == "Off" or not value: - if self._attributes[DeviceAttributes.oscillation_angle] == "Off": - message.oscillate = False - else: - message.oscillate = True - message.oscillation_mode = 1 - message.oscillation_angle = ( - MideaFADevice._oscillation_angles.index( - self._attributes[DeviceAttributes.oscillation_angle], - ) - ) - else: - message.tilting_angle = MideaFADevice._tilting_angles.index(value) - message.oscillate = True - if self._attributes[DeviceAttributes.oscillation_angle] == "Off": - message.oscillation_mode = 2 - elif ( - self._attributes[DeviceAttributes.oscillation_mode] - == "Oscillation" - ): - message.oscillation_mode = 6 - message.oscillation_angle = ( - MideaFADevice._oscillation_angles.index( - self._attributes[DeviceAttributes.oscillation_angle], - ) - ) + self._set_tilting_angle(message, value) return message def set_attribute(self, attr: str, value: Any) -> None: diff --git a/midealocal/devices/x26/message.py b/midealocal/devices/x26/message.py index 7cd59233..51f675ba 100644 --- a/midealocal/devices/x26/message.py +++ b/midealocal/devices/x26/message.py @@ -139,52 +139,26 @@ class Message26Body(MessageBody): def __init__(self, body: bytearray) -> None: """Initialize X26 message body.""" super().__init__(body) - self.fields = {} + self.fields = self._gen_fields(body) self.main_light = self.read_byte(body, 1) > 0 - self.fields["MAIN_LIGHT_BRIGHTNESS"] = self.read_byte(body, 2) self.night_light = self.read_byte(body, 3) > 0 - self.fields["NIGHT_LIGHT_BRIGHTNESS"] = self.read_byte(body, 4) - self.fields["RADAR_INDUCTION_ENABLE"] = self.read_byte(body, 5) - self.fields["RADAR_INDUCTION_CLOSING_TIME"] = self.read_byte(body, 6) - self.fields["LIGHT_INTENSITY_THRESHOLD"] = self.read_byte(body, 7) - self.fields["RADAR_SENSITIVITY"] = self.read_byte(body, 8) heat_mode = self.read_byte(body, 9) > 0 heat_temperature = self.read_byte(body, 10) - self.fields["HEATING_SPEED"] = self.read_byte(body, 11) heat_direction = self.read_byte(body, 12) bath_mode = self.read_byte(body, 13) > 0 - self.fields["BATH_HEATING_TIME"] = self.read_byte(body, 14) - self.fields["BATH_TEMPERATURE"] = self.read_byte(body, 15) - self.fields["BATH_SPEED"] = self.read_byte(body, 16) bath_direction = self.read_byte(body, 17) ventilation_mode = self.read_byte(body, 18) > 0 - self.fields["VENTILATION_SPEED"] = self.read_byte(body, 19) ventilation_direction = self.read_byte(body, 20) dry_mode = self.read_byte(body, 21) > 0 - self.fields["DRYING_TIME"] = self.read_byte(body, 22) - self.fields["DRYING_TEMPERATURE"] = self.read_byte(body, 23) - self.fields["DRYING_SPEED"] = self.read_byte(body, 24) dry_direction = self.read_byte(body, 25) blow_mode = self.read_byte(body, 26) > 0 - self.fields["BLOWING_SPEED"] = self.read_byte(body, 27) blow_direction = self.read_byte(body, 28) - self.fields["DELAY_ENABLE"] = self.read_byte(body, 29) - self.fields["DELAY_TIME"] = self.read_byte(body, 30) if self.read_byte(body, 31) != MAX_BYTE_VALUE: self.current_humidity = self.read_byte(body, 31) if self.read_byte(body, 32) != MAX_BYTE_VALUE: self.current_radar = self.read_byte(body, 32) if self.read_byte(body, 33) != MAX_BYTE_VALUE: self.current_temperature = self.read_byte(body, 33) - self.fields["SOFT_WIND_ENABLE"] = self.read_byte(body, 38) - self.fields["SOFT_WIND_TIME"] = self.read_byte(body, 39) - self.fields["SOFT_WIND_TEMPERATURE"] = self.read_byte(body, 40) - self.fields["SOFT_WIND_SPEED"] = self.read_byte(body, 41) - self.fields["SOFT_WIND_DIRECTION"] = self.read_byte(body, 42) - self.fields["WINDLESS_ENABLE"] = self.read_byte(body, 43) - self.fields["ANION_ENABLE"] = self.read_byte(body, 44) - self.fields["SMELLY_ENABLE"] = self.read_byte(body, 45) - self.fields["SMELLY_THRESHOLD"] = self.read_byte(body, 46) self.mode = 0 self.direction = 0xFD if heat_mode: @@ -206,6 +180,37 @@ def __init__(self, body: bytearray) -> None: self.mode = 6 self.direction = dry_direction + def _gen_fields(self, body: bytearray) -> dict[str, int]: + fields: dict[str, int] = {} + fields["MAIN_LIGHT_BRIGHTNESS"] = self.read_byte(body, 2) + fields["NIGHT_LIGHT_BRIGHTNESS"] = self.read_byte(body, 4) + fields["RADAR_INDUCTION_ENABLE"] = self.read_byte(body, 5) + fields["RADAR_INDUCTION_CLOSING_TIME"] = self.read_byte(body, 6) + fields["LIGHT_INTENSITY_THRESHOLD"] = self.read_byte(body, 7) + fields["RADAR_SENSITIVITY"] = self.read_byte(body, 8) + fields["HEATING_SPEED"] = self.read_byte(body, 11) + fields["BATH_HEATING_TIME"] = self.read_byte(body, 14) + fields["BATH_TEMPERATURE"] = self.read_byte(body, 15) + fields["BATH_SPEED"] = self.read_byte(body, 16) + fields["VENTILATION_SPEED"] = self.read_byte(body, 19) + fields["DRYING_TIME"] = self.read_byte(body, 22) + fields["DRYING_TEMPERATURE"] = self.read_byte(body, 23) + fields["DRYING_SPEED"] = self.read_byte(body, 24) + fields["BLOWING_SPEED"] = self.read_byte(body, 27) + fields["DELAY_ENABLE"] = self.read_byte(body, 29) + fields["DELAY_TIME"] = self.read_byte(body, 30) + fields["SOFT_WIND_ENABLE"] = self.read_byte(body, 38) + fields["SOFT_WIND_TIME"] = self.read_byte(body, 39) + fields["SOFT_WIND_TEMPERATURE"] = self.read_byte(body, 40) + fields["SOFT_WIND_SPEED"] = self.read_byte(body, 41) + fields["SOFT_WIND_DIRECTION"] = self.read_byte(body, 42) + fields["WINDLESS_ENABLE"] = self.read_byte(body, 43) + fields["ANION_ENABLE"] = self.read_byte(body, 44) + fields["SMELLY_ENABLE"] = self.read_byte(body, 45) + fields["SMELLY_THRESHOLD"] = self.read_byte(body, 46) + + return fields + class Message26Response(MessageResponse): """X26 message response.""" diff --git a/midealocal/discover.py b/midealocal/discover.py index d867d6d7..91651f61 100644 --- a/midealocal/discover.py +++ b/midealocal/discover.py @@ -157,6 +157,72 @@ SERIAL_TYPE2_LENGTH = 22 +def _parse_discover_response( + sock: socket.socket, found_devices: dict[int, dict[str, Any]] +) -> tuple[int, dict[str, Any] | None]: + security = LocalSecurity() + data, addr = sock.recvfrom(512) + ip = addr[0] + _LOGGER.debug("Received response from %s: %s", addr, data.hex()) + if len(data) >= DISCOVERY_MIN_RESPONSE_LENGTH and ( + data[:2].hex() == "5a5a" or data[8:10].hex() == "5a5a" + ): + if data[:2].hex() == "5a5a": + protocol = 2 + elif data[:2].hex() == "8370": + protocol = 3 + if data[8:10].hex() == "5a5a": + data = data[8:-16] + else: + return 0, None + device_id = int.from_bytes( + bytearray.fromhex(data[20:26].hex()), + "little", + ) + if device_id in found_devices: + return 0, None + encrypt_data = data[40:-16] + reply = security.aes_decrypt(encrypt_data) + _LOGGER.debug("Declassified reply: %s", reply.hex()) + ssid = reply[41 : 41 + reply[40]].decode("utf-8") + device_type = ssid.split("_")[1] + port = bytes2port(reply[4:8]) + model = reply[17:25].decode("utf-8") + sn = reply[8:40].decode("utf-8") + elif data[:6].hex() == "3c3f786d6c20": + protocol = 1 + root = ElementTree.fromstring( + data.decode(encoding="utf-8", errors="replace"), + ) + child = root.find("body/device") + assert child + m = child.attrib + port, sn, device_type = ( + int(m["port"]), + m["apc_sn"], + str(hex(int(m["apc_type"])))[2:], + ) + response = get_device_info(ip, int(port)) + device_id = get_id_from_response(response) + if len(sn) == SERIAL_TYPE1_LENGTH: + model = sn[9:17] + elif len(sn) == SERIAL_TYPE2_LENGTH: + model = sn[3:11] + else: + model = "" + else: + return 0, None + return device_id, { + "device_id": device_id, + "type": int(device_type, 16), + "ip_address": ip, + "port": port, + "model": model, + "sn": sn, + "protocol": protocol, + } + + def discover( discover_type: list | None = None, ip_address: list | None = None, @@ -164,11 +230,11 @@ def discover( """Discover devices.""" if discover_type is None: discover_type = [] - security = LocalSecurity() + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.settimeout(5) - found_devices = {} + found_devices: dict[int, dict[str, Any]] = {} addrs = enum_all_broadcast() if ip_address is None else [ip_address] _LOGGER.debug("All addresses for broadcast: %s", addrs) @@ -180,66 +246,9 @@ def discover( _LOGGER.warning("Can't access network %s", addrs) while True: try: - data, addr = sock.recvfrom(512) - ip = addr[0] - _LOGGER.debug("Received response from %s: %s", addr, data.hex()) - if len(data) >= DISCOVERY_MIN_RESPONSE_LENGTH and ( - data[:2].hex() == "5a5a" or data[8:10].hex() == "5a5a" - ): - if data[:2].hex() == "5a5a": - protocol = 2 - elif data[:2].hex() == "8370": - protocol = 3 - if data[8:10].hex() == "5a5a": - data = data[8:-16] - else: - continue - device_id = int.from_bytes( - bytearray.fromhex(data[20:26].hex()), - "little", - ) - if device_id in found_devices: - continue - encrypt_data = data[40:-16] - reply = security.aes_decrypt(encrypt_data) - _LOGGER.debug("Declassified reply: %s", reply.hex()) - ssid = reply[41 : 41 + reply[40]].decode("utf-8") - device_type = ssid.split("_")[1] - port = bytes2port(reply[4:8]) - model = reply[17:25].decode("utf-8") - sn = reply[8:40].decode("utf-8") - elif data[:6].hex() == "3c3f786d6c20": - protocol = 1 - root = ElementTree.fromstring( - data.decode(encoding="utf-8", errors="replace"), - ) - child = root.find("body/device") - assert child - m = child.attrib - port, sn, device_type = ( - int(m["port"]), - m["apc_sn"], - str(hex(int(m["apc_type"])))[2:], - ) - response = get_device_info(ip, int(port)) - device_id = get_id_from_response(response) - if len(sn) == SERIAL_TYPE1_LENGTH: - model = sn[9:17] - elif len(sn) == SERIAL_TYPE2_LENGTH: - model = sn[3:11] - else: - model = "" - else: + device_id, device = _parse_discover_response(sock, found_devices) + if device is None: continue - device = { - "device_id": device_id, - "type": int(device_type, 16), - "ip_address": ip, - "port": port, - "model": model, - "sn": sn, - "protocol": protocol, - } if len(discover_type) == 0 or device.get("type") in discover_type: found_devices[device_id] = device _LOGGER.debug("Found a supported device: %s", device)