Skip to content

Commit

Permalink
docs: add typing to asyncio http server examples (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
ttu authored Nov 22, 2024
1 parent 5f049ec commit 2ff65c2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
23 changes: 12 additions & 11 deletions examples/http_server_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,30 @@
import asyncio
from datetime import datetime
from multiprocessing import Manager
from multiprocessing.managers import DictProxy
from queue import Queue

from aiohttp import web

from ruuvitag_sensor.ruuvi import RuuviTagSensor


tags: dict[str, str] = {
# "F4:A5:74:89:16:57": "kitchen",
# "CC:2C:6A:1E:59:3D": "bedroom",
# "BB:2C:6A:1E:59:3D": "livingroom",
}


async def run_get_data_background(macs_to_fetch, queue):
async def run_get_data_background(macs_to_fetch: list[str], queue: Queue):
"""
Background process from RuuviTag Sensors
"""
async for sensor_data in RuuviTagSensor.get_data_async(macs_to_fetch):
sensor_data[1]["time"] = str(datetime.now())
sensor_data[1]["time"] = str(datetime.now()) # type: ignore
queue.put(sensor_data)


async def data_update(queue, shared_data):
async def data_update(queue: Queue, shared_data: DictProxy):
"""
Update data sent by the background process to global all_data variable
"""
Expand All @@ -48,34 +49,34 @@ async def data_update(queue, shared_data):
await asyncio.sleep(0.5)


async def get_all_data(_, shared_data):
async def get_all_data(_: web.Request, shared_data: DictProxy):
return web.json_response(dict(shared_data))


async def get_data(request, shared_data):
async def get_data(request: web.Request, shared_data: DictProxy):
mac = request.match_info.get("mac")
if mac not in shared_data:
return web.json_response(status=404)
return web.json_response(dict(shared_data[mac]))


def setup_routes(application, shared_data):
def setup_routes(application: web.Application, shared_data: DictProxy):
application.router.add_get("/data", lambda request: get_all_data(request, shared_data))
application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data))


if __name__ == "__main__":
m = Manager()
data = m.dict()
q = m.Queue()
data: DictProxy = m.dict()
q: Queue = m.Queue()

macs = list(tags.keys())

async def start_background_tasks(application):
async def start_background_tasks(application: web.Application):
application["run_get_data"] = asyncio.create_task(run_get_data_background(macs, q))
application["data_updater"] = asyncio.create_task(data_update(q, data))

async def cleanup_background_tasks(application):
async def cleanup_background_tasks(application: web.Application):
application["run_get_data"].cancel()
application["data_updater"].cancel()
await asyncio.gather(app["run_get_data"], app["data_updater"], return_exceptions=True)
Expand Down
22 changes: 12 additions & 10 deletions examples/http_server_asyncio_rx.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

import asyncio
from multiprocessing import Manager
from multiprocessing.managers import DictProxy

from aiohttp import web
from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive

from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive
from ruuvitag_sensor.ruuvi_types import MacAndSensorData

tags: dict[str, str] = {
# "F4:A5:74:89:16:57": "kitchen",
Expand All @@ -25,45 +27,45 @@
}


async def get_all_data(_, shared_data):
async def get_all_data(_: web.Request, shared_data: DictProxy):
return web.json_response(dict(shared_data))


async def get_data(request, shared_data):
async def get_data(request: web.Request, shared_data: DictProxy):
mac = request.match_info.get("mac")
if mac not in shared_data:
return web.json_response(status=404)
return web.json_response(dict(shared_data[mac]))


async def run_get_data_background(known_tags, shared_data):
async def run_get_data_background(known_tags: dict[str, str], shared_data: DictProxy):
"""
Background process from RuuviTag Sensors
"""

def handle_new_data(data):
def handle_new_data(data: MacAndSensorData):
mac, sensor_data = data
sensor_data["name"] = known_tags.get(mac, "unknown")
sensor_data["name"] = known_tags.get(mac, "unknown") # type: ignore
shared_data[data[0]] = sensor_data

ruuvi_rx = RuuviTagReactive(list(known_tags.keys()))
data_stream = ruuvi_rx.get_subject()
data_stream.subscribe(handle_new_data)


def setup_routes(application, shared_data):
def setup_routes(application: web.Application, shared_data: DictProxy):
application.router.add_get("/data", lambda request: get_all_data(request, shared_data))
application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data))


if __name__ == "__main__":
m = Manager()
data = m.dict()
data: DictProxy = m.dict()

async def start_background_tasks(application):
async def start_background_tasks(application: web.Application):
application["run_get_data"] = asyncio.create_task(run_get_data_background(tags, data))

async def cleanup_background_tasks(application):
async def cleanup_background_tasks(application: web.Application):
application["run_get_data"].cancel()
await asyncio.gather(app["run_get_data"], return_exceptions=True)
print("Background tasks shut down.")
Expand Down

0 comments on commit 2ff65c2

Please sign in to comment.