Skip to content

Commit

Permalink
Fix operation status calculation for binaty register values
Browse files Browse the repository at this point in the history
  • Loading branch information
klejejs committed Jan 6, 2023
1 parent 52dbfc8 commit 2f0ec4e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 35 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ To execute the example file, first run `pip install -r requirements.txt` to inst
| `cooling_supply_line_temperature` | Cooling supply line temperature in Celsius |
| --- | --- |
| Operational status | |
| `operational_status` | Operational status of the Heat Pump |
| `operational_status` | Operational status of the Heat Pump or list of operational statuses (if multiple) |
| `available_operational_statuses` | List of available operational statuses |
| `available_operational_statuses_map` | Dictionary mapping operational status names to their values |
| `operational_status_auxiliary_heater_3kw` | Auxiliary heater status for 3kw (returns `None` if unavailable) |
Expand Down
90 changes: 56 additions & 34 deletions ThermiaOnlineAPI/model/HeatPump.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from ..utils.utils import pretty_print_except

from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from ThermiaOnlineAPI.const import (
REG_BRINE_IN,
Expand Down Expand Up @@ -56,10 +56,14 @@ def __init__(self, device_data: dict, api_interface: "ThermiaAPI"):
self.__status = None
self.__device_data = None

self.__device_config: Dict[str, Optional[str]] = {
"operational_status_register": None,
"operational_status_valueNamePrefix": None,
}

# GROUPS
self.__group_temperatures = None
self.__group_operational_status = None
self.__operational_status_register = None
self.__group_operational_time = None
self.__group_operational_operation = None
self.__group_hot_water: Dict[str, Optional[int]] = {
Expand Down Expand Up @@ -345,39 +349,43 @@ def __get_value_by_key_and_register_name_from_operational_status(
return None

def __get_operational_statuses_from_operational_status(self) -> Optional[Dict]:
if self.__operational_status_register is not None:
return self.__get_register_from_operational_status(
self.__operational_status_register
if self.__device_config["operational_status_register"] is not None:
data = self.__get_register_from_operational_status(
self.__device_config["operational_status_register"]
)
if data is not None:
return data.get("valueNames", [])

# Try to get the data from the REG_OPERATIONAL_STATUS_PRIO1 register
data = self.__get_register_from_operational_status(REG_OPERATIONAL_STATUS_PRIO1)
if data is not None:
self.__operational_status_register = REG_OPERATIONAL_STATUS_PRIO1
return {
"registerValues": data.get("valueNames", []),
"valueNamePrefix": "REG_VALUE_STATUS_",
}
self.__device_config[
"operational_status_register"
] = REG_OPERATIONAL_STATUS_PRIO1
self.__device_config[
"operational_status_valueNamePrefix"
] = "REG_VALUE_STATUS_"
return data.get("valueNames", [])

# Try to get the data from the COMP_STATUS_ITEC register
data = self.__get_register_from_operational_status(COMP_STATUS_ITEC)
if data is not None:
self.__operational_status_register = COMP_STATUS_ITEC
return {
"registerValues": data.get("valueNames", []),
"valueNamePrefix": "COMP_VALUE_",
}
self.__device_config["operational_status_register"] = COMP_STATUS_ITEC
self.__device_config["operational_status_valueNamePrefix"] = "COMP_VALUE_"
return data.get("valueNames", [])

# Try to get the data from the REG_OPERATIONAL_STATUS_PRIORITY_BITMASK register
data = self.__get_register_from_operational_status(
REG_OPERATIONAL_STATUS_PRIORITY_BITMASK
)
if data is not None:
self.__operational_status_register = REG_OPERATIONAL_STATUS_PRIORITY_BITMASK
return {
"registerValues": data.get("valueNames", []),
"valueNamePrefix": "REG_VALUE_STATUS_",
}
self.__device_config[
"operational_status_register"
] = REG_OPERATIONAL_STATUS_PRIORITY_BITMASK
self.__device_config[
"operational_status_valueNamePrefix"
] = "REG_VALUE_"
return data.get("valueNames", [])

return None

Expand All @@ -389,17 +397,13 @@ def __get_all_operational_statuses_from_operational_status(
if data is None:
return None

filtered_register_values = list(
filter(lambda value: value.get("visible"), data["valueNames"])
)

operation_modes_map = map(
lambda values: {
values.get("value"): values.get("name").split(data["valueNamePrefix"])[
1
],
values.get("value"): values.get("name").split(
self.__device_config["operational_status_valueNamePrefix"]
)[1],
},
filtered_register_values,
data,
)

operation_modes_list = list(operation_modes_map)
Expand Down Expand Up @@ -554,15 +558,15 @@ def cooling_supply_line_temperature(self):
###########################################################################

@property
def operational_status(self):
if self.__operational_status_register is None:
def operational_status(self) -> Optional[Union[str, List[str]]]:
if self.__device_config["operational_status_register"] is None:
# Attempt to get the register from the status data
self.__get_operational_statuses_from_operational_status()
if self.__operational_status_register is None:
if self.__device_config["operational_status_register"] is None:
return None

data = self.__get_register_from_operational_status(
self.__operational_status_register
self.__device_config["operational_status_register"]
)

if data is None:
Expand All @@ -575,11 +579,29 @@ def operational_status(self):
if data is None:
return None

data_items_list = list(data.items())

current_operation_mode = [
name for value, name in data.items() if value == current_register_value
name for value, name in data_items_list if value == current_register_value
]

if len(current_operation_mode) != 1:
if (
len(current_operation_mode) != 1
and current_register_value > 0
and len(data_items_list) > 1
):
# Attempt to get multiple statuses by binary sum of the values
data_items_list.sort(key=lambda x: x[0], reverse=True)
list_of_current_operation_modes = []

for value, name in data_items_list:
if value <= current_register_value:
current_register_value -= value
list_of_current_operation_modes.append(name)

if current_register_value == 0:
return list_of_current_operation_modes

return None

return current_operation_mode[0]
Expand Down

0 comments on commit 2f0ec4e

Please sign in to comment.