Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugins: adjust all plugins using suspend_item to new pause_item #945

Merged
merged 12 commits into from
Jul 3, 2024
40 changes: 5 additions & 35 deletions db_addon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def __init__(self, sh):
self.item_attribute_search_str = 'database' # attribute, on which an item configured for database can be identified
self.last_connect_time = 0 # mechanism for limiting db connection requests
self.alive = None # Is plugin alive?
self.suspended = False # Is plugin activity suspended
self.active_queue_item: str = '-' # String holding item path of currently executed item
self.onchange_delay_time = 30

Expand Down Expand Up @@ -139,19 +138,19 @@ def run(self):
# check existence of db-plugin, get parameters, and init connection to db
if not self._check_db_existence():
self.logger.error(f"Check of existence of database plugin incl connection check failed. Plugin not loaded")
return self.deinit()
return

# create db object
self._db = lib.db.Database("DatabaseAddOn", self.db_driver, self.connection_data)
if not self._db.api_initialized:
self.logger.error("Initialization of database API failed")
return self.deinit()
return
self.logger.debug("Initialization of database API successful")

# check initialization of db
if not self._initialize_db():
self.logger.error("Connection to database failed")
return self.deinit()
return
self._db.close()

# check db connection settings
Expand Down Expand Up @@ -750,11 +749,8 @@ def update_item(self, item, caller=None, source=None, dest=None):
if item in self._database_items():
# if not self.startup_finished:
# self.logger.info(f"Handling of 'onchange' is paused for startup. No updated will be processed.")
if self.suspended:
self.logger.info(f"Plugin is suspended. No updated will be processed.")
else:
self.logger.debug(f" Updated Item {item.property.path} with value {item()} will be put to queue in approx. {self.onchange_delay_time}s resp. after startup.")
self.update_item_delay_deque.append([item, item(), int(time.time() + self.onchange_delay_time)])
self.logger.debug(f" Updated Item {item.property.path} with value {item()} will be put to queue in approx. {self.onchange_delay_time}s resp. after startup.")
self.update_item_delay_deque.append([item, item(), int(time.time() + self.onchange_delay_time)])

# handle admin items
elif self.has_iattr(item.conf, 'db_addon_admin'):
Expand Down Expand Up @@ -956,10 +952,6 @@ def _create_due_items() -> list:
if self.debug_log.execute:
self.logger.debug(f"execute_items called with {option=}")

if self.suspended:
self.logger.info(f"Plugin is suspended. No items will be calculated.")
return

suspended_items = self._suspended_items()
if len(suspended_items) > 0:
self.logger.info(f"{len(suspended_items)} are suspended and will not be calculated.")
Expand Down Expand Up @@ -1537,28 +1529,6 @@ def fetch_raw(self, query: str, params: dict = None) -> Union[list, None]:

return self._fetchall(query, params)

def suspend(self, state: bool = False) -> bool:
"""
Will pause value evaluation of plugin

"""

if state:
self.logger.info("Plugin is set to 'suspended'. Queries to database will not be made until suspension is cleared.")
self.suspended = True
self._clear_queue()
else:
self.logger.info("Plugin suspension cleared. Queries to database will be resumed.")
self.suspended = False

# write back value to item, if one exists
for item in self.get_item_list('db_addon', 'admin'):
item_config = self.get_item_config(item)
if item_config['db_addon_fct'] == 'suspend':
item(self.suspended, self.get_shortname())

return self.suspended

##############################################
# Calculation methods / Using Item Object
##############################################
Expand Down
124 changes: 33 additions & 91 deletions modbus_tcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@
AttrObjectType = 'modBusObjectType'
AttrDirection = 'modBusDirection'


class modbus_tcp(SmartPlugin):
"""
This class provides a Plugin for SmarthomeNG to read and or write to modbus
devices.
"""

PLUGIN_VERSION = '1.0.12'
PLUGIN_VERSION = '1.0.13'

def __init__(self, sh, *args, **kwargs):
"""
Expand Down Expand Up @@ -85,9 +86,11 @@ def __init__(self, sh, *args, **kwargs):
if not (self._cycle or self._crontab):
self.logger.error(f"{self.get_fullname()}: no update cycle or crontab set. Modbus will not be queried automatically")

self._slaveUnit = int(self.get_parameter_value('slaveUnit'))
self._slaveUnit = self.get_parameter_value('slaveUnit')
self._slaveUnitRegisterDependend = False

self._pause_item_path = self.get_parameter_value('pause_item')

self._sh = sh
self._regToRead = {}
self._regToWrite = {}
Expand All @@ -99,76 +102,32 @@ def __init__(self, sh, *args, **kwargs):

self.init_webinterface(WebInterface)

return

def run(self):
"""
Run method for the plugin
"""
self.logger.debug(f"Plugin '{self.get_fullname()}': run method called")
if self.alive:
return

self.alive = True
self.set_suspend(by='run()')


if self._cycle or self._crontab:
self.error_count = 0 # Initialize error count
if not self.suspended:
self._create_cyclic_scheduler()
self.error_count = 0 # Initialize error count
self.scheduler_add('poll_device_' + self._host, self.poll_device, cycle=self._cycle, cron=self._crontab, prio=5)
self.logger.debug(f"Plugin '{self.get_fullname()}': run method finished ")

def _create_cyclic_scheduler(self):
self.scheduler_add('poll_device_' + self._host, self.poll_device, cycle=self._cycle, cron=self._crontab, prio=5)

def _remove_cyclic_scheduler(self):
self.scheduler_remove('poll_device_' + self._host)

def stop(self):
"""
Stop method for the plugin
"""
self.alive = False
self.logger.debug(f"Plugin '{self.get_fullname()}': stop method called")
self._remove_cyclic_scheduler()
self.scheduler_remove('poll_device_' + self._host)
self._Mclient.close()
self.connected = False
self.logger.debug(f"Plugin '{self.get_fullname()}': stop method finished")

# sh.plugins.return_plugin('pluginName').suspend()
def set_suspend(self, suspend_active=None, by=None):
"""
enable / disable suspend mode: open/close connections, schedulers
"""

if suspend_active is None:
if self._suspend_item is not None:
# if no parameter set, try to use item setting
suspend_active = bool(self._suspend_item())
else:
# if not available, default to "resume" (non-breaking default)
suspend_active = False

# print debug logging
if suspend_active:
msg = 'Suspend mode enabled'
else:
msg = 'Suspend mode disabled'
if by:
msg += f' (set by {by})'
self.logger.debug(msg)

# activate selected mode, use smartplugin methods
if suspend_active:
self.suspend(by)
else:
self.resume(by)

if suspend_active:
self._remove_cyclic_scheduler()
else:
self._create_cyclic_scheduler()


def parse_item(self, item):
"""
Default plugin parse_item method. Is called when the plugin is initialized.
Expand All @@ -178,10 +137,10 @@ def parse_item(self, item):
:param item: The item to process.
"""

# check for suspend item
if item.property.path == self._suspend_item_path:
self.logger.debug(f'suspend item {item.property.path} registered')
self._suspend_item = item
# check for pause item
if item.property.path == self._pause_item_path:
self.logger.debug(f'pause item {item.property.path} registered')
self._pause_item = item
self.add_item(item, updating=True)
return self.update_item

Expand All @@ -207,7 +166,7 @@ def parse_item(self, item):
if self.has_iattr(item.conf, AttrObjectType):
objectType = self.get_iattr_value(item.conf, AttrObjectType)

reg = str(objectType) # dictionary key: objectType.regAddr.slaveUnit // HoldingRegister.528.1
reg = str(objectType) # dictionary key: objectType.regAddr.slaveUnit // HoldingRegister.528.1
reg += '.'
reg += str(regAddr)
reg += '.'
Expand Down Expand Up @@ -274,7 +233,7 @@ def poll_device(self):
changes on it's own, but has to be polled to get the actual status.
It is called by the scheduler which is set within run() method.
"""
if self.suspended:
if not self.alive:
return

with self.lock:
Expand Down Expand Up @@ -302,7 +261,6 @@ def poll_device(self):
try:
for reg, regPara in self._regToRead.items():
with self.lock:
regAddr = regPara['regAddr']
value = self.__read_Registers(regPara)
# self.logger.debug(f"value read: {value} type: {type(value)}")
if value is not None:
Expand Down Expand Up @@ -330,8 +288,6 @@ def poll_device(self):
except Exception as e:
self.logger.error(f"something went wrong in the poll_device function: {e}")


# called each time an item changes.
def update_item(self, item, caller=None, source=None, dest=None):
"""
Item has been updated
Expand All @@ -349,21 +305,16 @@ def update_item(self, item, caller=None, source=None, dest=None):
slaveUnit = self._slaveUnit
dataDirection = 'read'

# check for suspend item
if item is self._suspend_item:
# check for pause item
if item is self._pause_item:
if caller != self.get_shortname():
self.logger.debug(f'Suspend item changed to {item()}')
self.set_suspend(item(), by=f'suspend item {item.property.path}')
self.logger.debug(f'pause item changed to {item()}')
if item() and self.alive:
self.stop()
elif not item() and not self.alive:
self.run()
return

if self.suspended:
if self.suspend_log_update is None or self.suspend_log_update is False: # debug - Nachricht nur 1x ausgeben
self.logger.info('Plugin is suspended, data will not be written')
self.suspend_log_update = True
return
else:
self.suspend_log_update = False

if caller == self.get_fullname():
# self.logger.debug(f'item was changed by the plugin itself - caller:{caller} source:{source} dest:{dest}')
return
Expand All @@ -389,7 +340,7 @@ def update_item(self, item, caller=None, source=None, dest=None):
# else:
# self.logger.debug(f'update_item:{item} default modBusObjectTyp: {objectType}')

reg = str(objectType) # Dict-key: HoldingRegister.528.1 *** objectType.regAddr.slaveUnit ***
reg = str(objectType) # Dict-key: HoldingRegister.528.1 *** objectType.regAddr.slaveUnit ***
reg += '.'
reg += str(regAddr)
reg += '.'
Expand Down Expand Up @@ -417,8 +368,6 @@ def update_item(self, item, caller=None, source=None, dest=None):
self.connected = False
return

startTime = datetime.now()
regCount = 0
try:
self.__write_Registers(regPara, item())
except Exception as e:
Expand All @@ -431,19 +380,13 @@ def __write_Registers(self, regPara, value):
bo = regPara['byteOrder']
wo = regPara['wordOrder']
dataTypeStr = regPara['dataType']
dataType = ''.join(filter(str.isalpha, dataTypeStr)) # vom dataType die Ziffen entfernen z.B. uint16 = uint
registerCount = 0 # Anzahl der zu schreibenden Register (Words)
dataType = ''.join(filter(str.isalpha, dataTypeStr)) # vom dataType die Ziffen entfernen z.B. uint16 = uint

try:
bits = int(''.join(filter(str.isdigit, dataTypeStr))) # bit-Zahl aus aus dataType z.B. uint16 = 16
bits = int(''.join(filter(str.isdigit, dataTypeStr))) # bit-Zahl aus aus dataType z.B. uint16 = 16
except:
bits = 16

if dataType.lower() == 'string':
registerCount = int(bits / 2) # bei string: bits = bytes !! string16 -> 16Byte - 8 registerCount
else:
registerCount = int(bits / 16)

if regPara['factor'] != 1:
# self.logger.debug(f"value {value} divided by: {regPara['factor']}")
value = value * (1 / regPara['factor'])
Expand Down Expand Up @@ -480,11 +423,11 @@ def __write_Registers(self, regPara, value):
builder.add_string(value)
elif dataType.lower() == 'bit':
if objectType == 'Coil' or objectType == 'DiscreteInput':
if not isinstance(value, bool): # test is boolean
if not isinstance(value, bool): # test is boolean
self.logger.error(f"Value is not boolean: {value}")
return
else:
if set(value).issubset({'0', '1'}) and bool(value): # test is bit-string '00110101'
if set(value).issubset({'0', '1'}) and bool(value): # test is bit-string '00110101'
builder.add_bits(value)
else:
self.logger.error(f"Value is not a bitstring: {value}")
Expand Down Expand Up @@ -541,13 +484,13 @@ def __read_Registers(self, regPara):
bits = 16

if dataType.lower() == 'string':
registerCount = int(bits / 2) # bei string: bits = bytes !! string16 -> 16Byte - 8 registerCount
registerCount = int(bits / 2) # bei string: bits = bytes !! string16 -> 16Byte - 8 registerCount
else:
registerCount = int(bits / 16)

if self.connected == False:
if not self.connected:
self.logger.error(f"not connected to {self._host}:{self._port}")
return None
return

# self.logger.debug(f"read {objectType}.{address}.{slaveUnit} (address.slaveUnit) regCount:{registerCount}")
if objectType == 'Coil':
Expand All @@ -560,11 +503,11 @@ def __read_Registers(self, regPara):
result = self._Mclient.read_holding_registers(address, registerCount, slave=slaveUnit)
else:
self.logger.error(f"{AttrObjectType} not supported: {objectType}")
return None
return

if result.isError():
self.logger.error(f"read error: {result} {objectType}.{address}.{slaveUnit} (address.slaveUnit) regCount:{registerCount}")
return None
return

if objectType == 'Coil':
value = result.bits[0]
Expand Down Expand Up @@ -615,4 +558,3 @@ def __read_Registers(self, regPara):
return decoder.decode_bits()
else:
self.logger.error(f"Number of bits or datatype not supported : {dataTypeStr}")
return None
Loading
Loading