-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogic.py
71 lines (62 loc) · 2.49 KB
/
logic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from services import hydroponics_metrics_collector
class MetricsCollector:
def __init__(self, sensor_id, api_key, sensors=[]):
self.sensor_id = sensor_id
self.api_key = api_key
self.sensors = sensors
async def collect_metrics(self):
try:
responses = {}
for sensor in self.sensors:
responses.update(sensor.collect_metric())
metrics = parse_responses_to_request(responses, self.sensor_id)
hydroponics_metrics_collector.send_metrics(metrics, self.api_key)
except Exception as ex:
print("failed to acquire data", ex)
def parse_responses_to_request(responses, serial_id):
return hydroponics_metrics_collector.MetricsRequest(
responses['ph_value'],
responses['tds_value'],
responses['temperature'],
responses['humidity'],
responses['water_temperature'],
serial_id)
class Calibration:
def __init__(self, ph_sensor, tds_sensor):
self.ph_buffer_voltage = {}
self.tds_buffer_voltage = {}
self.ph_sensor = ph_sensor
self.tds_sensor = tds_sensor
def read_ph(self, buffer):
ph_voltages = self.ph_sensor.collect_ph_voltages(1)
avg_ph_voltage = sum(ph_voltages) / len(ph_voltages)
ph_voltage = (avg_ph_voltage * 5 / 65535)
self.ph_buffer_voltage[float(buffer)] = ph_voltage
return self.ph_buffer_voltage[float(buffer)]
def calibrate_ph(self):
buffers = list(self.ph_buffer_voltage.keys())
buffers.sort()
buffers.reverse()
sorted_ph_buffer_voltage = {i: self.ph_buffer_voltage[i] for i in buffers}
numerator = 0
denominator = 0
first_buffer = 0
for buffer in sorted_ph_buffer_voltage:
voltage = self.ph_buffer_voltage[buffer]
if first_buffer == 0:
voltage = self.ph_buffer_voltage[buffer]
numerator = buffer
denominator = voltage
first_buffer = buffer
numerator -= buffer
denominator -= voltage
m = numerator/denominator
b = ((m*sorted_ph_buffer_voltage[first_buffer])-first_buffer)*-1
self.ph_sensor.m = m
self.ph_sensor.b = b
return b, m
def calibrate_tds(self, buffer):
self.tds_sensor.transistor.on()
self.tds_sensor.collect_metric()
self.tds_sensor.sensor.calibrate(float(buffer))
return self.tds_sensor.sensor.k_value