diff --git a/pysonofflan/cli.py b/pysonofflan/cli.py index 2734765..bcf8b37 100644 --- a/pysonofflan/cli.py +++ b/pysonofflan/cli.py @@ -116,7 +116,7 @@ async def device_id_callback(device: SonoffSwitch): if device.basic_info is not None: device.shared_state['device_id_at_current_ip'] = \ device.device_id - device.keep_running = False + device.shutdown_event_loop() SonoffSwitch( host=ip, @@ -143,9 +143,10 @@ def state(config: dict): async def state_callback(device): if device.basic_info is not None: - print_device_details(device) + if device.available: + print_device_details(device) - device.shutdown_event_loop() + device.shutdown_event_loop() logger.info("Initialising SonoffSwitch with host %s" % config['host']) SonoffSwitch( @@ -215,21 +216,27 @@ def switch_device(host, inching, new_state): async def update_callback(device: SonoffSwitch): if device.basic_info is not None: - if inching is None: - logger.info("\nInitial state:") - print_device_details(device) - device.client.keep_running = False - if new_state == "on": - await device.turn_on() - else: - await device.turn_off() - else: - logger.info("Inching device activated by switching ON for " - "%ss" % inching) + if device.available: - logger.info("\nNew state:") - print_device_details(device) + if inching is None: + print_device_details(device) + + if device.is_on: + if new_state == "on": + device.shutdown_event_loop() + else: + await device.turn_off() + + elif device.is_off: + if new_state == "off": + device.shutdown_event_loop() + else: + await device.turn_on() + + else: + logger.info("Inching device activated by switching ON for " + "%ss" % inching) SonoffSwitch( host=host, diff --git a/pysonofflan/client.py b/pysonofflan/client.py index 23f4719..92b48f4 100644 --- a/pysonofflan/client.py +++ b/pysonofflan/client.py @@ -4,26 +4,90 @@ import random import time from typing import Dict, Union, Callable, Awaitable +import asyncio +import enum import websockets from websockets.framing import OP_CLOSE, parse_close, OP_PING, OP_PONG +logger = logging.getLogger(__name__) + +V6_DEFAULT_TIMEOUT = 10 +V6_DEFAULT_PING_INTERVAL = 300 + +class InvalidState(Exception): + """ + Exception raised when an operation is forbidden in the current state. + """ + +CLOSE_CODES = { + 1000: "OK", + 1001: "going away", + 1002: "protocol error", + 1003: "unsupported type", + # 1004 is reserved + 1005: "no status code [internal]", + 1006: "connection closed abnormally [internal]", + 1007: "invalid data", + 1008: "policy violation", + 1009: "message too big", + 1010: "extension required", + 1011: "unexpected error", + 1015: "TLS failure [internal]", +} + +# A WebSocket connection goes through the following four states, in order: + +class State(enum.IntEnum): + CONNECTING, OPEN, CLOSING, CLOSED = range(4) + +class ConnectionClosed(InvalidState): + """ + Exception raised when trying to read or write on a closed connection. + Provides the connection close code and reason in its ``code`` and + ``reason`` attributes respectively. + """ + + def __init__(self, code, reason): + self.code = code + self.reason = reason + message = "WebSocket connection is closed: " + message += format_close(code, reason) + super().__init__(message) + +def format_close(code, reason): + """ + Display a human-readable version of the close code and reason. + """ + if 3000 <= code < 4000: + explanation = "registered" + elif 4000 <= code < 5000: + explanation = "private use" + else: + explanation = CLOSE_CODES.get(code, "unknown") + result = "code = {} ({}), ".format(code, explanation) + + if reason: + result += "reason = {}".format(reason) + else: + result += "no reason" + + return result class SonoffLANModeClientProtocol(websockets.WebSocketClientProtocol): """Customised WebSocket client protocol to ignore pong payload match.""" - async def read_data_frame(self, max_size): + @asyncio.coroutine + def read_data_frame(self, max_size): """ Copied from websockets.WebSocketCommonProtocol to change pong handling """ - logger = logging.getLogger(__name__) - while True: - frame = await self.read_frame(max_size) + frame = yield from self.read_frame(max_size) if frame.opcode == OP_CLOSE: self.close_code, self.close_reason = parse_close(frame.data) - await self.write_close_frame(frame.data) + yield from self.write_close_frame(frame.data) return elif frame.opcode == OP_PING: @@ -31,7 +95,7 @@ async def read_data_frame(self, max_size): logger.debug( "%s - received ping, sending pong: %s", self.side, ping_hex ) - await self.pong(frame.data) + yield from self.pong(frame.data) elif frame.opcode == OP_PONG: # Acknowledge pings on solicited pongs, regardless of payload @@ -52,6 +116,166 @@ async def read_data_frame(self, max_size): else: return frame + def __init__(self, **kwds): + + logger.debug("__init__()" ) + + if float(websockets.__version__) < 7.0: + + self.ping_interval = V6_DEFAULT_PING_INTERVAL + self.ping_timeout = V6_DEFAULT_TIMEOUT + + #self.close_code: int + #self.close_reason: str + + # Task sending keepalive pings. + self.keepalive_ping_task = None + + super().__init__(**kwds) + + def connection_open(self): + + logger.debug("connection_open()") + + super().connection_open() + + if float(websockets.__version__) < 7.0: + + # Start the task that sends pings at regular intervals. + self.keepalive_ping_task = asyncio.ensure_future( + self.keepalive_ping(), loop=self.loop + ) + + @asyncio.coroutine + def keepalive_ping(self): + + logger.debug("keepalive_ping()" ) + + if float(websockets.__version__) >= 7.0: + + super().keepalive_ping() + + else: + + """ + Send a Ping frame and wait for a Pong frame at regular intervals. + This coroutine exits when the connection terminates and one of the + following happens: + - :meth:`ping` raises :exc:`ConnectionClosed`, or + - :meth:`close_connection` cancels :attr:`keepalive_ping_task`. + """ + if self.ping_interval is None: + return + + try: + while True: + + yield from asyncio.sleep(self.ping_interval, loop=self.loop) + + # ping() cannot raise ConnectionClosed, only CancelledError: + # - If the connection is CLOSING, keepalive_ping_task will be + # canceled by close_connection() before ping() returns. + # - If the connection is CLOSED, keepalive_ping_task must be + # canceled already. + + ping_waiter = yield from self.ping() + + if self.ping_timeout is not None: + try: + yield from asyncio.wait_for( + ping_waiter, self.ping_timeout, loop=self.loop + ) + + except asyncio.TimeoutError: + logger.debug("%s ! timed out waiting for pong", self.side) + self.fail_connection(1011) + break + + except asyncio.CancelledError: + raise + + except Exception: + logger.warning("Unexpected exception in keepalive ping task", exc_info=True) + + @asyncio.coroutine + def close_connection(self): + + logger.debug("close_connection()") + + yield from super().close_connection() + + logger.debug("super.close_connection() finished" ) + + if float(websockets.__version__) < 7.0: + + # Cancel the keepalive ping task. + if self.keepalive_ping_task is not None: + self.keepalive_ping_task.cancel() + + + + def abort_keepalive_pings(self): + + logger.debug("abort_keepalive_pings()") + + if float(websockets.__version__) >= 7.0: + super().abort_keepalive_pings() + + else: + + """ + Raise ConnectionClosed in pending keepalive pings. + They'll never receive a pong once the connection is closed. + """ + assert self.state is State.CLOSED + exc = ConnectionClosed(self.close_code, self.close_reason) + exc.__cause__ = self.transfer_data_exc # emulate raise ... from ... + + try: + + for ping in self.pings.values(): + ping.set_exception(exc) + + except asyncio.InvalidStateError: + pass + + """ No Need to do this as in V6, this is done in super.close_connection() + + if self.pings: + pings_hex = ', '.join( + binascii.hexlify(ping_id).decode() or '[empty]' + for ping_id in self.pings + ) + plural = 's' if len(self.pings) > 1 else '' + logger.debug( + "%s - aborted pending ping%s: %s", self.side, plural, pings_hex + )""" + + def connection_lost(self, exc): + + logger.debug("connection_lost()" ) + + if float(websockets.__version__) < 7.0: + + logger.debug("%s - event = connection_lost(%s)", self.side, exc) + self.state = State.CLOSED + logger.debug("%s - state = CLOSED", self.side) + if self.close_code is None: + self.close_code = 1006 + if self.close_reason is None: + self.close_reason = "" + logger.debug( + "%s x code = %d, reason = %s", + self.side, + self.close_code, + self.close_reason or "[no reason]", + ) + + self.abort_keepalive_pings() + + super().connection_lost(exc) + + class SonoffLANModeClient: """ @@ -81,9 +305,10 @@ def __init__(self, host: str, self.timeout = timeout self.logger = logger self.websocket = None - self.keep_running = True self.event_handler = event_handler - self.connected = False + self.connected_event = asyncio.Event() + self.disconnected_event = asyncio.Event() + if self.logger is None: self.logger = logging.getLogger(__name__) @@ -112,29 +337,29 @@ async def connect(self): subprotocols=['chat'], klass=SonoffLANModeClientProtocol ) - self.connected = True except websockets.InvalidMessage as ex: self.logger.error('SonoffLANModeClient connection failed: %s' % ex) raise ex async def close_connection(self): self.logger.debug('Closing websocket from client close_connection') - self.connected = False + self.connected_event.clear() + self.disconnected_event.set() if self.websocket is not None: + self.logger.debug('calling websocket.close') await self.websocket.close() - + self.websocket = None # Ensure we cannot close multiple times + self.logger.debug('websocket was closed') + async def receive_message_loop(self): try: - while self.keep_running: + while True: self.logger.debug('Waiting for messages on websocket') message = await self.websocket.recv() await self.event_handler(message) self.logger.debug('Message passed to handler, should loop now') finally: - self.logger.debug('receive_message_loop finally block reached: ' - 'closing websocket') - if self.websocket is not None: - await self.websocket.close() + self.logger.debug('receive_message_loop finally block reached') async def send_online_message(self): self.logger.debug('Sending user online message over websocket') diff --git a/pysonofflan/sonoffdevice.py b/pysonofflan/sonoffdevice.py index 7d9db5b..287bce1 100644 --- a/pysonofflan/sonoffdevice.py +++ b/pysonofflan/sonoffdevice.py @@ -7,6 +7,7 @@ import logging from typing import Callable, Awaitable, Dict +import traceback import websockets from .client import SonoffLANModeClient @@ -34,105 +35,217 @@ def __init__(self, self.shared_state = shared_state self.basic_info = None self.params = {} - self.send_updated_params_task = None self.params_updated_event = None self.loop = loop + self.tasks = [] # store the tasks that this module create s in a sequence + self.new_loop = False # use to decide if we should shutdown the loop on exit + self.messages_received = 0 if logger is None: self.logger = logging.getLogger(__name__) else: self.logger = logger - self.logger.debug( - 'Initializing SonoffLANModeClient class in SonoffDevice') - self.client = SonoffLANModeClient( - host, - self.handle_message, - ping_interval=ping_interval, - timeout=timeout, - logger=self.logger - ) - try: if self.loop is None: + + self.new_loop = True self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) - asyncio.set_event_loop(self.loop) + self.logger.debug( + 'Initializing SonoffLANModeClient class in SonoffDevice') + self.client = SonoffLANModeClient( + host, + self.handle_message, + ping_interval=ping_interval, + timeout=timeout, + logger=self.logger + ) + self.message_ping_event = asyncio.Event() + self.message_acknowledged_event = asyncio.Event() self.params_updated_event = asyncio.Event() - self.send_updated_params_task = self.loop.create_task( - self.send_updated_params_loop() - ) - if not self.loop.is_running(): - self.loop.run_until_complete(self.setup_connection()) - else: - asyncio.ensure_future(self.setup_connection()) + self.tasks.append(self.loop.create_task(self.send_updated_params_loop())) + + self.tasks.append(self.loop.create_task(self.send_availability_loop())) + + self.setup_connection_task = self.loop.create_task(self.setup_connection(not self.new_loop)) + self.tasks.append(self.setup_connection_task) + + if self.new_loop: + self.loop.run_until_complete(self.setup_connection_task) except asyncio.CancelledError: self.logger.debug('SonoffDevice loop ended, returning') - async def setup_connection(self): + async def setup_connection(self, retry): self.logger.debug('setup_connection is active on the event loop') + + retry_count = 0 + + while True: + connected = False + try: + self.logger.debug('setup_connection yielding to connect()') + await self.client.connect() + self.logger.debug( + 'setup_connection yielding to send_online_message()') + await self.client.send_online_message() + + connected = True + + except websockets.InvalidMessage as ex: + self.logger.warn('Unable to connect: %s' % ex) + await self.wait_before_retry(retry_count) + except ConnectionRefusedError: + self.logger.warn('Unable to connect: connection refused') + await self.wait_before_retry(retry_count) + except websockets.exceptions.ConnectionClosed: + self.logger.warn('Connection closed unexpectedly during setup') + await self.wait_before_retry(retry_count) + except OSError as ex: + self.logger.warn('OSError in setup_connection(): %s', format(ex) ) + await self.wait_before_retry(retry_count) + except Exception as ex: + self.logger.error('Unexpected error in setup_connection(): %s', format(ex) ) + await self.wait_before_retry(retry_count) + + if connected: + retry_count = 0 # reset retry count after successful connection + try: + self.logger.debug( + 'setup_connection yielding to receive_message_loop()') + await self.client.receive_message_loop() + + except websockets.InvalidMessage as ex: + self.logger.warn('Unable to connect: %s' % ex) + except websockets.exceptions.ConnectionClosed: + self.logger.warn('Connection closed in receive_message_loop()') + except OSError as ex: + self.logger.warn('OSError in receive_message_loop(): %s', format(ex) ) + + except asyncio.CancelledError: + self.logger.debug('receive_message_loop() cancelled' ) + break + + except Exception as ex: + self.logger.error('Unexpected error in receive_message_loop(): %s', format(ex) ) + + finally: + self.message_ping_event.set() + self.logger.debug('finally: closing websocket from setup_connection') + await self.client.close_connection() + + if not retry: + break + + retry_count +=1 + + self.shutdown_event_loop() + self.logger.debug('exiting setup_connection()') + + async def wait_before_retry(self, retry_count): try: - self.logger.debug('setup_connection yielding to connect()') - await self.client.connect() - self.logger.debug( - 'setup_connection yielding to send_online_message()') - await self.client.send_online_message() - self.logger.debug( - 'setup_connection yielding to receive_message_loop()') - await self.client.receive_message_loop() - except websockets.InvalidMessage as ex: - self.logger.error('Unable to connect: %s' % ex) - self.shutdown_event_loop() - except ConnectionRefusedError: - self.logger.error('Unable to connect: connection refused') - self.shutdown_event_loop() - except websockets.exceptions.ConnectionClosed: - self.logger.error('Connection closed unexpectedly') - self.shutdown_event_loop() - finally: - self.logger.debug( - 'finally: closing websocket from setup_connection') - await self.client.close_connection() - self.logger.debug('setup_connection resumed, exiting') + wait_times = [0.5,1,2,5,10,30,60] # increasing backoff each retry attempt + + if retry_count >= len(wait_times): + retry_count = len(wait_times) -1 + + wait_time = wait_times[retry_count] + + self.logger.debug('Waiting %i seconds before retry', wait_time) + + await asyncio.sleep(wait_time) + + except Exception as ex: + self.logger.error('Unexpected error in wait_before_retry(): %s', format(ex) ) + + async def send_availability_loop(self): + + try: + while True: + await self.client.disconnected_event.wait() + + if self.callback_after_update is not None: + await self.callback_after_update(self) + self.client.disconnected_event.clear() + finally: + self.logger.debug('exiting send_availability_loop()') async def send_updated_params_loop(self): self.logger.debug( 'send_updated_params_loop is active on the event loop') try: + self.logger.debug( 'Starting loop waiting for device params to change') - while self.client.keep_running: + + while True: self.logger.debug( 'send_updated_params_loop now awaiting event') + await self.params_updated_event.wait() + + await self.client.connected_event.wait() + self.logger.debug('Connected!') update_message = self.client.get_update_payload( self.device_id, self.params ) - await self.client.send(update_message) - self.params_updated_event.clear() - self.logger.debug('Update message sent, event cleared, should ' - 'loop now') - finally: - self.logger.debug( - 'send_updated_params_loop finally block reached: ' - 'closing websocket') - await self.client.close_connection() - self.logger.debug( - 'send_updated_params_loop resumed outside loop, exiting') + try: + self.message_ping_event.clear() + self.message_acknowledged_event.clear() + await self.client.send(update_message) + + await asyncio.wait_for(self.message_ping_event.wait(), 2) + + if self.message_acknowledged_event.is_set(): + self.params_updated_event.clear() + self.logger.debug('Update message sent, event cleared, should ' + 'loop now') + else: + self.logger.warn( + "we didn't get an acknowledge message, we have probably been disconnected!") + # message 'ping', but not an acknowledgement, so loop + # if we were disconnected we will wait for reconnection + # if it was another type of message, we will resend change + + + except websockets.exceptions.ConnectionClosed: + self.logger.error('Connection closed unexpectedly in send()') + except asyncio.TimeoutError: + self.logger.warn('Update message not received, close connection, then loop') + await self.client.close_connection() # closing connection causes cascade failure in setup_connection and reconnect + except OSError as ex: + self.logger.warn('OSError in send(): %s', format(ex) ) + + except asyncio.CancelledError: + self.logger.debug('send_updated_params_loop cancelled') + break + + except Exception as ex: + self.logger.error('Unexpected error in send(): %s', format(ex) ) + + except asyncio.CancelledError: + self.logger.debug('send_updated_params_loop cancelled') + + except Exception as ex: + self.logger.error('Unexpected error in send(): %s', format(ex) ) + + finally: + self.logger.debug('send_updated_params_loop finally block reached') def update_params(self, params): self.logger.debug( 'Scheduling params update message to device: %s' % params - ) + ) self.params = params self.params_updated_event.set() @@ -141,6 +254,10 @@ async def handle_message(self, message): Receive message sent by the device and handle it, either updating state or storing basic device info """ + + self.messages_received +=1 # ensure debug messages are unique to stop deduplication by logger + self.message_ping_event.set() + response = json.loads(message) if ( @@ -148,26 +265,42 @@ async def handle_message(self, message): and 'deviceid' in response ): self.logger.debug( - 'Received basic device info, storing in instance') + 'Message: %i: Received basic device info, storing in instance', self.messages_received) self.basic_info = response + + if self.client.connected_event.is_set(): # only mark message as accepted if we are already online (otherwise this is an initial connection message) + self.message_acknowledged_event.set() + + if self.callback_after_update is not None: + await self.callback_after_update(self) + elif 'action' in response and response['action'] == "update": + self.logger.debug( - 'Received update from device, updating internal state to: %s' - % response['params']) - self.params = response['params'] + 'Message: %i: Received update from device, updating internal state to: %s' + , self.messages_received , response['params'] ) - if self.callback_after_update is not None: + if not self.client.connected_event.is_set(): + self.client.connected_event.set() + self.client.disconnected_event.clear() + send_update = True + + if not self.params_updated_event.is_set(): # only update internal state if there is not a new message queued to be sent + + if self.params != response['params']: # only send client update message if there is a change + self.params = response['params'] + send_update = True + + if send_update and self.callback_after_update is not None: await self.callback_after_update(self) + else: self.logger.error( 'Unknown message received from device: ' % message) raise Exception('Unknown message received from device') def shutdown_event_loop(self): - self.logger.debug( - 'shutdown_event_loop called, setting keep_running to ' - 'False') - self.client.keep_running = False + self.logger.debug('shutdown_event_loop called') try: # Hide Cancelled Error exceptions during shutdown @@ -182,32 +315,43 @@ def shutdown_exception_handler(loop, context): # Handle shutdown gracefully by waiting for all tasks # to be cancelled tasks = asyncio.gather( - *asyncio.all_tasks(loop=self.loop), + *self.tasks, loop=self.loop, return_exceptions=True ) - - tasks.add_done_callback(lambda t: self.loop.stop()) + + if self.new_loop: + tasks.add_done_callback(lambda t: self.loop.stop()) + tasks.cancel() # Keep the event loop running until it is either # destroyed or all tasks have really terminated - while ( - not tasks.done() - and not self.loop.is_closed() - and not self.loop.is_running() - ): - self.loop.run_forever() + + if self.new_loop: + while ( + not tasks.done() + and not self.loop.is_closed() + and not self.loop.is_running() + ): + self.loop.run_forever() + + except Exception as ex: + self.logger.error('Unexpected error in shutdown_event_loop(): %s', format(ex) ) + finally: - if ( - hasattr(self.loop, "shutdown_asyncgens") - and not self.loop.is_running() - ): - # Python 3.5 - self.loop.run_until_complete( - self.loop.shutdown_asyncgens() - ) - self.loop.close() + if self.new_loop: + + if ( + hasattr(self.loop, "shutdown_asyncgens") + and not self.loop.is_running() + ): + # Python 3.5 + self.loop.run_until_complete( + self.loop.shutdown_asyncgens() + ) + self.loop.close() + @property def device_id(self) -> str: @@ -256,3 +400,8 @@ def __repr__(self): return "<%s at %s>" % ( self.__class__.__name__, self.host) + + @property + def available(self) -> bool: + + return self.client.connected_event.is_set() diff --git a/pysonofflan/sonoffswitch.py b/pysonofflan/sonoffswitch.py index 495047a..1d509a0 100644 --- a/pysonofflan/sonoffswitch.py +++ b/pysonofflan/sonoffswitch.py @@ -69,14 +69,17 @@ def state(self) -> str: SWITCH_STATE_UNKNOWN :rtype: str """ - state = self.params['switch'] - + try: + state = self.params['switch'] + except: + state = SonoffSwitch.SWITCH_STATE_UNKNOWN + if state == "off": return SonoffSwitch.SWITCH_STATE_OFF elif state == "on": return SonoffSwitch.SWITCH_STATE_ON else: - self.logger.warning("Unknown state %s returned.", state) + self.logger.debug("Unknown state %s returned.", state) return SonoffSwitch.SWITCH_STATE_UNKNOWN @state.setter @@ -157,11 +160,12 @@ async def pre_callback_after_update(self, _): "Inching switch activated, waiting %ss before " "turning OFF again" % self.inching_seconds) - self.loop.call_later( + inching_task = self.loop.call_later( self.inching_seconds, self.callback_to_turn_off_inching ) + self.tasks.append(inching_task) await self.turn_on() else: self.logger.debug("Not inching switch, calling parent callback") diff --git a/requirements_dev.txt b/requirements_dev.txt index 703efd2..815930e 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,18 +1,18 @@ -pip==19.1 +pip==19.1.1 bump2version==0.5.10 -wheel==0.33.0 +wheel==0.33.4 watchdog==0.9.0 flake8==3.7.7 tox==3.11.1 -coverage==4.5.2 -Sphinx==1.8.4 -pytest==4.4.1 +coverage==4.5.3 +Sphinx==2.0.1 +pytest==4.5.0 twine==1.13.0 click==7.0 -restructuredtext-lint==1.2.2 +restructuredtext-lint==1.3.0 collective.checkdocs==0.2 -coveralls==1.6.0 -pipupgrade==1.4.0 +coveralls==1.7.0 +pipupgrade==1.5.0 cmarkgfm==0.4.2 -mock==2.0.0 -mocket==2.6.0 +mock==3.0.5 +mocket==2.7.3 diff --git a/tests/test_cli.py b/tests/test_cli.py index 706771e..0dbb036 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -50,6 +50,13 @@ def test_cli_no_device_id(self): result = runner.invoke(cli.cli, ['--device_id']) assert 'Error: --device_id option requires an argument' in \ result.output + + def test_cli_no_host_id(self): + """Test the CLI.""" + runner = CliRunner() + result = runner.invoke(cli.cli, ['--host']) + assert 'Error: --host option requires an argument' in \ + result.output def test_cli_state(self): """Test the CLI."""