Skip to content

Commit

Permalink
Merge pull request #942 from sisamiwe/dbaddon_dev1
Browse files Browse the repository at this point in the history
DB_ADDON: Bugfix
  • Loading branch information
Morg42 authored Jul 6, 2024
2 parents fe63d0f + 5d28b1a commit 75f3a0f
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 257 deletions.
132 changes: 41 additions & 91 deletions db_addon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import time
import re
import queue
import threading
import logging
import pickle
import operator
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -62,7 +60,7 @@ class DatabaseAddOn(SmartPlugin):
Main class of the Plugin. Does all plugin specific stuff and provides the update functions for the items
"""

PLUGIN_VERSION = '1.2.8'
PLUGIN_VERSION = '1.2.9'

def __init__(self, sh):
"""
Expand All @@ -80,17 +78,15 @@ def __init__(self, sh):
self.plugins = Plugins.get_instance()

# define cache dicts
self.pickle_data_validity_time = 600 # seconds after which the data saved in pickle are not valid anymore
self.current_values = {} # Dict to hold min and max value of current day / week / month / year for items
self.previous_values = {} # Dict to hold value of end of last day / week / month / year for items
self.item_cache = {} # Dict to hold item_id, oldest_log_ts and oldest_entry for items
self.value_list_raw_data = {}
self.pickle_data_validity_time = 600 # seconds after which the data saved in pickle are not valid anymore
self.current_values = {} # Dict to hold min and max value of current day / week / month / year for items
self.previous_values = {} # Dict to hold value of end of last day / week / month / year for items
self.item_cache = {} # Dict to hold item_id, oldest_log_ts and oldest_entry for items
self.value_list_raw_data = {} # List to hold raw data

# define variables for database, database connection, working queue and status
self.item_queue = queue.Queue() # Queue containing all to be executed items
self.update_item_delay_deque = deque() # Deque for delay working of updated item values
# ToDo: Check if still needed
self.queue_consumer_thread = None # Queue consumer thread
self._db_plugin = None # object if database plugin
self._db = None # object of database
self.connection_data = None # connection data list of database
Expand All @@ -100,7 +96,8 @@ def __init__(self, sh):
self.last_connect_time = 0 # mechanism for limiting db connection requests
self.alive = None # Is plugin alive?
self.active_queue_item: str = '-' # String holding item path of currently executed item
self.onchange_delay_time = 30
self.onchange_delay_time = 30 # delay time in seconds between change of database item start of reevaluation of db_addon item
self.database_item_list = [] # list of needed database items

# define default mysql settings
self.default_connect_timeout = 60
Expand Down Expand Up @@ -168,25 +165,15 @@ def run(self):
# update database_items in item config, where path was given
self._update_database_items()

# create list if all relevant database items
self._create_list_of_relevant_database_items()

# set plugin to alive
self.alive = True

# work item queue
self.work_item_queue()

# ToDo: Check if still needed
"""
try:
self._queue_consumer_thread_startup()
except Exception as e:
self.logger.warning(f"During working item queue Exception '{e}' occurred.")
self.logger.debug(e, exc_info=True)
# self.logger.error("Thread for working item queue died. De-init plugin.")
# self.deinit()
self.logger.error("Suspend Plugin and clear Item-Queue.")
self.suspend(True)
"""

def stop(self):
"""
Stop method for the plugin
Expand All @@ -200,9 +187,6 @@ def stop(self):
self._db.close()
self.save_cache_data()

# ToDo: Check if still needed
# self._queue_consumer_thread_shutdown()

def parse_item(self, item: Item):
"""
Default plugin parse_item method. Is called when the plugin is initialized.
Expand Down Expand Up @@ -525,23 +509,6 @@ def get_database_item() -> Item:

return None, None

def has_db_addon_item() -> bool:
"""Returns item from shNG config which is item with db_addon attribut valid for database item"""

for child in item.return_children():
if check_db_addon_fct(child):
return True

for child_child in child.return_children():
if check_db_addon_fct(child_child):
return True

for child_child_child in child_child.return_children():
if check_db_addon_fct(child_child_child):
return True

return False

def check_db_addon_fct(check_item) -> bool:
"""
Check if item has db_addon_fct and is onchange
Expand Down Expand Up @@ -625,7 +592,6 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte

# read item_attribute_dict aus item_attributes_master
item_attribute_dict = ITEM_ATTRIBUTES['db_addon_fct'].get(db_addon_fct)
self.logger.debug(f"{db_addon_fct}: {item_attribute_dict=}")

# get query parameters from db_addon_fct or db_addon_params
if item_attribute_dict['params']:
Expand Down Expand Up @@ -682,11 +648,9 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte
if self.debug_log.parse:
self.logger.debug(f"Item={item.property.path} added with db_addon_fct={db_addon_fct} and database_item={database_item}")

# add type (onchange or ondemand) to item dict
item_config_data_dict.update({'on': item_attribute_dict['on']})

# add cycle for item groups
cycle = item_attribute_dict['calc']
cycle = item_attribute_dict['cycle']
on = 'demand'
if cycle == 'group':
cycle = item_config_data_dict['query_params'].get('group')
if not cycle:
Expand All @@ -695,13 +659,19 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte
elif cycle == 'timeframe':
cycle = item_config_data_dict['query_params'].get('timeframe')
cycle = f"{timeframe_to_updatecyle(cycle)}"
elif cycle == 'None':
cycle = None
item_config_data_dict.update({'cycle': cycle})
elif not cycle:
on = 'change'
item_config_data_dict.update({'cycle': cycle, 'on': on})

# do logging
if self.debug_log.parse:
self.logger.debug(f"Item '{item.property.path}' added to be run {item_config_data_dict['cycle']}.")
if cycle:
self.logger.debug(f"Item '{item.property.path}' added to be run {item_config_data_dict['cycle']}.")
else:
self.logger.debug(f"Item '{item.property.path}' added but will not be run cyclic.")

if on == 'change':
self.logger.debug(f"Item '{item.property.path}' added and will be run on-change of {database_item}.")

# create item config for item to be run on startup
if db_addon_startup or item_attribute_dict['cat'] == 'gen':
Expand All @@ -710,6 +680,8 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte
item_config_data_dict.update({'startup': False})

# add item to plugin item dict
if self.debug_log.parse:
self.logger.debug(f"Item '{item.property.path}' completely parsed: {item_config_data_dict=}.")
self.add_item(item, config_data_dict=item_config_data_dict)

# handle all items with db_addon_info
Expand All @@ -725,11 +697,8 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte
self.add_item(item, config_data_dict={'db_addon': 'admin', 'db_addon_fct': f"admin_{self.get_iattr_value(item.conf, 'db_addon_admin').lower()}", 'database_item': None})
return self.update_item

# Reference to 'update_item' für alle Items mit Attribut 'database', um die on_change Items zu berechnen
elif self.has_iattr(item.conf, self.item_attribute_search_str) and has_db_addon_item():
if self.debug_log.parse:
self.logger.debug(f"reference to update_item for item={item.property.path} will be set due to onchange")
self.add_item(item, config_data_dict={'db_addon': 'database'})
# Reference to 'update_item' for all database items to trigger calculation of on-change items
elif self.has_iattr(item.conf, self.item_attribute_search_str):
return self.update_item

def update_item(self, item, caller=None, source=None, dest=None):
Expand All @@ -747,8 +716,6 @@ def update_item(self, item, caller=None, source=None, dest=None):
if self.alive and caller != self.get_shortname():
# handle database items
if item in self._database_items():
# if not self.startup_finished:
# self.logger.info(f"Handling of 'onchange' is paused for startup. No updated will be processed.")
self.logger.debug(f" Updated Item {item.property.path} with value {item()} will be put to queue in approx. {self.onchange_delay_time}s resp. after startup.")
self.update_item_delay_deque.append([item, item(), int(time.time() + self.onchange_delay_time)])

Expand Down Expand Up @@ -1274,7 +1241,7 @@ def _update_database_items(self) -> None:
item_config.update({'startup': True})

def _suspend_item_calculation(self, item: Union[str, Item], suspended: bool = False) -> Union[bool, None]:
"""suspend calculation od decicated item"""
"""suspend calculation od dedicated item"""
if isinstance(item, str):
item = self.items.return_item(item)

Expand All @@ -1285,6 +1252,16 @@ def _suspend_item_calculation(self, item: Union[str, Item], suspended: bool = Fa
item_config['suspended'] = suspended
return suspended

def _create_list_of_relevant_database_items(self):
"""creates list of all relevant database items for further reference"""
_database_items = set()
for item in self.get_item_list('database_item'):
item_config = self.get_item_config(item)
database_item = item_config.get('database_item')
if database_item is not None:
_database_items.add(database_item)
self.database_item_list = list(_database_items)

@property
def log_level(self) -> int:
return self.logger.getEffectiveLevel()
Expand Down Expand Up @@ -1359,7 +1336,7 @@ def _info_items(self) -> list:
return self.get_item_list('db_addon', 'info')

def _database_items(self) -> list:
return self.get_item_list('db_addon', 'database')
return self.database_item_list

def _database_item_path_items(self) -> list:
return self.get_item_list('database_item_path', True)
Expand Down Expand Up @@ -2429,33 +2406,6 @@ def _clear_queue(self) -> None:
self.logger.info(f"Working queue will be cleared. Calculation run will end.")
self.item_queue.queue.clear()

# ToDo: Check if still needed
def _queue_consumer_thread_startup(self):
"""Start a thread to work item queue"""

self.logger = logging.getLogger(__name__)
_name = 'plugins.' + self.get_fullname() + '.work_item_queue'

try:
self.queue_consumer_thread = threading.Thread(target=self.work_item_queue, name=_name, daemon=False)
self.queue_consumer_thread.start()
self.logger.debug("Thread for 'queue_consumer_thread' has been started")
except threading.ThreadError:
self.logger.error("Unable to launch thread for 'queue_consumer_thread'.")
self.queue_consumer_thread = None

# ToDo: Check if still needed
def _queue_consumer_thread_shutdown(self):
"""Shut down the thread to work item queue"""

if self.queue_consumer_thread:
self.queue_consumer_thread.join()
if self.queue_consumer_thread.is_alive():
self.logger.error("Unable to shut down 'queue_consumer_thread' thread")
else:
self.logger.info("Thread 'queue_consumer_thread' has been shut down.")
self.queue_consumer_thread = None

def _get_start_end_as_timestamp(self, timeframe: str, start: Union[int, str, None], end: Union[int, str, None]) -> tuple:
"""
Provides start and end as timestamp in microseconds from timeframe with start and end
Expand Down Expand Up @@ -2605,7 +2555,7 @@ def _query_log_timestamp(self, func: str, item_id: int, ts_start: int, ts_end: i
'last': 'LIMIT 1 ',
}

_where = "item_id = :item_id AND time < :ts_end " if func == "next" else "item_id = :item_id AND time BETWEEN :ts_start AND :ts_end "
_where = "item_id = :item_id AND time <= :ts_start " if func == "next" else "item_id = :item_id AND time BETWEEN :ts_start AND :ts_end "

_db_table = 'log '

Expand Down Expand Up @@ -2657,7 +2607,7 @@ def _query_log_timestamp(self, func: str, item_id: int, ts_start: int, ts_end: i
# set params
params = {'item_id': item_id, 'ts_start': ts_start, 'ts_end': ts_end}
if func == "next":
params.pop('ts_start', None)
params.pop('ts_end', None)

# assemble query
query = f"SELECT {_select[func]}FROM {_db_table}WHERE {_where}{_group_by.get(group, '')}{_order.get(func, '')}{_limit.get(func, '')}{_table_alias.get(func, '')}{_group_by.get(group2, '')}".strip()
Expand Down
Loading

0 comments on commit 75f3a0f

Please sign in to comment.