-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtemp_server.py
70 lines (52 loc) · 1.87 KB
/
temp_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import sys
sys.path.append("")
from micropython import const
from esp32 import raw_temperature, hall_sensor
import uasyncio as asyncio
import aioble
import bluetooth
import random
import struct
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
# How frequently to send advertising beacons.
_ADV_INTERVAL_MS = 250_000
# Register GATT server.
temp_service = aioble.Service(_ENV_SENSE_UUID)
temp_characteristic = aioble.Characteristic(
temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True
)
aioble.register_services(temp_service)
# Helper to encode the temperature characteristic encoding (sint16, hundredths of a degree).
def _encode_temperature(temp_deg_c):
return struct.pack("<h", int(temp_deg_c * 100))
# This would be periodically polling a hardware sensor.
async def sensor_task():
t = (raw_temperature()-32.0)/1.8
while True:
temp_characteristic.write(_encode_temperature(t))
t = (raw_temperature()-32.0)/1.8
#t += random.uniform(-0.5, 0.5)
await asyncio.sleep_ms(1000)
# Serially wait for connections. Don't advertise while a central is
# connected.
async def peripheral_task():
while True:
async with await aioble.advertise(
_ADV_INTERVAL_MS,
name="ESP32_BLE_TEMP",
services=[_ENV_SENSE_UUID],
appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
) as connection:
print("Connection from", connection.device)
await connection.disconnected()
# Run both tasks.
async def main():
t1 = asyncio.create_task(sensor_task())
t2 = asyncio.create_task(peripheral_task())
await asyncio.gather(t1, t2)
asyncio.run(main())