Skip to content

Commit

Permalink
refactor: Read/Write w/ asyncio.Semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrapan committed Dec 10, 2024
1 parent de68d54 commit c547509
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 125 deletions.
51 changes: 11 additions & 40 deletions custom_components/solarman/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async def write_multiple_holding_registers(self, register_addr, values):

class Inverter():
def __init__(self, config: ConfigurationProvider):
self._is_busy = 0
self._semaphore = asyncio.Semaphore(1)
self._write_lock = True

self.state = -1
Expand Down Expand Up @@ -153,23 +153,6 @@ async def shutdown(self) -> None:
self.state = -1
await self.modbus.disconnect()

async def raise_when_busy(self, attempts_left = ACTION_ATTEMPTS, message = "Semaphore timeout."):
async def wait_for_done(attempts_left):
try:
try:
while self._is_busy == 1 and attempts_left > 0:
attempts_left -= 1
await asyncio.sleep(TIMINGS_WAIT_FOR_SLEEP)
except Exception as e:
_LOGGER.debug(f"wait_for_done: {format_exception(e)}")
return self._is_busy == 1
finally:
self._is_busy = 1

if await wait_for_done(attempts_left):
_LOGGER.debug(f"[{self.config.serial}] R/W Timeout.")
raise TimeoutError(f"[{self.config.serial}] {message}")

async def read_write(self, code, start, arg):
if await self.modbus.connect():
self.state_updated = datetime.now()
Expand Down Expand Up @@ -234,10 +217,8 @@ async def get(self, runtime = 0, requests = None):
_LOGGER.debug(f"[{self.config.serial}] Scheduling {scheduled_count} query request{'' if scheduled_count == 1 else 's'}. #{runtime}")

try:
await self.raise_when_busy(message = "Busy: Currently writing to the device!")

try:
async with asyncio.timeout(TIMINGS_UPDATE_TIMEOUT):
async with asyncio.timeout(TIMINGS_UPDATE_TIMEOUT):
async with self._semaphore:
for request in scheduled:
code, start, end = get_request_code(request), get_request_start(request), get_request_end(request)
quantity = end - start + 1
Expand All @@ -251,30 +232,20 @@ async def get(self, runtime = 0, requests = None):
self.state_updated = now
self.state = 1

except TimeoutError:
raise
except Exception as e:
if await self.get_failed():
raise Exception(f"[{self.config.serial}] {format_exception(e)}") from e
_LOGGER.debug(f"[{self.config.serial}] Error fetching {self.config.name} data: {e}")
finally:
self._is_busy = 0

except TimeoutError:
if await self.get_failed():
raise
_LOGGER.debug(f"[{self.config.serial}] Timeout fetching {self.config.name} data")
except Exception as e:
if await self.get_failed():
raise Exception(f"[{self.config.serial}] {format_exception(e)}") from e
_LOGGER.debug(f"[{self.config.serial}] Error fetching {self.config.name} data: {e}")

return result

async def call(self, code, start, arg, wait_for_attempts = ACTION_ATTEMPTS):
async def call(self, code, start, arg):
_LOGGER.debug(f"[{self.config.serial}] Scheduling call request.")

await self.raise_when_busy(wait_for_attempts, "Busy: The coordinator is currently reading data from the device!")

try:
return await self.try_read_write(code, start, arg, f"Call {code:02X} ~ {start} | 0x{start:04X}: {arg}", False)
except:
raise
finally:
self._is_busy = 0
async with asyncio.timeout(TIMINGS_UPDATE_TIMEOUT):
async with self._semaphore:
return await self.try_read_write(code, start, arg, f"Call {code:02X} ~ {start} | 0x{start:04X}: {arg}", False)
2 changes: 1 addition & 1 deletion custom_components/solarman/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def write(self, value, state = None) -> None:
if isinstance(value, list):
while len(self.registers) > len(value):
value.insert(0, 0)
if await self.coordinator.inverter.call(self.code, self.register, value, ACTION_ATTEMPTS_MAX) > 0 and state:
if await self.coordinator.inverter.call(self.code, self.register, value) > 0 and state:
self.set_state(state, value)
self.async_write_ha_state()
#await self.entity_description.update_fn(self.coordinator., int(value))
Expand Down
56 changes: 12 additions & 44 deletions custom_components/solarman/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
vol.Required(SERVICES_PARAM_VALUES): vol.All(cv.ensure_list, [vol.All(vol.Coerce(int), vol.Range(min = 0, max = 65535))])
}

WAIT_SCHEMA = {
vol.Required(SERVICES_PARAM_WAIT_FOR_ATTEMPTS): vol.All(vol.Coerce(int), vol.Range(min = 0, max = 30))
}

def async_register(hass: HomeAssistant) -> None:
_LOGGER.debug(f"register")

Expand Down Expand Up @@ -61,15 +57,9 @@ async def read_holding_registers(call: ServiceCall) -> int:
quantity = call.data.get(SERVICES_PARAM_QUANTITY)

try:
response = await inverter.call(
CODE.READ_HOLDING_REGISTERS, register, quantity,
call.data.get(SERVICES_PARAM_WAIT_FOR_ATTEMPTS))
response = await inverter.call(CODE.READ_HOLDING_REGISTERS, register, quantity)
except Exception as e:
raise ServiceValidationError(
e,
translation_domain = DOMAIN,
translation_key = "call_failed"
)
raise ServiceValidationError(e, translation_domain = DOMAIN, translation_key = "call_failed")

result = {}

Expand All @@ -93,15 +83,9 @@ async def read_input_registers(call: ServiceCall) -> int:
quantity = call.data.get(SERVICES_PARAM_QUANTITY)

try:
response = await inverter.call(
CODE.READ_INPUT, register, quantity,
call.data.get(SERVICES_PARAM_WAIT_FOR_ATTEMPTS))
response = await inverter.call(CODE.READ_INPUT, register, quantity)
except Exception as e:
raise ServiceValidationError(
e,
translation_domain = DOMAIN,
translation_key = "call_failed"
)
raise ServiceValidationError(e, translation_domain = DOMAIN, translation_key = "call_failed")

result = {}

Expand All @@ -122,17 +106,9 @@ async def write_holding_register(call: ServiceCall) -> None:
)

try:
await inverter.call(
CODE.WRITE_HOLDING_REGISTER,
call.data.get(SERVICES_PARAM_REGISTER),
call.data.get(SERVICES_PARAM_VALUE),
call.data.get(SERVICES_PARAM_WAIT_FOR_ATTEMPTS))
await inverter.call(CODE.WRITE_HOLDING_REGISTER, call.data.get(SERVICES_PARAM_REGISTER), call.data.get(SERVICES_PARAM_VALUE))
except Exception as e:
raise ServiceValidationError(
e,
translation_domain = DOMAIN,
translation_key = "call_failed"
)
raise ServiceValidationError(e, translation_domain = DOMAIN, translation_key = "call_failed")

return

Expand All @@ -147,32 +123,24 @@ async def write_multiple_holding_registers(call: ServiceCall) -> None:
)

try:
await inverter.call(
CODE.WRITE_MULTIPLE_HOLDING_REGISTERS,
call.data.get(SERVICES_PARAM_REGISTER),
call.data.get(SERVICES_PARAM_VALUES),
call.data.get(SERVICES_PARAM_WAIT_FOR_ATTEMPTS))
await inverter.call(CODE.WRITE_MULTIPLE_HOLDING_REGISTERS, call.data.get(SERVICES_PARAM_REGISTER), call.data.get(SERVICES_PARAM_VALUES))
except Exception as e:
raise ServiceValidationError(
e,
translation_domain = DOMAIN,
translation_key = "call_failed"
)
raise ServiceValidationError(e, translation_domain = DOMAIN, translation_key = "call_failed")

return

hass.services.async_register(
DOMAIN, SERVICE_READ_HOLDING_REGISTERS, read_holding_registers, schema = vol.Schema(HEADER_SCHEMA | QUANTITY_SCHEMA | WAIT_SCHEMA), supports_response = SupportsResponse.OPTIONAL
DOMAIN, SERVICE_READ_HOLDING_REGISTERS, read_holding_registers, schema = vol.Schema(HEADER_SCHEMA | QUANTITY_SCHEMA), supports_response = SupportsResponse.OPTIONAL
)

hass.services.async_register(
DOMAIN, SERVICE_READ_INPUT_REGISTERS, read_input_registers, schema = vol.Schema(HEADER_SCHEMA | QUANTITY_SCHEMA | WAIT_SCHEMA), supports_response = SupportsResponse.OPTIONAL
DOMAIN, SERVICE_READ_INPUT_REGISTERS, read_input_registers, schema = vol.Schema(HEADER_SCHEMA | QUANTITY_SCHEMA), supports_response = SupportsResponse.OPTIONAL
)

hass.services.async_register(
DOMAIN, SERVICE_WRITE_HOLDING_REGISTER, write_holding_register, schema = vol.Schema(HEADER_SCHEMA | VALUE_SCHEMA | WAIT_SCHEMA)
DOMAIN, SERVICE_WRITE_HOLDING_REGISTER, write_holding_register, schema = vol.Schema(HEADER_SCHEMA | VALUE_SCHEMA)
)

hass.services.async_register(
DOMAIN, SERVICE_WRITE_MULTIPLE_HOLDING_REGISTERS, write_multiple_holding_registers, schema = vol.Schema(HEADER_SCHEMA | VALUES_SCHEMA | WAIT_SCHEMA)
DOMAIN, SERVICE_WRITE_MULTIPLE_HOLDING_REGISTERS, write_multiple_holding_registers, schema = vol.Schema(HEADER_SCHEMA | VALUES_SCHEMA)
)
40 changes: 0 additions & 40 deletions custom_components/solarman/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@ read_holding_registers:
min: 1
max: 125
mode: box
wait_for_attempts:
name: Wait for attempts
description: Wait for coordinator attempts
default: 5
required: true
selector:
number:
min: 0
max: 30
mode: box

read_input_registers:
name: Read Input Registers (Modbus Function Code 4)
Expand Down Expand Up @@ -75,16 +65,6 @@ read_input_registers:
min: 1
max: 125
mode: box
wait_for_attempts:
name: Wait for attempts
description: Wait for coordinator attempts
default: 5
required: true
selector:
number:
min: 0
max: 30
mode: box

write_holding_register:
name: Write Holding Register (Modbus Function Code 6)
Expand Down Expand Up @@ -119,16 +99,6 @@ write_holding_register:
min: 0
max: 65535
mode: box
wait_for_attempts:
name: Wait for attempts
description: Wait for coordinator attempts
default: 5
required: true
selector:
number:
min: 0
max: 30
mode: box

write_multiple_holding_registers:
name: Write Multiple Holding Registers (Modbus Function Code 16)
Expand Down Expand Up @@ -166,13 +136,3 @@ write_multiple_holding_registers:
min: 0
max: 65535
mode: box
wait_for_attempts:
name: Wait for attempts
description: Wait for coordinator attempts
default: 5
required: true
selector:
number:
min: 0
max: 30
mode: box

0 comments on commit c547509

Please sign in to comment.