Skip to content

Commit

Permalink
Merge pull request #2 from ChristianTremblay/master
Browse files Browse the repository at this point in the history
Added support for InfluxDB
  • Loading branch information
jpjodoin authored Feb 26, 2022
2 parents 059faaf + 5224f43 commit 7448335
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 39 deletions.
10 changes: 10 additions & 0 deletions .env~
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# InfluxDB Params
INFLUXDB_V2_URL=http://127.0.0.1:8086
INFLUXDB_V2_ORG=your-org
INFLUXDB_V2_TOKEN=your token
# INFLUXDB_V2_TIMEOUT=
# INFLUXDB_V2_VERIFY_SSL=
# INFLUXDB_V2_SSL_CA_CERT=
# INFLUXDB_V2_CONNECTION_POOL_MAXSIZE=
# INFLUXDB_V2_AUTH_BASIC=
# INFLUXDB_V2_PROFILERS=
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ dist/
htmlcov/
.pytest_cache/
.tox/
.coverage*
.coverage*
.env
2 changes: 1 addition & 1 deletion pyhydroquebec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def main():
elif args.dump_data:
pprint(results[0].__dict__)
elif args.influxdb:
output_influx(results[0])
output_influx(results[0], args.hourly)
elif args.json or args.detailled_energy:
output_json(results[0], args.hourly)
else:
Expand Down
158 changes: 158 additions & 0 deletions pyhydroquebec/influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
try:
from influxdb_client import InfluxDBClient, Point, WriteOptions

except ImportError:
raise ImportError("Install influxdb to use this feature")

import pytz
import os, time
from datetime import datetime
from .consts import DAILY_MAP, CURRENT_MAP, ANNUAL_MAP

try:
# Load parameters from environment variables
if os.path.isfile("{}/.env".format(os.getcwd())):
from dotenv import load_dotenv
load_dotenv(os.path.join(os.getcwd(), ".env"))
except ImportError:
print("You need to pip install python-dotenv to use your .env file")

class InfluxDB:
"""
Connection to InfluxDB to write to DB
"""

url = None
port = 8086
token = None
org = None
bucket = None
tags_file = None
username = None
password = None
client = None

def __init__(self, params):
# params should be a dict with name=InfluxDB and bucket=valid_bucket_to_use.
# optional params : url, port, token and org may be provided
# if they are not provided, we will try to get them from environment
# variables that shoudl have been provided in a .env file.
for k, v in params.items():
setattr(self, k, v)
if self.bucket is None:
raise ValueError("Missing bucket name, please provide one in db_params")
self.connect_to_db()

self.write_api = self.client.write_api(
write_options=WriteOptions(
batch_size=getattr(self, "batch_size", 25),
flush_interval=getattr(self, "flush_interval", 10_000),
jitter_interval=getattr(self, "jitter_interval", 2_000),
retry_interval=getattr(self, "retry_interval", 5_000),
max_retries=getattr(self, "max_retries", 5),
max_retry_delay=getattr(self, "max_retry_delay", 30_000),
exponential_base=getattr(self, "exponential_base", 2),
)
)
self.query_api = self.client.query_api()
print(self.health)

def connect_to_db(self):
if self.url is None:
# Will try environment variables
self.client = InfluxDBClient.from_env_properties()
else:
_url = "{}:{}".format(self.url, self.port)
if self.token:
self.client = InfluxDBClient(url=_url, token=self.token)
else:
self.client = InfluxDBClient(
url=_url,
token="{}:{}".format(self.username, self.password),
bucket=self.bucket,
org="-",
)
try:
self.health
except:
raise ConnectionError("Error connecting to InfluxDB")

@property
def health(self):
return self.client.health()

def write_data_to_db(self, customer, show_hourly=False):
_data = []

# account
account_id = customer.account_id
customer_id = customer.customer_id
contract_id = customer.contract_id
id_prefix = "{}|{}|{}".format(account_id, customer_id, contract_id)
balance = customer.balance

yesterday_date = list(customer.current_daily_data.keys())[0]
data = {'date': yesterday_date}
data.update(customer.current_daily_data[yesterday_date])

# yesterday
_points = []
for k, v in CURRENT_MAP.items():
_id = "{}|{}".format(id_prefix, k)
_point = (
Point(_id)
.field("value", customer.current_period[k])
.tag("category", "current_period")
.tag("account_id", account_id)
.tag("customer_id", customer_id)
.tag("contract_id", contract_id)
.tag("unit", v['unit'])
.tag('name', v['raw_name'])
.time(datetime.fromisoformat(data['date']).astimezone(pytz.UTC))
)
_points.append(_point)

# annual
_id = "{}|{}".format(id_prefix, 'annual_stats')
_point = (
Point(_id)
.field("annual_kwh_price_cent", customer.current_annual_data['annual_kwh_price_cent'])
.field("annual_mean_daily_bill", customer.current_annual_data['annual_mean_daily_bill'])
.field("annual_total_consumption", customer.current_annual_data['annual_mean_daily_bill'])
.field("annual_mean_daily_consumption", customer.current_annual_data['annual_mean_daily_consumption'])
.tag("category", "annual")
.tag("unit", "")
.tag("date_start", customer.current_annual_data['annual_date_start'])
.tag("date_end", customer.current_annual_data['annual_date_end'])
.tag('name', 'Annual_Stats')
.tag("account_id", account_id)
.tag("customer_id", customer_id)
.tag("contract_id", contract_id)
.time(datetime.fromisoformat(data['date']).astimezone(pytz.UTC))
)
_points.append(_point)

if show_hourly:
# hourly
for hour, _data in customer.hourly_data[yesterday_date]["hours"].items():
for k, v in DAILY_MAP.items():
_id = "{}|{}".format(id_prefix, k)
_point = (
Point(_id)
.field("value", _data[k])
.tag("category", "hourly")
.tag("unit", v['unit'])
.tag('name', v['raw_name'])
.tag("account_id", account_id)
.tag("customer_id", customer_id)
.tag("contract_id", contract_id)
.time(datetime.fromisoformat("{}T{:02d}:00".format(data['date'], hour)).astimezone(pytz.UTC))
)
_points.append(_point)
self.write_api.write(self.bucket, self.org, _points)

# Give time to the scheduler so it can write to InfluxDB
for each in range(0,100):
time.sleep(0.1)
class ConnectionError(Exception):
pass
47 changes: 10 additions & 37 deletions pyhydroquebec/outputter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pyhydroquebec.consts import (OVERVIEW_TPL, COMMON_TPL,
CONSUMPTION_PROFILE_TPL,
YESTERDAY_TPL, ANNUAL_TPL, HOURLY_HEADER, HOURLY_TPL)

from pyhydroquebec.influxdb import InfluxDB

def output_text(customer, show_hourly=False):
"""Format data to get a readable output."""
Expand All @@ -30,43 +30,16 @@ def output_text(customer, show_hourly=False):
print(HOURLY_TPL.format(d=data, hour=hour))


def output_influx(contract):
def output_influx(customer, show_hourly=False):
"""Print data using influxDB format."""
raise Exception("FIXME")
# # Pop yesterdays data
# yesterday_data = contract]['yesterday_hourly_consumption']
# del data[contract]['yesterday_hourly_consumption']
#
# # Print general data
# out = "pyhydroquebec,contract=" + contract + " "
#
# for index, key in enumerate(data[contract]):
# if index != 0:
# out = out + ","
# if key in ("annual_date_start", "annual_date_end"):
# out += key + "=\"" + str(data[contract][key]) + "\""
# else:
# out += key + "=" + str(data[contract][key])
#
# out += " " + str(int(datetime.datetime.now(HQ_TIMEZONE).timestamp() * 1000000000))
# print(out)
#
# # Print yesterday values
# yesterday = datetime.datetime.now(HQ_TIMEZONE) - datetime.timedelta(days=1)
# yesterday = yesterday.replace(minute=0, hour=0, second=0, microsecond=0)
#
# for hour in yesterday_data:
# msg = "pyhydroquebec,contract={} {} {}"
#
# data = ",".join(["{}={}".format(key, value) for key, value in hour.items()
# if key != 'hour'])
#
# datatime = datetime.datetime.strptime(hour['hour'], '%H:%M:%S')
# yesterday = yesterday.replace(hour=datatime.hour)
# yesterday_str = str(int(yesterday.timestamp() * 1000000000))
#
# print(msg.format(contract, data, yesterday_str))

_params = {"name": "InfluxDB",
"bucket" : "HydroQuebec",
#"batch_size" : 100,
}
db = InfluxDB(_params)
db.write_data_to_db(customer, show_hourly=show_hourly)
print("Sent this to InfluxDB")
output_text(customer, show_hourly=show_hourly)

def output_json(customer, show_hourly=False):
"""Print data as a json."""
Expand Down

0 comments on commit 7448335

Please sign in to comment.