diff --git a/slixmpp/plugins/xep_0138.py b/slixmpp/plugins/xep_0138.py index b7039f7b..d5b04b4b 100644 --- a/slixmpp/plugins/xep_0138.py +++ b/slixmpp/plugins/xep_0138.py @@ -1,21 +1,14 @@ - -# slixmpp: The Slick XMPP Library -# Copyright (C) 2011 Nathanael C. Fritz -# This file is part of slixmpp. -# See the file LICENSE for copying permission. import logging import zlib - from slixmpp.stanza import StreamFeatures -from slixmpp.xmlstream import RestartStream, register_stanza_plugin, ElementBase, StanzaBase +from slixmpp.xmlstream import register_stanza_plugin, ElementBase, StanzaBase, tostring from slixmpp.xmlstream.matcher import * from slixmpp.xmlstream.handler import * from slixmpp.plugins import BasePlugin, register_plugin log = logging.getLogger(__name__) - class Compression(ElementBase): name = 'compression' namespace = 'http://jabber.org/features/compress' @@ -44,6 +37,8 @@ def setup(self, xml): StanzaBase.setup(self, xml) self.xml.tag = self.tag_name() + def set_method(self, method): + self._set_sub_text('{}method', text=method) class Compressed(StanzaBase): name = 'compressed' @@ -56,35 +51,19 @@ def setup(self, xml): StanzaBase.setup(self, xml) self.xml.tag = self.tag_name() +class CompressFailure(StanzaBase): + name = 'failure' + namespace = 'http://jabber.org/protocol/compress' + conditions = {'setup-failed', 'unsupported-method'} + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() - -class ZlibSocket(object): - - def __init__(self, socketobj): - self.__socket = socketobj - self.compressor = zlib.compressobj() - self.decompressor = zlib.decompressobj(zlib.MAX_WBITS) - - def __getattr__(self, name): - return getattr(self.__socket, name) - - def send(self, data): - sentlen = len(data) - data = self.compressor.compress(data) - data += self.compressor.flush(zlib.Z_SYNC_FLUSH) - log.debug(b'>>> (compressed)' + (data.encode("hex"))) - #return self.__socket.send(data) - sentactuallen = self.__socket.send(data) - assert(sentactuallen == len(data)) - - return sentlen - - def recv(self, *args, **kwargs): - data = self.__socket.recv(*args, **kwargs) - log.debug(b'<<< (compressed)' + data.encode("hex")) - return self.decompressor.decompress(self.decompressor.unconsumed_tail + data) - + def get_condition(self): + """Return the condition element's name.""" + for child in self.xml: + return child.tag.split('}', 1)[-1] class XEP_0138(BasePlugin): """ @@ -101,6 +80,7 @@ def plugin_init(self): self.compression_methods = {'zlib': True} register_stanza_plugin(StreamFeatures, Compression) + self.xmpp.register_stanza(CompressFailure) self.xmpp.register_stanza(Compress) self.xmpp.register_stanza(Compressed) @@ -110,11 +90,25 @@ def plugin_init(self): self._handle_compressed, instream=True)) + self.xmpp.register_handler( + Callback('CompressFailure', + StanzaPath('failure'), + self._handle_failure, + instream=True)) + self.xmpp.register_feature('compression', self._handle_compression, restart=True, order=self.config.get('order', 5)) + self.stats = { + 'rx-compress': 0, + 'tx-compress': 0, + 'rx-real': 0, + 'tx-real': 0, + } + self.xmpp.add_event_handler('disconnected', lambda _: log.debug("Compression stats %r" % self.stats)) + def register_compression_method(self, name, handler): self.compression_methods[name] = handler @@ -123,20 +117,47 @@ def _handle_compression(self, features): if method in self.compression_methods: log.info('Attempting to use %s compression' % method) c = Compress(self.xmpp) - c['method'] = method - c.send(now=True) + c.set_method(method) + str_data = tostring(c.xml, stream=self.xmpp, top_level=True) + self.xmpp.send(c) + return True return False + def _decorate_transport(self, transport): + compressor = zlib.compressobj() + decompressor = zlib.decompressobj(zlib.MAX_WBITS) + + orig_recv = self.xmpp.data_received + def zlib_recv(data): + #log.debug('zlib: received %s' % data) + plain = decompressor.decompress(decompressor.unconsumed_tail + data) + log.debug("zlib: decompressed %d bytes into %d" % (len(data), len(plain))) + self.stats['rx-compress'] += len(data) + self.stats['rx-real'] += len(plain) + orig_recv(plain) + self.xmpp.data_received = zlib_recv + + orig_write = transport.write + def zlib_write(data): + compressed = compressor.compress(data) + compressor.flush(zlib.Z_SYNC_FLUSH) + log.debug("zlib: compressed %d bytes into %d" % (len(data), len(compressed))) + #log.debug('zlib: sent %s' % compressed) + self.stats['tx-compress'] += len(compressed) + self.stats['tx-real'] += len(data) + return orig_write(compressed) + transport.write = zlib_write + + return transport + def _handle_compressed(self, stanza): self.xmpp.features.add('compression') log.debug('Stream Compressed!') - compressed_socket = ZlibSocket(self.xmpp.socket) - self.xmpp.set_socket(compressed_socket) - raise RestartStream() + self.xmpp.connection_made(self._decorate_transport(self.xmpp.transport)) def _handle_failure(self, stanza): - pass + # TODO - feature processing needs a restart + print("failure %s" % stanza.get_condition()) xep_0138 = XEP_0138 register_plugin(XEP_0138)