Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch history measurement data from internal memory #264

Merged
merged 22 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## Changelog

### [Unreleased]

* ADD: Support for fetching history data from RuuviTags

## [3.0.0] - 2025-01-13
* ADD: Install Bleak automatically on all platforms
Expand Down
68 changes: 67 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ RuuviTag Sensor Python Package
* Bleak supports
* [Async-methods](#usage)
* [Observable streams](#usage)
* [Fetch history data](#usage)
* [Install guide](#Bleak)
* Bluez (Linux-only)
* Bluez supports
Expand Down Expand Up @@ -61,7 +62,9 @@ Full installation guide for [Raspberry PI & Raspbian](https://github.com/ttu/ruu

## Usage

The package provides 3 ways to fetch data from sensors:
### Fetch broadcast data from RuuviTags

The package provides 3 ways to fetch broadcasted data from sensors:

1. Asynchronously with async/await
2. Synchronously with callback
Expand All @@ -73,6 +76,21 @@ RuuviTag sensors can be identified using MAC addresses. Methods return a tuple c
('D2:A3:6E:C8:E0:25', {'data_format': 5, 'humidity': 47.62, 'temperature': 23.58, 'pressure': 1023.68, 'acceleration': 993.2331045630729, 'acceleration_x': -48, 'acceleration_y': -12, 'acceleration_z': 992, 'tx_power': 4, 'battery': 2197, 'movement_counter': 0, 'measurement_sequence_number': 88, 'mac': 'd2a36ec8e025', 'rssi': -80})
```

### Fetch stored history data from RuuviTags internal memory

4. Fetch history data with async/await

Each history entry contains one measurement type (temperature, humidity, or pressure) with a Unix timestamp (integer). RuuviTag sends each measurement type as a separate entry.

```py
[
{'temperature': 22.22, 'humidity': None, 'pressure': None, 'timestamp': 1738476581}
{'temperature': None, 'humidity': 38.8, 'pressure': None, 'timestamp': 1738476581},
{'temperature': None, 'humidity': None, 'pressure': 35755.0, 'timestamp': 1738476581},
]
```


### 1. Get sensor data asynchronously with async/await

__NOTE:__ Asynchronous functionality works only with `Bleak`-adapter.
Expand Down Expand Up @@ -202,6 +220,54 @@ More [samples](https://github.com/ttu/ruuvitag-sensor/blob/master/examples/react

Check the official documentation of [ReactiveX](https://rxpy.readthedocs.io/en/latest/index.html) and the [list of operators](https://rxpy.readthedocs.io/en/latest/operators.html).

### 4. Fetch history data

__NOTE:__ History data functionality works only with `Bleak`-adapter.

RuuviTags with firmware version 3.30.0 or newer support retrieving historical measurements. The package provides two methods to access this data:

1. `get_history_async`: Stream history entries as they arrive
2. `download_history`: Download all history entries at once

Each history entry contains one measurement type (temperature, humidity, or pressure) with a Unix timestamp (integer). RuuviTag sends each measurement type as a separate entry.

Example history entry:
```py
{
'temperature': 22.22, # Only one measurement type per entry
'humidity': None,
'pressure': None,
'timestamp': 1738476581 # Unix timestamp (integer)
}
```

```py
import asyncio
from datetime import datetime, timedelta

from ruuvitag_sensor.ruuvi import RuuviTagSensor


async def main():
# Get history from the last 10 minutes
start_time = datetime.now() - timedelta(minutes=10)

# Stream entries as they arrive
async for entry in RuuviTagSensor.get_history_async(mac="AA:BB:CC:DD:EE:FF", start_time=start_time):
print(f"Time: {entry['timestamp']} - {entry}")

# Or download all entries at once
history = await RuuviTagSensor.download_history(mac="AA:BB:CC:DD:EE:FF", start_time=start_time)
for entry in history:
print(f"Time: {entry['timestamp']} - {entry}")


if __name__ == "__main__":
asyncio.run(main())
```

__NOTE:__ Due to the way macOS handles Bluetooth, methods uses UUIDs to identify RuuviTags instead of MAC addresses.

### Other helper methods

#### Get data for specified sensors for a specific duration
Expand Down
21 changes: 21 additions & 0 deletions examples/download_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import asyncio
from datetime import datetime, timedelta

import ruuvitag_sensor.log
from ruuvitag_sensor.ruuvi import RuuviTagSensor

# Enable debug logging to see the raw data
ruuvitag_sensor.log.enable_console(10)


async def main():
mac = "CA:F7:44:DE:EB:E1"
# On macOS, the device address is not a MAC address, but a system specific ID
# mac = "873A13F5-ED14-AEE1-E446-6ACF31649A1D"
start_time = datetime.now() - timedelta(minutes=10)
data = await RuuviTagSensor.download_history(mac, start_time=start_time)
print(data)


if __name__ == "__main__":
asyncio.run(main())
21 changes: 21 additions & 0 deletions examples/get_history_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import asyncio
from datetime import datetime, timedelta

import ruuvitag_sensor.log
from ruuvitag_sensor.ruuvi import RuuviTagSensor

# Enable debug logging to see the raw data
ruuvitag_sensor.log.enable_console(10)


async def main():
mac = "CA:F7:44:DE:EB:E1"
# On macOS, the device address is not a MAC address, but a system specific ID
# mac = "873A13F5-ED14-AEE1-E446-6ACF31649A1D"
start_time = datetime.now() - timedelta(minutes=60)
async for data in RuuviTagSensor.get_history_async(mac, start_time=start_time):
print(data)


if __name__ == "__main__":
asyncio.run(main())
144 changes: 142 additions & 2 deletions ruuvitag_sensor/adapters/bleak_ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
import os
import re
import sys
from typing import AsyncGenerator, List, Tuple
from datetime import datetime
from typing import AsyncGenerator, List, Optional, Tuple

from bleak import BleakScanner
from bleak import BleakClient, BleakGATTCharacteristic, BleakScanner
from bleak.backends.scanner import AdvertisementData, AdvertisementDataCallback, BLEDevice

from ruuvitag_sensor.adapters import BleCommunicationAsync
from ruuvitag_sensor.adapters.utils import rssi_to_hex
from ruuvitag_sensor.ruuvi_types import MacAndRawData, RawData

MAC_REGEX = "[0-9a-f]{2}([:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$"
RUUVI_HISTORY_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
RUUVI_HISTORY_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" # Write
RUUVI_HISTORY_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" # Read and notify


def _get_scanner(detection_callback: AdvertisementDataCallback, bt_device: str = ""):
Expand Down Expand Up @@ -120,3 +124,139 @@ async def get_first_data(mac: str, bt_device: str = "") -> RawData:
await data_iter.aclose()

return data or ""

async def get_history_data(
self, mac: str, start_time: Optional[datetime] = None, max_items: Optional[int] = None
) -> AsyncGenerator[bytearray, None]:
"""
Get history data from a RuuviTag using GATT connection.

Args:
mac (str): MAC address of the RuuviTag
start_time (datetime, optional): Start time for history data
max_items (int, optional): Maximum number of history entries to fetch

Yields:
bytearray: Raw history data entries

Raises:
RuntimeError: If connection fails or required services not found
"""
client = None
try:
log.debug("Connecting to device %s", mac)
client = await self._connect_gatt(mac)
log.debug("Connected to device %s", mac)

tx_char, rx_char = self._get_history_service_characteristics(client)

data_queue: asyncio.Queue[Optional[bytearray]] = asyncio.Queue()

def notification_handler(_, data: bytearray):
# Ignore heartbeat data that starts with 0x05
if data and data[0] == 0x05:
log.debug("Ignoring heartbeat data")
return
log.debug("Received data: %s", data)
# Check for end-of-logs marker (0x3A 0x3A 0x10 0xFF ...)
if len(data) >= 3 and all(b == 0xFF for b in data[3:]):
log.debug("Received end-of-logs marker")
data_queue.put_nowait(data)
data_queue.put_nowait(None)
return
# Check for error message. Header is 0xF0 (0x30 30 F0 FF FF FF FF FF FF FF FF)
if len(data) >= 11 and data[2] == 0xF0:
log.debug("Device reported error in log reading")
data_queue.put_nowait(data)
data_queue.put_nowait(None)
return
data_queue.put_nowait(data)

await client.start_notify(tx_char, notification_handler)

command = self._create_send_history_command(start_time)

log.debug("Sending command: %s", command)
await client.write_gatt_char(rx_char, command)
log.debug("Sent history command to device")

items_received = 0
while True:
try:
data = await asyncio.wait_for(data_queue.get(), timeout=10.0)
if data is None:
break
yield data
items_received += 1
if max_items and items_received >= max_items:
break
except asyncio.TimeoutError:
log.error("Timeout waiting for history data")
break

except Exception as e:
log.error("Failed to get history data from device %s: %r", mac, e)
raise
finally:
if client:
await client.disconnect()
log.debug("Disconnected from device %s", mac)

async def _connect_gatt(self, mac: str, max_retries: int = 3) -> BleakClient:
# Connect to a BLE device using GATT.
# NOTE: On macOS, the device address is not a MAC address, but a system specific ID
client = BleakClient(mac)

for attempt in range(max_retries):
try:
await client.connect()
return client
except Exception as e: # noqa: PERF203
if attempt == max_retries - 1:
raise
log.debug("Connection attempt %s failed: %s - Retrying...", attempt + 1, str(e))
await asyncio.sleep(1)

return client # Satisfy linter - this line will never be reached

def _get_history_service_characteristics(
self, client: BleakClient
) -> Tuple[BleakGATTCharacteristic, BleakGATTCharacteristic]:
# Get the history service
# https://docs.ruuvi.com/communication/bluetooth-connection/nordic-uart-service-nus
history_service = next(
(service for service in client.services if service.uuid.lower() == RUUVI_HISTORY_SERVICE_UUID.lower()),
None,
)
if not history_service:
raise RuntimeError("History service not found - device may not support history")

tx_char = history_service.get_characteristic(RUUVI_HISTORY_TX_CHAR_UUID)
rx_char = history_service.get_characteristic(RUUVI_HISTORY_RX_CHAR_UUID)

if not tx_char or not rx_char:
raise RuntimeError("Required characteristics not found")

return tx_char, rx_char

def _create_send_history_command(self, start_time):
end_time = int(datetime.now().timestamp())
start_time_to_use = int(start_time.timestamp()) if start_time else 0

command = bytearray(
[
0x3A,
0x3A,
0x11, # Header for temperature query
(end_time >> 24) & 0xFF, # End timestamp byte 1 (most significant)
(end_time >> 16) & 0xFF, # End timestamp byte 2
(end_time >> 8) & 0xFF, # End timestamp byte 3
end_time & 0xFF, # End timestamp byte 4
(start_time_to_use >> 24) & 0xFF, # Start timestamp byte 1 (most significant)
(start_time_to_use >> 16) & 0xFF, # Start timestamp byte 2
(start_time_to_use >> 8) & 0xFF, # Start timestamp byte 3
start_time_to_use & 0xFF, # Start timestamp byte 4
]
)

return command
Loading