-
Notifications
You must be signed in to change notification settings - Fork 130
/
Copy pathTCPServer.py
executable file
·135 lines (103 loc) · 3.62 KB
/
TCPServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python
"""
This simple TCP server application listens for one or more client connections
and echos the incoming lines back to the client. There is no conversion from
incoming streams of content into a line or any other higher-layer concept
of a packet.
"""
import os
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ArgumentParser
from bacpypes.core import run
from bacpypes.comm import PDU, Client, bind, ApplicationServiceElement
from bacpypes.tcp import TCPServerDirector
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# settings
SERVER_HOST = os.getenv('SERVER_HOST', 'any')
SERVER_PORT = int(os.getenv('SERVER_PORT', 9000))
IDLE_TIMEOUT = int(os.getenv('IDLE_TIMEOUT', 0)) or None
# globals
args = None
#
# EchoMaster
#
class EchoMaster(Client):
def confirmation(self, pdu):
if _debug: EchoMaster._debug('confirmation %r', pdu)
# send it back down the stack
self.request(PDU(pdu.pduData, destination=pdu.pduSource))
bacpypes_debugging(EchoMaster)
#
# MiddleManASE
#
class MiddleManASE(ApplicationServiceElement):
"""
An instance of this class is bound to the director, which is a
ServiceAccessPoint. It receives notifications of new actors connected
from a client, actors that are going away when the connections are closed,
and socket errors.
"""
def indication(self, add_actor=None, del_actor=None, actor_error=None, error=None):
global args
if add_actor:
if _debug: MiddleManASE._debug("indication add_actor=%r", add_actor)
# it's connected, maybe say hello
if args.hello:
self.elementService.indication(PDU(b'Hello, world!\n', destination=add_actor.peer))
if del_actor:
if _debug: MiddleManASE._debug("indication del_actor=%r", del_actor)
if actor_error:
if _debug: MiddleManASE._debug("indication actor_error=%r error=%r", actor_error, error)
bacpypes_debugging(MiddleManASE)
#
# __main__
#
def main():
global args
# parse the command line arguments
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"host", nargs='?',
help="listening address of server or 'any' (default %r)" % (SERVER_HOST,),
default=SERVER_HOST,
)
parser.add_argument(
"port", nargs='?', type=int,
help="server port (default %r)" % (SERVER_PORT,),
default=SERVER_PORT,
)
parser.add_argument(
"--idle-timeout", nargs='?', type=int,
help="idle connection timeout",
default=IDLE_TIMEOUT,
)
parser.add_argument(
"--hello", action="store_true",
default=False,
help="send a hello message to a client when it connects",
)
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# extract the server address and port
host = args.host
if host == "any":
host = ''
server_address = (host, args.port)
if _debug: _log.debug(" - server_address: %r", server_address)
# create a director listening to the address
this_director = TCPServerDirector(server_address, idle_timeout=args.idle_timeout)
if _debug: _log.debug(" - this_director: %r", this_director)
# create an echo
echo_master = EchoMaster()
if _debug: _log.debug(" - echo_master: %r", echo_master)
# bind everything together
bind(echo_master, this_director)
bind(MiddleManASE(), this_director)
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()