Skip to content

Commit

Permalink
Merge pull request #44 from romana/issue-43-addr-in-use-exit
Browse files Browse the repository at this point in the history
Fast failure when HTTP port not available.
  • Loading branch information
jbrendel authored Aug 23, 2017
2 parents 7bddd7c + 925ff65 commit 3c1d8e5
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 8 deletions.
2 changes: 1 addition & 1 deletion vpcrouter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
"""

__version__ = "1.7.0"
__version__ = "1.7.1"
1 change: 1 addition & 0 deletions vpcrouter/currentstate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self):
self.conf = None
self.main_param_names = []
self._vpc_router_http = None
self._stop_all = False

# The following top-level items are rendered as links and can be
# accessed with separate requests.
Expand Down
31 changes: 26 additions & 5 deletions vpcrouter/main/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

import bottle
import logging
import socket
import threading
import time

from functools import wraps

Expand Down Expand Up @@ -68,16 +70,29 @@ def _log_to_logger(*args, **kwargs):
class MyWSGIRefServer(bottle.ServerAdapter):
server = None

def __init__(self, *args, **kwargs):
if 'romana_http' in kwargs:
self.romana_http = kwargs['romana_http']
del kwargs['romana_http']
super(MyWSGIRefServer, self).__init__(*args, **kwargs)

def run(self, handler):
from wsgiref.simple_server import make_server, WSGIRequestHandler
if self.quiet:
class QuietHandler(WSGIRequestHandler):
def log_request(*args, **kw):
pass
self.options['handler_class'] = QuietHandler
self.server = make_server(self.host, self.port, handler,
**self.options)
self.server.serve_forever()
try:
self.server = make_server(self.host, self.port, handler,
**self.options)
self.romana_http.wsgi_server_started = True
logging.info("HTTP server: Started to listen...")
self.server.serve_forever()
except socket.error as e:
logging.fatal("HTTP server: Cannot open socket "
"(error %d: %s)... " %
(e.errno, e.strerror))

def stop(self):
if self.server:
Expand Down Expand Up @@ -154,7 +169,8 @@ def __init__(self, conf):
Start the HTTP server thread.
"""
self.conf = conf
self.conf = conf
self.wsgi_server_started = False
self.start()

def start(self):
Expand All @@ -167,7 +183,8 @@ def start(self):
(self.conf['addr'], self.conf['port']))

self.my_server = MyWSGIRefServer(host=self.conf['addr'],
port=self.conf['port'])
port=self.conf['port'],
romana_http=self)

self.http_thread = threading.Thread(
target = APP.run,
Expand All @@ -176,6 +193,10 @@ def start(self):

self.http_thread.daemon = True
self.http_thread.start()
time.sleep(1)
if not self.wsgi_server_started:
# Set the global flag indicating that everything should stop
CURRENT_STATE._stop_all = True

def stop(self):
"""
Expand Down
4 changes: 3 additions & 1 deletion vpcrouter/monitor/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def start_monitoring(self):

try:
interval_count = 0
while True:
while not CURRENT_STATE._stop_all:
start_time = time.time()
# See if we should update our working set
new_ips = self.get_new_working_set()
Expand Down Expand Up @@ -234,6 +234,8 @@ def start_monitoring(self):
(end_time - start_time))
interval_count += 1

logging.debug("Monitoring loop ended: Global stop")

except StopReceived:
# Received the stop signal, just exiting the thread function
return
Expand Down
5 changes: 5 additions & 0 deletions vpcrouter/tests/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def new_do_health_checks(s, addrs):
('root', 'DEBUG',
'New route spec detected. Updating health-monitor '
'with: 1.1.1.1,2.2.2.2,3.3.3.3'),
('root', 'DEBUG', 'event_monitor_loop ended: Global stop'),
('root', 'DEBUG', u'Checking live IPs: 1.1.1.1,2.2.2.2,3.3.3.3'),
('root', 'INFO', u'Currently failed IPs: 3.3.3.3'))
self.lc.clear()
Expand Down Expand Up @@ -372,6 +373,7 @@ def new_do_health_checks(s, addrs):
('root', 'DEBUG',
'New route spec detected. Updating health-monitor '
'with: 2.2.2.2,3.3.3.3,4.4.4.4'),
('root', 'DEBUG', 'event_monitor_loop ended: Global stop'),
('root', 'DEBUG', u'Checking live IPs: 2.2.2.2,4.4.4.4'))

self.lc.clear()
Expand All @@ -385,6 +387,7 @@ def new_do_health_checks(s, addrs):
self.lc.check(
('root', 'DEBUG', u'Checking live IPs: 2.2.2.2,4.4.4.4'),
('root', 'DEBUG', 'Time for regular route check'),
('root', 'DEBUG', 'event_monitor_loop ended: Global stop'),
('root', 'DEBUG', u'Checking live IPs: 2.2.2.2,4.4.4.4'))

watcher.stop_plugins(watcher_plugin, health_plugin)
Expand Down Expand Up @@ -439,6 +442,8 @@ def start_thread_log_tuple(self):
('root', 'INFO',
"HTTP server: Starting to listen for requests on "
"'localhost:%d'..." % self.conf['port']),
('root', 'INFO',
'HTTP server: Started to listen...'),
('root', 'INFO',
"Http watcher plugin: Starting to watch for route spec on "
"'localhost:%d/route_spec'..." % self.conf['port'])
Expand Down
8 changes: 8 additions & 0 deletions vpcrouter/vpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ def process_route_spec_config(con, vpc_info, route_spec, failed_ips):
If a route points at a failed IP then a new candidate is chosen.
"""
if CURRENT_STATE._stop_all:
logging.debug("Routespec processing. Stop requested, abort operation")
return

if failed_ips:
logging.debug("Route spec processing. Failed IPs: %s" %
",".join(failed_ips))
Expand Down Expand Up @@ -497,6 +501,10 @@ def handle_spec(region_name, vpc_id, route_spec, failed_ips):
Connect to region and update routes according to route spec.
"""
if CURRENT_STATE._stop_all:
logging.debug("handle_spec: Stop requested, abort operation")
return

if not route_spec:
logging.debug("handle_spec: No route spec provided")
return
Expand Down
11 changes: 10 additions & 1 deletion vpcrouter/watcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _event_monitor_loop(region_name, vpc_id,
# re-created on its own.
last_route_check_time = time.time()
time_for_regular_recheck = False
while True:
while not CURRENT_STATE._stop_all:
try:
# Get the latest messages from the route-spec monitor and the
# health-check monitor. At system start the route-spec queue should
Expand Down Expand Up @@ -145,7 +145,12 @@ def _event_monitor_loop(region_name, vpc_id,
except Exception as e:
# Of course we should never get here, but if we do, better to log
# it and keep operating best we can...
import traceback
traceback.print_exc()
logging.error("*** Uncaught exception 1: %s" % str(e))
return

logging.debug("event_monitor_loop ended: Global stop")


def start_plugins(conf, watcher_plugin_class, health_plugin_class,
Expand Down Expand Up @@ -207,6 +212,10 @@ def start_watcher(conf, watcher_plugin_class, health_plugin_class,
The loop itself is in its own function to facilitate easier testing.
"""
if CURRENT_STATE._stop_all:
logging.debug("Not starting plugins: Global stop")
return

# Start the working threads (health monitor, config event monitor, etc.)
# and return the thread handles and message queues in a thread-info dict.
watcher_plugin, health_plugin = \
Expand Down

0 comments on commit 3c1d8e5

Please sign in to comment.