From aaa3dbdbc37d3ad189093cf029f36582550885d3 Mon Sep 17 00:00:00 2001 From: Jack Rubacha Date: Wed, 17 Apr 2024 20:56:16 -0400 Subject: [PATCH] initial with threads and classes --- .../odysseus_tree/package/calypso/calypso.mk | 2 +- .../tpu_telemetry/telemetry/__init__.py | 119 ++++++++++++--- .../telemetry/poll_data/__init__.py | 4 +- .../tpu_telemetry/telemetry/poll_data/can.py | 39 +++-- .../telemetry/poll_data/environment.py | 72 +++++---- .../telemetry/poll_data/example.py | 31 ++-- .../telemetry/poll_data/halow.py | 128 ++++++++-------- .../telemetry/poll_data/on_board.py | 142 +++++++++--------- .../tpu_telemetry/telemetry/publish.py | 49 ++++-- 9 files changed, 361 insertions(+), 225 deletions(-) diff --git a/odysseus/odysseus_tree/package/calypso/calypso.mk b/odysseus/odysseus_tree/package/calypso/calypso.mk index d771faa8..27ca19fc 100644 --- a/odysseus/odysseus_tree/package/calypso/calypso.mk +++ b/odysseus/odysseus_tree/package/calypso/calypso.mk @@ -1,4 +1,4 @@ -CALYPSO_VERSION = bd753c18c30375c8cb66c5d118b1f2caf574d571 +CALYPSO_VERSION = da5ee44e6e8bc15907289c050c10d36a54575e79 CALYPSO_SITE_METHOD = git CALYPSO_SITE = https://github.com/Northeastern-Electric-Racing/Calypso CALYPSO_GIT_SUBMODULES = YES diff --git a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/__init__.py b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/__init__.py index b4e0cd47..d8fbe6d8 100644 --- a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/__init__.py +++ b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/__init__.py @@ -1,39 +1,114 @@ +from abc import ABC, abstractmethod import asyncio +from subprocess import PIPE, Popen +import subprocess +import threading +from time import sleep -routines = {} -processes = [] +class MeasureTask(ABC): + def __init__(self, freq): + self.interval = freq -def task(): + @abstractmethod + def measurement(self) -> tuple[str, list[str], str]: + """ + returns (str, [str], str) in the format (topic, [values], unit) + """ + pass + + async def set_interval(self, stop: asyncio.Event): + """ + Behaves *like* JS' `setInterval`: + Run the function fn every interval milliseconds, and yield the result. + Stop when the stop event is set. + Uses the rest of the given args and kwargs for the function itself. + """ + + while not stop.is_set(): + measure = self.measurement() + if measure == None: + yield [] + else: + yield measure + await asyncio.sleep(self.interval / 1000) + + +class BufferedCommand: """ - A decorator that registers functions as tasks to be execute + Buffer a command's output into a list, that can be read + on demand. """ - def wrapper(fn): - processes.append(fn) - return fn # return the function unmodified so manual calls still work + def __init__(self, command: list[str], limit: int = 20) -> None: + """ + Construct a BufferedCommand. + """ + self.process = Popen(command, stdout=PIPE, text=True, bufsize=1) - return wrapper + self.buffer = ItemStore(limit=limit) -def measurement(freq: int): + def __deinit__(self): + self.process.terminate() + + def streamer(process, buffer): + if process.poll() is None: + for line in process.stdout: + buffer.add(line) + + def get_thread(self): + return threading.Thread(target=BufferedCommand.streamer, args=(self.process, self.buffer,), daemon=True) + + def read(self) -> list: + return self.buffer.getAll() + + +class OneshotCommand: """ - Marks a measurement, takes a frequency (in ms) to repeat the measurement. + Rerun oneshot commands on a frequency in a thread. + Use for any commands which have the potential to hang. """ + def __init__(self, command: list[str], runFreq: int, limit: int = 20, ) -> None: + + self.buffer: ItemStore = ItemStore(limit=limit) - def wrapper(fn): - routines[fn] = freq - return fn # return the function unmodified so manual calls still work + self.command = command - return wrapper + self.runFreq = runFreq + def streamer(command, buffer, runFreq): + while True: + data = subprocess.run(args=command, text=True).stdout + if not data: + buffer.add(data) + sleep(runFreq / 1000) -async def set_interval(fn, interval: int, stop: asyncio.Event, *args, **kwargs): + def get_thread(self): + return threading.Thread(target=OneshotCommand.streamer, args=(self.command, self.buffer, self.runFreq), daemon=True) + + def read(self) -> list: + return self.buffer.getAll() + + + + + +class ItemStore(): """ - Behaves *like* JS' `setInterval`: - Run the function fn every interval milliseconds, and yield the result. - Stop when the stop event is set. - Uses the rest of the given args and kwargs for the function itself. + Thread safe store, when limit hits list will clear """ + def __init__(self, limit: int): + self.lock = threading.Lock() + self.items = [] + self.limit = limit + + def add(self, item) -> None: + if len(self.items) >= self.limit: + self.items.clear() + + with self.lock: + self.items.append(item) - while not stop.is_set(): - yield fn(*args, **kwargs) - await asyncio.sleep(interval / 1000) + def getAll(self): + with self.lock: + items, self.items = self.items, [] + return items \ No newline at end of file diff --git a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/__init__.py b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/__init__.py index 3e2087c6..139597f9 100644 --- a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/__init__.py +++ b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/__init__.py @@ -1,4 +1,2 @@ -# This file looks like it does nothing, but only things -# that get imported here are going to get run as measurement routines. -from . import can as _, example as _, halow as _, on_board as _ + diff --git a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/can.py b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/can.py index f56a0c25..99e272aa 100644 --- a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/can.py +++ b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/can.py @@ -1,24 +1,33 @@ -from subprocess import check_output -from .. import measurement +from .. import BufferedCommand, MeasureTask -FETCH_CMD = "bmon -o format:quitafter=1 -p can0" -# example = "can0 0 11024 0 40" +# read in 1/10 a second increments +FETCH_CMD = ["bmon", "-r", "0.09", "-o", "format:fmt='$(attr:txrate:bytes) $(attr:rxrate:bytes)\n'", "-p", "can0" ] -@measurement(100) -def fetch_data(): - try: - out = check_output(FETCH_CMD.split(" "), shell=False).decode("utf-8") - split = out.split(" ") - data = [split[4].strip(), split[2].strip()] - return [("TPU/Can/DataRate", data, "kb/s")] - except Exception as e: - print(f"Failed to fetch data: {e}") - return [] + +class CanMT(MeasureTask, BufferedCommand): + def __init__(self): + MeasureTask.__init__(self, 100) + BufferedCommand.__init__(self, FETCH_CMD) + + def measurement(self): + try: + items = self.read() + send_data = [] + for item in items: + item = item.strip('\'').split(" ") + data = [item[0].strip(), item[1].strip()] + send_data.append(('TPU/Can/DataRate', data, 'kb/s')) + + return send_data + except Exception as e: + print(f"Failed to fetch data: {e}") + return [] def main(): - print(fetch_data()) + ex = CanMT() + print(ex.measurement()) if __name__ == "__main__": diff --git a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/environment.py b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/environment.py index bc5ec892..3ca8349b 100644 --- a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/environment.py +++ b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/environment.py @@ -1,34 +1,48 @@ # open file to read error -from .. import measurement -def read_sensor_data(sensor_path): - try: - with open(sensor_path, 'r') as f: - sensor_data = int(f.read().strip()) / 1000 - return sensor_data - except IOError: - print("Error: Unable to read data from", sensor_path) - return None - -# get data from given path -@measurement(1000) -def fetch_environmental_data(): - data = [] - - temp_sensor_path = '/sys/class/hwmon/hwmon2/temp1_input' - humidity_sensor_path = '/sys/class/hwmon/hwmon2/humidity1_input' - - temperature = read_sensor_data(temp_sensor_path) - if temperature is not None: - data.append(("TPU/Environment/Temperature", [temperature], "Celsius")) - - humidity = read_sensor_data(humidity_sensor_path) - if humidity is not None: - data.append(("TPU/Environment/Humidity", [humidity], "%")) - - return data +from .. import MeasureTask + + +class EnvironmentMT(MeasureTask): + def __init__(self): + MeasureTask.__init__(self, 1000) + + def read_sensor_data(sensor_path): + """ + Read the sensor path and send it + """ + try: + with open(sensor_path, 'r') as f: + sensor_data = int(f.read().strip()) / 1000 + return sensor_data + except IOError: + print("Error: Unable to read data from", sensor_path) + return None + + def measurement(self): + try: + data = [] + + temp_sensor_path = '/sys/class/hwmon/hwmon2/temp1_input' + humidity_sensor_path = '/sys/class/hwmon/hwmon2/humidity1_input' + + temperature = self.read_sensor_data(temp_sensor_path) + if temperature is not None: + data.append(("TPU/Environment/Temperature", [temperature], "Celsius")) + + humidity = self.read_sensor_data(humidity_sensor_path) + if humidity is not None: + data.append(("TPU/Environment/Humidity", [humidity], "%")) + + return data + except Exception as e: + print(f"Failed to fetch data: {e}") + return [] + def main(): - print(fetch_environmental_data()) + ex = EnvironmentMT() + print(ex.measurement()) + if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/example.py b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/example.py index 4fdf6191..4081825e 100644 --- a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/example.py +++ b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/example.py @@ -1,16 +1,23 @@ -from .. import measurement +from .. import MeasureTask +class ExampleMT(MeasureTask): + def __init__(self): + MeasureTask.__init__(self, 1000) -@measurement(500) -def fetch_data(): - return [ - ("TPU/Example/Data1", ["114"], "C"), - ("TPU/Example/Data1", ["1431242"], "D"), - ("TPU/Example/Data1", ["1431242"], "Q"), - ("TPU/Example/Data1", ["112343122"], "X"), - ("TPU/Example/Data1", ["112341232"], "M"), - ("TPU/Example/Data1", ["1413242"], "W"), - ] + def measurement(self): + return [ + ("TPU/Example/Data1", ["114"], "C"), + ("TPU/Example/Data2", ["1431242"], "D"), + ("TPU/Example/Data3", ["1431242"], "Q"), + ("TPU/Example/Data4", ["112343122"], "X"), + ("TPU/Example/Data5", ["112341232"], "M"), + ("TPU/Example/Data6", ["1413242"], "W"), + ] +def main(): + ex = ExampleMT() + print(ex.measurement()) -# fetch_data() # returns List[(str, [str], str)] in the format (topic, values, unit) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/halow.py b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/halow.py index b38a0665..e670c9ce 100644 --- a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/halow.py +++ b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/halow.py @@ -1,5 +1,4 @@ -from subprocess import check_output -from .. import measurement +from .. import BufferedCommand, MeasureTask, OneshotCommand example_data = """BSSID BW TX bit rate RX bit rate ===================================================================== @@ -16,66 +15,77 @@ SNR : 0 --------------------------------------------------- OK""" -FETCH_RSSI_CMD = "cli_app show stats simple_rx" -FETCH_RATE_CMD = "cli_app show ap 0" -FETCH_THROUGHPUT_CMD = "bmon -o format:quitafter=1 -p wlan1" - -@measurement(100) -def fetch_data_Throughput(): - try: - out = check_output(FETCH_THROUGHPUT_CMD.split(" "), shell=False).decode("utf-8") - split = out.split(" ") - data = [split[4].strip(), split[2].strip()] - return [("TPU/HaLow/DataRate", data, "kb/s")] - except Exception as e: - print(f"Error fetching data: {e}") - return [] - -@measurement(500) -def fetch_data_ApMCS(): - try: - out = check_output(FETCH_RATE_CMD.split(" "), shell=False).decode("utf-8") - data_line = out.splitlines()[2] - parsed_data = data_line.split()[5][:-1].strip() - return [("TPU/HaLow/ApMCS", [parsed_data], "integer 0-10")] - except Exception as e: - print(f"Error fetching data: {e}") - return [] - - -@measurement(500) -def fetch_data_StaMCS(): - try: - out = check_output(FETCH_RATE_CMD.split(" "), shell=False).decode("utf-8") - data_line = out.splitlines()[2] - parsed_data = data_line.split()[3][:-1].strip() - return [("TPU/HaLow/StaMCS", [parsed_data], "integer 0-10")] - except Exception as e: - print(f"Error fetching data: {e}") - return [] - - -@measurement(500) -def fetch_data_RSSI(): - try: - out = check_output(FETCH_RSSI_CMD.split(" "), shell=False).decode("utf-8") - split = out.splitlines()[1] - data = split.split(":")[1].strip() - return [("TPU/HaLow/RSSI", [data], "dbm")] - except Exception as e: - print(f"Error fetching data: {e}") - return [] - - -def fetch_data(): - return fetch_data_Throughput() + fetch_data_ApMCS() + fetch_data_StaMCS() + fetch_data_RSSI() + + +FETCH_THROUGHPUT_CMD = ["bmon","-r", "0.09", "format:fmt='$(attr:txrate:bytes) $(attr:rxrate:bytes)\n'", "-p", "wlan1"] +FETCH_RATE_CMD = ["cli_app","show", "ap", "0"] +FETCH_RSSI_CMD = ["cli_app", "show", "stats", "simple_rx"] + +class HalowThroughputMT(MeasureTask, BufferedCommand): + def __init__(self): + MeasureTask.__init__(self, 100) + BufferedCommand.__init__(self, FETCH_THROUGHPUT_CMD) + + def measurement(self): + try: + items = self.read() + send_data = [] + for item in items: + item = item.strip('\'').split(" ") + data = [item[0].strip(), item[1].strip()] + send_data.append(('TPU/HaLow/DataRate', data, 'kb/s')) + + return send_data + except Exception as e: + print(f"Failed to fetch data: {e}") + return [] + + +class HalowMCSMT(MeasureTask, OneshotCommand): + def __init__(self): + MeasureTask.__init__(self, 500) + OneshotCommand.__init__(self, FETCH_RATE_CMD, 450) + + def measurement(self): + try: + out = self.read() + for line in out: + data_line = line.splitlines()[2] + parsed_data_ap = data_line.split()[5][:-1].strip() + parsed_data_sta = data_line.split()[3][:-1].strip() + return [("TPU/HaLow/ApMCS", [parsed_data_ap], "int"), + ("TPU/HaLow/StaMCS", [parsed_data_sta], "int")] + except Exception as e: + print(f"Error fetching data: {e}") + return [] + +class HalowRSSIMT(MeasureTask, OneshotCommand): + def __init__(self): + MeasureTask.__init__(self, 500) + OneshotCommand.__init__(self, FETCH_RSSI_CMD, 450) + + def measurement(self): + try: + out = self.read() + for line in out: + split = out.splitlines()[1] + data = split.split(":")[1].strip() + return [("TPU/HaLow/RSSI", [data], "int")] + except Exception as e: + print(f"Error fetching data: {e}") + return [] + def main(): - print(fetch_data_Throughput()) - print(fetch_data_ApMCS()) - print(fetch_data_StaMCS()) - print(fetch_data_RSSI()) + ex1 = HalowThroughputMT() + print(ex1.measurement()) + + ex2 = HalowMCSMT() + print(ex2.measurement()) + + ex3 = HalowRSSIMT() + print(ex3.measurement()) if __name__ == "__main__": diff --git a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/on_board.py b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/on_board.py index 6ae441a1..aab13e94 100644 --- a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/on_board.py +++ b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/poll_data/on_board.py @@ -1,76 +1,82 @@ -from .. import measurement +from .. import MeasureTask import psutil -# fetch_data() -> List[(str, [str], str)] - - -def fetch_data(): - return ( - fetch_cpu_temperature() - + fetch_cpu_usage() - + fetch_broker_cpu_usage() - + fetch_available_memory() - ) - - -# CPU Temp -@measurement(2000) -def fetch_cpu_temperature(): - try: - temps = psutil.sensors_temperatures(fahrenheit=False) - for name, entries in temps.items(): - for entry in entries: - line = " %-20s %s °C (high = %s °C, critical = %s °C)" % ( - entry.label or name, - entry.current, - entry.high, - entry.critical, - ) - return [("TPU/OnBoard/CpuTemp", [str(entry.current)], "celsius")] - except Exception as e: - print(f"Error fetching system temperature: {e}") - return [] - - -# CPU Usage -@measurement(50) -def fetch_cpu_usage(): - try: - cpu_usage = psutil.cpu_percent() - return [("TPU/OnBoard/CpuUsage", [str(cpu_usage)], "percent")] - except Exception as e: - print(f"Error fetching CPU usage: {e}") - return [] - - -# CPU usage of nanomq process -@measurement(100) -def fetch_broker_cpu_usage(): - try: - with open("/var/run/mosquitto.pid", "r") as file: - pid = int(file.read()) - process = psutil.Process(pid) - broker_cpu_usage = process.cpu_percent() - return [("TPU/OnBoard/BrokerCpuUsage", [str(broker_cpu_usage)], "percent")] - except Exception as e: - print(f"Error fetching broker CPU usage: {e}") - return [] - - -# CPU available memory -@measurement(500) -def fetch_available_memory(): - try: - mem_info = psutil.virtual_memory() - mem_available = mem_info.available / (1024 * 1024) - return [("TPU/OnBoard/MemAvailable", [str(mem_available)], "MB")] - except Exception as e: - print(f"Error fetching available memory: {e}") - return [] +class CpuTempMT(MeasureTask): + def __init__(self): + MeasureTask.__init__(self, 2000) + + def measurement(self): + try: + temps = psutil.sensors_temperatures(fahrenheit=False) + for name, entries in temps.items(): + for entry in entries: + line = " %-20s %s °C (high = %s °C, critical = %s °C)" % ( + entry.label or name, + entry.current, + entry.high, + entry.critical, + ) + return [("TPU/OnBoard/CpuTemp", [str(entry.current)], "celsius")] + except Exception as e: + print(f"Error fetching system temperature: {e}") + return [] + + +class CpuUsageMT(MeasureTask): + def __init__(self): + MeasureTask.__init__(self, 50) + + def measurement(self): + try: + cpu_usage = psutil.cpu_percent() + return [("TPU/OnBoard/CpuUsage", [str(cpu_usage)], "percent")] + except Exception as e: + print(f"Error fetching CPU usage: {e}") + return [] + + +class BrokerCpuUsageMT(MeasureTask): + def __init__(self): + MeasureTask.__init__(self, 100) + + def measurement(self): + try: + with open("/var/run/mosquitto.pid", "r") as file: + pid = int(file.read()) + process = psutil.Process(pid) + broker_cpu_usage = process.cpu_percent() + return [("TPU/OnBoard/BrokerCpuUsage", [str(broker_cpu_usage)], "percent")] + except Exception as e: + print(f"Error fetching broker CPU usage: {e}") + return [] + + +class MemAvailMT(MeasureTask): + def __init__(self): + MeasureTask.__init__(self, 500) + + def measurement(self): + try: + mem_info = psutil.virtual_memory() + mem_available = mem_info.available / (1024 * 1024) + return [("TPU/OnBoard/MemAvailable", [str(mem_available)], "MB")] + except Exception as e: + print(f"Error fetching available memory: {e}") + return [] def main(): - print(fetch_data()) + ex1 = CpuTempMT() + print(ex1.measurement()) + + ex2 = CpuUsageMT() + print(ex2.measurement()) + + ex3 = BrokerCpuUsageMT() + print(ex3.measurement()) + + ex4 = MemAvailMT() + print(ex4.measurement()) if __name__ == "__main__": diff --git a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/publish.py b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/publish.py index 34fb99e5..e47a1c02 100644 --- a/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/publish.py +++ b/odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/publish.py @@ -1,14 +1,18 @@ import asyncio import signal + +from telemetry.poll_data import environment, halow, on_board +import telemetry.poll_data.can as can; +import telemetry.poll_data.example as example; from . import ( - server_data_pb2, - routines, - processes, - set_interval, - poll_data as _, # your editor lies, this is an important import. + BufferedCommand, + MeasureTask, + server_data_pb2 ) from gmqtt import Client as MQTTClient +TASKS = [] + # initialize connection client = MQTTClient("tpu-publisher") STOP = asyncio.Event() @@ -24,8 +28,9 @@ def on_disconnect(client, packet, exc=None): def ask_exit(*args): STOP.set() - for task in processes: - task.exit() + for task in TASKS: + if task.deinit != None: + task.deinit() def publish_data(topic, message_data): @@ -33,14 +38,11 @@ def publish_data(topic, message_data): client.publish(topic, message_data) -async def interval(fn, freq): - async for result in set_interval(fn, freq, STOP): +async def interval(task: MeasureTask): + async for result in task.set_interval(STOP): for packet in result: - print(packet) - data = server_data_pb2.ServerData() topic, values, data.unit = packet - for val in values: data.value.append(val) @@ -53,13 +55,28 @@ async def run(host): client.on_connect = on_connect client.on_disconnect = on_disconnect - stagger = 1 / len(routines) - for fn in routines: - freq = routines[fn] + TASKS = [ example.ExampleMT(), # uncomment so example data is sent + can.CanMT(), + # environment.EnvironmentMT() # commented out bc sensor is currently broken + halow.HalowThroughputMT(), + halow.HalowMCSMT(), + halow.HalowRSSIMT(), + on_board.CpuTempMT(), + on_board.CpuUsageMT(), + on_board.BrokerCpuUsageMT(), + on_board.MemAvailMT() + ] + + stagger = 1 / len(TASKS) + for task in TASKS: + + # if task is of type BufferedCommand, register it + if isinstance(task, BufferedCommand): + task.get_thread().start() # should not be awaited, this just gets run parallely along with other intervals. - asyncio.create_task(interval(fn, freq)) + asyncio.create_task(interval(task)) await asyncio.sleep(stagger) await STOP.wait()