diff --git a/custom_components/solarman/__init__.py b/custom_components/solarman/__init__.py index b106250e..bfc609b2 100644 --- a/custom_components/solarman/__init__.py +++ b/custom_components/solarman/__init__.py @@ -16,7 +16,7 @@ from .api import Inverter from .discovery import InverterDiscovery from .coordinator import InverterCoordinator -from .entity import async_migrate_unique_ids +from .entity import migrate_unique_ids from .config_flow import async_update_listener from .services import * @@ -51,10 +51,10 @@ async def async_setup_entry(hass: HomeAssistant, config: ConfigEntry) -> bool: except AddressValueError: ipaddr = IPv4Address(socket.gethostbyname(inverter_host)) if ipaddr.is_private and (discover := await InverterDiscovery(hass, inverter_host, serial).discover()): - if device := get_or_default(discover, serial): + if (device := discover.get(serial)) is not None: inverter_host = device["ip"] inverter_mac = device["mac"] - elif device := get_or_default(discover, (s := next(iter([k for k, v in discover.items() if v["ip"] == inverter_host]), None))): + elif (device := discover.get((s := next(iter([k for k, v in discover.items() if v["ip"] == inverter_host]), None)))): raise vol.Invalid(f"Host {inverter_host} has serial number {s} but is configured with {serial}.") inverter = Inverter(inverter_host, serial, inverter_port, mb_slave_id) @@ -78,7 +78,7 @@ async def async_setup_entry(hass: HomeAssistant, config: ConfigEntry) -> bool: # _LOGGER.debug(f"async_setup: async_migrate_entries") - await async_migrate_entries(hass, config.entry_id, partial(async_migrate_unique_ids, name, serial)) + await async_migrate_entries(hass, config.entry_id, partial(migrate_unique_ids, name, serial)) # Forward setup # diff --git a/custom_components/solarman/common.py b/custom_components/solarman/common.py index 6b525c9e..a9f0df12 100644 --- a/custom_components/solarman/common.py +++ b/custom_components/solarman/common.py @@ -30,9 +30,6 @@ def execute_async(x): def ensure_list(value): return value if isinstance(value, list) else [value] -def get_or_default(dict, key, default = None): - return dict[key] if dict and key in dict else default - def set_request(code, start, end): return { REQUEST_CODE: code, REQUEST_START: start, REQUEST_END: end } @@ -97,13 +94,20 @@ def format_exception(e): return re.sub(r"\s+", " ", f"{type(e).__name__}{f': {e}' if f'{e}' else ''}") def process_descriptions(item, group, table, code): - if not REQUEST_UPDATE_INTERVAL in item and REQUEST_UPDATE_INTERVAL in group: - item[REQUEST_UPDATE_INTERVAL] = group[REQUEST_UPDATE_INTERVAL] - if not REQUEST_CODE in item and "registers" in item: - if REQUEST_CODE in group: - item[REQUEST_CODE] = group[REQUEST_CODE] - elif (addr := min(item["registers"])) is not None: - item[REQUEST_CODE] = table[addr] if addr in table else code + def inherit(target, source, *properties): + for p in properties: + if not p in target and (v := source.get(p)) is not None: + target[p] = v + return target + + inherit(item, group, *(REQUEST_UPDATE_INTERVAL, REQUEST_CODE) if "registers" in item else REQUEST_UPDATE_INTERVAL) + if not REQUEST_CODE in item and (r := item.get("registers")) is not None and (addr := min(r)) is not None: + item[REQUEST_CODE] = table.get(addr, code) + if (sensors := item.get("sensors")) is not None: + for s in sensors: + inherit(s, item, REQUEST_CODE, "scale") + if (m := item.get("multiply")) is not None: + inherit(m, s, REQUEST_CODE, "scale") return item def get_code(item, type, default = None): @@ -129,6 +133,9 @@ def get_addr_value(data, code, addr): def ilen(object): return len(object) if not isinstance(object, int) else 1 +def get_or_def(o, k, d): + return o.get(k, d) or d + def lookup_value(value, dictionary): default = dictionary[0]["value"] diff --git a/custom_components/solarman/entity.py b/custom_components/solarman/entity.py index 59dc77c7..df722ce5 100644 --- a/custom_components/solarman/entity.py +++ b/custom_components/solarman/entity.py @@ -22,7 +22,7 @@ _LOGGER = logging.getLogger(__name__) @callback -def async_migrate_unique_ids(name: str, serial: int, entity_entry: RegistryEntry) -> dict[str, Any] | None: +def migrate_unique_ids(name: str, serial: int, entity_entry: RegistryEntry) -> dict[str, Any] | None: entity_name = entity_entry.original_name if entity_entry.has_entity_name or not entity_entry.original_name else entity_entry.original_name.replace(name, '').strip() old_unique_id = '_'.join(filter(None, (name, str(serial), entity_name))) diff --git a/custom_components/solarman/parser.py b/custom_components/solarman/parser.py index 5300fd1a..bd6c4559 100644 --- a/custom_components/solarman/parser.py +++ b/custom_components/solarman/parser.py @@ -99,10 +99,9 @@ def schedule_requests(self, runtime = 0): return [set_request(self._code if self._is_single_code else r[0][0], r[0][1], r[-1][1]) for r in groups] def in_range(self, value, definition): - if "range" in definition: - range = definition["range"] - if "min" in range and "max" in range: - if value < range["min"] or value > range["max"]: + if (range := definition.get("range")) is not None: + if (min := range.get("min")) is not None and (max := range.get("max")) is not None: + if value < min or value > max: _LOGGER.debug(f"Value: {value} of {definition["registers"]} is out of range: {range}") return False @@ -180,30 +179,30 @@ def _read_registers(self, data, definition): if not self.in_range(value, definition): return None - if "mask" in definition: - value &= definition["mask"] + if (mask := definition.get("mask")) is not None: + value &= mask - if "bit" in definition: - value = (value >> definition["bit"]) & 1 + if (bit := definition.get("bit")) is not None: + value = (value >> bit) & 1 - if "bitmask" in definition and (bitmask := definition["bitmask"]): + if (bitmask := definition.get("bitmask")) is not None: value = int((value & bitmask) / bitmask) if "lookup" not in definition: - if "offset" in definition: - value -= definition["offset"] + if (offset := definition.get("offset")) is not None: + value -= offset - if "scale" in definition and (scale := definition["scale"]): + if (scale := definition.get("scale")) is not None: value *= scale - if "divide" in definition and (divide := definition["divide"]): + if (divide := definition.get("divide")) is not None: value //= divide return value def _read_registers_signed(self, data, definition): code = get_code(definition, "read") - magnitude = definition["magnitude"] if "magnitude" in definition else False + magnitude = definition.get("magnitude", False) maxint = 0 value = 0 shift = 0 @@ -220,16 +219,16 @@ def _read_registers_signed(self, data, definition): if not self.in_range(value, definition): return None - if "offset" in definition: - value -= definition["offset"] + if (offset := definition.get("offset")) is not None: + value -= offset if value > (maxint >> 1): value = (value - maxint) if not magnitude else -(value & (maxint >> 1)) - if "scale" in definition and (scale := definition["scale"]): + if (scale := definition.get("scale")) is not None: value *= scale - if "divide" in definition and (divide := definition["divide"]): + if (divide := definition.get("divide")) is not None: value //= divide return value @@ -238,27 +237,16 @@ def _read_registers_custom(self, data, definition): value = 0 for s in definition["sensors"]: - if not REQUEST_CODE in s and REQUEST_CODE in definition and (code := definition[REQUEST_CODE]): - s[REQUEST_CODE] = code - if not "scale" in s and "scale" in definition and (scale := definition["scale"]): - s["scale"] = scale - - if (n := (self._read_registers(data, s) if not "signed" in s else self._read_registers_signed(data, s))) is None: + if (n := self._read_registers(data, s) if not "signed" in s else self._read_registers_signed(data, s)) is None: return None - if (validation := get_or_default(s, "validation")) and not self.do_validate(s["registers"], n, validation): + if (validation := s.get("validation")) is not None and not self.do_validate(s["registers"], n, validation): if not "default" in validation: continue n = validation["default"] - if "multiply" in s and (s_multiply := s["multiply"]): - if not REQUEST_CODE in s_multiply and REQUEST_CODE in s and (s_code := s[REQUEST_CODE]): - s_multiply[REQUEST_CODE] = s_code - if not "scale" in s_multiply and "scale" in s and (s_scale := s["scale"]): - s_multiply["scale"] = s_scale - - if (c := self._read_registers(data, s_multiply)) is not None: - n *= c + if (m := s.get("multiply")) and (c := self._read_registers(data, m)) is not None: + n *= c if not "operator" in s: value += n @@ -276,7 +264,7 @@ def _read_registers_custom(self, data, definition): return value def try_parse_unsigned(self, data, definition): - if (value := (self._read_registers(data, definition) if not "sensors" in definition else self._read_registers_custom(data, definition))) is None: + if (value := self._read_registers(data, definition) if not "sensors" in definition else self._read_registers_custom(data, definition)) is None: return if "uint" in definition and value < 0: @@ -288,31 +276,31 @@ def try_parse_unsigned(self, data, definition): self.set_state(key, lookup_value(value, definition["lookup"]), int(value)) return - if (validation := get_or_default(definition, "validation")) and not self.do_validate(key, value, validation): + if (validation := definition.get("validation")) is not None and not self.do_validate(key, value, validation): if not "default" in validation: return value = validation["default"] - self.set_state(key, get_number(value, definition["digits"] if "digits" in definition else self._digits)) + self.set_state(key, get_number(value, get_or_def(definition, "digits", self._digits))) - if "attributes" in definition and "value" in definition["attributes"]: + if (a := definition.get("attributes")) is not None and "value" in a: self.set_state(key, self._result[key][0], int(value)) def try_parse_signed(self, data, definition): - if (value := (self._read_registers_signed(data, definition) if not "sensors" in definition else self._read_registers_custom(data, definition))) is None: + if (value := self._read_registers_signed(data, definition) if not "sensors" in definition else self._read_registers_custom(data, definition)) is None: return - if "inverted" in definition and definition["inverted"]: + if definition.get("inverted"): value = -value key = definition["name"] - if (validation := get_or_default(definition, "validation")) and not self.do_validate(key, value, validation): + if (validation := definition.get("validation")) is not None and not self.do_validate(key, value, validation): if not "default" in validation: return value = validation["default"] - self.set_state(key, get_number(value, definition["digits"] if "digits" in definition else self._digits)) + self.set_state(key, get_number(value, get_or_def(definition, "digits", self._digits))) def try_parse_ascii(self, data, definition): code = get_code(definition, "read") @@ -348,8 +336,8 @@ def try_parse_version(self, data, definition): value += str(temp >> 12) + "." + str(temp >> 8 & 0x0F) + "." + str(temp >> 4 & 0x0F) + "." + str(temp & 0x0F) - if "remove" in definition: - value = value.replace(definition["remove"], "") + if (remove := definition.get("remove")) is not None: + value = value.replace(remove, "") self.set_state(definition["name"], value) @@ -394,8 +382,8 @@ def try_parse_datetime(self, data, definition): def try_parse_time(self, data, definition): code = get_code(definition, "read") - f, d = ("{:02d}", 100 if not "dec" in definition else definition["dec"]) if not "hex" in definition else ("{:02x}", 0x100 if definition["hex"] is None else definition["hex"]) - offset = definition["offset"] if "offset" in definition else None + f, d = ("{:02d}", get_or_def(definition, "dec", 100)) if not "hex" in definition else ("{:02x}", get_or_def(definition, "hex", 0x100)) + offset = definition.get("offset") value = "" registers_count = len(definition["registers"])