-
Notifications
You must be signed in to change notification settings - Fork 130
/
Copy pathCOVClientApp.py
executable file
·187 lines (137 loc) · 6.07 KB
/
COVClientApp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/env python
"""
Configured with a subscription context object which is passed to the
application, it sends a SubscribeCOVRequest and listens for confirmed or
unconfirmed COV notifications, lines them up with the context, and passes the
APDU to the context to print out.
Making multiple subscription contexts and keeping them active based on their
lifetime is left as an exercise for the reader.
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run, deferred
from bacpypes.iocb import IOCB
from bacpypes.pdu import Address
from bacpypes.apdu import SubscribeCOVRequest, SimpleAckPDU
from bacpypes.errors import ExecutionError
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_application = None
subscription_contexts = {}
next_proc_id = 1
#
# SubscriptionContext
#
@bacpypes_debugging
class SubscriptionContext:
def __init__(self, address, objid, confirmed=None, lifetime=None):
if _debug: SubscriptionContext._debug("__init__ %r %r confirmed=%r lifetime=%r", address, objid, confirmed, lifetime)
global subscription_contexts, next_proc_id
# destination for subscription requests
self.address = address
# assign a unique process identifer and keep track of it
self.subscriberProcessIdentifier = next_proc_id
next_proc_id += 1
subscription_contexts[self.subscriberProcessIdentifier] = self
self.monitoredObjectIdentifier = objid
self.issueConfirmedNotifications = confirmed
self.lifetime = lifetime
def cov_notification(self, apdu):
if _debug: SubscriptionContext._debug("cov_notification %r", apdu)
# make a rash assumption that the property value is going to be
# a single application encoded tag
print("{} {} changed\n {}".format(
apdu.pduSource,
apdu.monitoredObjectIdentifier,
",\n ".join("{} = {}".format(
element.propertyIdentifier,
str(element.value.tagList[0].app_to_object().value),
) for element in apdu.listOfValues),
))
#
# SubscribeCOVApplication
#
@bacpypes_debugging
class SubscribeCOVApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: SubscribeCOVApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
def send_subscription(self, context):
if _debug: SubscribeCOVApplication._debug("send_subscription %r", context)
# build a request
request = SubscribeCOVRequest(
subscriberProcessIdentifier=context.subscriberProcessIdentifier,
monitoredObjectIdentifier=context.monitoredObjectIdentifier,
)
request.pduDestination = context.address
# optional parameters
if context.issueConfirmedNotifications is not None:
request.issueConfirmedNotifications = context.issueConfirmedNotifications
if context.lifetime is not None:
request.lifetime = context.lifetime
# make an IOCB
iocb = IOCB(request)
if _debug: SubscribeCOVApplication._debug(" - iocb: %r", iocb)
# callback when it is acknowledged
iocb.add_callback(self.subscription_acknowledged)
# give it to the application
this_application.request_io(iocb)
def subscription_acknowledged(self, iocb):
if _debug: SubscribeCOVApplication._debug("subscription_acknowledged %r", iocb)
# do something for success
if iocb.ioResponse:
if _debug: SubscribeCOVApplication._debug(" - response: %r", iocb.ioResponse)
# do something for error/reject/abort
if iocb.ioError:
if _debug: SubscribeCOVApplication._debug(" - error: %r", iocb.ioError)
def do_ConfirmedCOVNotificationRequest(self, apdu):
if _debug: SubscribeCOVApplication._debug("do_ConfirmedCOVNotificationRequest %r", apdu)
# look up the process identifier
context = subscription_contexts.get(apdu.subscriberProcessIdentifier, None)
if not context or apdu.pduSource != context.address:
if _debug: SubscribeCOVApplication._debug(" - no context")
# this is turned into an ErrorPDU and sent back to the client
raise ExecutionError('services', 'unknownSubscription')
# now tell the context object
context.cov_notification(apdu)
# success
response = SimpleAckPDU(context=apdu)
if _debug: SubscribeCOVApplication._debug(" - simple_ack: %r", response)
# return the result
self.response(response)
def do_UnconfirmedCOVNotificationRequest(self, apdu):
if _debug: SubscribeCOVApplication._debug("do_UnconfirmedCOVNotificationRequest %r", apdu)
# look up the process identifier
context = subscription_contexts.get(apdu.subscriberProcessIdentifier, None)
if not context or apdu.pduSource != context.address:
if _debug: SubscribeCOVApplication._debug(" - no context")
return
# now tell the context object
context.cov_notification(apdu)
#
# __main__
#
def main():
global this_application
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = SubscribeCOVApplication(this_device, args.ini.address)
# make a subscription context
context = SubscriptionContext(Address("10.0.1.31"), ('analogValue', 1), False, 60)
# send the subscription when the stack is ready
deferred(this_application.send_subscription, context)
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()