Skip to content

Commit

Permalink
Merge pull request #48 from romana/issue-45-avoid
Browse files Browse the repository at this point in the history
Infrastructure for 'questionable' IP addresses
  • Loading branch information
jbrendel authored Aug 31, 2017
2 parents 8876bb8 + 9310def commit bcbc4ab
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 189 deletions.
2 changes: 1 addition & 1 deletion vpcrouter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
"""

__version__ = "1.7.2"
__version__ = "1.7.3"
28 changes: 15 additions & 13 deletions vpcrouter/currentstate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self):
self.versions = ""
self.plugins = []
self.failed_ips = []
self.questionable_ips = []
self.working_set = []
self.route_spec = {}
self.routes = {}
Expand Down Expand Up @@ -87,14 +88,15 @@ def get_state_repr(self, path):
"""
if path == "ips":
return {
"failed_ips" : self.failed_ips,
"working_set" : self.working_set,
"failed_ips" : self.failed_ips,
"questionable_ips" : self.questionable_ips,
"working_set" : self.working_set,
}

if path == "route_info":
return {
"route_spec" : self.route_spec,
"routes" : self.routes,
"route_spec" : self.route_spec,
"routes" : self.routes,
}

if path == "plugins":
Expand All @@ -105,16 +107,16 @@ def get_state_repr(self, path):

if path == "":
return {
"SERVER" : {
"version" : self.versions,
"start_time" : self.starttime.isoformat(),
"current_time" : datetime.datetime.now().isoformat()
"SERVER" : {
"version" : self.versions,
"start_time" : self.starttime.isoformat(),
"current_time" : datetime.datetime.now().isoformat()
},
"params" : self.render_main_params(),
"plugins" : {"_href" : "/plugins"},
"ips" : {"_href" : "/ips"},
"route_info" : {"_href" : "/route_info"},
"vpc" : {"_href" : "/vpc"}
"params" : self.render_main_params(),
"plugins" : {"_href" : "/plugins"},
"ips" : {"_href" : "/ips"},
"route_info" : {"_href" : "/route_info"},
"vpc" : {"_href" : "/vpc"}
}

def as_json(self, path="", with_indent=False):
Expand Down
21 changes: 13 additions & 8 deletions vpcrouter/monitor/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ def __init__(self, conf, thread_name):
This includes all parameters, not just the ones specific to the
plugin.
Also creates two queues:
Also creates three queues:
* A queue to receive updated sets of IP addresses.
* A queue to send out notices of failed IP addresses.
* A queue to inform about questionable or failing IPs (still
operational, but with some indication that it will soon change).
"""
self.conf = conf
self.q_monitor_ips = Queue.Queue()
self.q_failed_ips = Queue.Queue()
self.thread_name = thread_name
self.conf = conf
self.thread_name = thread_name

self.q_monitor_ips = Queue.Queue()
self.q_failed_ips = Queue.Queue()
self.q_questionable_ips = Queue.Queue()

def get_plugin_name(self):
return type(self).__name__.lower()
Expand All @@ -86,11 +90,12 @@ def stop(self):

def get_queues(self):
"""
Return the queues, which the plugin uses to receive new IP lists and to
announce lists of failed IPs.
Return the queues, which the plugin uses to receive new IP lists, to
announce lists of failed IPs and to communicate about questionable IP
addresses.
"""
return (self.q_monitor_ips, self.q_failed_ips)
return (self.q_monitor_ips, self.q_failed_ips, self.q_questionable_ips)

def get_new_working_set(self):
"""
Expand Down
144 changes: 93 additions & 51 deletions vpcrouter/monitor/plugins/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ def __init__(self, conf, **kwargs):

super(Multi, self).__init__(conf, "MultiHealth")

self.my_wait_interval = 0
self.plugins = []
self.monitor_ip_queues = {}
self.failed_ip_queues = {}
self.my_wait_interval = 0
self.plugins = []
self.monitor_ip_queues = {}
self.failed_ip_queues = {}
self.questionable_ip_queues = {}

# For testing purposes, it is convenient to supply already pre-created
# test plugins. We do this by calling the Multi plugin with an extra
Expand All @@ -132,6 +133,8 @@ def __init__(self, conf, **kwargs):
plugins_and_names = test_plugins

# Load and start each sub-plugin
self.failed_queue_lookup = {}
self.questionable_queue_lookup = {}
for pname, pc in plugins_and_names:
if test_plugins:
# In test configuration we get already initialized instances
Expand All @@ -150,9 +153,11 @@ def __init__(self, conf, **kwargs):
# belongs to what plugin is so that we can produce nicer logging
# messages. For the logic of the multi-plugin it doesn't really
# matter which plugin reports what failed IP.
q_monitor_ips, q_failed_ips = plugin.get_queues()
self.monitor_ip_queues[pname] = q_monitor_ips
self.failed_ip_queues[pname] = q_failed_ips
q_monitor_ips, q_failed_ips, q_questionable_ips = \
plugin.get_queues()
self.monitor_ip_queues[pname] = q_monitor_ips
self.failed_queue_lookup[pname] = q_failed_ips
self.questionable_queue_lookup[pname] = q_questionable_ips

# Also calculate our waiting interval: Double the max of each
# plugin's interval. That's sufficient to make sure we get updates
Expand All @@ -161,11 +166,12 @@ def __init__(self, conf, **kwargs):
plugin.get_monitor_interval())
self.my_wait_interval *= 2

# We will keep the reported failed IP addresses in an accumulating
# buffer. This is important, since otherwise, an updated from one
# plugin may wipe out the updated provided just before from another
# plugin.
self.expiring_data_set = ExpireSet(self.my_wait_interval * 10)
# We will keep the reportedly failed and questionable IP addresses in
# accumulating buffers. This is important, since otherwise, an update
# from one plugin may wipe out the update provided just before from
# another plugin.
self.report_failed_acc = ExpireSet(self.my_wait_interval * 10)
self.report_questionable_acc = ExpireSet(self.my_wait_interval * 10)

def get_monitor_interval(self):
"""
Expand Down Expand Up @@ -196,6 +202,66 @@ def get_info(self):
}
}

def _accumulate_ips_from_plugins(self, ip_type_name, plugin_queue_lookup,
ip_accumulator):
"""
Retrieve all IPs of a given type from all sub-plugins.
ip_type_name: A name of the type of IP we are working with.
Used for nice log messages. Example 'failed',
'questionable'.
plugin_queue_lookup: Dictionary to lookup the queues (of a given type)
for a plugins, by plugin name.
ip_accumulator: An expiring data set for this type of IP address.
Returns either a set of addresses to send out on our own reporting
queues, or None.
"""
all_reported_ips = set()
for pname, q in plugin_queue_lookup.items():
# Get all the IPs of the specified type from all the plugins.
ips = utils.read_last_msg_from_queue(q)
if ips:
logging.debug("Sub-plugin '%s' reported %d "
"%s IPs: %s" %
(pname, len(ips), ip_type_name,
",".join(ips)))
all_reported_ips.update(ips) # merge all the lists
else:
logging.debug("Sub-plugin '%s' reported no "
"%s IPs." % (pname, ip_type_name))

# Send out the combined list of reported IPs. The receiver of this
# message expects this list to always be the full list of IPs. So, IF
# they get a message, it needs to be complete, since otherwise any IP
# not mentioned in this update is considered healthy.
#
# Since different sub-plugins may report different IPs at different
# times (and not always at the same time), we need to accumulate those
# IPs that are recorded by different sub-plugins over time.
#
# We use an 'expiring data set' to store those: If any plugin refreshes
# an IP as failed then the entry remains, otherwise, it will expire
# after some time. The expiring data set therefore, is an accumulation
# of recently reported IPs. We always report this set, whenever we send
# out an update of IPs.
#
# Each type of IP (for example, 'failed' or 'questionable') has its own
# accumulator, which was passed in to this function.
if all_reported_ips:
ip_accumulator.update(all_reported_ips)
current_ips = ip_accumulator.get()
logging.info("Multi-plugin health monitor: "
"Reporting combined list of %s "
"IPs: %s" %
(ip_type_name,
",".join(current_ips)))
return current_ips
else:
logging.debug("No failed IPs to report.")
return None

def start_monitoring(self):
"""
Pass IP lists to monitor sub-plugins and get results from them.
Expand All @@ -221,45 +287,21 @@ def start_monitoring(self):
for q in self.monitor_ip_queues.values():
q.put(new_ips)

# Get any notifications about failed IPs from the plugins.
failed_ips = set()
for pname, q in self.failed_ip_queues.items():
fips = utils.read_last_msg_from_queue(q)
if fips:
logging.debug("Sub-plugin '%s' reported %d "
"failed IPs: %s" %
(pname, len(fips), ",".join(fips)))
failed_ips.update(fips)
else:
logging.debug("Sub-plugin '%s' reported no "
"failed IPs." % pname)

# Send out the combined list of failed IPs. The receiver of
# this message expects this list to always be the full list of
# IPs. So, IF they get a message, it needs to be complete,
# since otherwise any IP not mentioned in this update is
# considered healthy.
#
# Since different sub-plugins may report different IPs at
# different times (and not always at the same time), we need to
# accumulate those IPs that are recorded by different
# sub-plugins over time.
#
# We use an 'expiring data set' to store those: If any plugin
# refreshes an IP as failed then the entry remains, otherwise,
# it will expire after some time. The expiring data set
# therefore, is an accumulation of recently reported failed
# IPs. We always report this set, whenever we send out an
# update of failed IPs.
if failed_ips:
self.expiring_data_set.update(failed_ips)
current_failed_ips = self.expiring_data_set.get()
logging.info("Multi-plugin health monitor: "
"Reporting combined list of failed "
"IPs: %s" % ",".join(current_failed_ips))
self.q_failed_ips.put(current_failed_ips)
else:
logging.debug("No failed IPs to report.")
# Get any notifications about failed or questionable IPs from
# the plugins.
all_failed_ips = self._accumulate_ips_from_plugins(
"failed",
self.failed_queue_lookup,
self.report_failed_acc)
if all_failed_ips:
self.q_failed_ips.put(all_failed_ips)

all_questionable_ips = self._accumulate_ips_from_plugins(
"questionable",
self.questionable_queue_lookup,
self.report_questionable_acc)
if all_questionable_ips:
self.q_questionable_ips.put(all_questionable_ips)

time.sleep(self.get_monitor_interval())

Expand Down
39 changes: 30 additions & 9 deletions vpcrouter/tests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def new_do_health_checks(addrs):
# communication queues.
p.start()
self.plugin = p
self.q_monitor_ips, self.q_failed_ips = self.plugin.get_queues()
self.q_monitor_ips, self.q_failed_ips, self.q_questionable_ips = \
self.plugin.get_queues()

# Install the cleanup, which will send the stop signal to the monitor
# thread once we are done with our test
Expand Down Expand Up @@ -266,7 +267,8 @@ def new_tcp_check(ip, results):

p.start()
self.plugin = p
self.q_monitor_ips, self.q_failed_ips = self.plugin.get_queues()
self.q_monitor_ips, self.q_failed_ips, self.q_questionable_ips = \
self.plugin.get_queues()

self.addCleanup(self.cleanup)

Expand Down Expand Up @@ -323,11 +325,16 @@ def get_monitor_interval(self):
def start(self):
pass

def send(self, items):
def send_failed(self, items):
# This allows us to force the plugin to 'report' specified
# failed IP addresses.
self.q_failed_ips.put(items)

def send_questionable(self, items):
# This allows us to force the plugin to 'report' specified
# questionable IP addresses.
self.q_questionable_ips.put(items)

conf = {}
t1 = Testplugin(conf, "t1")
t2 = Testplugin(conf, "t2")
Expand All @@ -338,7 +345,7 @@ def send(self, items):
self.mp = mp
mp.start()

qm, qf = mp.get_queues()
qm, qf, qq = mp.get_queues()

# Test that new monitor IPs are passed on.
time.sleep(1)
Expand All @@ -351,29 +358,43 @@ def send(self, items):
# Sending various failed IPs through the two plugins. We should get
# accumulated results...
self.assertTrue(utils.read_last_msg_from_queue(qf) is None)
t1.send(["10.1.1.1", "10.1.1.2"])
t1.send_failed(["10.1.1.1", "10.1.1.2"])
time.sleep(0.5)
self.assertEqual(sorted(utils.read_last_msg_from_queue(qf)),
["10.1.1.1", "10.1.1.2"])
t2.send(["10.1.1.3"])
t2.send_failed(["10.1.1.3"])
time.sleep(0.5)
self.assertEqual(sorted(utils.read_last_msg_from_queue(qf)),
["10.1.1.1", "10.1.1.2", "10.1.1.3"])
t1.send(["10.1.1.1"])
t1.send_failed(["10.1.1.1"])
time.sleep(0.5)
self.assertEqual(sorted(utils.read_last_msg_from_queue(qf)),
["10.1.1.1", "10.1.1.2", "10.1.1.3"])
time.sleep(1)
t1.send(["10.1.1.1"])
t1.send_failed(["10.1.1.1"])
time.sleep(2)
self.assertEqual(sorted(utils.read_last_msg_from_queue(qf)),
["10.1.1.1", "10.1.1.2", "10.1.1.3"])
t1.send(["10.1.1.2"])
t1.send_failed(["10.1.1.2"])
time.sleep(1)
# ... but without refresh, some results should eventually disappear
self.assertEqual(sorted(utils.read_last_msg_from_queue(qf)),
["10.1.1.1", "10.1.1.2"])

# Now test questionable IPs
t1.send_questionable(["10.1.2.3", "10.2.3.4"])
time.sleep(0.5)
self.assertEqual(sorted(utils.read_last_msg_from_queue(qq)),
["10.1.2.3", "10.2.3.4"])
self.assertFalse(utils.read_last_msg_from_queue(qf))

t1.send_questionable(["10.9.9.9"])
t2.send_questionable(["10.2.2.2", "10.3.3.3"])
time.sleep(0.5)
self.assertEqual(sorted(utils.read_last_msg_from_queue(qq)),
["10.1.2.3", "10.2.2.2", "10.2.3.4",
"10.3.3.3", "10.9.9.9"])


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit bcbc4ab

Please sign in to comment.