diff --git a/examples/http_server_asyncio.py b/examples/http_server_asyncio.py index a62899b..b253044 100644 --- a/examples/http_server_asyncio.py +++ b/examples/http_server_asyncio.py @@ -14,12 +14,13 @@ import asyncio from datetime import datetime from multiprocessing import Manager +from multiprocessing.managers import DictProxy +from queue import Queue from aiohttp import web from ruuvitag_sensor.ruuvi import RuuviTagSensor - tags: dict[str, str] = { # "F4:A5:74:89:16:57": "kitchen", # "CC:2C:6A:1E:59:3D": "bedroom", @@ -27,16 +28,16 @@ } -async def run_get_data_background(macs_to_fetch, queue): +async def run_get_data_background(macs_to_fetch: list[str], queue: Queue): """ Background process from RuuviTag Sensors """ async for sensor_data in RuuviTagSensor.get_data_async(macs_to_fetch): - sensor_data[1]["time"] = str(datetime.now()) + sensor_data[1]["time"] = str(datetime.now()) # type: ignore queue.put(sensor_data) -async def data_update(queue, shared_data): +async def data_update(queue: Queue, shared_data: DictProxy): """ Update data sent by the background process to global all_data variable """ @@ -48,34 +49,34 @@ async def data_update(queue, shared_data): await asyncio.sleep(0.5) -async def get_all_data(_, shared_data): +async def get_all_data(_: web.Request, shared_data: DictProxy): return web.json_response(dict(shared_data)) -async def get_data(request, shared_data): +async def get_data(request: web.Request, shared_data: DictProxy): mac = request.match_info.get("mac") if mac not in shared_data: return web.json_response(status=404) return web.json_response(dict(shared_data[mac])) -def setup_routes(application, shared_data): +def setup_routes(application: web.Application, shared_data: DictProxy): application.router.add_get("/data", lambda request: get_all_data(request, shared_data)) application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data)) if __name__ == "__main__": m = Manager() - data = m.dict() - q = m.Queue() + data: DictProxy = m.dict() + q: Queue = m.Queue() macs = list(tags.keys()) - async def start_background_tasks(application): + async def start_background_tasks(application: web.Application): application["run_get_data"] = asyncio.create_task(run_get_data_background(macs, q)) application["data_updater"] = asyncio.create_task(data_update(q, data)) - async def cleanup_background_tasks(application): + async def cleanup_background_tasks(application: web.Application): application["run_get_data"].cancel() application["data_updater"].cancel() await asyncio.gather(app["run_get_data"], app["data_updater"], return_exceptions=True) diff --git a/examples/http_server_asyncio_rx.py b/examples/http_server_asyncio_rx.py index fe8ec97..241eea9 100644 --- a/examples/http_server_asyncio_rx.py +++ b/examples/http_server_asyncio_rx.py @@ -13,10 +13,12 @@ import asyncio from multiprocessing import Manager +from multiprocessing.managers import DictProxy from aiohttp import web -from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive +from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive +from ruuvitag_sensor.ruuvi_types import MacAndSensorData tags: dict[str, str] = { # "F4:A5:74:89:16:57": "kitchen", @@ -25,25 +27,25 @@ } -async def get_all_data(_, shared_data): +async def get_all_data(_: web.Request, shared_data: DictProxy): return web.json_response(dict(shared_data)) -async def get_data(request, shared_data): +async def get_data(request: web.Request, shared_data: DictProxy): mac = request.match_info.get("mac") if mac not in shared_data: return web.json_response(status=404) return web.json_response(dict(shared_data[mac])) -async def run_get_data_background(known_tags, shared_data): +async def run_get_data_background(known_tags: dict[str, str], shared_data: DictProxy): """ Background process from RuuviTag Sensors """ - def handle_new_data(data): + def handle_new_data(data: MacAndSensorData): mac, sensor_data = data - sensor_data["name"] = known_tags.get(mac, "unknown") + sensor_data["name"] = known_tags.get(mac, "unknown") # type: ignore shared_data[data[0]] = sensor_data ruuvi_rx = RuuviTagReactive(list(known_tags.keys())) @@ -51,19 +53,19 @@ def handle_new_data(data): data_stream.subscribe(handle_new_data) -def setup_routes(application, shared_data): +def setup_routes(application: web.Application, shared_data: DictProxy): application.router.add_get("/data", lambda request: get_all_data(request, shared_data)) application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data)) if __name__ == "__main__": m = Manager() - data = m.dict() + data: DictProxy = m.dict() - async def start_background_tasks(application): + async def start_background_tasks(application: web.Application): application["run_get_data"] = asyncio.create_task(run_get_data_background(tags, data)) - async def cleanup_background_tasks(application): + async def cleanup_background_tasks(application: web.Application): application["run_get_data"].cancel() await asyncio.gather(app["run_get_data"], return_exceptions=True) print("Background tasks shut down.")