Skip to content
This repository has been archived by the owner on Jan 10, 2024. It is now read-only.

Commit

Permalink
Update XEP-0138 plugin to work with slixmpp
Browse files Browse the repository at this point in the history
  • Loading branch information
mzealey committed Jun 13, 2021
1 parent b1411d8 commit b82d11a
Showing 1 changed file with 62 additions and 41 deletions.
103 changes: 62 additions & 41 deletions slixmpp/plugins/xep_0138.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@

# slixmpp: The Slick XMPP Library
# Copyright (C) 2011 Nathanael C. Fritz
# This file is part of slixmpp.
# See the file LICENSE for copying permission.
import logging
import zlib


from slixmpp.stanza import StreamFeatures
from slixmpp.xmlstream import RestartStream, register_stanza_plugin, ElementBase, StanzaBase
from slixmpp.xmlstream import register_stanza_plugin, ElementBase, StanzaBase, tostring
from slixmpp.xmlstream.matcher import *
from slixmpp.xmlstream.handler import *
from slixmpp.plugins import BasePlugin, register_plugin

log = logging.getLogger(__name__)


class Compression(ElementBase):
name = 'compression'
namespace = 'http://jabber.org/features/compress'
Expand Down Expand Up @@ -44,6 +37,8 @@ def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()

def set_method(self, method):
self._set_sub_text('{}method', text=method)

class Compressed(StanzaBase):
name = 'compressed'
Expand All @@ -56,35 +51,19 @@ def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()

class CompressFailure(StanzaBase):
name = 'failure'
namespace = 'http://jabber.org/protocol/compress'
conditions = {'setup-failed', 'unsupported-method'}

def setup(self, xml):
StanzaBase.setup(self, xml)
self.xml.tag = self.tag_name()


class ZlibSocket(object):

def __init__(self, socketobj):
self.__socket = socketobj
self.compressor = zlib.compressobj()
self.decompressor = zlib.decompressobj(zlib.MAX_WBITS)

def __getattr__(self, name):
return getattr(self.__socket, name)

def send(self, data):
sentlen = len(data)
data = self.compressor.compress(data)
data += self.compressor.flush(zlib.Z_SYNC_FLUSH)
log.debug(b'>>> (compressed)' + (data.encode("hex")))
#return self.__socket.send(data)
sentactuallen = self.__socket.send(data)
assert(sentactuallen == len(data))

return sentlen

def recv(self, *args, **kwargs):
data = self.__socket.recv(*args, **kwargs)
log.debug(b'<<< (compressed)' + data.encode("hex"))
return self.decompressor.decompress(self.decompressor.unconsumed_tail + data)

def get_condition(self):
"""Return the condition element's name."""
for child in self.xml:
return child.tag.split('}', 1)[-1]

class XEP_0138(BasePlugin):
"""
Expand All @@ -101,6 +80,7 @@ def plugin_init(self):
self.compression_methods = {'zlib': True}

register_stanza_plugin(StreamFeatures, Compression)
self.xmpp.register_stanza(CompressFailure)
self.xmpp.register_stanza(Compress)
self.xmpp.register_stanza(Compressed)

Expand All @@ -110,11 +90,25 @@ def plugin_init(self):
self._handle_compressed,
instream=True))

self.xmpp.register_handler(
Callback('CompressFailure',
StanzaPath('failure'),
self._handle_failure,
instream=True))

self.xmpp.register_feature('compression',
self._handle_compression,
restart=True,
order=self.config.get('order', 5))

self.stats = {
'rx-compress': 0,
'tx-compress': 0,
'rx-real': 0,
'tx-real': 0,
}
self.xmpp.add_event_handler('disconnected', lambda _: log.debug("Compression stats %r" % self.stats))

def register_compression_method(self, name, handler):
self.compression_methods[name] = handler

Expand All @@ -123,20 +117,47 @@ def _handle_compression(self, features):
if method in self.compression_methods:
log.info('Attempting to use %s compression' % method)
c = Compress(self.xmpp)
c['method'] = method
c.send(now=True)
c.set_method(method)
str_data = tostring(c.xml, stream=self.xmpp, top_level=True)
self.xmpp.send(c)

return True
return False

def _decorate_transport(self, transport):
compressor = zlib.compressobj()
decompressor = zlib.decompressobj(zlib.MAX_WBITS)

orig_recv = self.xmpp.data_received
def zlib_recv(data):
#log.debug('zlib: received %s' % data)
plain = decompressor.decompress(decompressor.unconsumed_tail + data)
log.debug("zlib: decompressed %d bytes into %d" % (len(data), len(plain)))
self.stats['rx-compress'] += len(data)
self.stats['rx-real'] += len(plain)
orig_recv(plain)
self.xmpp.data_received = zlib_recv

orig_write = transport.write
def zlib_write(data):
compressed = compressor.compress(data) + compressor.flush(zlib.Z_SYNC_FLUSH)
log.debug("zlib: compressed %d bytes into %d" % (len(data), len(compressed)))
#log.debug('zlib: sent %s' % compressed)
self.stats['tx-compress'] += len(compressed)
self.stats['tx-real'] += len(data)
return orig_write(compressed)
transport.write = zlib_write

return transport

def _handle_compressed(self, stanza):
self.xmpp.features.add('compression')
log.debug('Stream Compressed!')
compressed_socket = ZlibSocket(self.xmpp.socket)
self.xmpp.set_socket(compressed_socket)
raise RestartStream()
self.xmpp.connection_made(self._decorate_transport(self.xmpp.transport))

def _handle_failure(self, stanza):
pass
# TODO - feature processing needs a restart
print("failure %s" % stanza.get_condition())

xep_0138 = XEP_0138
register_plugin(XEP_0138)

0 comments on commit b82d11a

Please sign in to comment.