-
Notifications
You must be signed in to change notification settings - Fork 130
/
Copy pathmini_device.py
executable file
·139 lines (105 loc) · 3.51 KB
/
mini_device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/python
"""
This sample application is a server that supports many core services that
applications need to present data on a BACnet network. It supports Who-Is
and I-Am for device binding, Read and Write Property, Read and Write
Property Multiple, and COV subscriptions.
Change the process_task() function to do something on a regular INTERVAL
number of seconds.
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.task import RecurringTask
from bacpypes.app import BIPSimpleApplication
from bacpypes.object import AnalogValueObject, BinaryValueObject
from bacpypes.local.device import LocalDeviceObject
from bacpypes.service.cov import ChangeOfValueServices
from bacpypes.service.object import ReadWritePropertyMultipleServices
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# settings
INTERVAL = 1.0
# globals
test_application = None
test_av = None
test_bv = None
@bacpypes_debugging
class SampleApplication(
BIPSimpleApplication, ReadWritePropertyMultipleServices, ChangeOfValueServices
):
pass
@bacpypes_debugging
class DoSomething(RecurringTask):
def __init__(self, interval):
if _debug:
DoSomething._debug("__init__ %r", interval)
RecurringTask.__init__(self, interval * 1000)
# save the interval
self.interval = interval
# make a list of test values
self.test_values = [
("active", 1.0),
("inactive", 2.0),
("active", 3.0),
("inactive", 4.0),
]
def process_task(self):
if _debug:
DoSomething._debug("process_task")
global test_av, test_bv
# pop the next value
next_value = self.test_values.pop(0)
self.test_values.append(next_value)
if _debug:
DoSomething._debug(" - next_value: %r", next_value)
# change the point
test_av.presentValue = next_value[1]
test_bv.presentValue = next_value[0]
def main():
global test_av, test_bv, test_application
# make a parser
parser = ConfigArgumentParser(description=__doc__)
# parse the command line arguments
args = parser.parse_args()
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug:
_log.debug(" - this_device: %r", this_device)
# make a sample application
test_application = SampleApplication(this_device, args.ini.address)
# make an analog value object
test_av = AnalogValueObject(
objectIdentifier=("analogValue", 1),
objectName="av",
presentValue=0.0,
statusFlags=[0, 0, 0, 0],
covIncrement=1.0,
)
_log.debug(" - test_av: %r", test_av)
# add it to the device
test_application.add_object(test_av)
_log.debug(" - object list: %r", this_device.objectList)
# make a binary value object
test_bv = BinaryValueObject(
objectIdentifier=("binaryValue", 1),
objectName="bv",
presentValue="inactive",
statusFlags=[0, 0, 0, 0],
)
_log.debug(" - test_bv: %r", test_bv)
# add it to the device
test_application.add_object(test_bv)
# binary value task
do_something_task = DoSomething(INTERVAL)
do_something_task.install_task()
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()