diff --git a/PLUGINS.md b/PLUGINS.md index 8c51e11..7f664df 100644 --- a/PLUGINS.md +++ b/PLUGINS.md @@ -85,10 +85,12 @@ cluster nodes. It uses plugins so that it can easily be extended. The design of health monitor plugins are very similar to the watcher plugins. -One health monitor plugin is included by default: +Two health monitor plugin are included by default: * icmpecho: This uses ICMPecho (ping) requests to check that an EC2 instance is responsive. +* tcp: This uses a TCP connection attempt to check that a process on an EC2 + instance is responsive. A health monitor plugin communicates any detected failed instances to the main event loop of the vpc-router via a queue. It always sends a full list of the @@ -101,9 +103,9 @@ host list. ## Location, naming convention and base class -The 'icmpecho' health monitor plugin is included. It is an integrated -health monitor plugin (included in the vpc-router source) and is located -in the directory `vpcrouter/monitor/plugins/`. +The 'icmpecho' and 'tcp' health monitor plugins are included. They are +integrated health monitor plugins (included in the vpc-router source) and are +located in the directory `vpcrouter/monitor/plugins/`. The `-H` / `--health` option in the vpc-router command line chooses the health monitor plugin. It uses 'icmpecho' as default value. The name of the plugin has diff --git a/README.md b/README.md index c569198..440b2c0 100644 --- a/README.md +++ b/README.md @@ -231,7 +231,17 @@ The health-check itself is implemented via plugins, which gives vpc-router the flexibility to use a wide variety of information to determine whether an EC2 routing instance is healthy. By default, it uses the 'icmpecho' plugin, which utilizes an ICMPecho ('ping') request to actively check the responsiveness of -instances. +instances. A 'tcp' plugin, which attempts TCP connection attempts on a +specified port, is also provided. + +Use the `--health` option to select the health monitor plugin, for example: + + $ vpcrouter --health tcp --tcp_check_port 22 --tcp_check_interval 5 ... + +or: + + $ vpcrouter --health icmpecho --icmp_check_interval 5 ... + ## TODO diff --git a/vpcrouter/__init__.py b/vpcrouter/__init__.py index ef0c46d..0ced49d 100644 --- a/vpcrouter/__init__.py +++ b/vpcrouter/__init__.py @@ -15,4 +15,4 @@ """ -__version__ = "1.4.0" +__version__ = "1.4.1" diff --git a/vpcrouter/monitor/common.py b/vpcrouter/monitor/common.py index 3252d83..83bfab8 100644 --- a/vpcrouter/monitor/common.py +++ b/vpcrouter/monitor/common.py @@ -113,6 +113,13 @@ def get_new_working_set(self): break return new_list_of_ips + def get_monitor_interval(self): + """ + Return the sleep time between monitoring intervals. + + """ + raise NotImplementedError() + def do_health_checks(self, list_of_ips): """ Perform a health check on a list of IP addresses. @@ -191,7 +198,7 @@ def start_monitoring(self): interval_count = 0 currently_failed_ips = set() - time.sleep(self.conf['interval']) + time.sleep(self.get_monitor_interval()) interval_count += 1 except StopReceived: diff --git a/vpcrouter/monitor/plugins/icmpecho.py b/vpcrouter/monitor/plugins/icmpecho.py index 9a046f9..b236fb6 100644 --- a/vpcrouter/monitor/plugins/icmpecho.py +++ b/vpcrouter/monitor/plugins/icmpecho.py @@ -89,11 +89,18 @@ def _do_ping(self, ip, ping_id, results): pass results[ip] = res + def get_monitor_interval(self): + """ + Return the sleep time between monitoring intervals. + + """ + return self.conf['icmp_check_interval'] + def do_health_checks(self, list_of_ips): """ Perform a health check on a list of IP addresses. - Each check (we use ICMP echo right now) is run in its own thread. + Each check (we use ICMP echo) is run in its own thread. Gather up the results and return the list of those addresses that failed the test. @@ -154,11 +161,12 @@ def add_arguments(cls, parser): Arguments for the configfile mode. """ - parser.add_argument('-i', '--interval', dest='interval', + parser.add_argument('--icmp_check_interval', + dest='icmp_check_interval', required=False, default=2, help="ICMPecho interval in seconds " - "(only in ping mode)") - return ["interval"] + "(only for 'icmpecho' health monitor plugin)") + return ["icmp_check_interval"] @classmethod def check_arguments(cls, conf): @@ -169,16 +177,16 @@ def check_arguments(cls, conf): float. """ - if not conf['interval']: - raise ArgsError("An ICMPecho interval needs to be specified (-i).") + if not conf['icmp_check_interval']: + raise ArgsError("An ICMPecho interval needs to be specified " + "(--icmp_check_interval).") try: - conf['interval'] = float(conf['interval']) + conf['icmp_check_interval'] = float(conf['icmp_check_interval']) except Exception: raise ArgsError("Specified ICMPecho interval '%s' must be " - "a number." % - conf['interval']) + "a number." % conf['icmp_check_interval']) - if not (1 <= conf['interval'] <= 3600): + if not (1 <= conf['icmp_check_interval'] <= 3600): raise ArgsError("Specified ICMPecho interval must be between " "1 and 3600 seconds") diff --git a/vpcrouter/monitor/plugins/tcp.py b/vpcrouter/monitor/plugins/tcp.py new file mode 100644 index 0000000..a86dc00 --- /dev/null +++ b/vpcrouter/monitor/plugins/tcp.py @@ -0,0 +1,175 @@ +""" +Copyright 2017 Pani Networks Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +# +# A monitor plugin for checking instance health with a TCP connection +# establishment attempt. +# + +import logging +import socket +import threading + +from vpcrouter.errors import ArgsError +from vpcrouter.monitor import common + + +class Tcp(common.MonitorPlugin): + """ + A health monitor plugin, which uses ICMP echo requests (ping) to check + instances for health. + + """ + def _do_tcp_check(self, ip, results): + """ + Attempt to establish a TCP connection. + + If not successful, record the IP in the results dict. + + Always closes the connection at the end. + + """ + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) + sock.connect((ip, self.conf['tcp_check_port'])) + except: + # Any problem during the connection attempt? We won't diagnose it, + # we just indicate failure by adding the IP to the list + results.append(ip) + finally: + sock.close() + + def get_monitor_interval(self): + """ + Return the sleep time between monitoring intervals. + + """ + return self.conf['tcp_check_interval'] + + def do_health_checks(self, list_of_ips): + """ + Perform a health check on a list of IP addresses. + + Each check (we use a TCP connection attempt) is run in its own thread. + + Gather up the results and return the list of those addresses that + failed the test. + + TODO: Currently, this starts a thread for every single address we want + to check. That's probably not a good idea if we have thousands of + addresses. Therefore, we should implement some batching for large + sets. + + """ + threads = [] + results = [] + + # Start the thread for each IP we wish to ping. We calculate a unique + # ID for the ICMP echo request sent by each thread. It's based on the + # slowly increasing time stamp (just 8 bits worth of the seconds since + # epoch)... + for count, ip in enumerate(list_of_ips): + thread = threading.Thread(target=self._do_tcp_check, + args=(ip, results)) + thread.start() + threads.append(thread) + + # ... make sure all threads are done... + for thread in threads: + thread.join() + + # ... and send back all the failed IPs. + return results + + def start(self): + """ + Start the configfile change monitoring thread. + + """ + logging.info("TCP health monitor plugin: Starting to watch " + "instances.") + + self.monitor_thread = threading.Thread(target = self.start_monitoring, + name = "HealthMon") + self.monitor_thread.daemon = True + self.monitor_thread.start() + + def stop(self): + """ + Stop the config change monitoring thread. + + """ + super(Tcp, self).stop() + self.monitor_thread.join() + logging.info("TCP health monitor plugin: Stopped") + + @classmethod + def add_arguments(cls, parser): + """ + Arguments for the configfile mode. + + """ + parser.add_argument('--tcp_check_interval', + dest='tcp_check_interval', + required=False, default=2, + help="TCP health-test interval in seconds " + "(only for 'tcp' health monitor plugin)") + parser.add_argument('--tcp_check_port', + dest='tcp_check_port', + required=False, default=22, + help="Port for TCP health-test, default 22 " + "(only for 'tcp' health monitor plugin)") + return ["tcp_check_interval", "tcp_check_port"] + + @classmethod + def check_arguments(cls, conf): + """ + Sanity checks for options needed for configfile mode. + + As a side effect, it also converts the specified interval to an + integer. + + """ + # Checking the interval + if not conf['tcp_check_interval']: + raise ArgsError("A TCP health-test interval needs to be " + "specified (--tcp_check_interval).") + + try: + conf['tcp_check_interval'] = float(conf['tcp_check_interval']) + except Exception: + raise ArgsError("Specified TCP health-test interval '%s' must be " + "a number." % conf['tcp_check_interval']) + + if not (1 <= conf['tcp_check_interval'] <= 3600): + raise ArgsError("Specified TCP health-test interval must be " + "between 1 and 3600 seconds") + + # Checking the port + if not conf['tcp_check_port']: + raise ArgsError("A port for the TCP health-test needs to be " + "specified (--tcp_check_port).") + try: + conf['tcp_check_port'] = int(conf['tcp_check_port']) + except Exception: + raise ArgsError("Specified port for the TCP health-test '%s' " + "must be a number." % conf['tcp_check_port']) + + if not (1 <= conf['tcp_check_port'] <= 65535): + raise ArgsError("Specified port for TCP health-test must be " + "between 1 and 65535") diff --git a/vpcrouter/tests/test_main.py b/vpcrouter/tests/test_main.py index afbfc0f..17f8b38 100644 --- a/vpcrouter/tests/test_main.py +++ b/vpcrouter/tests/test_main.py @@ -67,7 +67,8 @@ def test_parse_args(self): "conf" : { 'verbose': False, 'addr': 'localhost', 'mode': 'http', 'vpc_id': '123', 'logfile': 'foo', 'health' : 'icmpecho', - 'interval' : 2, 'port': 33289, 'region_name': 'foo'}}, + 'icmp_check_interval' : 2, 'port': 33289, + 'region_name': 'foo'}}, {"args" : ['-l', 'foo', '-v', '123', '-r', 'foo', '-m', 'configfile'], "exc" : ArgsError, "watcher_plugin" : "configfile", diff --git a/vpcrouter/tests/test_monitor.py b/vpcrouter/tests/test_monitor.py index a269047..99dbbc5 100644 --- a/vpcrouter/tests/test_monitor.py +++ b/vpcrouter/tests/test_monitor.py @@ -24,7 +24,7 @@ import Queue import time -from vpcrouter.monitor.plugins import icmpecho +from vpcrouter.monitor.plugins import icmpecho, tcp # This variable determines what IP addresses are considered 'failed' when we @@ -39,7 +39,7 @@ def test_sending_receiving_ping(self): # Only thing we can really send a ping to that doesn't leak of the host # is localhost. conf = { - "interval" : 2 + "icmp_check_interval" : 2 } p = icmpecho.Icmpecho(conf) try: @@ -52,6 +52,32 @@ def test_sending_receiving_ping(self): self.assertTrue(res is not None) +class TestTcpPlugin(unittest.TestCase): + + def test_tcp_health_check(self): + # Use localhost for test, so that we don't leak packets out on the + # network during tests. We also assume that we can use port 22 to try + # to connect to. + conf = { + "tcp_check_interval" : 2, + "tcp_check_port" : 22 + } + p = tcp.Tcp(conf) + results = [] + p._do_tcp_check("127.0.0.1", results) + self.assertEqual(len(results), 0) + + # Now check for a port that we assume isn't in use. + conf = { + "tcp_check_interval" : 2, + "tcp_check_port" : 65533 + } + p = tcp.Tcp(conf) + results = [] + p._do_tcp_check("127.0.0.1", results) + self.assertEqual(results, ["127.0.0.1"]) + + class TestQueues(unittest.TestCase): def setUp(self): @@ -72,7 +98,7 @@ def new_do_one(ip, timeout, dummy_id, size): else: return 0.5 conf = { - "interval" : 0.1 + "icmp_check_interval" : 0.1 } p = icmpecho.Icmpecho(conf) # Now we install this new ping function in place of the original one. @@ -222,5 +248,36 @@ def test_monitor_state_change(self): break +class TestQueuesTcp(TestQueues): + # We can run all the same tests as before, but this time with the TCP + # health monitor plugin. + + def setUp(self): + # Monkey patch the socket connect test function + def new_tcp_check(ip, results): + # There is a more low level test about malformed IP addresses that + # we use for the ICMP plugin. We don't really need to treat this in + # some extra way for the TCP plugin, but need to indicate a failure + # for this malformed IP here as well, so that the test doesn't + # fail. That's why there is the special case test for a malformed + # IP (starting with "333."). + if ip.startswith(_FAILED_PREFIX) or ip.startswith("333."): + results.append(ip) + + conf = { + "tcp_check_interval" : 0.1, + "tcp_check_port" : 22 + } + + p = tcp.Tcp(conf) + p._do_tcp_check = new_tcp_check + + p.start() + self.plugin = p + self.q_monitor_ips, self.q_failed_ips = self.plugin.get_queues() + + self.addCleanup(self.cleanup) + + if __name__ == '__main__': unittest.main() diff --git a/vpcrouter/tests/test_watcher.py b/vpcrouter/tests/test_watcher.py index d3e336f..c0d9893 100644 --- a/vpcrouter/tests/test_watcher.py +++ b/vpcrouter/tests/test_watcher.py @@ -167,12 +167,12 @@ def additional_setup(self): self.temp_dir = tempfile.mkdtemp() self.abs_fname = self.temp_dir + "/r.spec" self.conf = { - "file" : self.abs_fname, - "region_name" : "dummy-region", - "vpc_id" : "dummy-vpc", - "mode" : "configfile", - "health" : "icmpecho", - "interval" : 2 + "file" : self.abs_fname, + "region_name" : "dummy-region", + "vpc_id" : "dummy-vpc", + "mode" : "configfile", + "health" : "icmpecho", + "icmp_check_interval" : 2 } self.watcher_plugin_class = \ main.load_plugin("configfile", DEFAULT_WATCHER_PLUGIN_MOD) @@ -373,13 +373,13 @@ class TestWatcherHttp(TestWatcherConfigfile): def additional_setup(self): global PORT self.conf = { - "addr" : "localhost", - "port" : PORT, - "region_name" : "dummy-region", - "vpc_id" : "dummy-vpc", - "mode" : "http", - "health" : "icmpecho", - "interval" : 2 + "addr" : "localhost", + "port" : PORT, + "region_name" : "dummy-region", + "vpc_id" : "dummy-vpc", + "mode" : "http", + "health" : "icmpecho", + "icmp_check_interval" : 2 } self.watcher_plugin_class = \ main.load_plugin("http", DEFAULT_WATCHER_PLUGIN_MOD)