-
Notifications
You must be signed in to change notification settings - Fork 0
/
scrubcam.py
executable file
·120 lines (93 loc) · 3.74 KB
/
scrubcam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
"""Code for ScrubCam field camera device
The code that runs on the Scrubcam field camera device. Gets
configuration information from a YAML file provided as a command line
argument. There is an example configuration file at:
cfgs/config.yaml.example
To run:
./scrubcam.py cfgs/YOUR_CONFIGURATION_FILE.yaml
"""
import logging
import io
import argparse
from datetime import datetime
import yaml
import picamera
from dencam.gui import State
from scrubcam.vision import ObjectDetectionSystem
from scrubcam.networking import ClientSocketHandler
from scrubcam.display import Display
from scrubcam.lora import LoRaSender
logging.basicConfig(level='INFO',
format='[%(levelname)s] %(message)s (%(name)s)')
log = logging.getLogger('main')
parser = argparse.ArgumentParser()
parser.add_argument('config_filename')
parser.add_argument('-c', '--continue', dest='cont', action='store_true')
args = parser.parse_args()
CONFIG_FILE = args.config_filename
CONTINUE_RUN = args.cont
with open(CONFIG_FILE, encoding="utf-8") as f:
configs = yaml.load(f, Loader=yaml.SafeLoader)
RECORD = configs['RECORD']
RECORD_CONF_THRESHOLD = configs['RECORD_CONF_THRESHOLD']
CAMERA_RESOLUTION = configs['CAMERA_RESOLUTION']
CAMERA_ROTATION = configs['CAMERA_ROTATION']
FILTER_CLASSES = configs['FILTER_CLASSES']
HEADLESS = configs['HEADLESS']
CONNECT_REMOTE_SERVER = configs['CONNECT_REMOTE_SERVER']
LORA_ON = configs['LORA_ON']
def main():
"""Main routine of Scrubcam
"""
if LORA_ON:
lora_sender = LoRaSender()
else:
log.info('LoRa is ***DISABLED***\n\n')
detector = ObjectDetectionSystem(configs)
stream = io.BytesIO()
camera = picamera.PiCamera()
camera.rotation = CAMERA_ROTATION
camera.resolution = CAMERA_RESOLUTION
if CONNECT_REMOTE_SERVER:
log.info('Connecting to server enabled')
socket_handler = ClientSocketHandler(configs)
socket_handler.send_host_configs(FILTER_CLASSES, CONTINUE_RUN)
else:
log.info('Connecting to ScrubDash server is ***DISABLED***\n\n')
if not HEADLESS:
state = State(4)
display = Display(configs, camera, state)
try:
for _ in camera.capture_continuous(stream, format='jpeg'):
if CONNECT_REMOTE_SERVER:
socket_handler.send_heartbeat_every_15s()
detector.infer(stream)
detector.print_report()
lboxes = detector.labeled_boxes
if not HEADLESS:
display.update(lboxes)
if len(lboxes) > 0:
if RECORD and lboxes[0]['confidence'] > RECORD_CONF_THRESHOLD:
detected_classes = [lbox['class_name'] for lbox in lboxes]
if any(itm in FILTER_CLASSES for itm in detected_classes):
if CONNECT_REMOTE_SERVER:
socket_handler.send_image_and_boxes(stream, lboxes)
log.debug('Image sent')
detector.save_current_frame(None, lboxes=lboxes)
if LORA_ON:
to_send = f"Top-1: {lboxes[0]['class_name']}"
lora_sender.send(to_send)
with open('what_was_seen.log', 'a+', encoding="utf-8") as seen_file:
time_format = '%Y-%m-%d %H:%M:%S'
tstamp = str(datetime.now().strftime(time_format))
top_class = lboxes[0]['class_name']
seen_file.write(f'{tstamp} | {top_class}\n')
stream.seek(0)
stream.truncate()
except KeyboardInterrupt:
log.warning('KeyboardInterrupt')
if CONNECT_REMOTE_SERVER:
socket_handler.close()
if __name__ == "__main__":
main()