This repository has been archived by the owner on Oct 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
docker-tcp-switchboard.py
executable file
·380 lines (309 loc) · 15 KB
/
docker-tcp-switchboard.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
#!/usr/bin/env python3
from twisted.protocols.portforward import *
from twisted.internet import reactor
import time, socket
import configparser, glob
import random, string
import pprint
import json
import docker
import copy
import logging
import logging.handlers
logger = logging.getLogger("docker-tcp-switchboard")
# this is a global object that keeps track of the free ports
# when requested, it allocated a new docker instance and returns it
class DockerPorts():
CONFIG_PROFILEPREFIX = "profile:"
CONFIG_DOCKEROPTIONSPREFIX = "dockeroptions:"
def __init__(self):
self.instancesByName = dict()
self.imageParams = dict()
def _getProfilesList(self, config):
out = []
for n in config.sections():
if n.startswith(self.CONFIG_PROFILEPREFIX):
out += [n[len(self.CONFIG_PROFILEPREFIX):]]
return out
def _readProfileConfig(self, config, profilename):
fullprofilename = "{}{}".format(self.CONFIG_PROFILEPREFIX, profilename)
innerport = self._parseInt(config[fullprofilename]["innerport"])
checkupport = self._parseInt(config[fullprofilename]["checkupport"]) if "checkupport" in config[fullprofilename] else innerport
return {
"outerport": int(config[fullprofilename]["outerport"]),
"innerport": innerport,
"containername": config[fullprofilename]["container"],
"checkupport": checkupport,
"limit": self._parseInt(config[fullprofilename]["limit"]) if "limit" in config[fullprofilename] else 0,
"reuse": self._parseTruthy(config[fullprofilename]["reuse"]) if "reuse" in config[fullprofilename] else False,
"dockeroptions": self._getDockerOptions(config, profilename, innerport, checkupport)
}
def _addDockerOptionsFromConfigSection(self, config, sectionname, base={}):
import collections
def update(d, u):
for k, v in u.items():
if isinstance(v, collections.Mapping):
r = update(d.get(k, {}), v)
d[k] = r
else:
d[k] = u[k]
return d
# we may need to read json values
def guessvalue(v):
if v in ["True", "False"] or all(c in string.digits for c in v) or v.startswith("[") or v.startswith("{"):
return json.loads(v)
return v
# if sectionname doesn't exist, return base
# otherwise, read keywords and values, add them to base
if sectionname in config.sections():
newvals = dict(config[sectionname])
fixedvals = {}
for (k,v) in newvals.items():
fixedvals[k] = guessvalue(v)
base = update(base, fixedvals)
return base # FIXME
def _getDockerOptions(self, config, profilename, innerport, checkupport):
out = {}
out = self._addDockerOptionsFromConfigSection(config, "dockeroptions", {})
out = self._addDockerOptionsFromConfigSection(config, "{}{}".format(self.CONFIG_DOCKEROPTIONSPREFIX, profilename), out)
out["detach"] = True
if "ports" not in out:
out["ports"] = {}
out["ports"][innerport] = None
out["ports"][checkupport] = None
# cannot use detach and remove together
# See https://github.com/docker/docker-py/issues/1477
#out["remove"] = True
#out["auto_remove"] = True
return out
def readConfig(self, fn):
# read the configfile.
config = configparser.ConfigParser()
logger.debug("Reading configfile from {}".format(fn))
config.read(fn)
# set log file
if "global" in config.sections() and "logfile" in config["global"]:
if "global" in config.sections() and "rotatelogfileat" in config["global"]:
handler = logging.handlers.TimedRotatingFileHandler(config["global"]["logfile"], when=config["global"]["rotatelogfileat"])
else:
handler = logging.FileHandler(config["global"]["logfile"])
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# set log level
if "global" in config.sections() and "loglevel" in config["global"]:
#global logger
logger.setLevel(logging.getLevelName(config["global"]["loglevel"]))
# if there is a configdir directory, reread everything
if "global" in config.sections() and "splitconfigfiles" in config["global"]:
fnlist = [fn] + [f for f in glob.glob(config["global"]["splitconfigfiles"])]
logger.debug("Detected configdir directive. Reading configfiles from {}".format(fnlist))
config = configparser.ConfigParser()
config.read(fnlist)
if len(self._getProfilesList(config)) == 0:
logger.error("invalid configfile. No docker images")
sys.exit(1)
for profilename in self._getProfilesList(config):
conf = self._readProfileConfig(config, profilename)
logger.debug("Read config for profile {} as:\n {}".format(profilename, pprint.pformat(conf)))
self.registerProxy(profilename, conf)
return dict([(name, self.imageParams[name]["outerport"]) for name in self.imageParams.keys()])
def _parseInt(self, x):
return int(x)
def _parseTruthy(self, x):
if x.lower() in ["0", "false", "no"]:
return False
if x.lower() in ["1", "true", "yes"]:
return True
raise "Unknown truthy value {}".format(x)
def registerProxy(self, profilename, conf):
self.imageParams[profilename] = copy.deepcopy(conf)
def create(self, profilename):
containername = self.imageParams[profilename]["containername"]
dockeroptions = self.imageParams[profilename]["dockeroptions"]
imagelimit = self.imageParams[profilename]["limit"]
reuse = self.imageParams[profilename]["reuse"]
innerport = self.imageParams[profilename]["innerport"]
checkupport = self.imageParams[profilename]["checkupport"]
icount = 0
if profilename in self.instancesByName:
icount = len(self.instancesByName[profilename])
if imagelimit > 0 and icount >= imagelimit:
logger.warn("Reached max count of {} (currently {}) for image {}".format(imagelimit, icount, profilename))
return None
instance = None
if reuse and icount > 0:
logger.debug("Reusing existing instance for image {}".format(profilename))
instance = self.instancesByName[profilename][0]
else:
instance = DockerInstance(profilename, containername, innerport, checkupport, dockeroptions)
instance.start()
if profilename not in self.instancesByName:
self.instancesByName[profilename] = []
# in case of reuse, the list will have duplicates
self.instancesByName[profilename] += [instance]
return instance
def destroy(self, instance):
profilename = instance.getProfileName()
reuse = self.imageParams[profilename]["reuse"]
# in case of reuse, the list will have duplicates, but remove() does not care
self.instancesByName[profilename].remove(instance)
# stop the instance if there is no reuse, or if this is the last instance for a reused image
if not reuse or len(self.instancesByName[profilename]) == 0:
instance.stop()
# this class represents a single docker instance listening on a certain middleport.
# The middleport is managed by the DockerPorts global object
# After the docker container is started, we wait until the middleport becomes reachable
# before returning
class DockerInstance():
def __init__(self, profilename, containername, innerport, checkupport, dockeroptions):
self._profilename = profilename
self._containername = containername
self._dockeroptions = dockeroptions
self._innerport = innerport
self._checkupport = checkupport
self._instance = None
def getDockerOptions(self):
return self._dockeroptions
def getContainerName(self):
return self._containername
def getMappedPort(self, inp):
try:
return int(self._instance.attrs["NetworkSettings"]["Ports"]["{}/tcp".format(inp)][0]["HostPort"])
except Exception as e:
logger.warn("Failed to get port information for port {} from {}: {}".format(inp, self.getInstanceID(), e))
return None
def getMiddlePort(self):
return self.getMappedPort(self._innerport)
def getMiddleCheckupPort(self):
return self.getMappedPort(self._checkupport)
def getProfileName(self):
return self._profilename
def getInstanceID(self):
try:
return self._instance.id
except Exception as e:
logger.warn("Failed to get instanceid: {}".format(e))
return "None"
def start(self):
# get docker client
client = docker.from_env()
# start instance
try:
logger.debug("Starting instance {} of container {} with dockeroptions {}".format(self.getProfileName(), self.getContainerName(), pprint.pformat(self.getDockerOptions())))
clientres = client.containers.run(self.getContainerName(), **self.getDockerOptions())
self._instance = client.containers.get(clientres.id)
logger.debug("Done starting instance {} of container {}".format(self.getProfileName(), self.getContainerName()))
except Exception as e:
logger.debug("Failed to start instance {} of container {}: {}".format(self.getProfileName(), self.getContainerName(), e))
self.stop()
return False
# wait until container's checkupport is available
logger.debug("Started instance on middleport {} with ID {}".format(self.getMiddlePort(), self.getInstanceID()))
if self.__waitForOpenPort(self.getMiddleCheckupPort()):
logger.debug("Started instance on middleport {} with ID {} has open port {}".format(self.getMiddlePort(), self.getInstanceID(), self.getMiddleCheckupPort()))
return True
else:
logger.debug("Started instance on middleport {} with ID {} has closed port {}".format(self.getMiddlePort(), self.getInstanceID(), self.getMiddleCheckupPort()))
self.stop()
return False
def stop(self):
mp = self.getMiddlePort()
cid = self.getInstanceID()
logger.debug("Killing and removing {} (middleport {})".format(cid, mp))
try:
self._instance.remove(force=True)
except Exception as e:
logger.warn("Failed to remove instance for middleport {}, id {}".format(mp, cid))
return False
return True
def __isPortOpen(self, port, readtimeout=0.1):
s = socket.socket()
ret = False
logger.debug("Checking whether port {} is open...".format(port))
if port == None:
time.sleep(readtimeout)
else:
try:
s.connect(("0.0.0.0", port))
# just connecting is not enough, we should try to read and get at least 1 byte back
# since the daemon in the container might not have started accepting connections yet, while docker-proxy does
s.settimeout(readtimeout)
data = s.recv(1)
ret = len(data) > 0
except socket.error:
ret = False
logger.debug("result = ".format(ret))
s.close()
return ret
def __waitForOpenPort(self, port, timeout=5, step=0.1):
started = time.time()
while started + timeout >= time.time():
if self.__isPortOpen(port):
return True
time.sleep(step)
return False
class LoggingProxyClient(ProxyClient):
def dataReceived(self, data):
payloadlen = len(data)
self.factory.server.upBytes += payloadlen
self.peer.transport.write(data)
class LoggingProxyClientFactory(ProxyClientFactory):
protocol = LoggingProxyClient
class DockerProxyServer(ProxyServer):
clientProtocolFactory = LoggingProxyClientFactory
reactor = None
def __init__(self):
super().__init__()
self.downBytes = 0
self.upBytes = 0
self.sessionID = "".join([random.choice(string.ascii_letters) for _ in range(16)])
self.sessionStart = time.time()
# This is a reimplementation, except that we want to specify host and port...
def connectionMade(self):
# Don't read anything from the connecting client until we have
# somewhere to send it to.
self.transport.pauseProducing()
client = self.clientProtocolFactory()
client.setServer(self)
if self.reactor is None:
from twisted.internet import reactor
self.reactor = reactor
global globalDockerPorts
self.dockerinstance = globalDockerPorts.create(self.factory.profilename)
if self.dockerinstance == None:
self.transport.write(bytearray("Maximum connection-count reached. Try again later.\r\n", "utf-8"))
self.transport.loseConnection()
else:
logger.info("[Session {}] Incoming connection for image {} from {} at {}".format(self.sessionID, self.dockerinstance.getProfileName(),
self.transport.getPeer(), self.sessionStart))
self.reactor.connectTCP("0.0.0.0", self.dockerinstance.getMiddlePort(), client)
def connectionLost(self, reason):
profilename = "<none>"
if self.dockerinstance != None:
global globalDockerPorts
globalDockerPorts.destroy(self.dockerinstance)
profilename = self.dockerinstance.getProfileName()
self.dockerinstance = None
super().connectionLost(reason)
timenow = time.time()
logger.info("[Session {}] server disconnected session for image {} from {} (start={}, end={}, duration={}, upBytes={}, downBytes={}, totalBytes={})".format(
self.sessionID, profilename, self.transport.getPeer(),
self.sessionStart, timenow, timenow-self.sessionStart,
self.upBytes, self.downBytes, self.upBytes + self.downBytes))
def dataReceived(self, data):
payloadlen = len(data)
self.downBytes += payloadlen
self.peer.transport.write(data)
class DockerProxyFactory(ProxyFactory):
protocol = DockerProxyServer
def __init__(self, profilename):
self.profilename = profilename
if __name__ == "__main__":
import sys
globalDockerPorts = DockerPorts()
portsAndNames = globalDockerPorts.readConfig(sys.argv[1] if len(sys.argv) > 1 else '/etc/docker-tcp-switchboard.conf')
for (name, outerport) in portsAndNames.items():
logger.debug("Listening on port {}".format(outerport))
reactor.listenTCP(outerport, DockerProxyFactory(name), interface=sys.argv[2] if len(sys.argv) > 2 else '')
reactor.run()