-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
125 lines (100 loc) · 3.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from bluetooth.ble import DiscoveryService
import bluetooth
import time
import argon2
import requests
import base64
import os
import multiprocessing
salt = os.environ['BLUE_COLLECTOR_HASH_SALT']
location = os.environ['BLUE_COLLECTOR_LOCATION_NAME']
if 'BLUE_COLLECTOR_GEO_POINT' in os.environ:
geo_point = [float(a) for a in os.environ[
'BLUE_COLLECTOR_GEO_POINT'].split(',')]
class DeviceScanner(multiprocessing.Process):
def __init__(self, observation_queue):
multiprocessing.Process.__init__(self)
self.observation_queue = observation_queue
self.timeout = 5
def run(self):
while True:
devices = self.scan()
self.process_devices(devices)
def set_timeout(self, timeout):
self.timeout = timeout
def process_devices(self, devices):
now = time.time()
for device in devices:
self.observation_queue.put((device, now))
class BTScanner(DeviceScanner):
def __init__(self, observation_queue):
DeviceScanner.__init__(self, observation_queue)
def scan(self):
return {i[0]: i[1] for i in bluetooth.discover_devices(
duration=self.timeout, lookup_names=True, flush_cache=True,
lookup_class=False)}
class BLEScanner(DeviceScanner):
def __init__(self, observation_queue):
DeviceScanner.__init__(self, observation_queue)
self.service = DiscoveryService()
def scan(self):
return self.service.discover(self.timeout)
class DevicePublisher(multiprocessing.Process):
def __init__(self, observation_queue):
multiprocessing.Process.__init__(self)
self.observations = observation_queue
self.seen_devices = {}
self.cleanup_counter = 0
def run(self):
while True:
observation = self.observations.get()
self.process_observation(observation)
print(observation)
def process_observation(self, observation):
(mac, obs_time) = observation
if mac in self.seen_devices:
if self.seen_devices[mac] + 300 < obs_time:
self.seen_devices[mac] = obs_time
self.publish_observation(mac, obs_time)
else:
self.seen_devices[mac] = obs_time
else:
self.seen_devices[mac] = obs_time
self.publish_observation(mac, obs_time)
self.cleanup_counter += 1
if self.cleanup_counter > 100:
self.cleanup_devices()
def cleanup_devices(self):
now = time.time()
remove_devices = []
for mac, seen_time in self.seen_devices.items():
if now - 600 > seen_time:
remove_devices.append(mac)
for mac in remove_devices:
del self.seen_devices[mac]
self.cleanup_counter = 0
def publish_observation(self, mac, obs_time):
observation = {}
observation['hash'] = base64.urlsafe_b64encode(
argon2.argon2_hash(mac, salt)).decode()
observation['location'] = location
observation['geo_point'] = geo_point
observation['timestamp'] = obs_time
try:
r = requests.post("http://localhost:5000/observations",
json=observation)
except Exception as e:
print('publish failed for observation: ' + observation['hash'] +
'with exception: ' + str(e) + " http code: " + r.code)
pass
def main():
print(geo_point)
observations = multiprocessing.Queue()
ble_scanner = BLEScanner(observations)
bt_scanner = BTScanner(observations)
publisher = DevicePublisher(observations)
ble_scanner.start()
bt_scanner.start()
publisher.start()
if __name__ == "__main__":
main()