-
Notifications
You must be signed in to change notification settings - Fork 0
/
__init__.py
446 lines (340 loc) · 14 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
import time
import asyncio
import traceback
SHOW_TASK_NAME = False
DEBUG_AS_INFO = False
registered_triggers = []
class OccupancyManager:
def __init__(self, config):
if not self.parse_config(config):
log.error(f'INVALID CONFIG {config}')
return
self.log_debug('STARTING')
self.delay_off_end = 0
self.ready = False
@time_trigger('startup')
def startup_trigger():
self.log_debug('startup trigger')
self.clear_delayed_off()
self.startup()
registered_triggers.append(startup_trigger)
def log_id(self):
if SHOW_TASK_NAME:
task_name = asyncio.current_task().get_name()
log_id = f'{self.state_entity} ({task_name})'
else:
log_id = f'{self.state_entity}'
return log_id
def log_info(self, message):
log.info(f'{self.log_id()}: {message}')
def log_error(self, message):
log.error(f'{self.log_id()}: {message}')
def log_warning(self, message):
log.warning(f'{self.log_id()}: {message}')
def log_debug(self, message):
if DEBUG_AS_INFO:
log.info(f'{self.log_id()} DEBUG: {message}')
else:
log.debug(f'{self.log_id()}: {message}')
def startup(self):
self.state = state.get(self.state_entity)
self.log_info(f'Initial State {self.state}')
@state_trigger('True or {}'.format(self.state_entity))
def state_entity_change(**data):
new_state = state.get(self.state_entity)
if new_state == self.state:
return
self.log_info(f'state entity manually changed from {self.state} to {new_state}')
self.state = new_state
registered_triggers.append(state_entity_change)
if len(self.occupied_conditions) > 0:
# Register Occupied Conditions
@state_trigger('True or {}'.format(" or ".join(self.occupied_conditions)))
def inner_occupied_condition(**params):
self.log_info(f'condition change with {params}')
if params['value'] == params['old_value']:
self.log_info('no change. doing nothing.')
return
self.update()
registered_triggers.append(inner_occupied_condition)
if len(self.armed_conditions) > 0:
# Register Armed Conditions
@state_trigger('True or {}'.format(" or ".join(self.armed_conditions)))
def inner_armed_condition(**params):
self.log_info(f'armed change with {params}')
if params['value'] == params['old_value']:
self.log_info('no change. doing nothing.')
return
self.update()
registered_triggers.append(inner_armed_condition)
if len(self.occupied_states) > 0:
# Register Occupied States
@state_trigger('True or {}'.format(" or ".join(self.occupied_states)))
def inner_occupied_state(**params):
self.log_info(f'state change with {params}')
if params['value'] == params['old_value']:
self.log_info('no change. doing nothing.')
return
self.update()
registered_triggers.append(inner_occupied_state)
if len(self.held_states) > 0:
# Register Held States
@state_trigger('True or {}'.format(" or ".join(self.held_states)))
def inner_held_state(**params):
self.log_info(f'held change with {params}')
if params['value'] == params['old_value']:
self.log_info('no change. doing nothing.')
return
self.update()
registered_triggers.append(inner_held_state)
# Register State Triggers
for one_state_trigger in self.state_triggers:
self.log_debug(f'registering trigger {one_state_trigger}')
@state_trigger(one_state_trigger)
def inner_state_trigger(**params):
self.log_info(f'state trigger with {params}')
if params['value'] == params['old_value']:
self.log_info('no change. doing nothing.')
return
self.trigger()
registered_triggers.append(inner_state_trigger)
# Register Event Trigger
for one_event_trigger in self.event_triggers:
if 'expression' in one_event_trigger:
@event_trigger(one_event_trigger['event'], one_event_trigger['expression'])
def inner_event_trigger(**params):
self.log_info(f'event trigger with {params}')
self.trigger()
registered_triggers.append(inner_event_trigger)
else:
@event_trigger(one_event_trigger['event'])
def inner_event_trigger(**params):
self.log_info(f'event trigger with {params}')
self.trigger()
registered_triggers.append(inner_event_trigger)
@time_trigger('startup')
def first_update():
self.log_debug('first update')
self.update(make_ready=True)
registered_triggers.append(first_update)
def parse_config(self, data):
# TODO: use voluptuous
self.state_entity = data.get('state_entity')
if self.state_entity is None:
log.error('state_entity is required')
return False
if not self.state_entity.startswith('input_boolean.'):
log.error('state_entity must be an input_boolean')
return False
self.occupied_conditions = data.get('occupied_conditions', [])
if not isinstance(self.occupied_conditions, list):
log.error(f'{self.state_entity}: occupied_conditions must be a list')
return False
self.occupied_states = data.get('occupied_states', [])
if not isinstance(self.occupied_states, list):
log.error(f'{self.state_entity}: occupied_states must be a list')
return False
self.armed_conditions = data.get('armed_conditions', [])
if not isinstance(self.armed_conditions, list):
log.error(f'{self.state_entity}: armed_conditions must be a list')
return False
self.held_states = data.get('held_states', [])
if not isinstance(self.held_states, list):
log.error(f'{self.state_entity}: held_states must be a list')
return False
self.timeout = data.get('timeout', 0)
if not isinstance(self.timeout, float) and not isinstance(self.timeout, int):
log.error(f'{self.state_entity}: timeout must be a float or int')
return False
self.state_triggers = data.get('state_triggers', [])
if not isinstance(self.state_triggers, list):
log.error(f'{self.state_entity}: state_triggers must be a list')
return False
self.event_triggers = data.get('event_triggers', [])
if not isinstance(self.event_triggers, list):
log.error(f'{self.state_entity}: event_triggers must be a list')
return False
return True
def get_unique_id(self, task=''):
if task != '':
task = f"_{task}"
return f'occupancy_manager_{self.state_entity}{task}'
def trigger(self):
self.log_info('TRIGGERED')
if self.timeout <= 0:
self.log_error('triggers require a timeout')
return
if self.state == 'on':
self.log_info('already on')
return
if not self.check_conditions():
self.log_info('conditions not met')
return
if not self.check_armed():
self.log_info('not armed')
return
self.turn_on()
# Update after Turn On, because our occupied states/holds are likely not "on"
# If they aren't "on", this will lead to a "pending_off" state
self.update()
def update(self, make_ready=False):
if make_ready:
self.ready = True
if not self.ready:
log_warning('Not Ready')
return
self.log_debug('updating')
if not self.check_conditions():
self.log_info('conditions not met')
self.turn_off(immediate=True)
return
state = self.check_state()
armed = self.check_armed()
held = self.check_held()
if self.state != 'off' and state:
self.log_info('not off and state')
self.turn_on()
return
if armed and state:
self.log_info('armed and state')
self.turn_on()
return
if self.state == 'off' and not armed:
self.log_info('off and not armed')
self.turn_off()
return
if self.state != 'off' and held:
self.log_info('not off and held')
self.turn_on()
return
self.turn_off()
def check_state(self):
self.log_debug('CHECKING STATE')
for occupied_state in self.occupied_states:
try:
if eval(occupied_state):
self.log_debug(f'STATE TRUE {occupied_state}')
return True
self.log_debug(f'STATE FALSE {occupied_state}')
except NameError as e:
self.log_error(f'STATE UNKNOWN {occupied_state}')
self.log_debug('NO STATE TRUE')
return False
def check_held(self):
self.log_debug('CHECKING HELD')
for held_state in self.held_states:
try:
if eval(held_state):
self.log_debug(f'HELD TRUE {held_state}')
return True
self.log_debug(f'HELD FALSE {held_state}')
except NameError as e:
self.log_error(f'HELD UNKNOWN {held_state}')
self.log_debug('NO HELD TRUE')
return False
def check_conditions(self):
self.log_debug('CHECKING CONDITIONS')
for occupied_condition in self.occupied_conditions:
try:
if not eval(occupied_condition):
self.log_debug(f'CONDITION FALSE {occupied_condition}')
return False
else:
self.log_debug(f'CONDITION TRUE {occupied_condition}')
except NameError as e:
self.log_error(f'CONDITION UNKNOWN {occupied_condition}')
self.log_debug(f'ALL CONDITIONS TRUE (or no conditions)')
return True
def check_armed(self):
self.log_debug('CHECKING ARMED')
for armed_condition in self.armed_conditions:
try:
if not eval(armed_condition):
self.log_debug(f'ARMED FALSE {armed_condition}')
return False
else:
self.log_debug(f'ARMED TRUE {armed_condition}')
except NameError as e:
self.log_error(f'ARMED UNKNOWN {armed_condition}')
self.log_debug(f'ALL ARMED TRUE (or no armed)')
return True
def turn_on(self):
self.clear_delayed_off()
if self.state == 'on':
self.log_info('still on')
return
if self.state == 'pending_off':
self.log_info('clearing delayed off. remaining on.')
self.state = 'on'
else:
self.log_info('on')
self.state = 'on'
input_boolean.turn_on(entity_id=self.state_entity)
def turn_off(self, immediate=False):
if self.state == 'off':
self.log_info('still off')
return
if self.timeout == 0:
immediate = True
if immediate:
self.log_info('immediate off')
self.clear_delayed_off()
self.state = 'off'
input_boolean.turn_off(entity_id=self.state_entity)
return
self.set_delayed_off()
def set_delayed_off(self, seconds=None, force=False):
if force:
self.clear_delayed_off()
if seconds is None:
seconds = self.timeout
self.log_info('pending off')
self.state = 'pending_off'
# Continue Existing Delay
# Method #1
self.log_debug('killing myself if there are others')
task.unique(self.get_unique_id(), kill_me=True)
self.log_debug('survived')
# End Method #1
# Method #2
# self.log_info('killing others')
# task.unique(self.get_unique_id())
# if (self.delay_off_start + seconds) > time.time():
# seconds = round(self.delay_off_start + seconds - time.time() + 1)
# self.log_info(f'existing delayed off. setting seconds to {seconds}')
# End Method #2
self.delay_off_start = time.time()
self.log_info(f'delay off in {seconds} seconds')
task.sleep(seconds)
elapsed = round(time.time() - self.delay_off_start, 2)
self.log_info(f'delay elapsed in {elapsed}/{seconds} seconds')
self.turn_off(immediate=True)
def clear_delayed_off(self):
self.log_debug(f'clearing delayed off by killing others')
task.unique(self.get_unique_id())
self.delay_off_start = 0
##########
# Helpers
##########
factory_apps = []
def load_apps(app_name, factory):
if "apps" not in pyscript.config:
return
if app_name not in pyscript.config['apps']:
return
for app in pyscript.config['apps'][app_name]:
factory_apps.append(factory(app))
def load_apps_list(app_name, factory):
if "apps_list" not in pyscript.config:
return
for app in pyscript.config['apps_list']:
if 'app' in app:
if app['app'] == app_name:
factory_apps.append(factory(app))
##########
# Startup
##########
@time_trigger('startup')
def load():
load_apps("occupancy_manager", OccupancyManager)
load_apps_list('occupancy_manager', OccupancyManager)