diff --git a/examples/http_server_asyncio.py b/examples/http_server_asyncio.py index 7315944..a62899b 100644 --- a/examples/http_server_asyncio.py +++ b/examples/http_server_asyncio.py @@ -11,10 +11,7 @@ aiohttp - pip install aiohttp """ -# pylint: disable=duplicate-code - import asyncio -from concurrent.futures import ProcessPoolExecutor from datetime import datetime from multiprocessing import Manager @@ -22,69 +19,71 @@ from ruuvitag_sensor.ruuvi import RuuviTagSensor -all_data = {} + +tags: dict[str, str] = { + # "F4:A5:74:89:16:57": "kitchen", + # "CC:2C:6A:1E:59:3D": "bedroom", + # "BB:2C:6A:1E:59:3D": "livingroom", +} -def run_get_data_background(macs, queue): +async def run_get_data_background(macs_to_fetch, queue): """ Background process from RuuviTag Sensors """ + async for sensor_data in RuuviTagSensor.get_data_async(macs_to_fetch): + sensor_data[1]["time"] = str(datetime.now()) + queue.put(sensor_data) - def callback(data): - data[1]["time"] = str(datetime.now()) - queue.put(data) - - RuuviTagSensor.get_data(callback, macs) - -async def data_update(queue): +async def data_update(queue, shared_data): """ Update data sent by the background process to global all_data variable """ - global all_data # pylint: disable=global-variable-not-assigned while True: while not queue.empty(): - data = queue.get() - all_data[data[0]] = data[1] - for key, value in tags.items(): - if key in all_data: - all_data[key]["name"] = value + mac, sensor_data = queue.get() + shared_data["name"] = tags.get(mac, "unknown") + shared_data[mac] = sensor_data await asyncio.sleep(0.5) -async def get_all_data(_): - return web.json_response(all_data) +async def get_all_data(_, shared_data): + return web.json_response(dict(shared_data)) -async def get_data(request): +async def get_data(request, shared_data): mac = request.match_info.get("mac") - if mac not in all_data: + if mac not in shared_data: return web.json_response(status=404) - return web.json_response(all_data[mac]) + return web.json_response(dict(shared_data[mac])) -# pylint: disable=redefined-outer-name -def setup_routes(app): - app.router.add_get("/data", get_all_data) - app.router.add_get("/data/{mac}", get_data) +def setup_routes(application, shared_data): + application.router.add_get("/data", lambda request: get_all_data(request, shared_data)) + application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data)) if __name__ == "__main__": - tags = {"F4:A5:74:89:16:57": "kitchen", "CC:2C:6A:1E:59:3D": "bedroom", "BB:2C:6A:1E:59:3D": "livingroom"} - m = Manager() + data = m.dict() q = m.Queue() - # Start background process - executor = ProcessPoolExecutor(1) - executor.submit(run_get_data_background, list(tags.keys()), q) + macs = list(tags.keys()) - loop = asyncio.get_event_loop() + async def start_background_tasks(application): + application["run_get_data"] = asyncio.create_task(run_get_data_background(macs, q)) + application["data_updater"] = asyncio.create_task(data_update(q, data)) - # Start data updater - loop.create_task(data_update(q)) + async def cleanup_background_tasks(application): + application["run_get_data"].cancel() + application["data_updater"].cancel() + await asyncio.gather(app["run_get_data"], app["data_updater"], return_exceptions=True) + print("Background tasks shut down.") # Setup and start web application - app = web.Application(loop=loop) - setup_routes(app) - web.run_app(app, host="0.0.0.0", port=5000) + app = web.Application() + setup_routes(app, data) + app.on_startup.append(start_background_tasks) + app.on_shutdown.append(cleanup_background_tasks) + web.run_app(app, host="0.0.0.0", port=5500) diff --git a/examples/http_server_asyncio_rx.py b/examples/http_server_asyncio_rx.py index 1e14ed3..fe8ec97 100644 --- a/examples/http_server_asyncio_rx.py +++ b/examples/http_server_asyncio_rx.py @@ -11,45 +11,66 @@ aiohttp - pip install aiohttp """ -# pylint: disable=duplicate-code +import asyncio +from multiprocessing import Manager from aiohttp import web - from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive -all_data = {} + +tags: dict[str, str] = { + # "F4:A5:74:89:16:57": "kitchen", + # "CC:2C:6A:1E:59:3D": "bedroom", + # "BB:2C:6A:1E:59:3D": "livingroom", +} -async def get_all_data(_): - return web.json_response(all_data) +async def get_all_data(_, shared_data): + return web.json_response(dict(shared_data)) -async def get_data(request): +async def get_data(request, shared_data): mac = request.match_info.get("mac") - if mac not in all_data: + if mac not in shared_data: return web.json_response(status=404) - return web.json_response(all_data[mac]) - + return web.json_response(dict(shared_data[mac])) -# pylint: disable=redefined-outer-name -def setup_routes(app): - app.router.add_get("/data", get_all_data) - app.router.add_get("/data/{mac}", get_data) - -if __name__ == "__main__": - tags = {"F4:A5:74:89:16:57": "kitchen", "CC:2C:6A:1E:59:3D": "bedroom", "BB:2C:6A:1E:59:3D": "livingroom"} +async def run_get_data_background(known_tags, shared_data): + """ + Background process from RuuviTag Sensors + """ def handle_new_data(data): - global all_data # pylint: disable=global-variable-not-assigned - data[1]["name"] = tags[data[0]] - all_data[data[0]] = data[1] + mac, sensor_data = data + sensor_data["name"] = known_tags.get(mac, "unknown") + shared_data[data[0]] = sensor_data - ruuvi_rx = RuuviTagReactive(list(tags.keys())) + ruuvi_rx = RuuviTagReactive(list(known_tags.keys())) data_stream = ruuvi_rx.get_subject() data_stream.subscribe(handle_new_data) + +def setup_routes(application, shared_data): + application.router.add_get("/data", lambda request: get_all_data(request, shared_data)) + application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data)) + + +if __name__ == "__main__": + m = Manager() + data = m.dict() + + async def start_background_tasks(application): + application["run_get_data"] = asyncio.create_task(run_get_data_background(tags, data)) + + async def cleanup_background_tasks(application): + application["run_get_data"].cancel() + await asyncio.gather(app["run_get_data"], return_exceptions=True) + print("Background tasks shut down.") + # Setup and start web application app = web.Application() - setup_routes(app) - web.run_app(app, host="0.0.0.0", port=5000) + setup_routes(app, data) + app.on_startup.append(start_background_tasks) + app.on_shutdown.append(cleanup_background_tasks) + web.run_app(app, host="0.0.0.0", port=5500)