-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnovelan.py
executable file
·155 lines (124 loc) · 5.66 KB
/
novelan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/python3
import socket
import struct
from collections import namedtuple
class novelan:
__host = "0.0.0.0"
__port = 0
__sock = None
PARAM_HEATING_TEMPERATURE = 1
PARAM_WARMWATER_TEMPERATURE = 2
PARAM_HEATING_OPERATION_MODE = 3
PARAM_WARMWATER_OPERATION_MODE = 4
PARAM_COOLING_OPERATION_MODE = 108
PARAM_COOLING_RELEASE_TEMP = 110
PARAM_COOLING_INLET_TEMP = 132
PARAM_COOLING_START = 850
PARAM_COOLING_STOP = 851
OPERATING_MODE_AUTOMATIC = 0
OPERATING_MODE_AUXILIARY_HEATER = 1
OPERATING_MODE_PARTY = 2
OPERATING_MODE_HOLIDAY = 3
OPERATING_MODE_OFF = 4
def __init__(self, host, port=8888):
self.__host = host
self.__port = port
def __connect(self):
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__sock.connect((self.__host, self.__port))
def __read(self, command, fmt, tnames, containsstatus):
recv_msg = bytearray()
try:
self.__connect()
#send
msg = struct.pack("!II", command, 0 )
totalsent = 0
while totalsent < len(msg):
sent = self.__sock.send(msg[totalsent:])
if sent == 0:
raise RuntimeError("socket connection broken")
totalsent = totalsent + sent
#receive
data = self.__sock.recv(4)
recv_command = struct.unpack("!I", data)[0]
if recv_command != command:
raise("received wrong command! ", command, recv_command)
if containsstatus:
data = self.__sock.recv(4)
stat = struct.unpack("!I", data)[0]
data = self.__sock.recv(4)
remaining = struct.unpack("!I", data)[0] * 4 # datacount * len(int) = bytelength
while remaining > 0:
chunk = self.__sock.recv(remaining)
if chunk == b'':
raise RuntimeError("socket connection broken")
recv_msg.extend(chunk)
remaining -= len(chunk)
finally:
self.__sock.close()
if struct.calcsize(fmt) != len(recv_msg):
raise("format size does not fit received bytes")
return tnames._make(struct.unpack(fmt, recv_msg))
def __write(self, param, value):
try:
self.__connect()
#send
command = 3002
msg = struct.pack("!III", command, param, value)
totalsent = 0
while totalsent < len(msg):
sent = self.__sock.send(msg[totalsent:])
if sent == 0:
raise RuntimeError("socket connection broken")
totalsent = totalsent + sent
#receive
data = self.__sock.recv(4)
recv_command = struct.unpack("!I", data)[0]
if recv_command != command:
raise("received wrong command! ", command, recv_command)
data = self.__sock.recv(4)
response = struct.unpack("!I", data)[0]
finally:
self.__sock.close()
def __formatTemperatures(self, dct):
#temperatures are integers and have to devide by 10 to get the right value
for key in dct:
if "temp" in key:
dct[key] = dct[key]/10
return dct
def readStatus(self):
fmt = '!40xiiiiiiiiiiiii4xiiiii108xiiiiiiiiiii336xiiiii108x'
statustupel = namedtuple('status', 'temperature_supply temperature_return temperature_reference_return temperature_out_external temperature_hot_gas temperature_outside temperature_outside_avg temperature_servicewater temperature_servicewater_reference temperature_probe_in temperature_probe_out temperature_mk1 temperature_mk1_reference temperature_mk2 temperature_mk2_reference heatpump_solar_collector heatpump_solar_storage temperature_external_source hours_compressor1 starts_compressor1 hours_compressor2 starts_compressor2 hours_zwe1 hours_zwe2 hours_zwe3 hours_heatpump hours_heating hours_warmwater hours_cooling thermalenergy_heating thermalenergy_warmwater thermalenergy_pool thermalenergy_total massflow')
status = self.__read(3004, fmt, statustupel, True)
statusdict = status._asdict()
return self.__formatTemperatures(statusdict)
def readStatusValue(self, name):
statusdict = self.readStatus()
if name in statusdict:
return statusdict[name]
else:
return "-1"
def readParameter(self):
fmt = '!4xiiii412xi4xi84xi2868xii676x'
parametertupel = namedtuple('parameter', 'heating_temperature warmwater_temperature heating_operation_mode warmwater_operation_mode cooling_operation_mode cooling_release_temperature cooling_inlet_temp cooling_start_after_hours cooling_stop_after_hours')
parameter = self.__read(3003, fmt, parametertupel, False)
parameterdict = parameter._asdict()
return self.__formatTemperatures(parameterdict)
def readParameterValue(self, name):
parameterdict = self.readParameter()
if name in parameterdict:
return parameterdict[name]
else:
return "-1"
def writeHeatingMode(self, value):
param = self.PARAM_HEATING_OPERATION_MODE
self.__write(param, value)
def writeWarmwaterMode(self, value):
param = self.PARAM_WARMWATER_OPERATION_MODE
self.__write(param, value)
def writeHeatingTemperature(self, value):
param = self.PARAM_HEATING_TEMPERATURE
self.__write(param, value)
def writeWarmwaterTemperature(self, value):
param = self.PARAM_WARMWATER_TEMPERATURE
self.__write(param, value)