-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomcon.py
executable file
·94 lines (66 loc) · 2.52 KB
/
comcon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/env python3
import pyfiglet
import sys
import zmq
import json
from jsonrpcclient.requests import Request
ZMQ_SOCKET_ADDR = "tcp://127.0.0.1:4444"
def help():
print(
"""
##### Help Text ######
list -> List of all devices connected to server
get_data_packet <device_addr_index> <timestamp> -> Fetch data packet with the given timestamp from device
exit -> Exit comcon, go back to your world, lie down, have a cup of coffee, go to a spa.
help -> Print this help text for your immediate rescue.
"""
)
def list_devices(socket):
print("List of devices connected to server.")
req = Request("list")
socket.send_string(str(req))
resp = socket.recv().decode('utf-8')
devices = json.loads(resp)
for i, dev in enumerate(devices):
print(f"{i} -> {dev}")
def fetch_data_packet(socket, device_addr, timestamp):
req = Request("get_data_packet", timestamp=timestamp, device_addr_index=device_addr)
print(f"{device_addr} -> {req}")
socket.send_string(str(req))
resp = socket.recv().decode("utf-8")
print(f"Response -> {resp}")
def send_to_device(socket, command):
print("Sending a message to device.")
def main():
print(pyfiglet.figlet_format("COMCON", font="slant"))
print("Talk to end devices and control them by issuing commands.\n")
# Socket to talk to server
print("Connecting to zmq socket")
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(ZMQ_SOCKET_ADDR)
help()
try:
while True:
user_command = input("> ")
if user_command.lower() == "list":
list_devices(socket)
elif user_command.startswith("send "):
send_to_device(socket, user_command)
elif user_command.startswith("get_data_packet"):
parsed_user_command = user_command.strip().split(" ")
device_addr = int(parsed_user_command[1])
timestamp = int(parsed_user_command[2])
fetch_data_packet(socket, device_addr, timestamp)
elif user_command.lower() == "exit":
print("\nBoi Boi")
sys.exit(0)
elif user_command.lower() == "help":
help()
else:
print("Unknown command.")
except KeyboardInterrupt:
socket.disconnect(ZMQ_SOCKET_ADDR)
print("\nBoi Boi")
if __name__ == "__main__":
main()