Skip to content
This repository has been archived by the owner on Jan 10, 2024. It is now read-only.

Update XEP-0138 plugin to work with slixmpp #23

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 63 additions & 41 deletions slixmpp/plugins/xep_0138.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@

# slixmpp: The Slick XMPP Library
# Copyright (C) 2011 Nathanael C. Fritz
# This file is part of slixmpp.
# See the file LICENSE for copying permission.
import logging
import zlib


from slixmpp.stanza import StreamFeatures
from slixmpp.xmlstream import RestartStream, register_stanza_plugin, ElementBase, StanzaBase
from slixmpp.xmlstream import register_stanza_plugin, ElementBase, StanzaBase, tostring
from slixmpp.xmlstream.matcher import *
from slixmpp.xmlstream.handler import *
from slixmpp.plugins import BasePlugin, register_plugin

log = logging.getLogger(__name__)


class Compression(ElementBase):
name = 'compression'
namespace = 'http://jabber.org/features/compress'
Expand Down Expand Up @@ -44,6 +37,8 @@ def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()

def set_method(self, method):
self._set_sub_text('{}method', text=method)

class Compressed(StanzaBase):
name = 'compressed'
Expand All @@ -56,35 +51,19 @@ def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()

class CompressFailure(StanzaBase):
name = 'failure'
namespace = 'http://jabber.org/protocol/compress'
conditions = {'setup-failed', 'unsupported-method'}

def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()


class ZlibSocket(object):

def __init__(self, socketobj):
self.__socket = socketobj
self.compressor = zlib.compressobj()
self.decompressor = zlib.decompressobj(zlib.MAX_WBITS)

def __getattr__(self, name):
return getattr(self.__socket, name)

def send(self, data):
sentlen = len(data)
data = self.compressor.compress(data)
data += self.compressor.flush(zlib.Z_SYNC_FLUSH)
log.debug(b'>>> (compressed)' + (data.encode("hex")))
#return self.__socket.send(data)
sentactuallen = self.__socket.send(data)
assert(sentactuallen == len(data))

return sentlen

def recv(self, *args, **kwargs):
data = self.__socket.recv(*args, **kwargs)
log.debug(b'<<< (compressed)' + data.encode("hex"))
return self.decompressor.decompress(self.decompressor.unconsumed_tail + data)

def get_condition(self):
"""Return the condition element's name."""
for child in self.xml:
return child.tag.split('}', 1)[-1]

class XEP_0138(BasePlugin):
"""
Expand All @@ -101,6 +80,7 @@ def plugin_init(self):
self.compression_methods = {'zlib': True}

register_stanza_plugin(StreamFeatures, Compression)
self.xmpp.register_stanza(CompressFailure)
self.xmpp.register_stanza(Compress)
self.xmpp.register_stanza(Compressed)

Expand All @@ -110,11 +90,25 @@ def plugin_init(self):
self._handle_compressed,
instream=True))

self.xmpp.register_handler(
Callback('CompressFailure',
StanzaPath('failure'),
self._handle_failure,
instream=True))

self.xmpp.register_feature('compression',
self._handle_compression,
restart=True,
order=self.config.get('order', 5))

self.stats = {
'rx-compress': 0,
'tx-compress': 0,
'rx-real': 0,
'tx-real': 0,
}
self.xmpp.add_event_handler('disconnected', lambda _: log.debug("Compression stats %r" % self.stats))

def register_compression_method(self, name, handler):
self.compression_methods[name] = handler

Expand All @@ -123,20 +117,48 @@ def _handle_compression(self, features):
if method in self.compression_methods:
log.info('Attempting to use %s compression' % method)
c = Compress(self.xmpp)
c['method'] = method
c.send(now=True)
c.set_method(method)
str_data = tostring(c.xml, stream=self.xmpp, top_level=True)
self.xmpp.send(c)

return True
return False

def _decorate_transport(self, transport):
compressor = zlib.compressobj()
decompressor = zlib.decompressobj(zlib.MAX_WBITS)

orig_recv = self.xmpp.data_received
def zlib_recv(data):
#log.debug('zlib: received %s' % data)
plain = decompressor.decompress(decompressor.unconsumed_tail + data)
log.debug("zlib: decompressed %d bytes into %d" % (len(data), len(plain)))
self.stats['rx-compress'] += len(data)
self.stats['rx-real'] += len(plain)
orig_recv(plain)
self.xmpp.data_received = zlib_recv

orig_write = transport.write
def zlib_write(data):
compressed = compressor.compress(data) + compressor.flush(zlib.Z_SYNC_FLUSH)
log.debug("zlib: compressed %d bytes into %d" % (len(data), len(compressed)))
#log.debug('zlib: sent %s' % compressed)
self.stats['tx-compress'] += len(compressed)
self.stats['tx-real'] += len(data)
return orig_write(compressed)
transport.write = zlib_write

return transport

def _handle_compressed(self, stanza):
self.xmpp.features.add('compression')
log.debug('Stream Compressed!')
compressed_socket = ZlibSocket(self.xmpp.socket)
self.xmpp.set_socket(compressed_socket)
raise RestartStream()
self.xmpp.event_when_connected = 'zlib_enabled'
self.xmpp.connection_made(self._decorate_transport(self.xmpp.transport))

def _handle_failure(self, stanza):
pass
# TODO - feature processing needs a restart
print("failure %s" % stanza.get_condition())

xep_0138 = XEP_0138
register_plugin(XEP_0138)