From 1e115586136acb937853aa0384c7288abd82455b Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 17 Apr 2024 02:06:36 -0300 Subject: [PATCH] Add support for Tornado 16X SQ air conditioner (0x4E2A) (#520) * Add support for Tornado 16X SQ air conditioner * Make Tornado a generic HVAC class * Better names * Clean up IntEnums * Clean up encoders * Fix indexes * Improve set_state() interface * Enumerate presets * Rename state to power in get_ac_info() * Paint it black * Use CRC16 helper class * Remove log messages * Fix bugs * Return state in set_state() --- broadlink/__init__.py | 13 ++- broadlink/climate.py | 262 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 258 insertions(+), 17 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 1d4ffb2a..8af3e4c6 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -6,7 +6,7 @@ from . import exceptions as e from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT from .alarm import S1C -from .climate import hysen +from .climate import hvac, hysen from .cover import dooya, dooya2, wser from .device import Device, ping, scan from .hub import s3 @@ -177,6 +177,9 @@ 0xA59C: ("S3", "Broadlink"), 0xA64D: ("S3", "Broadlink"), }, + hvac: { + 0x4E2A: ("HVAC", "Licensed manufacturer"), + }, hysen: { 0x4EAD: ("HY02/HY03", "Hysen"), }, @@ -258,7 +261,9 @@ def discover( discover_ip_port: int = DEFAULT_PORT, ) -> t.List[Device]: """Discover devices connected to the local network.""" - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) return [gendevice(*resp) for resp in responses] @@ -272,7 +277,9 @@ def xdiscover( This function returns a generator that yields devices instantly. """ - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) for resp in responses: yield gendevice(*resp) diff --git a/broadlink/climate.py b/broadlink/climate.py index c04e65c0..1a0c6006 100755 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,5 +1,7 @@ -"""Support for HVAC units.""" -import typing as t +"""Support for climate control.""" +import enum +import struct +from typing import List, Sequence from . import exceptions as e from .device import Device @@ -19,7 +21,7 @@ class hysen(Device): TYPE = "HYS" - def send_request(self, request: t.Sequence[int]) -> bytes: + def send_request(self, request: Sequence[int]) -> bytes: """Send a request to the device.""" packet = bytearray() packet.extend((len(request) + 2).to_bytes(2, "little")) @@ -31,15 +33,15 @@ def send_request(self, request: t.Sequence[int]) -> bytes: payload = self.decrypt(response[0x38:]) p_len = int.from_bytes(payload[:0x02], "little") - if p_len + 2 > len(payload): - raise ValueError( - "hysen_response_error", "first byte of response is not length" - ) - - nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") real_crc = CRC16.calculate(payload[0x02:p_len]) + if nom_crc != real_crc: - raise ValueError("hysen_response_error", "CRC check on response failed") + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) return payload[0x02:p_len] @@ -74,7 +76,7 @@ def get_full_status(self) -> dict: data["heating_cooling"] = (payload[4] >> 7) & 1 data["room_temp"] = self._decode_temp(payload, 5) data["thermostat_temp"] = payload[6] / 2.0 - data["auto_mode"] = payload[7] & 0xF + data["auto_mode"] = payload[7] & 0x0F data["loop_mode"] = payload[7] >> 4 data["sensor"] = payload[8] data["osv"] = payload[9] @@ -125,7 +127,9 @@ def get_full_status(self) -> dict: # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) # The sensor command is currently experimental - def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: + def set_mode( + self, auto_mode: int, loop_mode: int, sensor: int = 0 + ) -> None: """Set the mode of the device.""" mode_byte = ((loop_mode + 1) << 4) + auto_mode self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) @@ -206,7 +210,19 @@ def set_power( def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" self.send_request( - [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] + [ + 0x01, + 0x10, + 0x00, + 0x08, + 0x00, + 0x02, + 0x04, + hour, + minute, + second, + day + ] ) # Set timer schedule @@ -215,7 +231,7 @@ def set_time(self, hour: int, minute: int, second: int, day: int) -> None: # {'start_hour':17, 'start_minute':30, 'temp': 22 } # Each one specifies the thermostat temp that will become effective at start_hour:start_minute # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: + def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] @@ -238,3 +254,221 @@ def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: request.append(int(weekend[i]["temp"] * 2)) self.send_request(request) + + +class hvac(Device): + """Controls a HVAC. + + Supported models: + - Tornado SMART X SQ series + - Aux ASW-H12U3/JIR1DI-US + - Aux ASW-H36U2/LFR1DI-US + """ + + TYPE = "HVAC" + + @enum.unique + class Mode(enum.IntEnum): + """Enumerates modes.""" + + AUTO = 0 + COOL = 1 + DRY = 2 + HEAT = 3 + FAN = 4 + + @enum.unique + class Speed(enum.IntEnum): + """Enumerates fan speed.""" + + HIGH = 1 + MID = 2 + LOW = 3 + AUTO = 5 + + @enum.unique + class Preset(enum.IntEnum): + """Enumerates presets.""" + + NORMAL = 0 + TURBO = 1 + MUTE = 2 + + @enum.unique + class SwHoriz(enum.IntEnum): + """Enumerates horizontal swing.""" + + ON = 0 + OFF = 7 + + @enum.unique + class SwVert(enum.IntEnum): + """Enumerates vertical swing.""" + + ON = 0 + POS1 = 1 + POS2 = 2 + POS3 = 3 + POS4 = 4 + POS5 = 5 + OFF = 7 + + def _encode(self, data: bytes) -> bytes: + """Encode data for transport.""" + packet = bytearray(10) + p_len = 10 + len(data) + struct.pack_into( + " bytes: + """Decode data from transport.""" + # payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00]) + payload = self.decrypt(response[0x38:]) + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + d_len = int.from_bytes(payload[0x08:0x0A], "little") + return payload[0x0A:0x0A+d_len] + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a command to the unit.""" + prefix = bytes([((command << 4) | 1), 1]) + packet = self._encode(prefix + data) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response)[0x02:] + + def _parse_state(self, data: bytes) -> dict: + """Parse state.""" + state = {} + state["power"] = bool(data[0x08] & 1 << 5) + state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5 + state["swing_v"] = self.SwVert(data[0x00] & 0b111) + state["swing_h"] = self.SwHoriz(data[0x01] >> 5) + state["mode"] = self.Mode(data[0x05] >> 5) + state["speed"] = self.Speed(data[0x03] >> 5) + state["preset"] = self.Preset(data[0x04] >> 6) + state["sleep"] = bool(data[0x05] & 1 << 2) + state["ifeel"] = bool(data[0x05] & 1 << 3) + state["health"] = bool(data[0x08] & 1 << 1) + state["clean"] = bool(data[0x08] & 1 << 2) + state["display"] = bool(data[0x0A] & 1 << 4) + state["mildew"] = bool(data[0x0A] & 1 << 3) + return state + + def set_state( + self, + power: bool, + target_temp: float, # 16<=target_temp<=32 + mode: Mode, + speed: Speed, + preset: Preset, + swing_h: SwHoriz, + swing_v: SwVert, + sleep: bool, + ifeel: bool, + display: bool, + health: bool, + clean: bool, + mildew: bool, + ) -> dict: + """Set the state of the device.""" + # TODO: decode unknown bits + UNK0 = 0b100 + UNK1 = 0b1101 + UNK2 = 0b101 + + target_temp = round(target_temp * 2) / 2 + + if preset == self.Preset.MUTE: + if mode != self.Mode.FAN: + raise ValueError("mute is only available in fan mode") + speed = self.Speed.LOW + + elif preset == self.Preset.TURBO: + if mode not in {self.Mode.COOL, self.Mode.HEAT}: + raise ValueError("turbo is only available in cooling/heating") + speed = self.Speed.HIGH + + data = bytearray(0x0D) + data[0x00] = (int(target_temp) - 8 << 3) | swing_v + data[0x01] = (swing_h << 5) | UNK0 + data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1 + data[0x03] = speed << 5 + data[0x04] = preset << 6 + data[0x05] = mode << 5 | sleep << 2 | ifeel << 3 + data[0x08] = power << 5 | clean << 2 | (health and 0b11) + data[0x0A] = display << 4 | mildew << 3 + data[0x0C] = UNK2 + + resp = self._send(0, data) + return self._parse_state(resp) + + def get_state(self) -> dict: + """Returns a dictionary with the unit's parameters. + + Returns: + dict: + power (bool): + target_temp (float): temperature set point 16 dict: + """Returns dictionary with AC info. + + Returns: + dict: + power (bool): power + ambient_temp (float): ambient temperature + """ + resp = self._send(2) + + if len(resp) < 22: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 24 bytes and received {len(resp) + 2}", + ) + + ac_info = {} + ac_info["power"] = resp[0x1] & 1 + + ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111 + if any(ambient_temp): + ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0 + + return ac_info