Skip to content

Commit

Permalink
initial with threads and classes
Browse files Browse the repository at this point in the history
  • Loading branch information
jr1221 committed Apr 18, 2024
1 parent 8ea8cd0 commit aaa3dbd
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 225 deletions.
2 changes: 1 addition & 1 deletion odysseus/odysseus_tree/package/calypso/calypso.mk
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CALYPSO_VERSION = bd753c18c30375c8cb66c5d118b1f2caf574d571
CALYPSO_VERSION = da5ee44e6e8bc15907289c050c10d36a54575e79
CALYPSO_SITE_METHOD = git
CALYPSO_SITE = https://github.com/Northeastern-Electric-Racing/Calypso
CALYPSO_GIT_SUBMODULES = YES
Expand Down
119 changes: 97 additions & 22 deletions odysseus/odysseus_tree/sources/tpu_telemetry/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,114 @@
from abc import ABC, abstractmethod
import asyncio
from subprocess import PIPE, Popen
import subprocess
import threading
from time import sleep

routines = {}
processes = []
class MeasureTask(ABC):
def __init__(self, freq):
self.interval = freq

def task():
@abstractmethod
def measurement(self) -> tuple[str, list[str], str]:
"""
returns (str, [str], str) in the format (topic, [values], unit)
"""
pass

async def set_interval(self, stop: asyncio.Event):
"""
Behaves *like* JS' `setInterval`:
Run the function fn every interval milliseconds, and yield the result.
Stop when the stop event is set.
Uses the rest of the given args and kwargs for the function itself.
"""

while not stop.is_set():
measure = self.measurement()
if measure == None:
yield []
else:
yield measure
await asyncio.sleep(self.interval / 1000)


class BufferedCommand:
"""
A decorator that registers functions as tasks to be execute
Buffer a command's output into a list, that can be read
on demand.
"""

def wrapper(fn):
processes.append(fn)
return fn # return the function unmodified so manual calls still work
def __init__(self, command: list[str], limit: int = 20) -> None:
"""
Construct a BufferedCommand.
"""
self.process = Popen(command, stdout=PIPE, text=True, bufsize=1)

return wrapper
self.buffer = ItemStore(limit=limit)

def measurement(freq: int):
def __deinit__(self):
self.process.terminate()

def streamer(process, buffer):
if process.poll() is None:
for line in process.stdout:
buffer.add(line)

def get_thread(self):
return threading.Thread(target=BufferedCommand.streamer, args=(self.process, self.buffer,), daemon=True)

def read(self) -> list:
return self.buffer.getAll()


class OneshotCommand:
"""
Marks a measurement, takes a frequency (in ms) to repeat the measurement.
Rerun oneshot commands on a frequency in a thread.
Use for any commands which have the potential to hang.
"""
def __init__(self, command: list[str], runFreq: int, limit: int = 20, ) -> None:

self.buffer: ItemStore = ItemStore(limit=limit)

def wrapper(fn):
routines[fn] = freq
return fn # return the function unmodified so manual calls still work
self.command = command

return wrapper
self.runFreq = runFreq

def streamer(command, buffer, runFreq):
while True:
data = subprocess.run(args=command, text=True).stdout
if not data:
buffer.add(data)
sleep(runFreq / 1000)

async def set_interval(fn, interval: int, stop: asyncio.Event, *args, **kwargs):
def get_thread(self):
return threading.Thread(target=OneshotCommand.streamer, args=(self.command, self.buffer, self.runFreq), daemon=True)

def read(self) -> list:
return self.buffer.getAll()





class ItemStore():
"""
Behaves *like* JS' `setInterval`:
Run the function fn every interval milliseconds, and yield the result.
Stop when the stop event is set.
Uses the rest of the given args and kwargs for the function itself.
Thread safe store, when limit hits list will clear
"""
def __init__(self, limit: int):
self.lock = threading.Lock()
self.items = []
self.limit = limit

def add(self, item) -> None:
if len(self.items) >= self.limit:
self.items.clear()

with self.lock:
self.items.append(item)

while not stop.is_set():
yield fn(*args, **kwargs)
await asyncio.sleep(interval / 1000)
def getAll(self):
with self.lock:
items, self.items = self.items, []
return items
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
# This file looks like it does nothing, but only things
# that get imported here are going to get run as measurement routines.

from . import can as _, example as _, halow as _, on_board as _

Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
from subprocess import check_output
from .. import measurement
from .. import BufferedCommand, MeasureTask

FETCH_CMD = "bmon -o format:quitafter=1 -p can0"
# example = "can0 0 11024 0 40"

# read in 1/10 a second increments
FETCH_CMD = ["bmon", "-r", "0.09", "-o", "format:fmt='$(attr:txrate:bytes) $(attr:rxrate:bytes)\n'", "-p", "can0" ]

@measurement(100)
def fetch_data():
try:
out = check_output(FETCH_CMD.split(" "), shell=False).decode("utf-8")
split = out.split(" ")
data = [split[4].strip(), split[2].strip()]
return [("TPU/Can/DataRate", data, "kb/s")]
except Exception as e:
print(f"Failed to fetch data: {e}")
return []

class CanMT(MeasureTask, BufferedCommand):
def __init__(self):
MeasureTask.__init__(self, 100)
BufferedCommand.__init__(self, FETCH_CMD)

def measurement(self):
try:
items = self.read()
send_data = []
for item in items:
item = item.strip('\'').split(" ")
data = [item[0].strip(), item[1].strip()]
send_data.append(('TPU/Can/DataRate', data, 'kb/s'))

return send_data
except Exception as e:
print(f"Failed to fetch data: {e}")
return []


def main():
print(fetch_data())
ex = CanMT()
print(ex.measurement())


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,48 @@
# open file to read error
from .. import measurement
def read_sensor_data(sensor_path):
try:
with open(sensor_path, 'r') as f:
sensor_data = int(f.read().strip()) / 1000
return sensor_data
except IOError:
print("Error: Unable to read data from", sensor_path)
return None

# get data from given path
@measurement(1000)
def fetch_environmental_data():
data = []

temp_sensor_path = '/sys/class/hwmon/hwmon2/temp1_input'
humidity_sensor_path = '/sys/class/hwmon/hwmon2/humidity1_input'

temperature = read_sensor_data(temp_sensor_path)
if temperature is not None:
data.append(("TPU/Environment/Temperature", [temperature], "Celsius"))

humidity = read_sensor_data(humidity_sensor_path)
if humidity is not None:
data.append(("TPU/Environment/Humidity", [humidity], "%"))

return data
from .. import MeasureTask


class EnvironmentMT(MeasureTask):
def __init__(self):
MeasureTask.__init__(self, 1000)

def read_sensor_data(sensor_path):
"""
Read the sensor path and send it
"""
try:
with open(sensor_path, 'r') as f:
sensor_data = int(f.read().strip()) / 1000
return sensor_data
except IOError:
print("Error: Unable to read data from", sensor_path)
return None

def measurement(self):
try:
data = []

temp_sensor_path = '/sys/class/hwmon/hwmon2/temp1_input'
humidity_sensor_path = '/sys/class/hwmon/hwmon2/humidity1_input'

temperature = self.read_sensor_data(temp_sensor_path)
if temperature is not None:
data.append(("TPU/Environment/Temperature", [temperature], "Celsius"))

humidity = self.read_sensor_data(humidity_sensor_path)
if humidity is not None:
data.append(("TPU/Environment/Humidity", [humidity], "%"))

return data
except Exception as e:
print(f"Failed to fetch data: {e}")
return []


def main():
print(fetch_environmental_data())
ex = EnvironmentMT()
print(ex.measurement())


if __name__ == "__main__":
main()
main()
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from .. import measurement
from .. import MeasureTask

class ExampleMT(MeasureTask):
def __init__(self):
MeasureTask.__init__(self, 1000)

@measurement(500)
def fetch_data():
return [
("TPU/Example/Data1", ["114"], "C"),
("TPU/Example/Data1", ["1431242"], "D"),
("TPU/Example/Data1", ["1431242"], "Q"),
("TPU/Example/Data1", ["112343122"], "X"),
("TPU/Example/Data1", ["112341232"], "M"),
("TPU/Example/Data1", ["1413242"], "W"),
]
def measurement(self):
return [
("TPU/Example/Data1", ["114"], "C"),
("TPU/Example/Data2", ["1431242"], "D"),
("TPU/Example/Data3", ["1431242"], "Q"),
("TPU/Example/Data4", ["112343122"], "X"),
("TPU/Example/Data5", ["112341232"], "M"),
("TPU/Example/Data6", ["1413242"], "W"),
]

def main():
ex = ExampleMT()
print(ex.measurement())

# fetch_data() # returns List[(str, [str], str)] in the format (topic, values, unit)

if __name__ == "__main__":
main()
Loading

0 comments on commit aaa3dbd

Please sign in to comment.