Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support setting udp buffer size in run time #64

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 69 additions & 3 deletions dlt/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
DLT_EMPTY_FILE_ERROR = "DLT TRACE FILE IS EMPTY"
cDLT_FILE_NOT_OPEN_ERROR = "Could not open DLT Trace file (libdlt)" # pylint: disable=invalid-name

DLT_UDP_MULTICAST_FD_BUFFER_SIZE = int(os.environ.get("PYDLT_UDP_MULTICAST_FD_BUFFER_SIZE", 2 * (2**20))) # 2 Mb
DLT_UDP_MULTICAST_BUFFER_SIZE = int(os.environ.get("PYDLT_UDP_MULTICAST_BUFFER_SIZE", 8 * (2**20))) # 8 Mb


class cached_property(object): # pylint: disable=invalid-name
"""
Expand Down Expand Up @@ -865,13 +868,23 @@ def __len__(self):


class DLTClient(cDltClient):
"""DLTClient class takes care about correct initialization and
cleanup
"""
"""DLTClient class takes care about correct initialization and cleanup"""

verbose = 0

def __init__(self, **kwords):
"""Initialize a DLTClient.

:param servIP: Optional[str] - dlt server IP.
:param hostIP: Optional[str] - Only available for udp multicast mode.
Set host interface address.
:param port: Optional[int] - dlt tcp daemon port.
:param verbose: Optional[bool] - Enable verbose output.
:param udp_fd_buffer_size_bytes: Optional[int] - Only available for udp
multicast mode. Set the UDP buffer size through setsockopt (unit: bytes).
:param udp_buffer_size_bytes: Optional[int] - Only available for udp
multicast mode. Set the DltReceiver's buffer size (unit: bytes).
"""
self.is_udp_multicast = False
self.verbose = kwords.pop("verbose", 0)
if dltlib.dlt_client_init(ctypes.byref(self), self.verbose) == DLT_RETURN_ERROR:
Expand Down Expand Up @@ -915,6 +928,9 @@ def __init__(self, **kwords):
# it ourselves elsewhere
self.port = kwords.get("port", DLT_DAEMON_TCP_PORT)

self._udp_fd_buffer_size_bytes = kwords.get("udp_fd_buffer_size_bytes", DLT_UDP_MULTICAST_FD_BUFFER_SIZE)
self._udp_buffer_size_bytes = kwords.get("udp_buffer_size_bytes", DLT_UDP_MULTICAST_BUFFER_SIZE)

def connect(self, timeout=None):
"""Connect to the server

Expand Down Expand Up @@ -973,7 +989,9 @@ def connect(self, timeout=None):
else:
if self.verbose:
logger.info("Connecting DLTClient using UDP Connection")

connected = dltlib.dlt_client_connect(ctypes.byref(self), self.verbose)
self._set_udp_multicast_buffer_size()

if self.verbose:
logger.info("DLT Connection return: %s", connected)
Expand Down Expand Up @@ -1051,6 +1069,54 @@ def client_loop(self):
dltlib.dlt_client_register_message_callback(self.msg_callback)
dltlib.dlt_client_main_loop(ctypes.byref(self), None, self.verbose)

def _set_udp_multicast_buffer_size(self, custom_fd_buffer_size_bytes=None, custom_buffer_size_bytes=None) -> None:
fd_buffer_size = int(self._udp_fd_buffer_size_bytes or custom_fd_buffer_size_bytes or 0)
buffer_size_bytes = int(self._udp_buffer_size_bytes or custom_buffer_size_bytes or 0)

if fd_buffer_size:
# Socket options are associated with an open file description. This
# means that file descriptors duplicated as a consequence of dup()
# (or similar) or fork() share the same set of socket options.
# -- Chapter 61.9 Socket Options.
# The Linux Programming Interface, p.1279
#
# The buffer size can be changed with a new fd which is created by
# dup system call (it's the internal implementation in
# `socket.fromfd`), so the code creates a socket instance first
# configures it and directly close it.
with socket.fromfd(self.sock, socket.AF_INET, socket.SOCK_DGRAM) as conf_socket:
logger.debug("Set UDP Multicast socket buffer size: %s kbytes", fd_buffer_size / 1024)
conf_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, fd_buffer_size)

real_buffer_size = int(conf_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) / 2)
if real_buffer_size != fd_buffer_size:
logger.warning(
(
"Failed to set UDP Multicast buffer size. set_size: %s, real_size: %s. "
"Bypass the error and continue"
),
fd_buffer_size / 1024,
real_buffer_size / 1024,
)
logger.warning(
(
"Please run command `sysctl -w net.core.rmem_max=%s` with root permission to "
"set the maximum size and restart dlt again."
),
fd_buffer_size,
)

if buffer_size_bytes:
logger.debug("Set UDP Multicast DltReceiver buffer size: %s kbytes", buffer_size_bytes / 1024)
ret = dltlib.dlt_receiver_init(
ctypes.byref(self.receiver), self.sock, self.receiver.type, buffer_size_bytes
)
if ret < 0:
raise RuntimeError(
f"Failed to set UDP Multicast DltReceiver buffer size. return code: {ret}, "
f"buffer_size_bytes: {buffer_size_bytes}"
)


def py_dlt_file_main_loop(dlt_reader, limit=None, callback=None):
"""Main loop to read dlt messages from dlt file."""
Expand Down
12 changes: 11 additions & 1 deletion dlt/dlt_broker_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from dlt.dlt import (
DLTClient,
DLT_DAEMON_TCP_PORT,
DLT_UDP_MULTICAST_BUFFER_SIZE,
DLT_UDP_MULTICAST_FD_BUFFER_SIZE,
cDLT_FILE_NOT_OPEN_ERROR,
load,
py_dlt_client_main_loop,
Expand Down Expand Up @@ -402,6 +404,8 @@ def __init__(
self.tracefile = None
self.last_connected = time.time()
self.last_message = time.time() - 120.0
self._udp_fd_buffer_size_bytes = client_cfg.get("udp_fd_buffer_size_bytes", DLT_UDP_MULTICAST_FD_BUFFER_SIZE)
self._udp_buffer_size_bytes = client_cfg.get("udp_buffer_size_bytes", DLT_UDP_MULTICAST_BUFFER_SIZE)

def is_valid_message(self, message):
return message and (message.apid != "" or message.ctid != "")
Expand All @@ -420,7 +424,13 @@ def _client_connect(self):
self._port,
self._filename,
)
self._client = DLTClient(servIP=self._ip_address, port=self._port, verbose=self.verbose)
self._client = DLTClient(
servIP=self._ip_address,
port=self._port,
verbose=self.verbose,
udp_fd_buffer_size_bytes=self._udp_fd_buffer_size_bytes,
udp_buffer_size_bytes=self._udp_buffer_size_bytes,
)
connected = self._client.connect(self.timeout)
if connected:
logger.info("DLTClient connected to %s", self._client.servIP)
Expand Down
22 changes: 21 additions & 1 deletion dlt/py_dlt_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import time

from dlt.dlt import DLT_UDP_MULTICAST_FD_BUFFER_SIZE, DLT_UDP_MULTICAST_BUFFER_SIZE
from dlt.dlt_broker import DLTBroker

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)-8s %(message)s")
Expand All @@ -18,13 +19,32 @@ def parse_args():
parser = argparse.ArgumentParser(description="Receive DLT messages")
parser.add_argument("--host", required=True, help="hostname or ip address to connect to")
parser.add_argument("--file", required=True, help="The file into which the messages will be written")
parser.add_argument(
"--udp-fd-buffer-size",
dest="udp_fd_buffer_size",
default=DLT_UDP_MULTICAST_FD_BUFFER_SIZE,
type=int,
help=f"Set the socket buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_FD_BUFFER_SIZE} bytes",
)
parser.add_argument(
"--udp-buffer-size",
dest="udp_buffer_size",
default=DLT_UDP_MULTICAST_BUFFER_SIZE,
type=int,
help=f"Set the DltReceiver buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_BUFFER_SIZE} bytes",
)
return parser.parse_args()


def dlt_receive(options):
"""Receive DLT messages via DLTBroker"""
logger.info("Creating DLTBroker instance")
broker = DLTBroker(ip_address=options.host, filename=options.file)
broker = DLTBroker(
ip_address=options.host,
filename=options.file,
udp_fd_buffer_size_bytes=options.udp_buffer_size,
udp_buffer_size_bytes=options.udp_fd_buffer_size,
)

logger.info("Starting DLTBroker")
broker.start() # start the loop
Expand Down
Loading