Skip to content

Commit

Permalink
chore: ruff PLR0915 (midea-lan#145)
Browse files Browse the repository at this point in the history
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced device control with new methods for setting oscillation
modes, angles, and tilting angles.
  
- **Refactor**
- Improved code organization by refactoring refresh, heartbeat, and
discovery logic into separate methods for better maintainability and
readability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
rokam authored Jun 15, 2024
1 parent bdd769c commit 9fc1c99
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 195 deletions.
27 changes: 17 additions & 10 deletions midealocal/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ def __init__(
self._refresh_interval = 30
self._heartbeat_interval = 10
self._default_refresh_interval = 30
self._previous_refresh = 0.0
self._previous_heartbeat = 0.0
self.name = self._device_name

@property
Expand Down Expand Up @@ -479,6 +481,16 @@ def set_refresh_interval(self, refresh_interval: int) -> None:
"""Set refresh interval."""
self._refresh_interval = refresh_interval

def _check_refresh(self, now: float) -> None:
if 0 < self._refresh_interval <= now - self._previous_refresh:
self.refresh_status()
self._previous_refresh = now

def _check_heartbeat(self, now: float) -> None:
if now - self._previous_heartbeat >= self._heartbeat_interval:
self.send_heartbeat()
self._previous_heartbeat = now

def run(self) -> None:
"""Run loop."""
while self._is_run:
Expand All @@ -488,21 +500,16 @@ def run(self) -> None:
time.sleep(5)
timeout_counter = 0
start = time.time()
previous_refresh = start
previous_heartbeat = start
self._previous_refresh = start
self._previous_heartbeat = start
self._socket.settimeout(1)
while True:
try:
now = time.time()
if 0 < self._refresh_interval <= now - previous_refresh:
self.refresh_status()
previous_refresh = now
if now - previous_heartbeat >= self._heartbeat_interval:
self.send_heartbeat()
previous_heartbeat = now
self._check_refresh(now)
self._check_heartbeat(now)
msg = self._socket.recv(512)
msg_len = len(msg)
if msg_len == 0:
if len(msg) == 0:
if self._is_run:
_LOGGER.error(
"[%s] Socket error - Connection closed by peer",
Expand Down
180 changes: 83 additions & 97 deletions midealocal/devices/fa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,86 @@ def process_message(self, msg: bytes) -> dict[str, Any]:
new_status[str(status)] = self._attributes[status]
return new_status

def _set_oscillation_mode(self, message: MessageSet, value: Any) -> None:
if value == "Off" or not value:
message.oscillate = False
else:
message.oscillate = True
message.oscillation_mode = MideaFADevice._oscillation_modes.index(
value,
)
if value == "Oscillation":
if self._attributes[DeviceAttributes.oscillation_angle] == "Off":
message.oscillation_angle = 3 # 90
else:
message.oscillation_angle = MideaFADevice._oscillation_angles.index(
self._attributes[DeviceAttributes.oscillation_angle],
)
elif value == "Tilting":
if self._attributes[DeviceAttributes.tilting_angle] == "Off":
message.tilting_angle = 3 # 90
else:
message.tilting_angle = MideaFADevice._tilting_angles.index(
self._attributes[DeviceAttributes.tilting_angle],
)
else:
if self._attributes[DeviceAttributes.oscillation_angle] == "Off":
message.oscillation_angle = 3 # 90
else:
message.oscillation_angle = MideaFADevice._oscillation_angles.index(
self._attributes[DeviceAttributes.oscillation_angle],
)
if self._attributes[DeviceAttributes.tilting_angle] == "Off":
message.tilting_angle = 3 # 90
else:
message.tilting_angle = MideaFADevice._tilting_angles.index(
self._attributes[DeviceAttributes.tilting_angle],
)

def _set_oscillation_angle(self, message: MessageSet, value: Any) -> None:
if value == "Off" or not value:
if self._attributes[DeviceAttributes.tilting_angle] == "Off":
message.oscillate = False
else:
message.oscillate = True
message.oscillation_mode = 2
message.tilting_angle = MideaFADevice._tilting_angles.index(
self._attributes[DeviceAttributes.tilting_angle],
)
else:
message.oscillation_angle = MideaFADevice._oscillation_angles.index(
value,
)
message.oscillate = True
if self._attributes[DeviceAttributes.tilting_angle] == "Off":
message.oscillation_mode = 1
elif self._attributes[DeviceAttributes.oscillation_mode] == "Tilting":
message.oscillation_mode = 6
message.tilting_angle = MideaFADevice._tilting_angles.index(
self._attributes[DeviceAttributes.tilting_angle],
)

def _set_tilting_angle(self, message: MessageSet, value: Any) -> None:
if value == "Off" or not value:
if self._attributes[DeviceAttributes.oscillation_angle] == "Off":
message.oscillate = False
else:
message.oscillate = True
message.oscillation_mode = 1
message.oscillation_angle = MideaFADevice._oscillation_angles.index(
self._attributes[DeviceAttributes.oscillation_angle],
)
else:
message.tilting_angle = MideaFADevice._tilting_angles.index(value)
message.oscillate = True
if self._attributes[DeviceAttributes.oscillation_angle] == "Off":
message.oscillation_mode = 2
elif self._attributes[DeviceAttributes.oscillation_mode] == "Oscillation":
message.oscillation_mode = 6
message.oscillation_angle = MideaFADevice._oscillation_angles.index(
self._attributes[DeviceAttributes.oscillation_angle],
)

def set_oscillation(self, attr: str, value: Any) -> MessageSet | None:
"""Set oscillation mode."""
message: MessageSet | None = None
Expand All @@ -208,111 +288,17 @@ def set_oscillation(self, attr: str, value: Any) -> MessageSet | None:
value in MideaFADevice._oscillation_modes or not value
):
message = MessageSet(self._protocol_version, self.subtype)
if value == "Off" or not value:
message.oscillate = False
else:
message.oscillate = True
message.oscillation_mode = MideaFADevice._oscillation_modes.index(
value,
)
if value == "Oscillation":
if (
self._attributes[DeviceAttributes.oscillation_angle]
== "Off"
):
message.oscillation_angle = 3 # 90
else:
message.oscillation_angle = (
MideaFADevice._oscillation_angles.index(
self._attributes[
DeviceAttributes.oscillation_angle
],
)
)
elif value == "Tilting":
if self._attributes[DeviceAttributes.tilting_angle] == "Off":
message.tilting_angle = 3 # 90
else:
message.tilting_angle = MideaFADevice._tilting_angles.index(
self._attributes[DeviceAttributes.tilting_angle],
)
else:
if (
self._attributes[DeviceAttributes.oscillation_angle]
== "Off"
):
message.oscillation_angle = 3 # 90
else:
message.oscillation_angle = (
MideaFADevice._oscillation_angles.index(
self._attributes[
DeviceAttributes.oscillation_angle
],
)
)
if self._attributes[DeviceAttributes.tilting_angle] == "Off":
message.tilting_angle = 3 # 90
else:
message.tilting_angle = MideaFADevice._tilting_angles.index(
self._attributes[DeviceAttributes.tilting_angle],
)
self._set_oscillation_mode(message, value)
elif attr == DeviceAttributes.oscillation_angle and (
value in MideaFADevice._oscillation_angles or not value
):
message = MessageSet(self._protocol_version, self.subtype)
if value == "Off" or not value:
if self._attributes[DeviceAttributes.tilting_angle] == "Off":
message.oscillate = False
else:
message.oscillate = True
message.oscillation_mode = 2
message.tilting_angle = MideaFADevice._tilting_angles.index(
self._attributes[DeviceAttributes.tilting_angle],
)
else:
message.oscillation_angle = MideaFADevice._oscillation_angles.index(
value,
)
message.oscillate = True
if self._attributes[DeviceAttributes.tilting_angle] == "Off":
message.oscillation_mode = 1
elif (
self._attributes[DeviceAttributes.oscillation_mode] == "Tilting"
):
message.oscillation_mode = 6
message.tilting_angle = MideaFADevice._tilting_angles.index(
self._attributes[DeviceAttributes.tilting_angle],
)
self._set_oscillation_angle(message, value)
elif attr == DeviceAttributes.tilting_angle and (
value in MideaFADevice._tilting_angles or not value
):
message = MessageSet(self._protocol_version, self.subtype)
if value == "Off" or not value:
if self._attributes[DeviceAttributes.oscillation_angle] == "Off":
message.oscillate = False
else:
message.oscillate = True
message.oscillation_mode = 1
message.oscillation_angle = (
MideaFADevice._oscillation_angles.index(
self._attributes[DeviceAttributes.oscillation_angle],
)
)
else:
message.tilting_angle = MideaFADevice._tilting_angles.index(value)
message.oscillate = True
if self._attributes[DeviceAttributes.oscillation_angle] == "Off":
message.oscillation_mode = 2
elif (
self._attributes[DeviceAttributes.oscillation_mode]
== "Oscillation"
):
message.oscillation_mode = 6
message.oscillation_angle = (
MideaFADevice._oscillation_angles.index(
self._attributes[DeviceAttributes.oscillation_angle],
)
)
self._set_tilting_angle(message, value)
return message

def set_attribute(self, attr: str, value: Any) -> None:
Expand Down
59 changes: 32 additions & 27 deletions midealocal/devices/x26/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,52 +139,26 @@ class Message26Body(MessageBody):
def __init__(self, body: bytearray) -> None:
"""Initialize X26 message body."""
super().__init__(body)
self.fields = {}
self.fields = self._gen_fields(body)
self.main_light = self.read_byte(body, 1) > 0
self.fields["MAIN_LIGHT_BRIGHTNESS"] = self.read_byte(body, 2)
self.night_light = self.read_byte(body, 3) > 0
self.fields["NIGHT_LIGHT_BRIGHTNESS"] = self.read_byte(body, 4)
self.fields["RADAR_INDUCTION_ENABLE"] = self.read_byte(body, 5)
self.fields["RADAR_INDUCTION_CLOSING_TIME"] = self.read_byte(body, 6)
self.fields["LIGHT_INTENSITY_THRESHOLD"] = self.read_byte(body, 7)
self.fields["RADAR_SENSITIVITY"] = self.read_byte(body, 8)
heat_mode = self.read_byte(body, 9) > 0
heat_temperature = self.read_byte(body, 10)
self.fields["HEATING_SPEED"] = self.read_byte(body, 11)
heat_direction = self.read_byte(body, 12)
bath_mode = self.read_byte(body, 13) > 0
self.fields["BATH_HEATING_TIME"] = self.read_byte(body, 14)
self.fields["BATH_TEMPERATURE"] = self.read_byte(body, 15)
self.fields["BATH_SPEED"] = self.read_byte(body, 16)
bath_direction = self.read_byte(body, 17)
ventilation_mode = self.read_byte(body, 18) > 0
self.fields["VENTILATION_SPEED"] = self.read_byte(body, 19)
ventilation_direction = self.read_byte(body, 20)
dry_mode = self.read_byte(body, 21) > 0
self.fields["DRYING_TIME"] = self.read_byte(body, 22)
self.fields["DRYING_TEMPERATURE"] = self.read_byte(body, 23)
self.fields["DRYING_SPEED"] = self.read_byte(body, 24)
dry_direction = self.read_byte(body, 25)
blow_mode = self.read_byte(body, 26) > 0
self.fields["BLOWING_SPEED"] = self.read_byte(body, 27)
blow_direction = self.read_byte(body, 28)
self.fields["DELAY_ENABLE"] = self.read_byte(body, 29)
self.fields["DELAY_TIME"] = self.read_byte(body, 30)
if self.read_byte(body, 31) != MAX_BYTE_VALUE:
self.current_humidity = self.read_byte(body, 31)
if self.read_byte(body, 32) != MAX_BYTE_VALUE:
self.current_radar = self.read_byte(body, 32)
if self.read_byte(body, 33) != MAX_BYTE_VALUE:
self.current_temperature = self.read_byte(body, 33)
self.fields["SOFT_WIND_ENABLE"] = self.read_byte(body, 38)
self.fields["SOFT_WIND_TIME"] = self.read_byte(body, 39)
self.fields["SOFT_WIND_TEMPERATURE"] = self.read_byte(body, 40)
self.fields["SOFT_WIND_SPEED"] = self.read_byte(body, 41)
self.fields["SOFT_WIND_DIRECTION"] = self.read_byte(body, 42)
self.fields["WINDLESS_ENABLE"] = self.read_byte(body, 43)
self.fields["ANION_ENABLE"] = self.read_byte(body, 44)
self.fields["SMELLY_ENABLE"] = self.read_byte(body, 45)
self.fields["SMELLY_THRESHOLD"] = self.read_byte(body, 46)
self.mode = 0
self.direction = 0xFD
if heat_mode:
Expand All @@ -206,6 +180,37 @@ def __init__(self, body: bytearray) -> None:
self.mode = 6
self.direction = dry_direction

def _gen_fields(self, body: bytearray) -> dict[str, int]:
fields: dict[str, int] = {}
fields["MAIN_LIGHT_BRIGHTNESS"] = self.read_byte(body, 2)
fields["NIGHT_LIGHT_BRIGHTNESS"] = self.read_byte(body, 4)
fields["RADAR_INDUCTION_ENABLE"] = self.read_byte(body, 5)
fields["RADAR_INDUCTION_CLOSING_TIME"] = self.read_byte(body, 6)
fields["LIGHT_INTENSITY_THRESHOLD"] = self.read_byte(body, 7)
fields["RADAR_SENSITIVITY"] = self.read_byte(body, 8)
fields["HEATING_SPEED"] = self.read_byte(body, 11)
fields["BATH_HEATING_TIME"] = self.read_byte(body, 14)
fields["BATH_TEMPERATURE"] = self.read_byte(body, 15)
fields["BATH_SPEED"] = self.read_byte(body, 16)
fields["VENTILATION_SPEED"] = self.read_byte(body, 19)
fields["DRYING_TIME"] = self.read_byte(body, 22)
fields["DRYING_TEMPERATURE"] = self.read_byte(body, 23)
fields["DRYING_SPEED"] = self.read_byte(body, 24)
fields["BLOWING_SPEED"] = self.read_byte(body, 27)
fields["DELAY_ENABLE"] = self.read_byte(body, 29)
fields["DELAY_TIME"] = self.read_byte(body, 30)
fields["SOFT_WIND_ENABLE"] = self.read_byte(body, 38)
fields["SOFT_WIND_TIME"] = self.read_byte(body, 39)
fields["SOFT_WIND_TEMPERATURE"] = self.read_byte(body, 40)
fields["SOFT_WIND_SPEED"] = self.read_byte(body, 41)
fields["SOFT_WIND_DIRECTION"] = self.read_byte(body, 42)
fields["WINDLESS_ENABLE"] = self.read_byte(body, 43)
fields["ANION_ENABLE"] = self.read_byte(body, 44)
fields["SMELLY_ENABLE"] = self.read_byte(body, 45)
fields["SMELLY_THRESHOLD"] = self.read_byte(body, 46)

return fields


class Message26Response(MessageResponse):
"""X26 message response."""
Expand Down
Loading

0 comments on commit 9fc1c99

Please sign in to comment.