From 8c4b01196a950cddf8ae67cc6a01204aff12dc92 Mon Sep 17 00:00:00 2001 From: Juergen Brendel Date: Mon, 31 Jul 2017 10:46:17 +1200 Subject: [PATCH 1/4] Health monitor plugin. - Generalized plugin infrastructure from watcher plugins. - Moved ICMPecho code from monitor module into dedicated plugin. - Updated documentation. - Version bump. --- DEVELOPERS.md | 2 +- PLUGINS.md | 55 ++++- README.md | 15 +- vpcrouter/__init__.py | 2 +- vpcrouter/main/__init__.py | 125 +++++++----- vpcrouter/monitor/__init__.py | 249 ----------------------- vpcrouter/monitor/common.py | 107 ++++++++++ vpcrouter/monitor/plugins/__init__.py | 0 vpcrouter/monitor/plugins/icmpecho.py | 281 ++++++++++++++++++++++++++ vpcrouter/tests/test_main.py | 31 +-- vpcrouter/tests/test_monitor.py | 47 +++-- vpcrouter/tests/test_watcher.py | 103 ++++++---- vpcrouter/watcher/__init__.py | 68 +++---- 13 files changed, 660 insertions(+), 425 deletions(-) create mode 100644 vpcrouter/monitor/common.py create mode 100644 vpcrouter/monitor/plugins/__init__.py create mode 100644 vpcrouter/monitor/plugins/icmpecho.py diff --git a/DEVELOPERS.md b/DEVELOPERS.md index c9e55d4..3f26bad 100644 --- a/DEVELOPERS.md +++ b/DEVELOPERS.md @@ -49,7 +49,7 @@ code complexity. The architecture of vpc-router is simple: * A health-monitor thread detects if there are any failed hosts - (`vpcrouter.monitor`). + (`vpcrouter.monitor.plugins.*`). * A configuration-watcher thread detects if there are any updates to the routing configuration (`vpcrouter.watcher.plugins.*`) * A main loop receives notifications from both those threads via queues diff --git a/PLUGINS.md b/PLUGINS.md index a8888c9..8c51e11 100644 --- a/PLUGINS.md +++ b/PLUGINS.md @@ -1,3 +1,11 @@ +# Plugins for the vpc-router + +There are two types of plugins in use: + +* Watcher plugins (getting topology information from the environment and + orchestration system) +* Health monitor plugins (checking the health of routing instances) + # How to write watcher plugins The 'watcher' is the component of vpc-router that watches for changed routing @@ -41,8 +49,8 @@ read the code there, including the docstrings. A plugin needs to implement a very basic and simple API, which is defined by the `WatcherPlugin` class. The plugin class' name should be the name of the plugin, capitalized. -Therefore, the 'http' plugin provides the 'Http' class. The -'configfile' plugin provides the 'Configfile' class, and so on. +Therefore, the 'http' plugin provides the `Http` class. The +'configfile' plugin provides the `Configfile` class, and so on. ## Example of an integrated plugin @@ -68,3 +76,46 @@ As an example, please consider the It comes with its own `setup.py`, own test cases and own requirements files. By perusing this repository you can see how to develop an external plugin for vpc-router. + + +# How to write health monitor plugins + +The 'monitor' is the component of vpc-router that watches the health of the +cluster nodes. It uses plugins so that it can easily be extended. The +design of health monitor plugins are very similar to the watcher +plugins. + +One health monitor plugin is included by default: + +* icmpecho: This uses ICMPecho (ping) requests to check that an EC2 instance is + responsive. + +A health monitor plugin communicates any detected failed instances to the main +event loop of the vpc-router via a queue. It always sends a full list of the +currently failed instances, never a partial update. + +The main event loop also uses a second queue to send full host lists back to +the monitor whenever there has been a change in the overall host list. The +health monitor plugin then starts to monitor all the hosts in that updated +host list. + +## Location, naming convention and base class + +The 'icmpecho' health monitor plugin is included. It is an integrated +health monitor plugin (included in the vpc-router source) and is located +in the directory `vpcrouter/monitor/plugins/`. + +The `-H` / `--health` option in the vpc-router command line chooses the health +monitor plugin. It uses 'icmpecho' as default value. The name of the plugin has +to match the name of the Python file in which the plugin is implemented. For +example, the 'icmpecho' plugin is implemented in the +`vpcrouter/monitor/plugins/icmpecho.py` file. + +Every health monitor plugin should provide an implementation of the +`MonitorPlugin` base class, which is found in `vpcrouter/monitor/common.py`. +If you wish to write your own health monitor plugin, please make sure you +read the code there, including the docstrings. A plugin needs to implement +a very basic and simple API, which is defined by the `MonitorPlugin` class. + +The plugin class' name should be the name of the plugin, capitalized. +Therefore, the 'icmpecho' plugin provides the `Icmpecho` class. diff --git a/README.md b/README.md index 239b81a..c569198 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,10 @@ it does not depend on either project and can also be used stand-alone. Plugins for integration with different environments are provided. For example, a [plugin to integrate with Romana](https://github.com/romana/vpcrouter-romana-plugin). +Health-checks are also implemented via plugins. This means that vpc-router may +either directly contact EC2 instances to check their health, or it may instead +connect to AWS status and alert information, or use the node status provided by +orchestration systems, such as Kubernetes. ## Installation and running @@ -102,7 +106,6 @@ In order to develop or extend vpc-router, please read the [developer documentation](DEVELOPERS.md) for information that might be useful to get you started. - ## Configuration ### The route spec @@ -224,15 +227,15 @@ instance does not appear healthy anymore and it is a current target for a route then the route will be automatically updated to point to an alternate target, if a healthy one is available. -Currently, the health check consists of an ICMP echo request. In the future, -this will be made configurable. +The health-check itself is implemented via plugins, which gives vpc-router the +flexibility to use a wide variety of information to determine whether an EC2 +routing instance is healthy. By default, it uses the 'icmpecho' plugin, which +utilizes an ICMPecho ('ping') request to actively check the responsiveness of +instances. ## TODO * Support for BGP listener: Allow vpc-router to act as BGP peer and receive route announcements via BGP. -* Configurable health checks. * Ability to use CloudWatch alerts, instead of active health checks to detect instance failure. - - diff --git a/vpcrouter/__init__.py b/vpcrouter/__init__.py index a9d6034..ef0c46d 100644 --- a/vpcrouter/__init__.py +++ b/vpcrouter/__init__.py @@ -15,4 +15,4 @@ """ -__version__ = "1.3.2" +__version__ = "1.4.0" diff --git a/vpcrouter/main/__init__.py b/vpcrouter/main/__init__.py index 98317b6..1de38e6 100644 --- a/vpcrouter/main/__init__.py +++ b/vpcrouter/main/__init__.py @@ -29,12 +29,13 @@ from vpcrouter import watcher -def _setup_arg_parser(plugin_class): +def _setup_arg_parser(watcher_plugin_class, health_plugin_class): """ Configure and return the argument parser for the command line options. - If a plugin_class is provided then call the add_arguments() call back of - the plugin class, in order to add plugin specific options. + If a watcher and/or health-monitor plugin_class is provided then call the + add_arguments() callback of the plugin class(es), in order to add plugin + specific options. Some parameters are required (vpc and region, for example), but we may be able to discover them automatically, later on. Therefore, we allow them to @@ -61,19 +62,24 @@ def _setup_arg_parser(plugin_class): help="the ID of the VPC in which to operate") parser.add_argument('-m', '--mode', dest='mode', required=True, help="name of the watcher plugin") + parser.add_argument('-H', '--health', dest='health', required=False, + default="icmpecho", + help="name of the health-check plugin") parser.add_argument('--verbose', dest="verbose", action='store_true', help="produces more output") - arglist = ["logfile", "region_name", "vpc_id", "mode", "verbose"] + arglist = ["logfile", "region_name", "vpc_id", "mode", "health", "verbose"] - # Let each watcher plugin add its own arguments - if plugin_class: - arglist.extend(plugin_class.add_arguments(parser)) + # Let each watcher and health-monitor plugin add its own arguments + if watcher_plugin_class: + arglist.extend(watcher_plugin_class.add_arguments(parser)) + if health_plugin_class: + arglist.extend(health_plugin_class.add_arguments(parser)) return parser, arglist -def parse_args(args_list, plugin_class=None): +def parse_args(args_list, watcher_plugin_class=None, health_plugin_class=None): """ Parse command line arguments and return relevant values in a dict. @@ -90,7 +96,8 @@ def parse_args(args_list, plugin_class=None): conf = {} # Setting up the command line argument parser - parser, arglist = _setup_arg_parser(plugin_class) + parser, arglist = _setup_arg_parser(watcher_plugin_class, + health_plugin_class) args = parser.parse_args(args_list) @@ -98,14 +105,15 @@ def parse_args(args_list, plugin_class=None): for argname in arglist: conf[argname] = getattr(args, argname) - # Sanity checking of arguments. Let the watcher plugin class check its own - # arguments. - if plugin_class is not None: - try: - plugin_class.check_arguments(conf) - except ArgsError as e: - parser.print_help() - raise e + # Sanity checking of arguments. Let the watcher and health-monitor plugin + # class check their own arguments. + for plugin_class in [watcher_plugin_class, health_plugin_class]: + if plugin_class is not None: + try: + plugin_class.check_arguments(conf) + except ArgsError as e: + parser.print_help() + raise e return conf @@ -134,28 +142,28 @@ def setup_logging(conf): setLevel(logging.CRITICAL) -def load_plugin(mode_name): +def load_plugin(plugin_name, default_plugin_module): """ - Load a watcher plugin. + Load a plugin plugin. Supports loading of plugins that are part of the vpcrouter, as well as - external plugins: If the mode/plugin name has a dotted notation then it + external plugins: If the plugin name has a dotted notation then it assumes it's an external plugin and the dotted notation is the complete - import path. If it's just a single word then it looks for the plugin in the - vpcrouter.watcher.plugins.* module. + import path. If it's just a single word then it looks for the plugin in + the specified default module. Return the plugin class. """ try: - if "." in mode_name: + if "." in plugin_name: # Assume external plugin, full path - plugin_mod_name = mode_name - plugin_class_name = mode_name.split(".")[-1].capitalize() + plugin_mod_name = plugin_name + plugin_class_name = plugin_name.split(".")[-1].capitalize() else: # One of the built-in plugins - plugin_mod_name = "vpcrouter.watcher.plugins.%s" % mode_name - plugin_class_name = mode_name.capitalize() + plugin_mod_name = "%s.%s" % (default_plugin_module, plugin_name) + plugin_class_name = plugin_name.capitalize() plugin_mod = importlib.import_module(plugin_mod_name) plugin_class = getattr(plugin_mod, plugin_class_name) @@ -171,31 +179,33 @@ def load_plugin(mode_name): (plugin_mod_name, str(e))) -def _get_mode_name(args): +def _param_extract(args, short_form, long_form): """ - Quick and dirty extraction of mode name from argument list. + Quick extraction of a parameter from the command line argument list. + + In some cases we need to parse a few arguments before the official + arg-parser starts. - Need to do this before the proper arg-parser is setup, since the - mode/plugin may add arguments on its own, which we need for the parser - setup. + Returns parameter value, or None if not present. """ - mode_name = None # No -m / --mode was specified + val = None # Parameter wasn't defined for i, a in enumerate(args): - # Long form may use "--mode=foo", so need to split on '=' + # Long form may use "--xyz=foo", so need to split on '=', but it + # doesn't necessarily do that, can also be "--xyz foo". elems = a.split("=", 1) - if elems[0] in ["-m", "--mode"]: + if elems[0] in [short_form, long_form]: # At least make sure that an actual name was specified if len(elems) == 1: if i + 1 < len(args) and not args[i + 1].startswith("-"): - mode_name = args[i + 1] + val = args[i + 1] else: - mode_name = "" # Invalid mode was specified + val = "" # Invalid value was specified else: - mode_name = elems[1] + val = elems[1] break - return mode_name + return val def main(): @@ -203,28 +213,30 @@ def main(): Starting point of the executable. """ - # Importing all watcher plugins. - # - Each plugin is located in the vpcrouter.watcher.plugins module. - # - The name of the plugin file is the 'mode' of vpc-router, plus '.py' - # - The file has to contain a class that implements the WatcherPlugin - # interface. - # - The plugin class has to have the same name as the plugin itself, only - # capitalized. try: - # A bit of a hack: We want to load the plugin (specified via the mode - # parameter) in order to add its arguments to the argument parser. But - # this means we first need to look into the arguments to find it ... - # before looking at the arguments. So we first perform a manual search - # through the argument list for this purpose only. + # A bit of a hack: We want to load the plugins (specified via the mode + # and health parameter) in order to add their arguments to the argument + # parser. But this means we first need to look into the CLI arguments + # to find them ... before looking at the arguments. So we first perform + # a manual search through the argument list for this purpose only. args = sys.argv[1:] - mode_name = _get_mode_name(args) + mode_name = _param_extract(args, "-m", "--mode") if mode_name: - plugin_class = load_plugin(mode_name) + watcher_plugin_class = load_plugin(mode_name, + "vpcrouter.watcher.plugins") + else: + watcher_plugin_class = None + + health_check_name = _param_extract(args, "-H", "--health") + if health_check_name: + health_plugin_class = load_plugin(health_check_name, + "vpcrouter.monitor.plugins") else: - plugin_class = None + health_plugin_class = None - conf = parse_args(sys.argv[1:], plugin_class) + conf = parse_args(sys.argv[1:], + watcher_plugin_class, health_plugin_class) setup_logging(conf) # If we are on an EC2 instance then some data is already available to @@ -244,7 +256,8 @@ def main(): try: logging.info("*** Starting vpc-router in %s mode ***" % conf['mode']) - watcher.start_watcher(conf, plugin_class) + watcher.start_watcher(conf, + watcher_plugin_class, health_plugin_class) logging.info("*** Stopping vpc-router ***") except Exception as e: import traceback diff --git a/vpcrouter/monitor/__init__.py b/vpcrouter/monitor/__init__.py index 0b0ad06..e69de29 100644 --- a/vpcrouter/monitor/__init__.py +++ b/vpcrouter/monitor/__init__.py @@ -1,249 +0,0 @@ -""" -Copyright 2017 Pani Networks Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -# -# Functions for monitoring instances -# - -import logging -import ping -import Queue -import socket -import threading -import time - - -class _StopReceived(Exception): - """ - Raised after monitor thread receives stop signal. - - """ - pass - - -def my_do_one(dest_addr, ping_id, timeout, psize): - """ - Returns either the delay (in seconds) or none on timeout. - - This is a copy of the do_one function in the ping packet, but importantly, - the ID for the ping packet is different (it's now passed in from the - caller). Originally, the PID was used, which is not thread safe. - - """ - icmp = socket.getprotobyname("icmp") - try: - my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) - except socket.error, (errno, msg): - if errno == 1: - msg = msg + ( - " - Note that ICMP messages can only be sent from processes" - " running as root." - ) - raise socket.error(msg) - raise # raise the original error - - ping.send_one_ping(my_socket, dest_addr, ping_id, psize) - delay = ping.receive_one_ping(my_socket, ping_id, timeout) - - my_socket.close() - return delay - - -def _do_ping(ip, ping_id, results): - """ - Send a single ping to a specified IP address. - - The result is either a time in seconds for the ping, or None if no result - was received from the pinged IP address in time. Store the result in the - results dict that's provided to us. - - """ - try: - res = my_do_one(ip, ping_id, 2, 16) - except Exception: - # If an unreachable name or IP is specified then we might even get an - # exception here. Still just return None in that case. - res = None - results[ip] = res - - -def _get_new_working_set(q_monitor_ips): - """ - Get a new list of IPs to work with from the queue. - - This returns None if there is no update. - - Read all the messages from the queue on which we get the IP addresses - that we have to monitor. We will ignore all of them, except the last - one, since maybe we received two updates in a row, but each update - is a full state, so only the last one matters. - - Raises the _StopReceived exception if the stop signal ("None") was received - on the notification queue. - - """ - new_list_of_ips = None - while True: - try: - new_list_of_ips = q_monitor_ips.get_nowait() - q_monitor_ips.task_done() - if new_list_of_ips is None: - raise _StopReceived() - except Queue.Empty: - # No more messages, all done reading monitor list for now - break - return new_list_of_ips - - -def _do_health_checks(list_of_ips): - """ - Perform a health check on a list of IP addresses. - - Each check (we use ICMP echo right now) is run in its own thread. - - Gather up the results and return the list of those addresses that failed - the test. - - TODO: Currently, this starts a thread for every single address we want to - check. That's probably not a good idea if we have thousands of addresses. - Therefore, we should implement some batching for large sets. - - """ - threads = [] - results = {} - - # Start the thread for each IP we wish to ping. - # We calculate a unique ID for the ICMP echo request sent by each thread. - # It's based on the slowly increasing time stamp (just 8 bits worth of the - # seconds since epoch)... - nowsecs = int(time.time()) % 255 - for count, ip in enumerate(list_of_ips): - ping_id = (nowsecs << 8) + count # ... plus running count of packets - thread = threading.Thread(target=_do_ping, - args=(ip, ping_id, results)) - thread.start() - threads.append(thread) - - # ... make sure all threads are done... - for thread in threads: - thread.join() - - # ... and gather up the results and send back if needed - return [k for (k, v) in results.items() if v is None] - - -def start_monitoring(q_monitor_ips, q_failed_ips, interval=2): - """ - Monitor IP addresses and send notifications if one of them has failed. - - This function will continuously monitor q_monitor_ips for new lists of IP - addresses to monitor. Each message received there is the full state (the - complete lists of addresses to monitor). - - Push out (return) any failed IPs on q_failed_ips. This is also a list of - IPs, which may be empty if all instances work correctly. - - If q_monitor_ips receives a 'None' instead of list then this is intepreted - as a stop signal and the function exits. - - """ - time.sleep(1) - logging.debug("Started health monitoring thread") - - # This is our working set. This list may be updated occasionally when we - # receive messages on the q_monitor_ips queue. But irrespective of any - # received updates, the list of IPs in here is regularly checked. - list_of_ips = [] - currently_failed_ips = set() - - # Accumulating failed IPs for 10 intervals before rechecking them to see if - # they are alive again - recheck_failed_interval = 10 - - try: - interval_count = 0 - while True: - # See if we should update our working set - new_ips = _get_new_working_set(q_monitor_ips) - if new_ips: - list_of_ips = new_ips - - # Don't check failed IPs for liveness on every interval. We - # keep a list of currently-failed IPs for that purpose. - live_ips_to_check = [ip for ip in list_of_ips if - ip not in currently_failed_ips] - logging.debug("Checking live IPs: %s" % - (",".join(live_ips_to_check) if live_ips_to_check - else "(none alive)")) - - # Independent of any updates: Perform health check on all IPs in - # the working set and send messages out about any failed once as - # necessary. - if live_ips_to_check: - failed_ips = _do_health_checks(live_ips_to_check) - if failed_ips: - q_failed_ips.put(failed_ips) - # Update list of currently failed IPs with any new ones - currently_failed_ips.update(failed_ips) - logging.info('Currently failed IPs: %s' % - ",".join(currently_failed_ips)) - - if interval_count == recheck_failed_interval: - # Ever now and then clean out our currently failed IP cache so - # that we can recheck them to see if they are still failed. - interval_count = 0 - currently_failed_ips = set() - - time.sleep(interval) - interval_count += 1 - - except _StopReceived: - # Received the stop signal, just exiting the thread function - return - - -def start_monitor_thread(interval=2): - """ - Start a thread for the monitor function. - - Specify the interval in seconds: Time between health checks of the - specified IP addresses. - - This function returns a 3-tuple consisting of the monitoring thread, the - monitor-queue and the failed-ips-queue. - - The monitor-queue is used to pass in sets of IPs for regular monitoring. A - new message here needs to consists of the complete set of IPs that should - be monitored. - - The failed-ips-queue is used by the monitoring thread to communicate back - any failed IPs it has discovered. It's up to whoever listens on this queue - ot then take actions. - - """ - # Prepare two queues for communication with the thread - q_monitor_ips = Queue.Queue() - q_failed_ips = Queue.Queue() - monitor_thread = threading.Thread(target = start_monitoring, - name = "HealthMon", - args = (q_monitor_ips, q_failed_ips, - interval)) - monitor_thread.daemon = True - monitor_thread.start() - - # Return the thread and the two queues to the caller - return (monitor_thread, q_monitor_ips, q_failed_ips) diff --git a/vpcrouter/monitor/common.py b/vpcrouter/monitor/common.py new file mode 100644 index 0000000..9bd2ef1 --- /dev/null +++ b/vpcrouter/monitor/common.py @@ -0,0 +1,107 @@ +""" +Copyright 2017 Pani Networks Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +# +# Base class, exceptions and signals for the health monitor plugins. +# + +import Queue + + +class StopReceived(Exception): + """ + Raised after monitor thread receives stop signal. + + """ + pass + + +class MonitorPluginStopSignal(object): + """ + An object of this type received on the monitor-ips queue should signal + 'stop' to the monitoring plugin. + + """ + pass + + +class MonitorPlugin(object): + """ + Base class for all monitor plugins. + + Every plugin should implement all of these functions. + + """ + def __init__(self, conf): + """ + Gives access to the config of the program to the plugin. + + This includes all parameters, not just the ones specific to the + plugin. + + Also creates two queues: + * A queue to receive updated sets of IP addresses. + * A queue to send out notices of failed IP addresses. + + """ + self.conf = conf + self.q_monitor_ips = Queue.Queue() + self.q_failed_ips = Queue.Queue() + + def start(self): + """ + Start the health monitor thread or process. + + """ + raise NotImplementedError() + + def stop(self): + """ + Stop the health monitor thread or process. + + """ + self.q_monitor_ips.put(MonitorPluginStopSignal()) + + def get_queues(self): + """ + Return the queues, which the plugin uses to receive new IP lists and to + announce lists of failed IPs. + + """ + return (self.q_monitor_ips, self.q_failed_ips) + + @classmethod + def add_arguments(cls, parser): + """ + Callback to add command line options for this plugin to the argparse + parser. + + Return list with names of new arguments. + + """ + return [] + + @classmethod + def check_arguments(cls, conf): + """ + Callback to perform sanity checking for the plugin's specific + parameters. + + Should raise exception in case of error. + + """ + return diff --git a/vpcrouter/monitor/plugins/__init__.py b/vpcrouter/monitor/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vpcrouter/monitor/plugins/icmpecho.py b/vpcrouter/monitor/plugins/icmpecho.py new file mode 100644 index 0000000..86a6282 --- /dev/null +++ b/vpcrouter/monitor/plugins/icmpecho.py @@ -0,0 +1,281 @@ +""" +Copyright 2017 Pani Networks Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +# +# A monitor plugin for checking instance health with ICMP Echo requests. +# + +import logging +import ping +import Queue +import socket +import threading +import time + +from vpcrouter.errors import ArgsError +from vpcrouter.monitor import common + + +class EchoPermissionError(Exception): + """ + Indicating a permission error, usually when we try to run the ping code + not as root. + + """ + pass + + +class Icmpecho(common.MonitorPlugin): + """ + A health monitor plugin, which uses ICMP echo requests (ping) to check + instances for health. + + """ + def my_do_one(self, dest_addr, ping_id, timeout, psize): + """ + Returns either the delay (in seconds) or none on timeout. + + This is a copy of the do_one function in the ping packet, but + importantly, the ID for the ping packet is different (it's now passed + in from the caller). Originally, the PID was used, which is not thread + safe. + + """ + icmp = socket.getprotobyname("icmp") + try: + my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) + except socket.error, (errno, msg): + if errno == 1: + raise EchoPermissionError() + raise # raise the original error + + ping.send_one_ping(my_socket, dest_addr, ping_id, psize) + delay = ping.receive_one_ping(my_socket, ping_id, timeout) + + my_socket.close() + return delay + + def _do_ping(self, ip, ping_id, results): + """ + Send a single ping to a specified IP address. + + The result is either a time in seconds for the ping, or None if no + result was received from the pinged IP address in time. Store the + result in the results dict that's provided to us. + + """ + res = None + try: + res = self.my_do_one(ip, ping_id, 2, 16) + except EchoPermissionError: + logging.error("Cannot send ICMP echo: Note that ICMP messages " + "can only be sent from processes running as root.") + except Exception: + # If an unreachable name or IP is specified then we might even get + # an exception here. Still just return None in that case. + pass + results[ip] = res + + def _get_new_working_set(self): + """ + Get a new list of IPs to work with from the queue. + + This returns None if there is no update. + + Read all the messages from the queue on which we get the IP addresses + that we have to monitor. We will ignore all of them, except the last + one, since maybe we received two updates in a row, but each update + is a full state, so only the last one matters. + + Raises the StopReceived exception if the stop signal ("None") was + received on the notification queue. + + """ + new_list_of_ips = None + while True: + try: + new_list_of_ips = self.q_monitor_ips.get_nowait() + self.q_monitor_ips.task_done() + if type(new_list_of_ips) is common.MonitorPluginStopSignal: + raise common.StopReceived() + except Queue.Empty: + # No more messages, all done reading monitor list for now + break + return new_list_of_ips + + def _do_health_checks(self, list_of_ips): + """ + Perform a health check on a list of IP addresses. + + Each check (we use ICMP echo right now) is run in its own thread. + + Gather up the results and return the list of those addresses that + failed the test. + + TODO: Currently, this starts a thread for every single address we want + to check. That's probably not a good idea if we have thousands of + addresses. Therefore, we should implement some batching for large + sets. + + """ + threads = [] + results = {} + + # Start the thread for each IP we wish to ping. We calculate a unique + # ID for the ICMP echo request sent by each thread. It's based on the + # slowly increasing time stamp (just 8 bits worth of the seconds since + # epoch)... + nowsecs = int(time.time()) % 255 + for count, ip in enumerate(list_of_ips): + ping_id = (nowsecs << 8) + count # ... plus running count of pkts + thread = threading.Thread(target=self._do_ping, + args=(ip, ping_id, results)) + thread.start() + threads.append(thread) + + # ... make sure all threads are done... + for thread in threads: + thread.join() + + # ... and gather up the results and send back if needed + return [k for (k, v) in results.items() if v is None] + + def start_monitoring(self): + """ + Monitor IP addresses and send notifications if one of them has failed. + + This function will continuously monitor q_monitor_ips for new lists of + IP addresses to monitor. Each message received there is the full state + (the complete lists of addresses to monitor). + + Push out (return) any failed IPs on q_failed_ips. This is also a list + of IPs, which may be empty if all instances work correctly. + + If q_monitor_ips receives a 'None' instead of list then this is + intepreted as a stop signal and the function exits. + + """ + time.sleep(1) + + # This is our working set. This list may be updated occasionally when + # we receive messages on the q_monitor_ips queue. But irrespective of + # any received updates, the list of IPs in here is regularly checked. + list_of_ips = [] + currently_failed_ips = set() + + # Accumulating failed IPs for 10 intervals before rechecking them to + # see if they are alive again + recheck_failed_interval = 10 + + try: + interval_count = 0 + while True: + # See if we should update our working set + new_ips = self._get_new_working_set() + if new_ips: + list_of_ips = new_ips + + # Don't check failed IPs for liveness on every interval. We + # keep a list of currently-failed IPs for that purpose. + live_ips_to_check = [ip for ip in list_of_ips if + ip not in currently_failed_ips] + logging.debug("Checking live IPs: %s" % + (",".join(live_ips_to_check) + if live_ips_to_check else "(none alive)")) + + # Independent of any updates: Perform health check on all IPs + # in the working set and send messages out about any failed + # once as necessary. + if live_ips_to_check: + failed_ips = self._do_health_checks(live_ips_to_check) + if failed_ips: + self.q_failed_ips.put(failed_ips) + # Update list of currently failed IPs with any new ones + currently_failed_ips.update(failed_ips) + logging.info('Currently failed IPs: %s' % + ",".join(currently_failed_ips)) + + if interval_count == recheck_failed_interval: + # Ever now and then clean out our currently failed IP cache + # so that we can recheck them to see if they are still + # failed. + interval_count = 0 + currently_failed_ips = set() + + time.sleep(self.conf['interval']) + interval_count += 1 + + except common.StopReceived: + # Received the stop signal, just exiting the thread function + return + + def start(self): + """ + Start the configfile change monitoring thread. + + """ + logging.info("ICMPecho health monitor plugin: Starting to watch " + "instances.") + + self.monitor_thread = threading.Thread(target = self.start_monitoring, + name = "HealthMon") + self.monitor_thread.daemon = True + self.monitor_thread.start() + + def stop(self): + """ + Stop the config change monitoring thread. + + """ + super(Icmpecho, self).stop() + self.monitor_thread.join() + logging.info("ICMPecho health monitor plugin: Stopped") + + @classmethod + def add_arguments(cls, parser): + """ + Arguments for the configfile mode. + + """ + parser.add_argument('-i', '--interval', dest='interval', + required=False, default=2, + help="ICMPecho interval in seconds " + "(only in ping mode)") + return ["interval"] + + @classmethod + def check_arguments(cls, conf): + """ + Sanity checks for options needed for configfile mode. + + As a side effect, it also converts the specified interval to a + float. + + """ + if not conf['interval']: + raise ArgsError("An ICMPecho interval needs to be specified (-i).") + + try: + conf['interval'] = float(conf['interval']) + except Exception: + raise ArgsError("Specified ICMPecho interval '%s' must be " + "a number." % + conf['interval']) + + if not (1 <= conf['interval'] <= 3600): + raise ArgsError("Specified ICMPecho interval must be between " + "1 and 3600 seconds") diff --git a/vpcrouter/tests/test_main.py b/vpcrouter/tests/test_main.py index 359952c..afbfc0f 100644 --- a/vpcrouter/tests/test_main.py +++ b/vpcrouter/tests/test_main.py @@ -63,39 +63,44 @@ def test_parse_args(self): {"args" : ['-l', 'foo', '-v', '123'], "exc" : SystemExit, "out" : "2"}, {"args" : ['-l', 'foo', '-v', '123', '-r', 'foo', '-m', 'http'], - "exc" : None, "plugin" : "http", + "exc" : None, "watcher_plugin" : "http", "conf" : { 'verbose': False, 'addr': 'localhost', 'mode': 'http', - 'vpc_id': '123', 'logfile': 'foo', - 'port': 33289, 'region_name': 'foo'}}, + 'vpc_id': '123', 'logfile': 'foo', 'health' : 'icmpecho', + 'interval' : 2, 'port': 33289, 'region_name': 'foo'}}, {"args" : ['-l', 'foo', '-v', '123', '-r', 'foo', '-m', 'configfile'], - "exc" : ArgsError, "plugin" : "configfile", + "exc" : ArgsError, "watcher_plugin" : "configfile", "out" : "A config file needs to be specified (-f)."}, {"args" : ['-l', 'foo', '-v', '123', '-r', 'foo', '-m', 'configfile', '-f', "/_does_not_exists"], - "exc" : ArgsError, "plugin" : "configfile", + "exc" : ArgsError, "watcher_plugin" : "configfile", "out" : "Cannot open config file"}, {"args" : ['-l', 'foo', '-v', '123', '-r', 'foo', '-m', 'configfile', '-p', '99999'], - "exc" : SystemExit, "plugin" : "configfile", + "exc" : SystemExit, "watcher_plugin" : "configfile", "out" : "2"}, {"args" : ['-l', 'foo', '-v', '123', '-r', 'foo', '-m', 'http', '-p', '99999'], - "exc" : ArgsError, "plugin" : "http", + "exc" : ArgsError, "watcher_plugin" : "http", "out" : "Invalid listen port"}, {"args" : ['-l', 'foo', '-v', '123', '-r', 'foo', '-m', 'http', '-a', '999.9'], - "exc" : ArgsError, "plugin" : "http", + "exc" : ArgsError, "watcher_plugin" : "http", "out" : "Not a valid IP address"} ] for i in inp: - if 'plugin' in i: - plc = main.load_plugin(i['plugin']) + if 'watcher_plugin' in i: + wplc = main.load_plugin(i['watcher_plugin'], + "vpcrouter.watcher.plugins") else: - plc = None + wplc = None + + hplc = main.load_plugin(i.get('health_plugin', 'icmpecho'), + "vpcrouter.monitor.plugins") + args = i['args'] exc = i['exc'] out = i.get('out', "") @@ -104,10 +109,10 @@ def test_parse_args(self): sys.stderr = StringIO() if exc: with self.assertRaises(exc) as ex: - main.parse_args(args, plc) + main.parse_args(args, wplc, hplc) self.assertTrue(out in str(ex.exception.message)) else: - conf_is = main.parse_args(args, plc) + conf_is = main.parse_args(args, wplc, hplc) output = sys.stderr.getvalue().strip() ll = self.get_last_line(output) if not out: diff --git a/vpcrouter/tests/test_monitor.py b/vpcrouter/tests/test_monitor.py index b0007c7..d3a2711 100644 --- a/vpcrouter/tests/test_monitor.py +++ b/vpcrouter/tests/test_monitor.py @@ -24,7 +24,7 @@ import Queue import time -from vpcrouter import monitor +from vpcrouter.monitor.plugins import icmpecho # This variable determines what IP addresses are considered 'failed' when we @@ -33,20 +33,22 @@ _FAILED_PREFIX = None -class TestPing(unittest.TestCase): +class TestPingPlugin(unittest.TestCase): def test_sending_receiving_ping(self): # Only thing we can really send a ping to that doesn't leak of the host # is localhost. + conf = { + "interval" : 2 + } + p = icmpecho.Icmpecho(conf) try: - res = monitor.my_do_one("127.0.0.1", 1234, 1, 16) - except socket.error as e: - if "running as root" in str(e): - # We are not running as root, therefore, can't execute the - # ping. Need to just accept that. - print "@@@ Not running as root, can't test ping." - return - raise + res = p.my_do_one("127.0.0.1", 1234, 1, 16) + except icmpecho.EchoPermissionError: + # We are not running as root, therefore, can't execute the + # ping. Need to just accept that. + print "@@@ Not running as root, can't test ping." + return self.assertTrue(res is not None) @@ -69,16 +71,21 @@ def new_do_one(ip, timeout, dummy_id, size): raise socket.gaierror() else: return 0.5 + conf = { + "interval" : 0.1 + } + p = icmpecho.Icmpecho(conf) # Now we install this new ping function in place of the original one. # Clearly, this is a white box test: We know about the inner working of # the monitoring module in order to perform our monkey patch. - monitor.my_do_one = new_do_one + p.my_do_one = new_do_one # Setup the monitor thread with a small monitoring interval (all # local, no real pings). We get back the thread and the two # communication queues. - self.monitor_thread, self.q_monitor_ips, self.q_failed_ips = \ - monitor.start_monitor_thread(0.1) + p.start() + self.plugin = p + self.q_monitor_ips, self.q_failed_ips = self.plugin.get_queues() # Install the cleanup, which will send the stop signal to the monitor # thread once we are done with our test @@ -89,8 +96,7 @@ def cleanup(self): Send the stop signal to the monitoring thread and end it. """ - self.q_monitor_ips.put(None) # send the stop signal - self.monitor_thread.join() + self.plugin.stop() def test_sending_receiving(self): # @@ -110,6 +116,7 @@ def test_sending_receiving(self): # Now also with some malformed input (["333.3.3.5", "10.2.2.5", "11.3.3.5"], ["333.3.3.5", "11.3.3.5"]) ] + for inp, expected_out in input_output: self.q_monitor_ips.put(inp) if expected_out is None: @@ -121,12 +128,12 @@ def test_sending_receiving(self): while True: # Read messages until we are at the last one try: - res = self.q_failed_ips.get(timeout=1) + res = self.q_failed_ips.get(timeout=0.5) + self.q_failed_ips.task_done() + self.assertEqual(sorted(res), + sorted(expected_out)) except Queue.Empty: break - self.q_failed_ips.task_done() - self.assertEqual(sorted(res), - sorted(expected_out)) def test_multi_send_single_receive(self): # @@ -158,7 +165,7 @@ def test_multi_send_single_receive(self): self.assertEqual(sorted(res), sorted(expected_out)) # Since the monitor will keep checking the IPs, we should keep getting - # results without en countering an empty queue + # results without encountering an empty queue res = self.q_failed_ips.get(timeout=1.5) res = self.q_failed_ips.get(timeout=1.5) diff --git a/vpcrouter/tests/test_watcher.py b/vpcrouter/tests/test_watcher.py index dd199d5..d3e336f 100644 --- a/vpcrouter/tests/test_watcher.py +++ b/vpcrouter/tests/test_watcher.py @@ -32,7 +32,6 @@ from watchdog.observers import Observer from vpcrouter import main -from vpcrouter import monitor from vpcrouter import watcher from vpcrouter import vpc @@ -171,9 +170,14 @@ def additional_setup(self): "file" : self.abs_fname, "region_name" : "dummy-region", "vpc_id" : "dummy-vpc", - "mode" : "configfile" + "mode" : "configfile", + "health" : "icmpecho", + "interval" : 2 } - self.plugin_class = main.load_plugin("configfile") + self.watcher_plugin_class = \ + main.load_plugin("configfile", DEFAULT_WATCHER_PLUGIN_MOD) + self.health_plugin_class = \ + main.load_plugin("icmpecho", DEFAULT_HEALTH_PLUGIN_MOD) # The watcher thread needs to have a config file available right at the # start, even if there's nothing in it @@ -200,16 +204,6 @@ def new_handle_spec(*args, **kwargs): pass watcher.handle_spec = vpc.handle_spec = new_handle_spec - # Monkey patch the do_one ping method, since we don't really want to - # send out ICMP echo requests when we run the tests. Will indicate - # failure for all IP addresses starting with "3." - def new_do_one(ip, timeout, dummy_id, size): - if ip.startswith("3."): - return None # indicates failure - else: - return 0.5 # indicates success - monitor.my_do_one = new_do_one - def additional_cleanup(self): shutil.rmtree(self.temp_dir) @@ -234,8 +228,11 @@ def change_event_log_tuple(self): def test_watcher_thread_no_config(self): os.remove(self.abs_fname) - self.tinfo = watcher._start_working_threads( - self.conf, self.plugin_class, 2) + watcher_plugin, health_plugin = \ + watcher.start_plugins( + self.conf, + self.watcher_plugin_class, self.health_plugin_class, + 2) time.sleep(0.5) # Config file doesn't exist yet, so we should get an error. @@ -243,16 +240,20 @@ def test_watcher_thread_no_config(self): # there, yet. self.lc.check( self.start_thread_log_tuple(), - ('root', - 'ERROR', + ('root', 'ERROR', "Config ignored: Cannot open file: " - "[Errno 2] No such file or directory: '%s'" % self.abs_fname)) + "[Errno 2] No such file or directory: '%s'" % self.abs_fname), + ('root', 'INFO', + 'ICMPecho health monitor plugin: Starting to watch instances.')) - watcher._stop_working_threads(self.tinfo) + watcher.stop_plugins(watcher_plugin, health_plugin) def test_watcher_thread_wrong_config(self): - self.tinfo = watcher._start_working_threads( - self.conf, self.plugin_class, 2) + watcher_plugin, health_plugin = \ + watcher.start_plugins( + self.conf, + self.watcher_plugin_class, self.health_plugin_class, + 2) time.sleep(1.2) inp = "MALFORMED" @@ -266,15 +267,33 @@ def test_watcher_thread_wrong_config(self): ('root', 'ERROR', 'Config ignored: Expected dictionary at top level')) - watcher._stop_working_threads(self.tinfo) + watcher.stop_plugins(watcher_plugin, health_plugin) def test_watcher_thread(self): - self.tinfo = watcher._start_working_threads( - self.conf, self.plugin_class, 2) + # Monkey patch the do_one ping method of the health monitor class, + # since we don't really want to send out ICMP echo requests when we run + # the tests. Will indicate failure for all IP addresses starting with + # "3." + def new_do_one(self, ip, timeout, dummy_id, size): + if ip.startswith("3."): + return None # indicates failure + else: + return 0.5 # indicates success + + # We do this in the class, before the plugin is instantiated + self.health_plugin_class.my_do_one = new_do_one + + watcher_plugin, health_plugin = \ + watcher.start_plugins( + self.conf, + self.watcher_plugin_class, self.health_plugin_class, + 2) + time.sleep(2) self.lc.check( self.start_thread_log_tuple(), - ('root', 'DEBUG', 'Started health monitoring thread'), + ('root', 'INFO', + 'ICMPecho health monitor plugin: Starting to watch instances.'), ('root', 'DEBUG', 'Checking live IPs: (none alive)')) self.lc.clear() @@ -289,8 +308,7 @@ def test_watcher_thread(self): watcher._event_monitor_loop( "dummy-region", "dummy-vpc", - self.tinfo['q_monitor_ips'], self.tinfo['q_failed_ips'], - self.tinfo['watcher_plugin'].get_route_spec_queue(), + watcher_plugin, health_plugin, iterations=1, sleep_time=0.5) time.sleep(2) @@ -325,8 +343,7 @@ def test_watcher_thread(self): watcher._event_monitor_loop( "dummy-region", "dummy-vpc", - self.tinfo['q_monitor_ips'], self.tinfo['q_failed_ips'], - self.tinfo['watcher_plugin'].get_route_spec_queue(), + watcher_plugin, health_plugin, iterations=1, sleep_time=0.5) time.sleep(2) @@ -336,11 +353,14 @@ def test_watcher_thread(self): 'with: 2.2.2.2,3.3.3.3,4.4.4.4'), ('root', 'DEBUG', u'Checking live IPs: 2.2.2.2,4.4.4.4')) - watcher._stop_working_threads(self.tinfo) + watcher.stop_plugins(watcher_plugin, health_plugin) PORT = 33289 +DEFAULT_WATCHER_PLUGIN_MOD = "vpcrouter.watcher.plugins" +DEFAULT_HEALTH_PLUGIN_MOD = "vpcrouter.monitor.plugins" + class TestWatcherHttp(TestWatcherConfigfile): """ @@ -357,9 +377,14 @@ def additional_setup(self): "port" : PORT, "region_name" : "dummy-region", "vpc_id" : "dummy-vpc", - "mode" : "http" + "mode" : "http", + "health" : "icmpecho", + "interval" : 2 } - self.plugin_class = main.load_plugin("http") + self.watcher_plugin_class = \ + main.load_plugin("http", DEFAULT_WATCHER_PLUGIN_MOD) + self.health_plugin_class = \ + main.load_plugin("icmpecho", DEFAULT_HEALTH_PLUGIN_MOD) # Changing the listen port number of the server for each test, since we # can't reuse the socket addresses in such rapid succession PORT += 1 @@ -381,16 +406,22 @@ def change_event_log_tuple(self): return ('root', 'INFO', "New route spec posted") def test_watcher_thread_no_config(self): - self.tinfo = watcher._start_working_threads( - self.conf, self.plugin_class, 2) + self.watcher_plugin, self.health_plugin = \ + watcher.start_plugins( + self.conf, + self.watcher_plugin_class, self.health_plugin_class, + 2) time.sleep(0.5) # Config file doesn't exist yet, so we should get an error. # Health monitor is started with a second delay, so no messages from # there, yet. - self.lc.check(self.start_thread_log_tuple()) + self.lc.check( + self.start_thread_log_tuple(), + ('root', 'INFO', + 'ICMPecho health monitor plugin: Starting to watch instances.')) - watcher._stop_working_threads(self.tinfo) + watcher.stop_plugins(self.watcher_plugin, self.health_plugin) if __name__ == '__main__': diff --git a/vpcrouter/watcher/__init__.py b/vpcrouter/watcher/__init__.py index 525ed88..cd9d5ef 100644 --- a/vpcrouter/watcher/__init__.py +++ b/vpcrouter/watcher/__init__.py @@ -24,7 +24,6 @@ import Queue import time -from vpcrouter import monitor from vpcrouter import vpc from vpcrouter.currentstate import CURRENT_STATE @@ -79,7 +78,7 @@ def _update_health_monitor_with_new_ips(route_spec, all_ips, def _event_monitor_loop(region_name, vpc_id, - q_monitor_ips, q_failed_ips, q_route_spec, + watcher_plugin, health_plugin, iterations, sleep_time): """ Monitor queues to receive updates about new route specs or any detected @@ -93,6 +92,8 @@ def _event_monitor_loop(region_name, vpc_id, tests, sleep_time can be set to values less than 1. """ + q_route_spec = watcher_plugin.get_route_spec_queue() + q_monitor_ips, q_failed_ips = health_plugin.get_queues() time.sleep(sleep_time) # Wait to allow monitor to report results current_route_spec = {} # The last route spec we have seen @@ -145,60 +146,44 @@ def _event_monitor_loop(region_name, vpc_id, logging.error("*** Uncaught exception 1: %s" % str(e)) -def _start_working_threads(conf, plugin_class, sleep_time): +def start_plugins(conf, watcher_plugin_class, health_plugin_class, + sleep_time): """ Start the working threads: - - Health monitor + - Health monitor (the health plugin) - Config change monitor (the watcher plugin) - Return dict with thread info and the message queues that were created for - them. - - There are different means of feeding updated route spec configs to the - vpc-router. These are implemented in the watcher plugins. The method/plugin - is chosen via the "mode" command line argument. - - Independent of what mode is chosen, the specific config-watcher thread - always uses the same means of communicating updated configs back: A message - queue on which a full route spec is sent whenever the config changes. - """ - # Start the health monitoring thread - monitor_thread, q_monitor_ips, q_failed_ips = \ - monitor.start_monitor_thread(sleep_time) - # No matter what the chosen plugin to watch for config updates: We get a # plugin-handle back. This gives us a start(), stop() and # get_route_spec_queue() function. All watcher plugins provide the same # interface. - plugin = plugin_class(conf) - plugin.start() + watcher_plugin = watcher_plugin_class(conf) + watcher_plugin.start() - return { - "monitor_thread" : monitor_thread, - "q_monitor_ips" : q_monitor_ips, - "q_failed_ips" : q_failed_ips, - "watcher_plugin" : plugin - } + # Similarly for the health-monitor-plugin. It gives us a get_queues() + # function, to get the monitor-ips and failed-ips queues. + health_plugin = health_plugin_class(conf) + health_plugin.start() + return watcher_plugin, health_plugin -def _stop_working_threads(thread_info): - """ - Stops and collects the workin threads. - Needs the thread-info dict created by _start_working_threads(). +def stop_plugins(watcher_plugin, health_plugin): + """ + Stops all plugins. """ logging.debug("Stopping health-check monitor...") - thread_info['q_monitor_ips'].put(None) # Stop signal for health monitor - thread_info['monitor_thread'].join() + health_plugin.stop() logging.debug("Stopping config change observer...") - thread_info['watcher_plugin'].stop() # Stop signal for config watcher + watcher_plugin.stop() -def start_watcher(conf, plugin_class, iterations=None, sleep_time=1): +def start_watcher(conf, watcher_plugin_class, health_plugin_class, + iterations=None, sleep_time=1): """ Start watcher loop, listening for config changes or failed hosts. @@ -211,7 +196,7 @@ def start_watcher(conf, plugin_class, iterations=None, sleep_time=1): This function starts a few working threads: - The watcher plugin to monitor for updated route specs. - - A health monitor thread for instances mentioned in the route spec. + - A health monitor plugin for instances mentioned in the route spec. It then drops into a loop to receive messages from the health monitoring thread and watcher plugin and re-process the config if any failed IPs are @@ -222,14 +207,15 @@ def start_watcher(conf, plugin_class, iterations=None, sleep_time=1): """ # Start the working threads (health monitor, config event monitor, etc.) # and return the thread handles and message queues in a thread-info dict. - tinfo = _start_working_threads(conf, plugin_class, sleep_time) + watcher_plugin, health_plugin = \ + start_plugins(conf, watcher_plugin_class, health_plugin_class, + sleep_time) # Start the loop to process messages from the monitoring # threads about any failed IP addresses or updated route specs. _event_monitor_loop(conf['region_name'], conf['vpc_id'], - tinfo['q_monitor_ips'], tinfo['q_failed_ips'], - tinfo['watcher_plugin'].get_route_spec_queue(), + watcher_plugin, health_plugin, iterations, sleep_time) - # Stopping and collecting all worker threads when we are done - _stop_working_threads(tinfo) + # Stopping plugins and collecting all worker threads when we are done + stop_plugins(watcher_plugin, health_plugin) From 22b8ff7c8015c03273ba95d3c4f983780777c989 Mon Sep 17 00:00:00 2001 From: Juergen Brendel Date: Mon, 31 Jul 2017 10:57:11 +1200 Subject: [PATCH 2/4] Refactored basic code into health-mon base class. --- vpcrouter/monitor/common.py | 107 ++++++++++++++++++++++++++ vpcrouter/monitor/plugins/icmpecho.py | 99 +----------------------- 2 files changed, 108 insertions(+), 98 deletions(-) diff --git a/vpcrouter/monitor/common.py b/vpcrouter/monitor/common.py index 9bd2ef1..df79500 100644 --- a/vpcrouter/monitor/common.py +++ b/vpcrouter/monitor/common.py @@ -19,7 +19,9 @@ # Base class, exceptions and signals for the health monitor plugins. # +import logging import Queue +import time class StopReceived(Exception): @@ -84,6 +86,111 @@ def get_queues(self): """ return (self.q_monitor_ips, self.q_failed_ips) + def get_new_working_set(self): + """ + Get a new list of IPs to work with from the queue. + + This returns None if there is no update. + + Read all the messages from the queue on which we get the IP addresses + that we have to monitor. We will ignore all of them, except the last + one, since maybe we received two updates in a row, but each update + is a full state, so only the last one matters. + + Raises the StopReceived exception if the stop signal ("None") was + received on the notification queue. + + """ + new_list_of_ips = None + while True: + try: + new_list_of_ips = self.q_monitor_ips.get_nowait() + self.q_monitor_ips.task_done() + if type(new_list_of_ips) is MonitorPluginStopSignal: + raise StopReceived() + except Queue.Empty: + # No more messages, all done reading monitor list for now + break + return new_list_of_ips + + def do_health_checks(self, list_of_ips): + """ + Perform a health check on a list of IP addresses. + + Return a list of failed IP addresses. + + """ + raise NotImplementedError() + + def start_monitoring(self): + """ + Monitor IP addresses and send notifications if one of them has failed. + + This function will continuously monitor q_monitor_ips for new lists of + IP addresses to monitor. Each message received there is the full state + (the complete lists of addresses to monitor). + + Push out (return) any failed IPs on q_failed_ips. This is also a list + of IPs, which may be empty if all instances work correctly. + + If q_monitor_ips receives a 'None' instead of list then this is + intepreted as a stop signal and the function exits. + + """ + time.sleep(1) + + # This is our working set. This list may be updated occasionally when + # we receive messages on the q_monitor_ips queue. But irrespective of + # any received updates, the list of IPs in here is regularly checked. + list_of_ips = [] + currently_failed_ips = set() + + # Accumulating failed IPs for 10 intervals before rechecking them to + # see if they are alive again + recheck_failed_interval = 10 + + try: + interval_count = 0 + while True: + # See if we should update our working set + new_ips = self.get_new_working_set() + if new_ips: + list_of_ips = new_ips + + # Don't check failed IPs for liveness on every interval. We + # keep a list of currently-failed IPs for that purpose. + live_ips_to_check = [ip for ip in list_of_ips if + ip not in currently_failed_ips] + logging.debug("Checking live IPs: %s" % + (",".join(live_ips_to_check) + if live_ips_to_check else "(none alive)")) + + # Independent of any updates: Perform health check on all IPs + # in the working set and send messages out about any failed + # once as necessary. + if live_ips_to_check: + failed_ips = self.do_health_checks(live_ips_to_check) + if failed_ips: + self.q_failed_ips.put(failed_ips) + # Update list of currently failed IPs with any new ones + currently_failed_ips.update(failed_ips) + logging.info('Currently failed IPs: %s' % + ",".join(currently_failed_ips)) + + if interval_count == recheck_failed_interval: + # Ever now and then clean out our currently failed IP cache + # so that we can recheck them to see if they are still + # failed. + interval_count = 0 + currently_failed_ips = set() + + time.sleep(self.conf['interval']) + interval_count += 1 + + except StopReceived: + # Received the stop signal, just exiting the thread function + return + @classmethod def add_arguments(cls, parser): """ diff --git a/vpcrouter/monitor/plugins/icmpecho.py b/vpcrouter/monitor/plugins/icmpecho.py index 86a6282..9a046f9 100644 --- a/vpcrouter/monitor/plugins/icmpecho.py +++ b/vpcrouter/monitor/plugins/icmpecho.py @@ -21,7 +21,6 @@ import logging import ping -import Queue import socket import threading import time @@ -90,34 +89,7 @@ def _do_ping(self, ip, ping_id, results): pass results[ip] = res - def _get_new_working_set(self): - """ - Get a new list of IPs to work with from the queue. - - This returns None if there is no update. - - Read all the messages from the queue on which we get the IP addresses - that we have to monitor. We will ignore all of them, except the last - one, since maybe we received two updates in a row, but each update - is a full state, so only the last one matters. - - Raises the StopReceived exception if the stop signal ("None") was - received on the notification queue. - - """ - new_list_of_ips = None - while True: - try: - new_list_of_ips = self.q_monitor_ips.get_nowait() - self.q_monitor_ips.task_done() - if type(new_list_of_ips) is common.MonitorPluginStopSignal: - raise common.StopReceived() - except Queue.Empty: - # No more messages, all done reading monitor list for now - break - return new_list_of_ips - - def _do_health_checks(self, list_of_ips): + def do_health_checks(self, list_of_ips): """ Perform a health check on a list of IP addresses. @@ -154,75 +126,6 @@ def _do_health_checks(self, list_of_ips): # ... and gather up the results and send back if needed return [k for (k, v) in results.items() if v is None] - def start_monitoring(self): - """ - Monitor IP addresses and send notifications if one of them has failed. - - This function will continuously monitor q_monitor_ips for new lists of - IP addresses to monitor. Each message received there is the full state - (the complete lists of addresses to monitor). - - Push out (return) any failed IPs on q_failed_ips. This is also a list - of IPs, which may be empty if all instances work correctly. - - If q_monitor_ips receives a 'None' instead of list then this is - intepreted as a stop signal and the function exits. - - """ - time.sleep(1) - - # This is our working set. This list may be updated occasionally when - # we receive messages on the q_monitor_ips queue. But irrespective of - # any received updates, the list of IPs in here is regularly checked. - list_of_ips = [] - currently_failed_ips = set() - - # Accumulating failed IPs for 10 intervals before rechecking them to - # see if they are alive again - recheck_failed_interval = 10 - - try: - interval_count = 0 - while True: - # See if we should update our working set - new_ips = self._get_new_working_set() - if new_ips: - list_of_ips = new_ips - - # Don't check failed IPs for liveness on every interval. We - # keep a list of currently-failed IPs for that purpose. - live_ips_to_check = [ip for ip in list_of_ips if - ip not in currently_failed_ips] - logging.debug("Checking live IPs: %s" % - (",".join(live_ips_to_check) - if live_ips_to_check else "(none alive)")) - - # Independent of any updates: Perform health check on all IPs - # in the working set and send messages out about any failed - # once as necessary. - if live_ips_to_check: - failed_ips = self._do_health_checks(live_ips_to_check) - if failed_ips: - self.q_failed_ips.put(failed_ips) - # Update list of currently failed IPs with any new ones - currently_failed_ips.update(failed_ips) - logging.info('Currently failed IPs: %s' % - ",".join(currently_failed_ips)) - - if interval_count == recheck_failed_interval: - # Ever now and then clean out our currently failed IP cache - # so that we can recheck them to see if they are still - # failed. - interval_count = 0 - currently_failed_ips = set() - - time.sleep(self.conf['interval']) - interval_count += 1 - - except common.StopReceived: - # Received the stop signal, just exiting the thread function - return - def start(self): """ Start the configfile change monitoring thread. From 4ce7ba974f4c6636e80c98fa5e4d88ef26fbcd59 Mon Sep 17 00:00:00 2001 From: Juergen Brendel Date: Mon, 31 Jul 2017 12:06:54 +1200 Subject: [PATCH 3/4] Avoid failover to failed instance. Passing currently-failed-IPs, rather than just the latest failed IPs. --- vpcrouter/monitor/common.py | 9 ++++++++- vpcrouter/tests/test_monitor.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/vpcrouter/monitor/common.py b/vpcrouter/monitor/common.py index df79500..31d7abb 100644 --- a/vpcrouter/monitor/common.py +++ b/vpcrouter/monitor/common.py @@ -156,6 +156,12 @@ def start_monitoring(self): new_ips = self.get_new_working_set() if new_ips: list_of_ips = new_ips + # Update the currently-failed-IP list to only include IPs + # that are still in the spec. The list update may have + # removed some of the historical, failed IPs altogether. + currently_failed_ips = \ + set([ip for ip in currently_failed_ips \ + if ip in list_of_ips]) # Don't check failed IPs for liveness on every interval. We # keep a list of currently-failed IPs for that purpose. @@ -171,11 +177,12 @@ def start_monitoring(self): if live_ips_to_check: failed_ips = self.do_health_checks(live_ips_to_check) if failed_ips: - self.q_failed_ips.put(failed_ips) # Update list of currently failed IPs with any new ones currently_failed_ips.update(failed_ips) logging.info('Currently failed IPs: %s' % ",".join(currently_failed_ips)) + # Let the main loop know the full set of failed IPs + self.q_failed_ips.put(list(currently_failed_ips)) if interval_count == recheck_failed_interval: # Ever now and then clean out our currently failed IP cache diff --git a/vpcrouter/tests/test_monitor.py b/vpcrouter/tests/test_monitor.py index d3a2711..a269047 100644 --- a/vpcrouter/tests/test_monitor.py +++ b/vpcrouter/tests/test_monitor.py @@ -209,7 +209,7 @@ def test_monitor_state_change(self): seen_12 = False while True: res = self.q_failed_ips.get(timeout=1) - self.assertEqual(1, len(res)) # only should ever see one result + self.assertTrue(1 <= len(res) <= 2) # latest and currently failed if not seen_12: # Make sure we see at least one more message for 12. self.assertEqual(res[0], "12.0.0.0") From 625fc47b9e1edf1f7dfa39cb459c2c1f8082acbf Mon Sep 17 00:00:00 2001 From: Juergen Brendel Date: Mon, 31 Jul 2017 12:21:56 +1200 Subject: [PATCH 4/4] Provide default value for health monitor plugin. --- vpcrouter/main/__init__.py | 14 +++++++++----- vpcrouter/monitor/common.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/vpcrouter/main/__init__.py b/vpcrouter/main/__init__.py index 1de38e6..80ef83a 100644 --- a/vpcrouter/main/__init__.py +++ b/vpcrouter/main/__init__.py @@ -29,6 +29,9 @@ from vpcrouter import watcher +_HEALTH_DEFAULT_PLUGIN = "icmpecho" + + def _setup_arg_parser(watcher_plugin_class, health_plugin_class): """ Configure and return the argument parser for the command line options. @@ -63,7 +66,7 @@ def _setup_arg_parser(watcher_plugin_class, health_plugin_class): parser.add_argument('-m', '--mode', dest='mode', required=True, help="name of the watcher plugin") parser.add_argument('-H', '--health', dest='health', required=False, - default="icmpecho", + default=_HEALTH_DEFAULT_PLUGIN, help="name of the health-check plugin") parser.add_argument('--verbose', dest="verbose", action='store_true', help="produces more output") @@ -179,7 +182,7 @@ def load_plugin(plugin_name, default_plugin_module): (plugin_mod_name, str(e))) -def _param_extract(args, short_form, long_form): +def _param_extract(args, short_form, long_form, default=None): """ Quick extraction of a parameter from the command line argument list. @@ -189,7 +192,7 @@ def _param_extract(args, short_form, long_form): Returns parameter value, or None if not present. """ - val = None # Parameter wasn't defined + val = default for i, a in enumerate(args): # Long form may use "--xyz=foo", so need to split on '=', but it # doesn't necessarily do that, can also be "--xyz foo". @@ -221,14 +224,15 @@ def main(): # a manual search through the argument list for this purpose only. args = sys.argv[1:] - mode_name = _param_extract(args, "-m", "--mode") + mode_name = _param_extract(args, "-m", "--mode", default=None) if mode_name: watcher_plugin_class = load_plugin(mode_name, "vpcrouter.watcher.plugins") else: watcher_plugin_class = None - health_check_name = _param_extract(args, "-H", "--health") + health_check_name = _param_extract(args, "-H", "--health", + default=_HEALTH_DEFAULT_PLUGIN) if health_check_name: health_plugin_class = load_plugin(health_check_name, "vpcrouter.monitor.plugins") diff --git a/vpcrouter/monitor/common.py b/vpcrouter/monitor/common.py index 31d7abb..3252d83 100644 --- a/vpcrouter/monitor/common.py +++ b/vpcrouter/monitor/common.py @@ -160,7 +160,7 @@ def start_monitoring(self): # that are still in the spec. The list update may have # removed some of the historical, failed IPs altogether. currently_failed_ips = \ - set([ip for ip in currently_failed_ips \ + set([ip for ip in currently_failed_ips if ip in list_of_ips]) # Don't check failed IPs for liveness on every interval. We