Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for InfluxDB #2

Merged
merged 3 commits into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .env~
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# InfluxDB Params
INFLUXDB_V2_URL=http://127.0.0.1:8086
INFLUXDB_V2_ORG=your-org
INFLUXDB_V2_TOKEN=your token
# INFLUXDB_V2_TIMEOUT=
# INFLUXDB_V2_VERIFY_SSL=
# INFLUXDB_V2_SSL_CA_CERT=
# INFLUXDB_V2_CONNECTION_POOL_MAXSIZE=
# INFLUXDB_V2_AUTH_BASIC=
# INFLUXDB_V2_PROFILERS=
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ dist/
htmlcov/
.pytest_cache/
.tox/
.coverage*
.coverage*
.env
2 changes: 1 addition & 1 deletion pyhydroquebec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def main():
elif args.dump_data:
pprint(results[0].__dict__)
elif args.influxdb:
output_influx(results[0])
output_influx(results[0], args.hourly)
elif args.json or args.detailled_energy:
output_json(results[0], args.hourly)
else:
Expand Down
158 changes: 158 additions & 0 deletions pyhydroquebec/influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
try:
from influxdb_client import InfluxDBClient, Point, WriteOptions

except ImportError:
raise ImportError("Install influxdb to use this feature")

import pytz
import os, time
from datetime import datetime
from .consts import DAILY_MAP, CURRENT_MAP, ANNUAL_MAP

try:
# Load parameters from environment variables
if os.path.isfile("{}/.env".format(os.getcwd())):
from dotenv import load_dotenv
load_dotenv(os.path.join(os.getcwd(), ".env"))
except ImportError:
print("You need to pip install python-dotenv to use your .env file")

class InfluxDB:
"""
Connection to InfluxDB to write to DB
"""

url = None
port = 8086
token = None
org = None
bucket = None
tags_file = None
username = None
password = None
client = None

def __init__(self, params):
# params should be a dict with name=InfluxDB and bucket=valid_bucket_to_use.
# optional params : url, port, token and org may be provided
# if they are not provided, we will try to get them from environment
# variables that shoudl have been provided in a .env file.
for k, v in params.items():
setattr(self, k, v)
if self.bucket is None:
raise ValueError("Missing bucket name, please provide one in db_params")
self.connect_to_db()

self.write_api = self.client.write_api(
write_options=WriteOptions(
batch_size=getattr(self, "batch_size", 25),
flush_interval=getattr(self, "flush_interval", 10_000),
jitter_interval=getattr(self, "jitter_interval", 2_000),
retry_interval=getattr(self, "retry_interval", 5_000),
max_retries=getattr(self, "max_retries", 5),
max_retry_delay=getattr(self, "max_retry_delay", 30_000),
exponential_base=getattr(self, "exponential_base", 2),
)
)
self.query_api = self.client.query_api()
print(self.health)

def connect_to_db(self):
if self.url is None:
# Will try environment variables
self.client = InfluxDBClient.from_env_properties()
else:
_url = "{}:{}".format(self.url, self.port)
if self.token:
self.client = InfluxDBClient(url=_url, token=self.token)
else:
self.client = InfluxDBClient(
url=_url,
token="{}:{}".format(self.username, self.password),
bucket=self.bucket,
org="-",
)
try:
self.health
except:
raise ConnectionError("Error connecting to InfluxDB")

@property
def health(self):
return self.client.health()

def write_data_to_db(self, customer, show_hourly=False):
_data = []

# account
account_id = customer.account_id
customer_id = customer.customer_id
contract_id = customer.contract_id
id_prefix = "{}|{}|{}".format(account_id, customer_id, contract_id)
balance = customer.balance

yesterday_date = list(customer.current_daily_data.keys())[0]
data = {'date': yesterday_date}
data.update(customer.current_daily_data[yesterday_date])

# yesterday
_points = []
for k, v in CURRENT_MAP.items():
_id = "{}|{}".format(id_prefix, k)
_point = (
Point(_id)
.field("value", customer.current_period[k])
.tag("category", "current_period")
.tag("account_id", account_id)
.tag("customer_id", customer_id)
.tag("contract_id", contract_id)
.tag("unit", v['unit'])
.tag('name', v['raw_name'])
.time(datetime.fromisoformat(data['date']).astimezone(pytz.UTC))
)
_points.append(_point)

# annual
_id = "{}|{}".format(id_prefix, 'annual_stats')
_point = (
Point(_id)
.field("annual_kwh_price_cent", customer.current_annual_data['annual_kwh_price_cent'])
.field("annual_mean_daily_bill", customer.current_annual_data['annual_mean_daily_bill'])
.field("annual_total_consumption", customer.current_annual_data['annual_mean_daily_bill'])
.field("annual_mean_daily_consumption", customer.current_annual_data['annual_mean_daily_consumption'])
.tag("category", "annual")
.tag("unit", "")
.tag("date_start", customer.current_annual_data['annual_date_start'])
.tag("date_end", customer.current_annual_data['annual_date_end'])
.tag('name', 'Annual_Stats')
.tag("account_id", account_id)
.tag("customer_id", customer_id)
.tag("contract_id", contract_id)
.time(datetime.fromisoformat(data['date']).astimezone(pytz.UTC))
)
_points.append(_point)

if show_hourly:
# hourly
for hour, _data in customer.hourly_data[yesterday_date]["hours"].items():
for k, v in DAILY_MAP.items():
_id = "{}|{}".format(id_prefix, k)
_point = (
Point(_id)
.field("value", _data[k])
.tag("category", "hourly")
.tag("unit", v['unit'])
.tag('name', v['raw_name'])
.tag("account_id", account_id)
.tag("customer_id", customer_id)
.tag("contract_id", contract_id)
.time(datetime.fromisoformat("{}T{:02d}:00".format(data['date'], hour)).astimezone(pytz.UTC))
)
_points.append(_point)
self.write_api.write(self.bucket, self.org, _points)

# Give time to the scheduler so it can write to InfluxDB
for each in range(0,100):
time.sleep(0.1)
class ConnectionError(Exception):
pass
47 changes: 10 additions & 37 deletions pyhydroquebec/outputter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pyhydroquebec.consts import (OVERVIEW_TPL,
CONSUMPTION_PROFILE_TPL,
YESTERDAY_TPL, ANNUAL_TPL, HOURLY_HEADER, HOURLY_TPL)

from pyhydroquebec.influxdb import InfluxDB

def output_text(customer, show_hourly=False):
"""Format data to get a readable output."""
Expand All @@ -29,43 +29,16 @@ def output_text(customer, show_hourly=False):
print(HOURLY_TPL.format(d=data, hour=hour))


def output_influx(contract):
def output_influx(customer, show_hourly=False):
"""Print data using influxDB format."""
raise Exception("FIXME")
# # Pop yesterdays data
# yesterday_data = contract]['yesterday_hourly_consumption']
# del data[contract]['yesterday_hourly_consumption']
#
# # Print general data
# out = "pyhydroquebec,contract=" + contract + " "
#
# for index, key in enumerate(data[contract]):
# if index != 0:
# out = out + ","
# if key in ("annual_date_start", "annual_date_end"):
# out += key + "=\"" + str(data[contract][key]) + "\""
# else:
# out += key + "=" + str(data[contract][key])
#
# out += " " + str(int(datetime.datetime.now(HQ_TIMEZONE).timestamp() * 1000000000))
# print(out)
#
# # Print yesterday values
# yesterday = datetime.datetime.now(HQ_TIMEZONE) - datetime.timedelta(days=1)
# yesterday = yesterday.replace(minute=0, hour=0, second=0, microsecond=0)
#
# for hour in yesterday_data:
# msg = "pyhydroquebec,contract={} {} {}"
#
# data = ",".join(["{}={}".format(key, value) for key, value in hour.items()
# if key != 'hour'])
#
# datatime = datetime.datetime.strptime(hour['hour'], '%H:%M:%S')
# yesterday = yesterday.replace(hour=datatime.hour)
# yesterday_str = str(int(yesterday.timestamp() * 1000000000))
#
# print(msg.format(contract, data, yesterday_str))

_params = {"name": "InfluxDB",
"bucket" : "HydroQuebec",
#"batch_size" : 100,
}
db = InfluxDB(_params)
db.write_data_to_db(customer, show_hourly=show_hourly)
print("Sent this to InfluxDB")
output_text(customer, show_hourly=show_hourly)

def output_json(customer, show_hourly=False):
"""Print data as a json."""
Expand Down