forked from argilo/secplus
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsecplus_rx_secplus_v2_decode.py
117 lines (102 loc) · 4.03 KB
/
secplus_rx_secplus_v2_decode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#
# Copyright 2020 Clayton Smith ([email protected])
#
# This file is part of secplus.
#
# secplus is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# secplus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with secplus. If not, see <http://www.gnu.org/licenses/>.
#
import numpy as np
from gnuradio import gr
if __name__ != "builtins": # Don't import within GRC
import secplus
class blk(gr.sync_block):
"""Decoder for Chamberlain garage door openers"""
def __init__(self, samp_rate=10000, threshold=0.02):
gr.sync_block.__init__(
self,
name='Security+ 2.0 Decoder',
in_sig=[np.float32],
out_sig=[]
)
self.samp_rate = samp_rate
self.threshold = threshold
self.last_sample = 0.0
self.last_edge = 0
self.buffer = []
self.pair = [None, None]
self.pair_time = [None, None]
def work(self, input_items, output_items):
for n, sample in enumerate(input_items[0]):
current_sample = self.nitems_read(0) + n
if self.last_sample < self.threshold <= sample:
# rising edge
self.process_edge(True, current_sample - self.last_edge)
self.last_edge = current_sample
elif self.last_sample >= self.threshold > sample:
# falling edge
self.process_edge(False, current_sample - self.last_edge)
self.last_edge = current_sample
if current_sample - self.last_edge >= 0.625e-3 * self.samp_rate:
self.buffer.append(0)
self.buffer.append(0)
self.process_buffer(current_sample)
self.buffer = []
self.last_sample = sample
return len(input_items[0])
def process_edge(self, rising, samples):
if samples < 0.125e-3 * self.samp_rate:
pass
elif samples < 0.375e-3 * self.samp_rate:
self.buffer.append(0 if rising else 1)
elif samples < 0.625e-3 * self.samp_rate:
self.buffer.append(0 if rising else 1)
self.buffer.append(0 if rising else 1)
def process_buffer(self, current_sample):
manchester = "".join(str(b) for b in self.buffer)
start = manchester.find("1010101001010101")
if start == -1:
return
if manchester[start+20:start+24] == "1010":
packet_length = 40
elif manchester[start+20:start+24] == "1001":
packet_length = 64
else:
return
manchester = manchester[start+16:start+20+(packet_length*2)]
baseband = []
for i in range(0, len(manchester), 2):
if manchester[i:i+2] == "01":
baseband.append(1)
elif manchester[i:i+2] == "10":
baseband.append(0)
else:
return
packet = baseband[2:]
if baseband[0:2] == [0, 0]:
frame_id = 0
elif baseband[0:2] == [0, 1]:
frame_id = 1
else:
return
self.pair_time[frame_id] = current_sample
if self.pair[frame_id] == packet:
return
self.pair[frame_id] = packet
if (self.pair[frame_id ^ 1] is not None) and (len(self.pair[frame_id ^ 1]) == packet_length):
if self.pair_time[frame_id] - self.pair_time[frame_id ^ 1] < 0.35 * self.samp_rate:
try:
rolling, fixed, data = secplus.decode_v2(self.pair[0] + self.pair[1])
print(secplus.pretty_v2(rolling, fixed, data))
except ValueError:
pass