-
Notifications
You must be signed in to change notification settings - Fork 0
/
elliptical_investigation.py
144 lines (115 loc) · 5.39 KB
/
elliptical_investigation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import sys
import network
import ntptime
# ruff: noqa: E402
sys.path.append("")
from micropython import const
import uasyncio as asyncio
import aioble
import bluetooth
import time
import random
import struct
def dstTime():
year = time.localtime()[0] #get current year
# print(year)
HHMarch = time.mktime((year,3 ,(14-(int(5*year/4+1))%7),1,0,0,0,0,0)) #Time of March change to DST
HHNovember = time.mktime((year,10,(7-(int(5*year/4+1))%7),1,0,0,0,0,0)) #Time of November change to EST
# print(HHNovember)
now=time.time()
if now < HHMarch : # we are before last sunday of march
dst=time.localtime(now-18000) # EST: UTC-5H
elif now < HHNovember : # we are before last sunday of october
dst=time.localtime(now-14400) # DST: UTC-4H
else: # we are after last sunday of october
dst=time.localtime(now-18000) # EST: UTC-5H
return(dst)
def read_characteristic(char):
try:
result = await char.read(timeout_ms=5000)
#print("Type: ",type(result))
if (type(result) == bytes):
#time.time() + (31557600000 * 30)
#print(" ",((time.time_ns() // 1_000_000)+(31557600000 * 30))," Read: ","".join("\\x%02x" % i for i in result))
readTime = ((time.time_ns() // 1_000_000)+(31557600000 * 30))
readData = "".join("\\x%02x" % i for i in result)
return (readTime, readData)
else:
print(" ",((time.time_ns() // 1_000_000)+(31557600000 * 30))," Read:", result)
except TypeError as e:
print(" ","??NoneType??,",e)
except ValueError:
print(" ","No read")
except asyncio.TimeoutError:
print(" ","Timeout")
async def find_device():
# Scan for 5 seconds, in active mode, with very low interval/window (to
# maximise detection rate).
async with aioble.scan(5000, interval_us=30000, window_us=30000, active=True) as scanner:
async for result in scanner:
if result.name() == "SCHWINN 470":
print("Result: ",result)
print("Name: ",result.name(), "RSSI:",result.rssi)
# See if it matches our name and the environmental sensing service.
#if result.name() == "SCHWINN 470" and _ENV_SENSE_UUID in result.services():
if result.name() == "SCHWINN 470":
return result.device
return None
__SERVICE_UUID = '98186d60-2f47-11e6-8899-0002a5d5c51b'
serviceUUIDs = set()
serviceUUIDs.add(__SERVICE_UUID)
# 5ec4e520-9804-11e3-b4b9-0002a5d5c51b - 10 (GATTC Service Done)
# e3f9af20-2674-11e3-879e-0002a5d5c51b - x2 (Flag Read) - never returns anything
# 4e349c00-999e-11e3-b341-0002a5d5c51b - x2 (Flag Read) - never returns anything
# 4ed124e0-9803-11e3-b14c-0002a5d5c51b - x2 (Flag Read) - never returns anything
# 1717b3c0-9803-11e3-90e1-0002a5d5c51b - x8 (Flag Write) - gets a timeout on write
# 35ddd0a0-9803-11e3-9a8b-0002a5d5c51b - 18 (GATTC Notify)
# 5c7d82a0-9803-11e3-8a6c-0002a5d5c51b - 16 (GATTC READ Done)
# 6be8f580-9803-11e3-ab03-0002a5d5c51b - 16 (GATTC READ Done)
# a46a4a80-9803-11e3-8f3c-0002a5d5c51b - 16 (GATTC READ Done)
# d57cda20-9803-11e3-8426-0002a5d5c51b - 16 (GATTC Read Done)
# ec865fc0-9803-11e3-8bf6-0002a5d5c51b - x4 (Flag Write No Response)
# 7241b880-a560-11e3-9f31-0002a5d5c51b - x2 (Flag Read)
__FLAG_READ_UUID_1 = 'e3f9af20-2674-11e3-879e-0002a5d5c51b'
__FLAG_READ_UUID_2 = '4e349c00-999e-11e3-b341-0002a5d5c51b'
__FLAG_READ_UUID_3 = '4ed124e0-9803-11e3-b14c-0002a5d5c51b'
__FLAG_READ_UUID_4 = '7241b880-a560-11e3-9f31-0002a5d5c51b'
readUUIDs = [__FLAG_READ_UUID_1,
__FLAG_READ_UUID_2,
__FLAG_READ_UUID_3,
__FLAG_READ_UUID_4]
async def main():
device = await find_device()
if not device:
print("Device not found")
return
try:
print("Connecting to", device)
connection = await device.connect()
except asyncio.TimeoutError:
print("Timeout during connection")
return
await asyncio.sleep_ms(1000)
for uuidString in serviceUUIDs:
uuid = bluetooth.UUID(uuidString)
print("uuidString", uuidString,"UUID: ", uuid)
try:
service = await connection.service(uuid)
print(f"{readUUIDs[0]},,",
f"{readUUIDs[1]},,",
f"{readUUIDs[2]},,",
f"{readUUIDs[3]},,",)
while True:
readInfo = []
for readUUID in readUUIDs:
uuid = bluetooth.UUID(readUUID)
read_char = await service.characteristic(uuid)
#(readTime, readData) = await read_characteristic(read_char)
readInfo.append(await read_characteristic(read_char))
print(f"{readInfo[0][0]},{readInfo[0][1]},",
f"{readInfo[1][0]},{readInfo[1][1]},",
f"{readInfo[2][0]},{readInfo[2][1]},",
f"{readInfo[3][0]},{readInfo[3][1]}")
except TypeError as e:
print("??TypeError??,",e)
asyncio.run(main())