-
Notifications
You must be signed in to change notification settings - Fork 43
/
example.py
42 lines (37 loc) · 1.69 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import logging
import configparser
import os
import sys
from renogybt import InverterClient, RoverClient, RoverHistoryClient, BatteryClient, DataLogger, Utils
logging.basicConfig(level=logging.INFO)
config_file = sys.argv[1] if len(sys.argv) > 1 else 'config.ini'
config_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), config_file)
config = configparser.ConfigParser(inline_comment_prefixes=('#'))
config.read(config_path)
data_logger: DataLogger = DataLogger(config)
# the callback func when you receive data
def on_data_received(client, data):
filtered_data = Utils.filter_fields(data, config['data']['fields'])
logging.info(f"{client.ble_manager.device.name} => {filtered_data}")
if config['remote_logging'].getboolean('enabled'):
data_logger.log_remote(json_data=filtered_data)
if config['mqtt'].getboolean('enabled'):
data_logger.log_mqtt(json_data=filtered_data)
if config['pvoutput'].getboolean('enabled') and config['device']['type'] == 'RNG_CTRL':
data_logger.log_pvoutput(json_data=filtered_data)
if not config['data'].getboolean('enable_polling'):
client.stop()
# error callback
def on_error(client, error):
logging.error(f"on_error: {error}")
# start client
if config['device']['type'] == 'RNG_CTRL':
RoverClient(config, on_data_received, on_error).start()
elif config['device']['type'] == 'RNG_CTRL_HIST':
RoverHistoryClient(config, on_data_received, on_error).start()
elif config['device']['type'] == 'RNG_BATT':
BatteryClient(config, on_data_received, on_error).start()
elif config['device']['type'] == 'RNG_INVT':
InverterClient(config, on_data_received, on_error).start()
else:
logging.error("unknown device type")