Skip to content

Commit

Permalink
refactor: Optimizations of repetitive code
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrapan committed Dec 3, 2024
1 parent 9d4a0e5 commit e53b3a3
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 60 deletions.
8 changes: 4 additions & 4 deletions custom_components/solarman/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .api import Inverter
from .discovery import InverterDiscovery
from .coordinator import InverterCoordinator
from .entity import async_migrate_unique_ids
from .entity import migrate_unique_ids
from .config_flow import async_update_listener
from .services import *

Expand Down Expand Up @@ -51,10 +51,10 @@ async def async_setup_entry(hass: HomeAssistant, config: ConfigEntry) -> bool:
except AddressValueError:
ipaddr = IPv4Address(socket.gethostbyname(inverter_host))
if ipaddr.is_private and (discover := await InverterDiscovery(hass, inverter_host, serial).discover()):
if device := get_or_default(discover, serial):
if (device := discover.get(serial)) is not None:
inverter_host = device["ip"]
inverter_mac = device["mac"]
elif device := get_or_default(discover, (s := next(iter([k for k, v in discover.items() if v["ip"] == inverter_host]), None))):
elif (device := discover.get((s := next(iter([k for k, v in discover.items() if v["ip"] == inverter_host]), None)))):
raise vol.Invalid(f"Host {inverter_host} has serial number {s} but is configured with {serial}.")

inverter = Inverter(inverter_host, serial, inverter_port, mb_slave_id)
Expand All @@ -78,7 +78,7 @@ async def async_setup_entry(hass: HomeAssistant, config: ConfigEntry) -> bool:
#
_LOGGER.debug(f"async_setup: async_migrate_entries")

await async_migrate_entries(hass, config.entry_id, partial(async_migrate_unique_ids, name, serial))
await async_migrate_entries(hass, config.entry_id, partial(migrate_unique_ids, name, serial))

# Forward setup
#
Expand Down
27 changes: 17 additions & 10 deletions custom_components/solarman/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ def execute_async(x):
def ensure_list(value):
return value if isinstance(value, list) else [value]

def get_or_default(dict, key, default = None):
return dict[key] if dict and key in dict else default

def set_request(code, start, end):
return { REQUEST_CODE: code, REQUEST_START: start, REQUEST_END: end }

Expand Down Expand Up @@ -97,13 +94,20 @@ def format_exception(e):
return re.sub(r"\s+", " ", f"{type(e).__name__}{f': {e}' if f'{e}' else ''}")

def process_descriptions(item, group, table, code):
if not REQUEST_UPDATE_INTERVAL in item and REQUEST_UPDATE_INTERVAL in group:
item[REQUEST_UPDATE_INTERVAL] = group[REQUEST_UPDATE_INTERVAL]
if not REQUEST_CODE in item and "registers" in item:
if REQUEST_CODE in group:
item[REQUEST_CODE] = group[REQUEST_CODE]
elif (addr := min(item["registers"])) is not None:
item[REQUEST_CODE] = table[addr] if addr in table else code
def inherit(target, source, *properties):
for p in properties:
if not p in target and (v := source.get(p)) is not None:
target[p] = v
return target

inherit(item, group, *(REQUEST_UPDATE_INTERVAL, REQUEST_CODE) if "registers" in item else REQUEST_UPDATE_INTERVAL)
if not REQUEST_CODE in item and (r := item.get("registers")) is not None and (addr := min(r)) is not None:
item[REQUEST_CODE] = table.get(addr, code)
if (sensors := item.get("sensors")) is not None:
for s in sensors:
inherit(s, item, REQUEST_CODE, "scale")
if (m := item.get("multiply")) is not None:
inherit(m, s, REQUEST_CODE, "scale")
return item

def get_code(item, type, default = None):
Expand All @@ -129,6 +133,9 @@ def get_addr_value(data, code, addr):
def ilen(object):
return len(object) if not isinstance(object, int) else 1

def get_or_def(o, k, d):
return o.get(k, d) or d

def lookup_value(value, dictionary):
default = dictionary[0]["value"]

Expand Down
2 changes: 1 addition & 1 deletion custom_components/solarman/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
_LOGGER = logging.getLogger(__name__)

@callback
def async_migrate_unique_ids(name: str, serial: int, entity_entry: RegistryEntry) -> dict[str, Any] | None:
def migrate_unique_ids(name: str, serial: int, entity_entry: RegistryEntry) -> dict[str, Any] | None:

entity_name = entity_entry.original_name if entity_entry.has_entity_name or not entity_entry.original_name else entity_entry.original_name.replace(name, '').strip()
old_unique_id = '_'.join(filter(None, (name, str(serial), entity_name)))
Expand Down
78 changes: 33 additions & 45 deletions custom_components/solarman/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,9 @@ def schedule_requests(self, runtime = 0):
return [set_request(self._code if self._is_single_code else r[0][0], r[0][1], r[-1][1]) for r in groups]

def in_range(self, value, definition):
if "range" in definition:
range = definition["range"]
if "min" in range and "max" in range:
if value < range["min"] or value > range["max"]:
if (range := definition.get("range")) is not None:
if (min := range.get("min")) is not None and (max := range.get("max")) is not None:
if value < min or value > max:
_LOGGER.debug(f"Value: {value} of {definition["registers"]} is out of range: {range}")
return False

Expand Down Expand Up @@ -180,30 +179,30 @@ def _read_registers(self, data, definition):
if not self.in_range(value, definition):
return None

if "mask" in definition:
value &= definition["mask"]
if (mask := definition.get("mask")) is not None:
value &= mask

if "bit" in definition:
value = (value >> definition["bit"]) & 1
if (bit := definition.get("bit")) is not None:
value = (value >> bit) & 1

if "bitmask" in definition and (bitmask := definition["bitmask"]):
if (bitmask := definition.get("bitmask")) is not None:
value = int((value & bitmask) / bitmask)

if "lookup" not in definition:
if "offset" in definition:
value -= definition["offset"]
if (offset := definition.get("offset")) is not None:
value -= offset

if "scale" in definition and (scale := definition["scale"]):
if (scale := definition.get("scale")) is not None:
value *= scale

if "divide" in definition and (divide := definition["divide"]):
if (divide := definition.get("divide")) is not None:
value //= divide

return value

def _read_registers_signed(self, data, definition):
code = get_code(definition, "read")
magnitude = definition["magnitude"] if "magnitude" in definition else False
magnitude = definition.get("magnitude", False)
maxint = 0
value = 0
shift = 0
Expand All @@ -220,16 +219,16 @@ def _read_registers_signed(self, data, definition):
if not self.in_range(value, definition):
return None

if "offset" in definition:
value -= definition["offset"]
if (offset := definition.get("offset")) is not None:
value -= offset

if value > (maxint >> 1):
value = (value - maxint) if not magnitude else -(value & (maxint >> 1))

if "scale" in definition and (scale := definition["scale"]):
if (scale := definition.get("scale")) is not None:
value *= scale

if "divide" in definition and (divide := definition["divide"]):
if (divide := definition.get("divide")) is not None:
value //= divide

return value
Expand All @@ -238,27 +237,16 @@ def _read_registers_custom(self, data, definition):
value = 0

for s in definition["sensors"]:
if not REQUEST_CODE in s and REQUEST_CODE in definition and (code := definition[REQUEST_CODE]):
s[REQUEST_CODE] = code
if not "scale" in s and "scale" in definition and (scale := definition["scale"]):
s["scale"] = scale

if (n := (self._read_registers(data, s) if not "signed" in s else self._read_registers_signed(data, s))) is None:
if (n := self._read_registers(data, s) if not "signed" in s else self._read_registers_signed(data, s)) is None:
return None

if (validation := get_or_default(s, "validation")) and not self.do_validate(s["registers"], n, validation):
if (validation := s.get("validation")) is not None and not self.do_validate(s["registers"], n, validation):
if not "default" in validation:
continue
n = validation["default"]

if "multiply" in s and (s_multiply := s["multiply"]):
if not REQUEST_CODE in s_multiply and REQUEST_CODE in s and (s_code := s[REQUEST_CODE]):
s_multiply[REQUEST_CODE] = s_code
if not "scale" in s_multiply and "scale" in s and (s_scale := s["scale"]):
s_multiply["scale"] = s_scale

if (c := self._read_registers(data, s_multiply)) is not None:
n *= c
if (m := s.get("multiply")) and (c := self._read_registers(data, m)) is not None:
n *= c

if not "operator" in s:
value += n
Expand All @@ -276,7 +264,7 @@ def _read_registers_custom(self, data, definition):
return value

def try_parse_unsigned(self, data, definition):
if (value := (self._read_registers(data, definition) if not "sensors" in definition else self._read_registers_custom(data, definition))) is None:
if (value := self._read_registers(data, definition) if not "sensors" in definition else self._read_registers_custom(data, definition)) is None:
return

if "uint" in definition and value < 0:
Expand All @@ -288,31 +276,31 @@ def try_parse_unsigned(self, data, definition):
self.set_state(key, lookup_value(value, definition["lookup"]), int(value))
return

if (validation := get_or_default(definition, "validation")) and not self.do_validate(key, value, validation):
if (validation := definition.get("validation")) is not None and not self.do_validate(key, value, validation):
if not "default" in validation:
return
value = validation["default"]

self.set_state(key, get_number(value, definition["digits"] if "digits" in definition else self._digits))
self.set_state(key, get_number(value, get_or_def(definition, "digits", self._digits)))

if "attributes" in definition and "value" in definition["attributes"]:
if (a := definition.get("attributes")) is not None and "value" in a:
self.set_state(key, self._result[key][0], int(value))

def try_parse_signed(self, data, definition):
if (value := (self._read_registers_signed(data, definition) if not "sensors" in definition else self._read_registers_custom(data, definition))) is None:
if (value := self._read_registers_signed(data, definition) if not "sensors" in definition else self._read_registers_custom(data, definition)) is None:
return

if "inverted" in definition and definition["inverted"]:
if definition.get("inverted"):
value = -value

key = definition["name"]

if (validation := get_or_default(definition, "validation")) and not self.do_validate(key, value, validation):
if (validation := definition.get("validation")) is not None and not self.do_validate(key, value, validation):
if not "default" in validation:
return
value = validation["default"]

self.set_state(key, get_number(value, definition["digits"] if "digits" in definition else self._digits))
self.set_state(key, get_number(value, get_or_def(definition, "digits", self._digits)))

def try_parse_ascii(self, data, definition):
code = get_code(definition, "read")
Expand Down Expand Up @@ -348,8 +336,8 @@ def try_parse_version(self, data, definition):

value += str(temp >> 12) + "." + str(temp >> 8 & 0x0F) + "." + str(temp >> 4 & 0x0F) + "." + str(temp & 0x0F)

if "remove" in definition:
value = value.replace(definition["remove"], "")
if (remove := definition.get("remove")) is not None:
value = value.replace(remove, "")

self.set_state(definition["name"], value)

Expand Down Expand Up @@ -394,8 +382,8 @@ def try_parse_datetime(self, data, definition):

def try_parse_time(self, data, definition):
code = get_code(definition, "read")
f, d = ("{:02d}", 100 if not "dec" in definition else definition["dec"]) if not "hex" in definition else ("{:02x}", 0x100 if definition["hex"] is None else definition["hex"])
offset = definition["offset"] if "offset" in definition else None
f, d = ("{:02d}", get_or_def(definition, "dec", 100)) if not "hex" in definition else ("{:02x}", get_or_def(definition, "hex", 0x100))
offset = definition.get("offset")
value = ""

registers_count = len(definition["registers"])
Expand Down

0 comments on commit e53b3a3

Please sign in to comment.