Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add typing to asyncio http server examples #260

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions examples/http_server_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,30 @@
import asyncio
from datetime import datetime
from multiprocessing import Manager
from multiprocessing.managers import DictProxy
from queue import Queue

from aiohttp import web

from ruuvitag_sensor.ruuvi import RuuviTagSensor


tags: dict[str, str] = {
# "F4:A5:74:89:16:57": "kitchen",
# "CC:2C:6A:1E:59:3D": "bedroom",
# "BB:2C:6A:1E:59:3D": "livingroom",
}


async def run_get_data_background(macs_to_fetch, queue):
async def run_get_data_background(macs_to_fetch: list[str], queue: Queue):
"""
Background process from RuuviTag Sensors
"""
async for sensor_data in RuuviTagSensor.get_data_async(macs_to_fetch):
sensor_data[1]["time"] = str(datetime.now())
sensor_data[1]["time"] = str(datetime.now()) # type: ignore
queue.put(sensor_data)


async def data_update(queue, shared_data):
async def data_update(queue: Queue, shared_data: DictProxy):
"""
Update data sent by the background process to global all_data variable
"""
Expand All @@ -48,34 +49,34 @@ async def data_update(queue, shared_data):
await asyncio.sleep(0.5)


async def get_all_data(_, shared_data):
async def get_all_data(_: web.Request, shared_data: DictProxy):
return web.json_response(dict(shared_data))


async def get_data(request, shared_data):
async def get_data(request: web.Request, shared_data: DictProxy):
mac = request.match_info.get("mac")
if mac not in shared_data:
return web.json_response(status=404)
return web.json_response(dict(shared_data[mac]))


def setup_routes(application, shared_data):
def setup_routes(application: web.Application, shared_data: DictProxy):
application.router.add_get("/data", lambda request: get_all_data(request, shared_data))
application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data))


if __name__ == "__main__":
m = Manager()
data = m.dict()
q = m.Queue()
data: DictProxy = m.dict()
q: Queue = m.Queue()

macs = list(tags.keys())

async def start_background_tasks(application):
async def start_background_tasks(application: web.Application):
application["run_get_data"] = asyncio.create_task(run_get_data_background(macs, q))
application["data_updater"] = asyncio.create_task(data_update(q, data))

async def cleanup_background_tasks(application):
async def cleanup_background_tasks(application: web.Application):
application["run_get_data"].cancel()
application["data_updater"].cancel()
await asyncio.gather(app["run_get_data"], app["data_updater"], return_exceptions=True)
Expand Down
22 changes: 12 additions & 10 deletions examples/http_server_asyncio_rx.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

import asyncio
from multiprocessing import Manager
from multiprocessing.managers import DictProxy

from aiohttp import web
from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive

from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive
from ruuvitag_sensor.ruuvi_types import MacAndSensorData

tags: dict[str, str] = {
# "F4:A5:74:89:16:57": "kitchen",
Expand All @@ -25,45 +27,45 @@
}


async def get_all_data(_, shared_data):
async def get_all_data(_: web.Request, shared_data: DictProxy):
return web.json_response(dict(shared_data))


async def get_data(request, shared_data):
async def get_data(request: web.Request, shared_data: DictProxy):
mac = request.match_info.get("mac")
if mac not in shared_data:
return web.json_response(status=404)
return web.json_response(dict(shared_data[mac]))


async def run_get_data_background(known_tags, shared_data):
async def run_get_data_background(known_tags: dict[str, str], shared_data: DictProxy):
"""
Background process from RuuviTag Sensors
"""

def handle_new_data(data):
def handle_new_data(data: MacAndSensorData):
mac, sensor_data = data
sensor_data["name"] = known_tags.get(mac, "unknown")
sensor_data["name"] = known_tags.get(mac, "unknown") # type: ignore
shared_data[data[0]] = sensor_data

ruuvi_rx = RuuviTagReactive(list(known_tags.keys()))
data_stream = ruuvi_rx.get_subject()
data_stream.subscribe(handle_new_data)


def setup_routes(application, shared_data):
def setup_routes(application: web.Application, shared_data: DictProxy):
application.router.add_get("/data", lambda request: get_all_data(request, shared_data))
application.router.add_get("/data/{mac}", lambda request: get_data(request, shared_data))


if __name__ == "__main__":
m = Manager()
data = m.dict()
data: DictProxy = m.dict()

async def start_background_tasks(application):
async def start_background_tasks(application: web.Application):
application["run_get_data"] = asyncio.create_task(run_get_data_background(tags, data))

async def cleanup_background_tasks(application):
async def cleanup_background_tasks(application: web.Application):
application["run_get_data"].cancel()
await asyncio.gather(app["run_get_data"], return_exceptions=True)
print("Background tasks shut down.")
Expand Down