forked from adamfast/DMRlink
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rcm.py
executable file
·144 lines (122 loc) · 5.41 KB
/
rcm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
#
# This work is licensed under the Creative Commons Attribution-ShareAlike
# 3.0 Unported License.To view a copy of this license, visit
# http://creativecommons.org/licenses/by-sa/3.0/ or send a letter to
# Creative Commons, 444 Castro Street, Suite 900, Mountain View,
# California, 94041, USA.
# This is a sample application that uses the Repeater Call Monitor packets to display events in the IPSC
# NOTE: dmrlink.py MUST BE CONFIGURED TO CONNECT AS A "REPEATER CALL MONITOR" PEER!!!
# ALSO NOTE, I'M NOT DONE MAKING THIS WORK, SO UNTIL THIS MESSAGE IS GONE, DON'T EXPECT GREAT THINGS.
from __future__ import print_function
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
from twisted.internet import task
from binascii import b2a_hex as h
import datetime
import binascii
import dmrlink
from dmrlink import IPSC, NETWORK, networks, get_info, int_id, subscriber_ids, peer_ids, talkgroup_ids, logger
__author__ = 'Cortney T. Buffington, N0MJS'
__copyright__ = 'Copyright (c) 2013, 2014 Cortney T. Buffington, N0MJS and the K0USY Group'
__credits__ = 'Adam Fast, KC0YLK; Dave Kierzkowski KD8EYF'
__license__ = 'Creative Commons Attribution-ShareAlike 3.0 Unported'
__maintainer__ = 'Cort Buffington, N0MJS'
__email__ = '[email protected]'
__status__ = 'Production'
try:
from ipsc.ipsc_message_types import *
except ImportError:
sys.exit('IPSC message types file not found or invalid')
status = True
rpt = True
nack = True
class rcmIPSC(IPSC):
def __init__(self, *args, **kwargs):
IPSC.__init__(self, *args, **kwargs)
#************************************************
# CALLBACK FUNCTIONS FOR USER PACKET TYPES
#************************************************
#
def call_mon_status(self, _network, _data):
if not status:
return
_source = _data[1:5]
_ipsc_src = _data[5:9]
_seq_num = _data[9:13]
_ts = _data[13]
_status = _data[15] # suspect [14:16] but nothing in leading byte?
_rf_src = _data[16:19]
_rf_tgt = _data[19:22]
_type = _data[22]
_prio = _data[23]
_sec = _data[24]
_source = str(int_id(_source)) + ', ' + str(get_info(int_id(_source), peer_ids))
_ipsc_src = str(int_id(_ipsc_src)) + ', ' + str(get_info(int_id(_ipsc_src), peer_ids))
_rf_src = str(int_id(_rf_src)) + ', ' + str(get_info(int_id(_rf_src), subscriber_ids))
if _type == '\x4F' or '\x51':
_rf_tgt = 'TGID: ' + str(int_id(_rf_tgt)) + ', ' + str(get_info(int_id(_rf_tgt), talkgroup_ids))
else:
_rf_tgt = 'SID: ' + str(int_id(_rf_tgt)) + ', ' + str(get_info(int_id(_rf_tgt), subscriber_ids))
print('Call Monitor - Call Status')
print('TIME: ', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print('DATA SOURCE: ', _source)
print('IPSC: ', _network)
print('IPSC Source: ', _ipsc_src)
print('Timeslot: ', TS[_ts])
try:
print('Status: ', STATUS[_status])
except KeyError:
print('Status (unknown): ', h(_status))
try:
print('Type: ', TYPE[_type])
except KeyError:
print('Type (unknown): ', h(_type))
print('Source Sub: ', _rf_src)
print('Target Sub: ', _rf_tgt)
print()
def call_mon_rpt(self, _network, _data):
if not rpt:
return
_source = _data[1:5]
_ts1_state = _data[5]
_ts2_state = _data[6]
_source = str(int_id(_source)) + ', ' + str(get_info(int_id(_source), peer_ids))
print('Call Monitor - Repeater State')
print('TIME: ', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print('DATA SOURCE: ', _source)
try:
print('TS1 State: ', REPEAT[_ts1_state])
except KeyError:
print('TS1 State (unknown): ', h(_ts1_state))
try:
print('TS2 State: ', REPEAT[_ts2_state])
except KeyError:
print('TS2 State (unknown): ', h(_ts2_state))
print()
def call_mon_nack(self, _network, _data):
if not nack:
return
_source = _data[1:5]
_nack = _data[5]
_source = get_info(int_id(_source), peer_ids)
print('Call Monitor - Transmission NACK')
print('TIME: ', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print('DATA SOURCE: ', _source)
try:
print('NACK Cause: ', NACK[_nack])
except KeyError:
print('NACK Cause (unknown): ', h(_nack))
print()
def repeater_wake_up(self, _network, _data):
_source = _data[1:5]
_source_dec = int_id(_source)
_source_name = get_info(_source_dec, peer_ids)
#print('({}) Repeater Wake-Up Packet Received: {} ({})' .format(_network, _source_name, _source_dec))
if __name__ == '__main__':
logger.info('DMRlink \'rcm.py\' (c) 2013, 2014 N0MJS & the K0USY Group - SYSTEM STARTING...')
for ipsc_network in NETWORK:
if NETWORK[ipsc_network]['LOCAL']['ENABLED']:
networks[ipsc_network] = rcmIPSC(ipsc_network)
reactor.listenUDP(NETWORK[ipsc_network]['LOCAL']['PORT'], networks[ipsc_network], interface=NETWORK[ipsc_network]['LOCAL']['IP'])
reactor.run()