Skip to content

Commit

Permalink
Merge pull request #54 from bmwcarit/add_dlt_file_spinner
Browse files Browse the repository at this point in the history
Add DltFileSpinner to fetch message for DltBroker
  • Loading branch information
yen3 authored Jun 13, 2023
2 parents cfd6ccd + d233bdc commit b4128ac
Show file tree
Hide file tree
Showing 9 changed files with 920 additions and 69 deletions.
38 changes: 36 additions & 2 deletions dlt/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import struct
import time
import threading
import multiprocessing

from dlt.core import (
cDLTFilter,
Expand Down Expand Up @@ -613,7 +614,10 @@ def __init__(self, **kwords):
self.indexed = False
self.end = False
self.live_run = kwords.pop("is_live", False)
# Stop event for threading usage in caller
self.stop_reading = threading.Event()
# Stop event for process usage in caller
self.stop_reading_proc = multiprocessing.Event()

def __repr__(self):
# pylint: disable=bad-continuation
Expand Down Expand Up @@ -764,7 +768,7 @@ def __getitem__(self, index):
def _open_file(self):
"""Open the configured file for processing"""
file_opened = False
while not self.stop_reading.is_set():
while not self.stop_reading.is_set() and not self.stop_reading_proc.is_set():
if dltlib.dlt_file_open(ctypes.byref(self), self.filename, self.verbose) >= DLT_RETURN_OK:
file_opened = True
break
Expand Down Expand Up @@ -801,7 +805,9 @@ def __iter__(self): # pylint: disable=too-many-branches
self._open_file()

found_data = False
while not self.stop_reading.is_set() or corruption_check_try: # pylint: disable=too-many-nested-blocks
while (
not self.stop_reading.is_set() and not self.stop_reading_proc.is_set()
) or corruption_check_try: # pylint: disable=too-many-nested-blocks
os_stat = os.stat(self.filename)
mtime = os_stat.st_mtime

Expand Down Expand Up @@ -1039,6 +1045,34 @@ def client_loop(self):
dltlib.dlt_client_main_loop(ctypes.byref(self), None, self.verbose)


def py_dlt_file_main_loop(dlt_reader, limit=None, callback=None):
"""Main loop to read dlt messages from dlt file."""
try:
for msg in dlt_reader:
logger.debug(
"Message from position %d and counter %d: %s", dlt_reader.file_position, dlt_reader.counter, msg
)

# send the message to the callback and check whether we
# need to continue
if callback and not callback(msg):
logger.debug("callback returned 'False'. Stopping main loop")
return False

if limit:
limit -= 1
if limit == 0:
break
except IOError as err:
# If the dlt file is empty, main_loop should not break, so it returns True
if str(err) == DLT_EMPTY_FILE_ERROR:
logger.debug("Dlt file is empty now. Wait until content is written")
return True
raise err

return True


# pylint: disable=too-many-arguments,too-many-return-statements,too-many-branches
def py_dlt_client_main_loop(client, limit=None, verbose=0, dumpfile=None, callback=None):
"""Reimplementation of dlt_client.c:dlt_client_main_loop() in order to handle callback
Expand Down
93 changes: 68 additions & 25 deletions dlt/dlt_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
DLT_DAEMON_TCP_PORT,
DLTContextHandler,
DLTFilterAckMessageHandler,
DLTMessageDispatcherBase,
DLTMessageHandler,
DLTFileSpinner,
DLTTimeValue,
)

Expand Down Expand Up @@ -39,7 +41,7 @@ class DLTBroker(object):

def __init__(
self,
ip_address,
ip_address=None,
port=DLT_DAEMON_TCP_PORT,
use_proxy=False,
enable_dlt_time=False,
Expand All @@ -50,8 +52,10 @@ def __init__(
):
"""Initialize the DLT Broker
:param str ip_address: IP address of the DLT Daemon. Defaults to TCP connection, unless a multicast address is
used. In that case an UDP multicast connection will be used
:param str | None ip_address: IP address of the DLT Daemon.
If None, then dlt does not come with any ip listening, in other words, it comes from dlt log directly;
Else, dlt comes from listening to some ip address. Defaults to TCP connection,
unless a multicast address is used. In that case an UDP multicast connection will be used
:param str post: Port of the DLT Daemon
:param bool use_proxy: Ignored - compatibility option
:param bool enable_dlt_time: Record the latest dlt message timestamp if enabled.
Expand Down Expand Up @@ -82,33 +86,59 @@ def __init__(
self.filter_ack_queue = None
self.filter_ack_msg_handler = None

kwargs["ip_address"] = ip_address
kwargs["port"] = port
kwargs["timeout"] = kwargs.get("timeout", DLT_CLIENT_TIMEOUT)
self.msg_handler = DLTMessageHandler(
self.msg_handler = self.create_dlt_message_dispather(ip_address, port, kwargs)

self.context_handler = DLTContextHandler(self.filter_queue, self.message_queue)

self._ip_address = ip_address
self._port = port
self._filename = kwargs.get("filename")

def create_dlt_message_dispather(self, ip_address, port, client_cfg) -> DLTMessageDispatcherBase:
if ip_address:
# If ip_address is given, then messages are retrieved from dlt client at run-time
return self._create_dlt_message_handler(ip_address, port, client_cfg)
else:
# If not ip_address is given, then messages are retrieved from the given filename
# The logs are written to the given filename from another process
return self._create_dlt_file_spinner(client_cfg.get("filename"))

def _create_dlt_message_handler(self, ip_address, port, client_cfg):
client_cfg["ip_address"] = ip_address
client_cfg["port"] = port
client_cfg["timeout"] = client_cfg.get("timeout", DLT_CLIENT_TIMEOUT)
return DLTMessageHandler(
self.filter_queue,
self.message_queue,
self.mp_stop_flag,
kwargs,
client_cfg,
dlt_time_value=self._dlt_time_value,
filter_ack_queue=self.filter_ack_queue,
)
self.context_handler = DLTContextHandler(self.filter_queue, self.message_queue)

self._ip_address = ip_address
self._port = port
self._filename = kwargs.get("filename")
def _create_dlt_file_spinner(self, file_name):
return DLTFileSpinner(
self.filter_queue,
self.message_queue,
self.mp_stop_flag,
file_name,
dlt_time_value=self._dlt_time_value,
filter_ack_queue=self.filter_ack_queue,
)

def start(self):
"""DLTBroker main worker method"""
logger.debug(
"Starting DLTBroker with parameters: use_proxy=%s, ip_address=%s, port=%s, filename=%s, multicast=%s",
False,
self._ip_address,
self._port,
self._filename,
ip.ip_address(self._ip_address).is_multicast,
)
if isinstance(self.msg_handler, DLTMessageHandler):
logger.debug(
"Starting DLTBroker with parameters: use_proxy=%s, ip_address=%s, port=%s, filename=%s, multicast=%s",
False,
self._ip_address,
self._port,
self._filename,
ip.ip_address(self._ip_address).is_multicast,
)
else:
logger.debug("Starting DLTBroker by reading %s", self._filename)

if self._dlt_time_value:
logger.debug("Enable dlt time for DLTBroker.")
Expand Down Expand Up @@ -166,12 +196,23 @@ def add_context(self, context_queue, filters=None):
)

if not self._recv_filter_set_ack(context_filter_ack_queue, True):
failure_reason = ""
if isinstance(self.msg_handler, DLTMessageHandler):
failure_reason = (
"It's possible that DLTClient client does not start."
" If it's a test case, it might be an error"
)
elif isinstance(self.msg_handler, DLTFileSpinner):
failure_reason = (
f"It's possible that dlt file {self._filename} is empty now. No big issue, "
f"filters would be added once after new message is available in dlt file"
)
logger.warning(
(
"Could not receive filter-setting messge ack. It's possible that DLTClient client does "
"not start. If it's a test case. It might be an error. For now, Run it anyway. "
"Could not receive filter-setting message ack. %s. For now, Run it anyway. "
"filters: %s, queue_id: %s"
),
failure_reason,
filters,
id(context_queue),
)
Expand All @@ -187,9 +228,11 @@ def remove_context(self, context_queue):

def stop(self):
"""Stop the broker"""
logger.info("Stopping DLTContextHandler and DLTMessageHandler")
logger.info("Stopping DLTContextHandler and %s", type(self.msg_handler).__name__)

self.msg_handler.break_blocking_main_loop()

logger.debug("Stop DLTMessageHandler")
logger.debug("Stop %s", type(self.msg_handler).__name__)
self.mp_stop_flag.set()

logger.debug("Stop DLTContextHandler")
Expand All @@ -205,7 +248,7 @@ def stop(self):
logger.debug("Waiting on DLTFilterAckMessageHandler ending")
self.filter_ack_msg_handler.join()

logger.debug("Waiting on DLTMessageHandler ending")
logger.debug("Waiting on %s ending", type(self.msg_handler).__name__)
if self.msg_handler.is_alive():
try:
self.msg_handler.terminate()
Expand Down
Loading

0 comments on commit b4128ac

Please sign in to comment.