From d233bdc99ef00f694a375e6709766cb485aa7ac1 Mon Sep 17 00:00:00 2001 From: "hongjin.zhou" Date: Mon, 22 May 2023 11:23:06 +0200 Subject: [PATCH] Extend DLTBroker to handle messages from dlt file - Add main loop to dispatch newly arrived dlt message from dlt log at runtime - Add DLTFileSpinner to handle the main loop of dispatching messages - Because of high commonality between DLTFileSpinner & DLTMessageHandler, a common base class is derived - Extend DLTBroker to use DLTFileSpinner for dispatching messages from dlt log, instead of using DLTMessageHandler for dlt daemon - Add unit tests for the newly added DLTFileSpinner, main loop to iterate dlt file, and new DLTBroker usage with DLTFileSpinner --- dlt/dlt.py | 38 ++- dlt/dlt_broker.py | 93 ++++-- dlt/dlt_broker_handlers.py | 196 +++++++++--- tests/dlt_broker_from_file_spinner_test.py | 197 ++++++++++++ tests/dlt_broker_time_test.py | 2 +- tests/dlt_file_spinner_unit_test.py | 300 ++++++++++++++++++ ...main_loop_by_reading_dlt_file_unit_test.py | 148 +++++++++ ...lt_main_loop_with_dlt_client_unit_test.py} | 2 +- tests/utils.py | 13 + 9 files changed, 920 insertions(+), 69 deletions(-) create mode 100644 tests/dlt_broker_from_file_spinner_test.py create mode 100644 tests/dlt_file_spinner_unit_test.py create mode 100644 tests/dlt_main_loop_by_reading_dlt_file_unit_test.py rename tests/{dlt_main_loop_unit_test.py => dlt_main_loop_with_dlt_client_unit_test.py} (98%) diff --git a/dlt/dlt.py b/dlt/dlt.py index 2bc0a9d..a15a070 100644 --- a/dlt/dlt.py +++ b/dlt/dlt.py @@ -9,6 +9,7 @@ import struct import time import threading +import multiprocessing from dlt.core import ( cDLTFilter, @@ -613,7 +614,10 @@ def __init__(self, **kwords): self.indexed = False self.end = False self.live_run = kwords.pop("is_live", False) + # Stop event for threading usage in caller self.stop_reading = threading.Event() + # Stop event for process usage in caller + self.stop_reading_proc = multiprocessing.Event() def __repr__(self): # pylint: disable=bad-continuation @@ -764,7 +768,7 @@ def __getitem__(self, index): def _open_file(self): """Open the configured file for processing""" file_opened = False - while not self.stop_reading.is_set(): + while not self.stop_reading.is_set() and not self.stop_reading_proc.is_set(): if dltlib.dlt_file_open(ctypes.byref(self), self.filename, self.verbose) >= DLT_RETURN_OK: file_opened = True break @@ -801,7 +805,9 @@ def __iter__(self): # pylint: disable=too-many-branches self._open_file() found_data = False - while not self.stop_reading.is_set() or corruption_check_try: # pylint: disable=too-many-nested-blocks + while ( + not self.stop_reading.is_set() and not self.stop_reading_proc.is_set() + ) or corruption_check_try: # pylint: disable=too-many-nested-blocks os_stat = os.stat(self.filename) mtime = os_stat.st_mtime @@ -1039,6 +1045,34 @@ def client_loop(self): dltlib.dlt_client_main_loop(ctypes.byref(self), None, self.verbose) +def py_dlt_file_main_loop(dlt_reader, limit=None, callback=None): + """Main loop to read dlt messages from dlt file.""" + try: + for msg in dlt_reader: + logger.debug( + "Message from position %d and counter %d: %s", dlt_reader.file_position, dlt_reader.counter, msg + ) + + # send the message to the callback and check whether we + # need to continue + if callback and not callback(msg): + logger.debug("callback returned 'False'. Stopping main loop") + return False + + if limit: + limit -= 1 + if limit == 0: + break + except IOError as err: + # If the dlt file is empty, main_loop should not break, so it returns True + if str(err) == DLT_EMPTY_FILE_ERROR: + logger.debug("Dlt file is empty now. Wait until content is written") + return True + raise err + + return True + + # pylint: disable=too-many-arguments,too-many-return-statements,too-many-branches def py_dlt_client_main_loop(client, limit=None, verbose=0, dumpfile=None, callback=None): """Reimplementation of dlt_client.c:dlt_client_main_loop() in order to handle callback diff --git a/dlt/dlt_broker.py b/dlt/dlt_broker.py index 13b2300..c7b7669 100644 --- a/dlt/dlt_broker.py +++ b/dlt/dlt_broker.py @@ -11,7 +11,9 @@ DLT_DAEMON_TCP_PORT, DLTContextHandler, DLTFilterAckMessageHandler, + DLTMessageDispatcherBase, DLTMessageHandler, + DLTFileSpinner, DLTTimeValue, ) @@ -39,7 +41,7 @@ class DLTBroker(object): def __init__( self, - ip_address, + ip_address=None, port=DLT_DAEMON_TCP_PORT, use_proxy=False, enable_dlt_time=False, @@ -50,8 +52,10 @@ def __init__( ): """Initialize the DLT Broker - :param str ip_address: IP address of the DLT Daemon. Defaults to TCP connection, unless a multicast address is - used. In that case an UDP multicast connection will be used + :param str | None ip_address: IP address of the DLT Daemon. + If None, then dlt does not come with any ip listening, in other words, it comes from dlt log directly; + Else, dlt comes from listening to some ip address. Defaults to TCP connection, + unless a multicast address is used. In that case an UDP multicast connection will be used :param str post: Port of the DLT Daemon :param bool use_proxy: Ignored - compatibility option :param bool enable_dlt_time: Record the latest dlt message timestamp if enabled. @@ -82,33 +86,59 @@ def __init__( self.filter_ack_queue = None self.filter_ack_msg_handler = None - kwargs["ip_address"] = ip_address - kwargs["port"] = port - kwargs["timeout"] = kwargs.get("timeout", DLT_CLIENT_TIMEOUT) - self.msg_handler = DLTMessageHandler( + self.msg_handler = self.create_dlt_message_dispather(ip_address, port, kwargs) + + self.context_handler = DLTContextHandler(self.filter_queue, self.message_queue) + + self._ip_address = ip_address + self._port = port + self._filename = kwargs.get("filename") + + def create_dlt_message_dispather(self, ip_address, port, client_cfg) -> DLTMessageDispatcherBase: + if ip_address: + # If ip_address is given, then messages are retrieved from dlt client at run-time + return self._create_dlt_message_handler(ip_address, port, client_cfg) + else: + # If not ip_address is given, then messages are retrieved from the given filename + # The logs are written to the given filename from another process + return self._create_dlt_file_spinner(client_cfg.get("filename")) + + def _create_dlt_message_handler(self, ip_address, port, client_cfg): + client_cfg["ip_address"] = ip_address + client_cfg["port"] = port + client_cfg["timeout"] = client_cfg.get("timeout", DLT_CLIENT_TIMEOUT) + return DLTMessageHandler( self.filter_queue, self.message_queue, self.mp_stop_flag, - kwargs, + client_cfg, dlt_time_value=self._dlt_time_value, filter_ack_queue=self.filter_ack_queue, ) - self.context_handler = DLTContextHandler(self.filter_queue, self.message_queue) - self._ip_address = ip_address - self._port = port - self._filename = kwargs.get("filename") + def _create_dlt_file_spinner(self, file_name): + return DLTFileSpinner( + self.filter_queue, + self.message_queue, + self.mp_stop_flag, + file_name, + dlt_time_value=self._dlt_time_value, + filter_ack_queue=self.filter_ack_queue, + ) def start(self): """DLTBroker main worker method""" - logger.debug( - "Starting DLTBroker with parameters: use_proxy=%s, ip_address=%s, port=%s, filename=%s, multicast=%s", - False, - self._ip_address, - self._port, - self._filename, - ip.ip_address(self._ip_address).is_multicast, - ) + if isinstance(self.msg_handler, DLTMessageHandler): + logger.debug( + "Starting DLTBroker with parameters: use_proxy=%s, ip_address=%s, port=%s, filename=%s, multicast=%s", + False, + self._ip_address, + self._port, + self._filename, + ip.ip_address(self._ip_address).is_multicast, + ) + else: + logger.debug("Starting DLTBroker by reading %s", self._filename) if self._dlt_time_value: logger.debug("Enable dlt time for DLTBroker.") @@ -166,12 +196,23 @@ def add_context(self, context_queue, filters=None): ) if not self._recv_filter_set_ack(context_filter_ack_queue, True): + failure_reason = "" + if isinstance(self.msg_handler, DLTMessageHandler): + failure_reason = ( + "It's possible that DLTClient client does not start." + " If it's a test case, it might be an error" + ) + elif isinstance(self.msg_handler, DLTFileSpinner): + failure_reason = ( + f"It's possible that dlt file {self._filename} is empty now. No big issue, " + f"filters would be added once after new message is available in dlt file" + ) logger.warning( ( - "Could not receive filter-setting messge ack. It's possible that DLTClient client does " - "not start. If it's a test case. It might be an error. For now, Run it anyway. " + "Could not receive filter-setting message ack. %s. For now, Run it anyway. " "filters: %s, queue_id: %s" ), + failure_reason, filters, id(context_queue), ) @@ -187,9 +228,11 @@ def remove_context(self, context_queue): def stop(self): """Stop the broker""" - logger.info("Stopping DLTContextHandler and DLTMessageHandler") + logger.info("Stopping DLTContextHandler and %s", type(self.msg_handler).__name__) + + self.msg_handler.break_blocking_main_loop() - logger.debug("Stop DLTMessageHandler") + logger.debug("Stop %s", type(self.msg_handler).__name__) self.mp_stop_flag.set() logger.debug("Stop DLTContextHandler") @@ -205,7 +248,7 @@ def stop(self): logger.debug("Waiting on DLTFilterAckMessageHandler ending") self.filter_ack_msg_handler.join() - logger.debug("Waiting on DLTMessageHandler ending") + logger.debug("Waiting on %s ending", type(self.msg_handler).__name__) if self.msg_handler.is_alive(): try: self.msg_handler.terminate() diff --git a/dlt/dlt_broker_handlers.py b/dlt/dlt_broker_handlers.py index 43e6a6e..4e0b59c 100644 --- a/dlt/dlt_broker_handlers.py +++ b/dlt/dlt_broker_handlers.py @@ -2,6 +2,7 @@ """Handlers are classes that assist dlt_broker in receiving and filtering DLT messages """ +from abc import ABC, abstractmethod from collections import defaultdict import ctypes import logging @@ -11,7 +12,14 @@ import time from threading import Thread, Event -from dlt.dlt import DLTClient, DLT_DAEMON_TCP_PORT, py_dlt_client_main_loop +from dlt.dlt import ( + DLTClient, + DLT_DAEMON_TCP_PORT, + cDLT_FILE_NOT_OPEN_ERROR, + load, + py_dlt_client_main_loop, + py_dlt_file_main_loop, +) DLT_CLIENT_TIMEOUT = 5 @@ -199,55 +207,31 @@ def stop(self): self.join() -class DLTMessageHandler(Process): - """Process receiving the DLT messages and handing them to DLTContextHandler +class DLTMessageDispatcherBase(ABC, Process): + """Base class for different dlt message dispatchers - This process instance is responsible for collecting messages from - the DLT daemon, tagging them with the correct queue id and placing - them on the messages queue. + The derived class could dispatch dlt messages from dlt-daemon, or from at-runtime written file. """ - def __init__( - self, filter_queue, message_queue, mp_stop_event, client_cfg, dlt_time_value=None, filter_ack_queue=None - ): + def __init__(self, filter_queue, message_queue, mp_stop_event, dlt_time_value=None, filter_ack_queue=None): + """ + Common members needed for common dispatching behavirours + + :param Queue filter_queue: contexts for filtering received dlt message + :param Queue message_queue: received dlt messages after filtering against context + :param multiprocessing.Event mp_stop_event: stop signal for this process + :param bool enable_dlt_time: Record the latest dlt message timestamp if enabled. + :param bool filter_ack_queue: acks for accepting contexts + """ + super().__init__() self.filter_queue = filter_queue self.filter_ack_queue = filter_ack_queue self.message_queue = message_queue self.mp_stop_flag = mp_stop_event - super(DLTMessageHandler, self).__init__() - # - dict mapping filters to queue ids self.context_map = defaultdict(list) - - self._ip_address = client_cfg["ip_address"] - self._port = client_cfg.get("port", DLT_DAEMON_TCP_PORT) - self._filename = client_cfg.get("filename") - self.verbose = client_cfg.get("verbose", 0) - self.timeout = client_cfg.get("timeout", DLT_CLIENT_TIMEOUT) - self._client = None - self.tracefile = None - self._dlt_time_value = dlt_time_value - def _client_connect(self): - """Create a new DLTClient - - :param int timeout: Time in seconds to wait for connection. - :returns: True if connected, False otherwise - :rtype: bool - """ - logger.debug( - "Creating DLTClient (ip_address='%s', Port='%s', logfile='%s')", - self._ip_address, - self._port, - self._filename, - ) - self._client = DLTClient(servIP=self._ip_address, port=self._port, verbose=self.verbose) - connected = self._client.connect(self.timeout) - if connected: - logger.info("DLTClient connected to %s", self._client.servIP) - return connected - def _process_filter_queue(self): """Check if filters have been added or need to be removed""" while not self.filter_queue.empty(): @@ -277,6 +261,11 @@ def _process_filter_queue(self): logger.debug("Send filter ack message: queue_ack_id: %s, add: %s", queue_ack_id, add) self.filter_ack_queue.put((queue_ack_id, add)) + @abstractmethod + def is_valid_message(self, message): + """Validate if the received message is a valid message according to AUTOSAR doc""" + return True + def handle(self, message): """Function to be called for every message received @@ -286,7 +275,7 @@ def handle(self, message): """ self._process_filter_queue() - if message and (message.apid != "" or message.ctid != ""): + if self.is_valid_message(message): # Dispatch the message msg_ctx = ((message.apid, message.ctid), (None, None), (message.apid, None), (None, message.ctid)) qids = ( @@ -306,6 +295,133 @@ def handle(self, message): return not self.mp_stop_flag.is_set() + @abstractmethod + def run(self) -> None: + pass + + def break_blocking_main_loop(self): + """All message dispatchers need a main loop to fetch dlt messages from source. + If it could constantly dispatch messages, then the main loop will not get into blocking state. + Only when no more message could not be dispatched, the main loop would get into blocking state. + + Not all message dispatchers need to implement this method""" + pass + + +class DLTFileSpinner(DLTMessageDispatcherBase): + """Process receiving the DLT messages and handing them to DLTContextHandler + + This process instance is responsible for collecting messages from + the at-runtime written dlt log, tagging them with the correct queue id and placing + them on the messages queue. + """ + + def __init__( + self, filter_queue, message_queue, mp_stop_event, file_name, dlt_time_value=None, filter_ack_queue=None + ): + super().__init__(filter_queue, message_queue, mp_stop_event, dlt_time_value, filter_ack_queue) + self.file_name = file_name + self.dlt_reader = load(filename=self.file_name, live_run=True) + + def is_valid_message(self, message): + """According to AUTOSAR doc, message with empty apid and empty ctid is still valid""" + return message is not None + + def run(self): + """DLTFileSpinner worker method""" + logger.info("Start to process dlt file %s", self.file_name) + # Even though dlt connector for ioc should only be instantiated after successful SerialConsole with fibex, + # the corner case of not-existing dlt file will still be handled here with max 5 retires + retries_for_non_existing_file = 5 + + while not self.mp_stop_flag.is_set(): + try: + logger.debug("py_dlt_file_main_loop") + res = py_dlt_file_main_loop(self.dlt_reader, callback=self.handle) + if res is False and not self.mp_stop_flag.is_set(): # main loop returned False + logger.error("Too many bad messages read from %s", self.file_name) + self.mp_stop_flag.set() + break + except KeyboardInterrupt: + logger.debug("main loop manually interrupted") + break + except IOError as err: + if str(err) == cDLT_FILE_NOT_OPEN_ERROR: + # Not every time of non-existing file, cDLTFile will report error + # Sometimes, it simply works through without issue. + # So, no unittest could be done for this error handling + if retries_for_non_existing_file == 0: + logger.error("After retries, dlt file %s still does not exist", self.file_name) + raise err + logger.warning( + "DLT file %s does not exist, will try %d times again", + self.file_name, + retries_for_non_existing_file, + ) + retries_for_non_existing_file = retries_for_non_existing_file - 1 + time.sleep(1) + else: + raise err + except Exception: # pylint: disable=broad-except + logger.exception("Exception during the DLT message receive") + + logger.debug("DLTFileSpinner starts to quit...") + if not self.dlt_reader.stop_reading_proc.is_set(): + self.dlt_reader.stop_reading_proc.set() + self.message_queue.close() + logger.info("DLTFileSpinner worker execution complete") + + def break_blocking_main_loop(self): + """A big user for DLTFileSpinner is IOC dlt, which does not have so many dlt messages as HU, + so it is quite easy for the main loop to get into blocking state, + at the moment that no more dlt messages could be dispatched. + """ + logger.debug("Stop iterating to file %s", self.file_name) + self.dlt_reader.stop_reading_proc.set() + + +class DLTMessageHandler(DLTMessageDispatcherBase): + """Process receiving the DLT messages and handing them to DLTContextHandler + + This process instance is responsible for collecting messages from + the DLT daemon, tagging them with the correct queue id and placing + them on the messages queue. + """ + + def __init__( + self, filter_queue, message_queue, mp_stop_event, client_cfg, dlt_time_value=None, filter_ack_queue=None + ): + super().__init__(filter_queue, message_queue, mp_stop_event, dlt_time_value, filter_ack_queue) + self._ip_address = client_cfg["ip_address"] + self._port = client_cfg.get("port", DLT_DAEMON_TCP_PORT) + self._filename = client_cfg.get("filename") + self.verbose = client_cfg.get("verbose", 0) + self.timeout = client_cfg.get("timeout", DLT_CLIENT_TIMEOUT) + self._client = None + self.tracefile = None + + def is_valid_message(self, message): + return message and (message.apid != "" or message.ctid != "") + + def _client_connect(self): + """Create a new DLTClient + + :param int timeout: Time in seconds to wait for connection. + :returns: True if connected, False otherwise + :rtype: bool + """ + logger.debug( + "Creating DLTClient (ip_address='%s', Port='%s', logfile='%s')", + self._ip_address, + self._port, + self._filename, + ) + self._client = DLTClient(servIP=self._ip_address, port=self._port, verbose=self.verbose) + connected = self._client.connect(self.timeout) + if connected: + logger.info("DLTClient connected to %s", self._client.servIP) + return connected + def run(self): """DLTMessageHandler worker method""" if self._filename is not None: diff --git a/tests/dlt_broker_from_file_spinner_test.py b/tests/dlt_broker_from_file_spinner_test.py new file mode 100644 index 0000000..7daa20c --- /dev/null +++ b/tests/dlt_broker_from_file_spinner_test.py @@ -0,0 +1,197 @@ +# Copyright (C) 2023. BMW Car IT GmbH. All rights reserved. +"""Test DLTBroker with message handler DLTFileSpinner""" +import os +import pytest +import tempfile +import time +import unittest +from unittest.mock import ANY, patch +from queue import Queue, Empty + +from dlt.dlt_broker import DLTBroker, logger +from tests.utils import ( + stream_multiple, + stream_with_params, + append_stream_to_file, + create_messages, + append_message_to_file, +) + + +class TestDLTBrokerFromDLTFileSpinnerWithNotExistingDLT(unittest.TestCase): + def setUp(self) -> None: + self.broker = None + _, self.dlt_file_name = tempfile.mkstemp(suffix=b".dlt") + + def tearDown(self) -> None: + if self.broker: + self.broker.stop() + if os.path.exists(self.dlt_file_name): + os.remove(self.dlt_file_name) + + def test_broker_with_not_existing_dlt_file(self): + """ + Test DLTBroker could work with not existing dlt file + + 1. prepare a file name which does not exist + 2. start dlt broker to dispatch messages from this not-existing file --> no error + 3. dlt broker could not add context successfully, but encounter a warning message + 4. no message could be dispatched from not existing file and throws out Queue.Empty exception + 5. dlt_time is 0.0, because it could not be reset according to the latest timestamp of messages + """ + # Remove the dlt file + os.remove(self.dlt_file_name) + # Start broker with non-existing dlt file + self.broker = DLTBroker( + filename=self.dlt_file_name, + enable_dlt_time=True, + enable_filter_set_ack=True, + ignore_filter_set_ack_timeout=True, + ) + self.broker.start() + # Add context should report warning message + queue = Queue(maxsize=0) + with patch.object(logger, "warning") as logger_mock: + self.broker.add_context(queue, filters=None) + logger_mock.assert_called_with(ANY, ANY, [(None, None)], id(queue)) + # Not existing dlt file should not throw any exception out + for _ in range(5): + with pytest.raises(Empty): + queue.get_nowait() + # dlt_time is not None, even though it is not reset with latest timestamp from messages + self.assertEqual(self.broker.dlt_time(), 0.0) + + def test_broker_with_later_created_dlt_file(self): + """ + Simulate a scenario: first dlt file does not exist, then dlt file is created and written with messages. + + 1. delete the dlt file + 2. start broker + 3. create the dlt file and write 1 sample message + Expectation: 1 message could be dispatched from broker + """ + # 1. delete the dlt file + os.remove(self.dlt_file_name) + # 2. Start broker with non-existing dlt file + self.broker = DLTBroker( + filename=self.dlt_file_name, + enable_dlt_time=True, + enable_filter_set_ack=True, + ignore_filter_set_ack_timeout=True, + ) + self.broker.start() + # Add context should report warning message + queue = Queue(maxsize=0) + self.broker.add_context(queue, filters=None) + # 3. Write 1 sample message to the dlt file + append_stream_to_file(stream_with_params, self.dlt_file_name) + # Expectation: 1 message could be dispatched from broker + time.sleep(0.5) + self.assertIsNotNone(queue.get_nowait()) + # If we try to dispatch for another time, exception Queue.Empty is thrown, + # because there is no new log from dlt file + with pytest.raises(Empty): + queue.get_nowait() + + +class TestDLTBrokerFromDLTFileSpinner(unittest.TestCase): + def setUp(self): + # Dlt file is created with empty content + _, self.dlt_file_name = tempfile.mkstemp(suffix=b".dlt") + self.dispatched_message_queue = Queue(maxsize=0) + # Instantiate DLTBroker without ignoring fileter ack timeout + self.broker = DLTBroker( + filename=self.dlt_file_name, + enable_dlt_time=True, + enable_filter_set_ack=True, + ignore_filter_set_ack_timeout=True, + ) + self.broker.start() + self.broker.add_context(self.dispatched_message_queue, filters=None) + + def tearDown(self): + self.broker.stop() + os.remove(self.dlt_file_name) + + def test_001_dispatch_from_empty_dlt_file(self): + """ + From empty file, no message could be dispatched from queue and raise Queue.Empty. + dlt_time is 0.0, because it could not be reset according to the latest timestamp of messages + """ + for _ in range(5): + with pytest.raises(Empty): + self.dispatched_message_queue.get_nowait() + self.assertEqual(self.broker.dlt_time(), 0.0) + + def test_002_dispatch_from_real_dlt_file(self): + """ + Test DltBroker dispatches from a run-time written dlt file + + With a running dlt broker: + 1. Write 2 sample messages to dlt file + 2. These two messages could be dispatched with the running dlt broker + With another try to dispatch, Queue.Empty is thrown, because no more logs could be read from dlt log; + dlt_time from dlt_broker is equal to the timestamp of 2nd message + 3. Append another 1 message to the same dlt file + 4. Total 3 messages could be dispatched with the dlt broker + With another try to dispatch, Queue.Empty is thrown, because no more logs could be read from dlt log; + dlt_time from dlt_broker is equal to the timestamp of 3rd message + """ + # 1. Write 2 sample messages to dlt file + append_stream_to_file(stream_multiple, self.dlt_file_name) + # 2. Dispatch 2 messages from dlt broker + time.sleep(0.1) + message_1 = self.dispatched_message_queue.get_nowait() + time.sleep(0.1) + message_2 = self.dispatched_message_queue.get_nowait() + self.assertNotEqual(message_1, message_2) + # If we try to dispatch for another time, exception Queue.Empty is thrown, + # because there is no new log from dlt file + with pytest.raises(Empty): + self.dispatched_message_queue.get_nowait() + # Validate dlt time from broker + self.assertEqual(self.broker.dlt_time(), message_2.storage_timestamp) + # 3. Append another 1 message to the same dlt file + append_stream_to_file(stream_with_params, self.dlt_file_name) + # 4. Total 3 messages could be dispatched with the dlt broker + time.sleep(0.1) + message_3 = self.dispatched_message_queue.get_nowait() + self.assertNotEqual(message_1, message_3) + self.assertNotEqual(message_2, message_3) + # If try to dispatch for another time, exception Queue.Empty is thrown, + # because there is no new log from dlt file + with pytest.raises(Empty): + self.dispatched_message_queue.get_nowait() + # Validate dlt time from broker + self.assertEqual(self.broker.dlt_time(), message_3.storage_timestamp) + + def test_003_dispatch_from_real_dlt_file(self): + """ + Test DltBroker dispatches apid==b"" and ctid==b"" message from a run-time written dlt file + + With a running dlt broker: + 1. Write apid==b"" and ctid==b"" message to dlt file + 2. This message could be dispatched with the running dlt broker + a. With another try to dispatch, Queue.Empty is thrown, because no more logs could be read from dlt log; + b. dlt_time from dlt_broker is equal to the timestamp of this message + c. the received message should have apid==b"" and ctid==b"" + """ + # 1. Write apid==b"" and ctid==b"" message to dlt file + # Construct a message with apid==b"" and ctid==b"" + message = create_messages(stream_with_params, from_file=True)[0] + message.extendedheader.apid = b"" + message.extendedheader.ctid = b"" + # Write this message into dlt file + append_message_to_file(message, self.dlt_file_name) + # 2. Dispatch from dlt broker + time.sleep(0.5) + message = self.dispatched_message_queue.get_nowait() + # If we try to dispatch for another time, exception Queue.Empty is thrown, + # because there is no new log from dlt file + with pytest.raises(Empty): + self.dispatched_message_queue.get_nowait() + # Validate dlt time from broker + self.assertEqual(self.broker.dlt_time(), message.storage_timestamp) + # Expectation: the received message should have apid==b"" and ctid==b"" + self.assertEqual("", message.apid) + self.assertEqual("", message.ctid) diff --git a/tests/dlt_broker_time_test.py b/tests/dlt_broker_time_test.py index 892ff09..c36cb72 100644 --- a/tests/dlt_broker_time_test.py +++ b/tests/dlt_broker_time_test.py @@ -217,7 +217,7 @@ def test_add_context_with_ack_warning(): broker.context_handler.register.assert_called() ack_mock.assert_called() - logger_mock.assert_called_with(ANY, [("APID", "CTID")], id(queue)) + logger_mock.assert_called_with(ANY, ANY, [("APID", "CTID")], id(queue)) finally: broker.context_handler = ori_context_handler diff --git a/tests/dlt_file_spinner_unit_test.py b/tests/dlt_file_spinner_unit_test.py new file mode 100644 index 0000000..0016426 --- /dev/null +++ b/tests/dlt_file_spinner_unit_test.py @@ -0,0 +1,300 @@ +# Copyright (C) 2023. BMW Car IT GmbH. All rights reserved. +import logging +from multiprocessing import Event, Queue +import os +import time +import tempfile +import unittest +from queue import Empty + +from dlt.dlt_broker_handlers import DLTFileSpinner +from tests.utils import ( + create_messages, + stream_multiple, + stream_with_params, + append_stream_to_file, + append_message_to_file, +) + +logger = logging.getLogger(__name__) + + +class TestDLTFileSpinner(unittest.TestCase): + def setUp(self): + self.filter_queue = Queue() + self.message_queue = Queue() + self.stop_event = Event() + # Dlt file is created with empty content + _, self.dlt_file_name = tempfile.mkstemp(suffix=b".dlt") + self.dlt_file_spinner = DLTFileSpinner( + self.filter_queue, self.message_queue, self.stop_event, self.dlt_file_name + ) + # dispatched_messages from DLTFileSpinner.message_queue + self.dispatched_messages = [] + + def tearDown(self): + if self.dlt_file_spinner.is_alive(): + self.dlt_file_spinner.break_blocking_main_loop() + self.stop_event.set() + self.dlt_file_spinner.join() + if os.path.exists(self.dlt_file_name): + os.remove(self.dlt_file_name) + + def test_init(self): + self.assertFalse(self.dlt_file_spinner.mp_stop_flag.is_set()) + self.assertFalse(self.dlt_file_spinner.is_alive()) + self.assertTrue(self.dlt_file_spinner.filter_queue.empty()) + self.assertTrue(self.dlt_file_spinner.message_queue.empty()) + + def test_run_basic_without_dlt_file(self): + # Delete the created dlt file + os.remove(self.dlt_file_name) + + self.assertFalse(self.dlt_file_spinner.is_alive()) + self.dlt_file_spinner.start() + self.assertTrue(self.dlt_file_spinner.is_alive()) + self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid()) + # DLT file does NOT exist + self.assertFalse(os.path.exists(self.dlt_file_spinner.file_name)) + + self.dlt_file_spinner.break_blocking_main_loop() + self.stop_event.set() + self.dlt_file_spinner.join() + self.assertFalse(self.dlt_file_spinner.is_alive()) + + def test_run_basic_with_empty_dlt_file(self): + self.assertFalse(self.dlt_file_spinner.is_alive()) + self.dlt_file_spinner.start() + self.assertTrue(self.dlt_file_spinner.is_alive()) + self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid()) + # dlt_reader is instantiated and keeps alive + self.assertTrue(os.path.exists(self.dlt_file_spinner.file_name)) + # Expect no dlt log is dispatched + time.sleep(2) + self.assertTrue(self.dlt_file_spinner.message_queue.empty()) + # First stop dlt reader, then stop DLTFileSpinner + self.dlt_file_spinner.break_blocking_main_loop() + self.stop_event.set() + self.dlt_file_spinner.join() + self.assertFalse(self.dlt_file_spinner.is_alive()) + + def test_handle_add_new_filter(self): + self.dlt_file_spinner.filter_queue.put(("queue_id", [("SYS", "JOUR")], True)) + time.sleep(0.01) + self.dlt_file_spinner.handle(None) + self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map) + self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id"]) + + def test_handle_remove_filter_single_entry(self): + self.dlt_file_spinner.filter_queue.put(("queue_id", [("SYS", "JOUR")], True)) + time.sleep(0.01) + self.dlt_file_spinner.handle(None) + self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map) + self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id"]) + + self.dlt_file_spinner.filter_queue.put(("queue_id", [("SYS", "JOUR")], False)) + time.sleep(0.01) + self.dlt_file_spinner.handle(None) + self.assertNotIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map) + + def test_handle_remove_filter_multiple_entries(self): + self.dlt_file_spinner.filter_queue.put(("queue_id1", [("SYS", "JOUR")], True)) + self.dlt_file_spinner.filter_queue.put(("queue_id2", [("SYS", "JOUR")], True)) + time.sleep(0.01) + self.dlt_file_spinner.handle(None) + self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map) + self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id1", "queue_id2"]) + + self.dlt_file_spinner.filter_queue.put(("queue_id1", [("SYS", "JOUR")], False)) + time.sleep(0.01) + self.dlt_file_spinner.handle(None) + self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map) + self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id2"]) + + def test_handle_multiple_similar_filters(self): + self.dlt_file_spinner.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True)) + self.dlt_file_spinner.filter_queue.put(("queue_id1", [("SYS", "JOUR")], True)) + time.sleep(0.01) + self.dlt_file_spinner.handle(None) + self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map) + self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id0", "queue_id1"]) + + def test_handle_multiple_different_filters(self): + self.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True)) + self.filter_queue.put(("queue_id1", [("DA1", "DC1")], True)) + time.sleep(0.01) + self.dlt_file_spinner.handle(None) + self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map) + self.assertIn(("DA1", "DC1"), self.dlt_file_spinner.context_map) + self.assertEqual(self.dlt_file_spinner.context_map[("SYS", "JOUR")], ["queue_id0"]) + self.assertEqual(self.dlt_file_spinner.context_map[("DA1", "DC1")], ["queue_id1"]) + + def test_handle_message_tag_and_distribute(self): + self.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True)) + self.filter_queue.put(("queue_id1", [("DA1", "DC1")], True)) + self.filter_queue.put(("queue_id2", [("SYS", None)], True)) + self.filter_queue.put(("queue_id3", [(None, "DC1")], True)) + self.filter_queue.put(("queue_id4", [(None, None)], True)) + time.sleep(0.01) + + # - simulate receiving of messages + for _ in range(10): + for message in create_messages(stream_multiple, from_file=True): + self.dlt_file_spinner.handle(message) + + self.assertIn(("SYS", "JOUR"), self.dlt_file_spinner.context_map) + self.assertIn(("DA1", "DC1"), self.dlt_file_spinner.context_map) + self.assertIn((None, None), self.dlt_file_spinner.context_map) + self.assertIn(("SYS", None), self.dlt_file_spinner.context_map) + self.assertIn((None, "DC1"), self.dlt_file_spinner.context_map) + try: + # 60 == 10 messages of each for SYS, JOUR and None combinations + + # 10 for (None,None) + messages = [self.message_queue.get(timeout=0.01) for _ in range(60)] + + # these queues should not get any messages from other queues + self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id0"]), 10) + self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id1"]), 10) + self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id2"]), 10) + self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id3"]), 10) + # this queue should get all messages + self.assertEqual(len([msg for qid, msg in messages if qid == "queue_id4"]), 20) + except Empty: + # - we should not get an Empty for at least 40 messages + self.fail() + + def _update_dispatch_messages_from_dlt_file_spinner(self): + for index in range(60): + try: + message = self.dlt_file_spinner.message_queue.get(timeout=0.01) + if not self.dispatched_messages or message[1] != self.dispatched_messages[-1][1]: + self.dispatched_messages.append(message) + except: # noqa: E722 + pass + + def test_run_with_writing_to_file(self): + """ + Test with real dlt file, which is written at runtime + + 1. set filter_queue properly, so that the handled messages could be added to message_queue later + 2. start DLTFileSpinner + At this moment, no messages are written to dlt file, so no messages in DLTFileSpinner.message_queue + 3. write 2 sample messages to dlt file + Expectation: we could dispatch 2 messages from DLTFileSpinner.message_queue + 5. stop DLTFileSpinner + """ + # 1. set filter_queue properly, so that the handled messages could be added to message_queue later + self.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True)) + self.filter_queue.put(("queue_id1", [("DA1", "DC1")], True)) + self.filter_queue.put(("queue_id2", [("SYS", None)], True)) + self.filter_queue.put(("queue_id3", [(None, "DC1")], True)) + self.filter_queue.put(("queue_id4", [(None, None)], True)) + time.sleep(0.01) + # 2. start DLTFileSpinner + self.assertFalse(self.dlt_file_spinner.is_alive()) + self.dlt_file_spinner.start() + self.assertTrue(self.dlt_file_spinner.is_alive()) + self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid()) + # dlt_reader is instantiated and keeps alive + self.assertTrue(os.path.exists(self.dlt_file_spinner.file_name)) + # With empty file content, no messages are dispatched to message_queue + time.sleep(2) + self.assertTrue(self.dlt_file_spinner.message_queue.empty()) + # 3. write 2 sample messages to dlt file + append_stream_to_file(stream_multiple, self.dlt_file_name) + # Expect the written dlt logs are dispatched to message_queue + self._update_dispatch_messages_from_dlt_file_spinner() + self.assertEqual(2, len(self.dispatched_messages)) + # 4. stop DLTFileSpinner + self.dlt_file_spinner.break_blocking_main_loop() + self.stop_event.set() + self.dlt_file_spinner.join() + self.assertFalse(self.dlt_file_spinner.is_alive()) + + def test_run_with_writing_to_file_twice(self): + """ + Test with real dlt file, which is written at runtime 2 times + + 1. set filter_queue properly, so that the handled messages could be added to message_queue later + 2. start DLTFileSpinner + 3. write 2 sample messages to dlt file + Expectation: we could dispatch 2 messages from DLTFileSpinner.message_queue + 4. append 1 sample message to dlt file + Expectation: we could dispatch 3 messages from DLTFileSpinner.message_queue + 5. stop DLTFileSpinner + """ + # 1. set filter_queue properly, so that the handled messages could be added to message_queue later + self.filter_queue.put(("queue_id0", [("SYS", "JOUR")], True)) + self.filter_queue.put(("queue_id1", [("DA1", "DC1")], True)) + self.filter_queue.put(("queue_id2", [("SYS", None)], True)) + self.filter_queue.put(("queue_id3", [(None, "DC1")], True)) + self.filter_queue.put(("queue_id4", [(None, None)], True)) + time.sleep(0.01) + # 2. start DLTFileSpinner + self.assertFalse(self.dlt_file_spinner.is_alive()) + self.dlt_file_spinner.start() + self.assertTrue(self.dlt_file_spinner.is_alive()) + self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid()) + # dlt_reader is instantiated and keeps alive + self.assertTrue(os.path.exists(self.dlt_file_spinner.file_name)) + # With empty file content, no messages are dispatched to message_queue + time.sleep(2) + self.assertTrue(self.dlt_file_spinner.message_queue.empty()) + # 3. write 2 sample messages to dlt file + append_stream_to_file(stream_multiple, self.dlt_file_name) + # Expect the written dlt logs are dispatched to message_queue + self._update_dispatch_messages_from_dlt_file_spinner() + self.assertEqual(2, len(self.dispatched_messages)) + # 4. append 1 sample message to dlt file + append_stream_to_file(stream_with_params, self.dlt_file_name) + self._update_dispatch_messages_from_dlt_file_spinner() + self.assertEqual(3, len(self.dispatched_messages)) + # 5. stop DLTFileSpinner + self.dlt_file_spinner.break_blocking_main_loop() + self.stop_event.set() + self.dlt_file_spinner.join() + self.assertFalse(self.dlt_file_spinner.is_alive()) + + def test_run_with_writing_empty_apid_ctid_to_file(self): + """ + Test with real dlt file, which contains message with apid=b"" and ctid=b"" + + 1. set filter_queue properly, so that the handled messages could be added to message_queue later + 2. start DLTFileSpinner + At this moment, no messages are written to dlt file, so no messages in DLTFileSpinner.message_queue + 3. write message with apid=b"" and ctid=b"" to dlt file + Expectation: we could dispatch 1 message from DLTFileSpinner.message_queue + and, apid==b"" and ctid==b"" + 5. stop DLTFileSpinner + """ + # 1. set filter_queue properly, so that the handled messages could be added to message_queue later + self.filter_queue.put(("queue_id0", [(None, None)], True)) + time.sleep(0.01) + # 2. start DLTFileSpinner + self.assertFalse(self.dlt_file_spinner.is_alive()) + self.dlt_file_spinner.start() + self.assertTrue(self.dlt_file_spinner.is_alive()) + self.assertNotEqual(self.dlt_file_spinner.pid, os.getpid()) + # dlt_reader is instantiated and keeps alive + self.assertTrue(os.path.exists(self.dlt_file_spinner.file_name)) + # With empty file content, no messages are dispatched to message_queue + time.sleep(2) + self.assertTrue(self.dlt_file_spinner.message_queue.empty()) + # 3. write a message to dlt file + # Construct a message with apid==b"" and ctid==b"" + message = create_messages(stream_with_params, from_file=True)[0] + message.extendedheader.apid = b"" + message.extendedheader.ctid = b"" + # Write this message into dlt file + append_message_to_file(message, self.dlt_file_name) + # Expect the written dlt logs are dispatched to message_queue + self._update_dispatch_messages_from_dlt_file_spinner() + self.assertEqual(1, len(self.dispatched_messages)) + # Expectation: the received message should have apid==b"" and ctid==b"" + self.assertEqual("", self.dispatched_messages[0][1].apid) + self.assertEqual("", self.dispatched_messages[0][1].ctid) + # 4. stop DLTFileSpinner + self.dlt_file_spinner.break_blocking_main_loop() + self.stop_event.set() + self.dlt_file_spinner.join() + self.assertFalse(self.dlt_file_spinner.is_alive()) diff --git a/tests/dlt_main_loop_by_reading_dlt_file_unit_test.py b/tests/dlt_main_loop_by_reading_dlt_file_unit_test.py new file mode 100644 index 0000000..43dd736 --- /dev/null +++ b/tests/dlt_main_loop_by_reading_dlt_file_unit_test.py @@ -0,0 +1,148 @@ +# Copyright (C) 2023. BMW Car IT GmbH. All rights reserved. +"""Basic unittests for the py_dlt_file_main_loop function""" +import os +import unittest +import tempfile +from threading import Thread +import time + +from dlt.dlt import cDLTFile, py_dlt_file_main_loop +from tests.utils import ( + append_stream_to_file, + stream_multiple, + stream_with_params, + create_messages, + append_message_to_file, +) + + +class TestMainLoopByReadingDltFile(unittest.TestCase): + def setUp(self): + # Empty content dlt file is created + _, self.dlt_file_name = tempfile.mkstemp(suffix=b".dlt") + self.dlt_reader = cDLTFile(filename=self.dlt_file_name, is_live=True, iterate_unblock_mode=False) + # message_queue to store the dispatched messages from main loop + self.message_queue = [] + # When callback() is called, then it is reset to True + self.callback_is_called = False + # With this variable, we could test different return value from callback() + # If callback() returns True, then main loop keeps going; otherwise, it breaks + self.callback_return_value = True + # Thread for main loop, which is instantiated in test case + self.main_loop = None + + def _callback_for_message(self, message): + self.callback_is_called = True + print("Called here") + if message: + self.message_queue.append(message) + return self.callback_return_value + + def _start_main_loop(self): + self.main_loop = Thread( + target=py_dlt_file_main_loop, + kwargs={"dlt_reader": self.dlt_reader, "callback": self._callback_for_message}, + ) + # self.main_loop.daemon = True + self.main_loop.start() + time.sleep(1) + + def tearDown(self): + if not self.dlt_reader.stop_reading_proc.is_set(): + self.dlt_reader.stop_reading_proc.set() + # After the stop of dlt_reader, main loop should be stopped automatically + if self.main_loop: + for _ in range(5): + if not self.main_loop.is_alive(): + break + time.sleep(0.1) + self.assertFalse(self.main_loop.is_alive()) + os.remove(self.dlt_file_name) + + def test_001_empty_dlt_file(self): + """When dlt file has empty content, then no message could be dispatched, and no return value from main loop""" + self._start_main_loop() + time.sleep(0.1) + # When file has empty content, callback() will not be called by any message + self.assertFalse(self.callback_is_called) + self.assertEqual(0, len(self.message_queue)) + + def test_002_first_write_then_read_dlt_file(self): + """ + Simulate a real dlt file case: first write to it, and then use main loop to read it + """ + # First write to dlt file without opening main loop + append_stream_to_file(stream_multiple, self.dlt_file_name) + time.sleep(0.1) + # Expectation: py_dlt_file_main_loop reads out the first batch messages to message_queue + self._start_main_loop() + time.sleep(0.1) + self.assertTrue(self.callback_is_called) + self.assertEqual(2, len(self.message_queue)) + + def test_003_first_read_then_write_dlt_file(self): + """ + Simulate a real dlt file case: first open main loop to read, then write to the file at opening main loop + """ + # First only main loop to read dlt file + self._start_main_loop() + # Then write to dlt file + append_stream_to_file(stream_multiple, self.dlt_file_name) + time.sleep(0.1) + # Expect the written logs could be dispatched by main loop + self.assertTrue(self.callback_is_called) + self.assertEqual(2, len(self.message_queue)) + + def test_004_read_2_writes(self): + """ + Test main loop reads from 2 consecutive writes to dlt file + """ + # First only main loop to read dlt file + self._start_main_loop() + # First write to dlt file + append_stream_to_file(stream_multiple, self.dlt_file_name) + time.sleep(0.1) + # Expect main loop could dispatch 2 logs + self.assertTrue(self.callback_is_called) + self.assertEqual(2, len(self.message_queue)) + # Second write to dlt file, and expect to dispatch 3 logs + append_stream_to_file(stream_with_params, self.dlt_file_name) + time.sleep(0.1) + self.assertEqual(3, len(self.message_queue)) + + def test_005_callback_return_false(self): + """ + If callback returns false, then main loop should exit + """ + # Set callback return value to False + self.callback_return_value = False + # Write to file + append_stream_to_file(stream_multiple, self.dlt_file_name) + time.sleep(0.1) + # Open main loop to dispatch logs + self._start_main_loop() + # Expect main loop could dispatch 2 logs + self.assertTrue(self.callback_is_called) + # Callback returns False after it handles the first message, which terminates main loop + # So, main loop wont be able to proceed the second message + self.assertEqual(1, len(self.message_queue)) + self.assertFalse(self.main_loop.is_alive()) + + def test_006_read_empty_apid_ctid_message(self): + """ + Simulate a case to read a apid==b"" and ctid==b"" message + """ + # Construct a message with apid==b"" and ctid==b"" + message = create_messages(stream_with_params, from_file=True)[0] + message.extendedheader.apid = b"" + message.extendedheader.ctid = b"" + # Write this message into dlt file + append_message_to_file(message, self.dlt_file_name) + # Expectation: py_dlt_file_main_loop reads out the first batch messages to message_queue + self._start_main_loop() + time.sleep(0.1) + self.assertTrue(self.callback_is_called) + self.assertEqual(1, len(self.message_queue)) + # Expectation: the received message should have apid==b"" and ctid==b"" + self.assertEqual("", self.message_queue[0].apid) + self.assertEqual("", self.message_queue[0].ctid) diff --git a/tests/dlt_main_loop_unit_test.py b/tests/dlt_main_loop_with_dlt_client_unit_test.py similarity index 98% rename from tests/dlt_main_loop_unit_test.py rename to tests/dlt_main_loop_with_dlt_client_unit_test.py index 857a721..f3196fa 100644 --- a/tests/dlt_main_loop_unit_test.py +++ b/tests/dlt_main_loop_with_dlt_client_unit_test.py @@ -25,7 +25,7 @@ def mock_dlt_receiver_receive_socket(client_receiver, partial=False, Fail=False) return len(buf) -class TestMainLoop(unittest.TestCase): +class TestMainLoopWithDltClient(unittest.TestCase): def setUp(self): self.client = DLTClient() self.client._connected_socket = Mock() diff --git a/tests/utils.py b/tests/utils.py index 9f6661a..56734e6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -134,6 +134,19 @@ ) +def append_stream_to_file(stream, file_name): + msgs = create_messages(stream, from_file=True) + for msg in msgs: + append_message_to_file(msg, file_name) + + +def append_message_to_file(message, file_name): + # Use 'ab' instead of 'wb' because it is to append instead to overwrite + with open(file_name, "ab") as file: + file.write(message.to_bytes()) + file.flush() + + def create_messages(stream, from_file=False): if from_file is False: stream.seek(0)