Skip to content
This repository has been archived by the owner on Jan 10, 2024. It is now read-only.

Commit

Permalink
Merge branch 'fix-slow-tasks' into 'master'
Browse files Browse the repository at this point in the history
xmlstream: Fix slow tasks

See merge request poezio/slixmpp!162
  • Loading branch information
mathieui committed Apr 30, 2021
2 parents 027545e + aaab58d commit 6f4ac7e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
49 changes: 49 additions & 0 deletions itests/test_slow_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import asyncio
import unittest
from slixmpp.test.integration import SlixIntegration
from slixmpp import Message


class TestSlowFilter(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.add_client(
self.envjid('CI_ACCOUNT2'),
self.envstr('CI_ACCOUNT2_PASSWORD'),
)
await self.connect_clients()

async def test_filters(self):
"""Make sure filters work"""
def add_a(stanza):
if isinstance(stanza, Message):
stanza['body'] = stanza['body'] + ' a'
return stanza

async def add_b(stanza):
if isinstance(stanza, Message):
stanza['body'] = stanza['body'] + ' b'
return stanza

async def add_c_wait(stanza):
if isinstance(stanza, Message):
await asyncio.sleep(2)
stanza['body'] = stanza['body'] + ' c'
return stanza
self.clients[0].add_filter('out', add_a)
self.clients[0].add_filter('out', add_b)
self.clients[0].add_filter('out', add_c_wait)
body = 'Msg body'
msg = self.clients[0].make_message(
mto=self.clients[1].boundjid, mbody=body,
)
msg.send()
message = await self.clients[1].wait_until('message')
self.assertEqual(message['body'], body + ' a b c')


suite = unittest.TestLoader().loadTestsFromTestCase(TestSlowFilter)
12 changes: 8 additions & 4 deletions slixmpp/xmlstream/xmlstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,11 +1053,13 @@ async def _continue_slow_send(
"""
data = await task
self.__slow_tasks.remove(task)
for filter in self.__filters['out']:
if data is None:
return
for filter in self.__filters['out'][:]:
if filter in already_used:
continue
if iscoroutinefunction(filter):
data = await task
data = await filter(data)
else:
data = filter(data)
if data is None:
Expand Down Expand Up @@ -1093,15 +1095,17 @@ async def run_filters(self):
timeout=1,
)
if pending:
self.slow_tasks.append(task)
self.__slow_tasks.append(task)
asyncio.ensure_future(
self._continue_slow_send(
task,
already_run_filters
),
loop=self.loop,
)
raise Exception("Slow coro, rescheduling")
raise ContinueQueue(
"Slow coroutine, rescheduling filters"
)
data = task.result()
else:
data = filter(data)
Expand Down

0 comments on commit 6f4ac7e

Please sign in to comment.