Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add promisesraces cancel mechanism #39

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions examples/05-cancellable-promise.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env php
<?php

require __DIR__.'/../vendor/autoload.php';

use M6Web\Tornado\Adapter;
use M6Web\Tornado\EventLoop;

// Choose your adapter.
$eventLoop = new Adapter\Tornado\EventLoop();
//$eventLoop = new Adapter\Tornado\SynchronousEventLoop();
//$eventLoop = new Adapter\Amp\EventLoop();
//$eventLoop = new Adapter\ReactPhp\EventLoop(new React\EventLoop\StreamSelectLoop());

function timer(EventLoop $eventLoop, string $id, int $time, \M6Web\Tornado\Promise &$promise = null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a reference to an object? Object are already passed by reference in Php 🤔

{
$result = 'not resolved';
try {
echo "[$id] Starting to wait $time …\n";
yield $eventLoop->delay($time);
echo "[$id] One more time !\n";
yield $eventLoop->delay($time);

if ($promise) {
echo "[$id] Cancelling other promise …\n";
$promise->cancel();
echo "[$id] Done!\n";
}

$result = $id;
} catch (\Exception $e) {
echo "[$id] XXX Cancelled …\n";
}

return $result;
}

echo "Let's start!\n";

try {
$result = $eventLoop->wait(
$eventLoop->promiseAll(
$p1 = $eventLoop->async(timer($eventLoop, 'A', 100)),
$eventLoop->async(timer($eventLoop, 'B', 10, $p1))
)
);
} catch (\M6Web\Tornado\CancelledException $e) {
$result = 'cancelled promise';
} catch (\Exception $e) {
$result = 'other exception';
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we agree that this example should not throw any exception? $result should be equal to ["not resolved", "B"] right?
If so, can you remove the last try/catch block around the wait? It's sound confusing to me


echo "async cancellation result :\n";

var_dump($result);

echo "Finished!\n";
73 changes: 73 additions & 0 deletions examples/06-advanced-cancellable-promise.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env php
<?php

require __DIR__.'/../vendor/autoload.php';

use M6Web\Tornado\Adapter;
use M6Web\Tornado\EventLoop;

// Choose your adapter.
$eventLoop = new Adapter\Tornado\EventLoop();
//$eventLoop = new Adapter\Tornado\SynchronousEventLoop();
//$eventLoop = new Adapter\Amp\EventLoop();
//$eventLoop = new Adapter\ReactPhp\EventLoop(new React\EventLoop\StreamSelectLoop());

function timer(EventLoop $eventLoop, string $id, int $time, \M6Web\Tornado\Promise &$promise = null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a reference to an object? Object are already passed by reference in Php 🤔

{
$result = 'not resolved';
try {
echo "[$id] Starting to wait $time …\n";
yield $eventLoop->delay($time);
echo "[$id] Working\n";
yield $eventLoop->delay($time);
echo "[$id] Working\n";
yield $eventLoop->delay($time);

if ($promise) {
echo "[$id] Cancelling other promise …\n";
$promise->cancel();
echo "[$id] Cancellation Done!\n";
}

echo "[$id] Working after cancellation !\n";
yield $eventLoop->delay($time);
echo "[$id] Working after cancellation !\n";
yield $eventLoop->delay($time);
echo "[$id] Working after cancellation !\n";
yield $eventLoop->delay($time);
echo "[$id] Working after cancellation !\n";
yield $eventLoop->delay($time);
echo "[$id] Working after cancellation !\n";
yield $eventLoop->delay($time);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a loop?


$result = $id;
} catch (\Exception $e) {
echo "[$id] Cancelled … : clean workspace and other stuff\n";
}

return $result;
}

echo "Let's start!\n";

try {
$result = $eventLoop->wait(
$eventLoop->promiseAll(
$promise = $eventLoop->promiseAll(
$eventLoop->async(timer($eventLoop, ' Timer A ', 100)),
$eventLoop->async(timer($eventLoop, ' Timer B ', 100))
),
$eventLoop->async(timer($eventLoop, 'Canceller', 100, $promise))
)
);
} catch (\M6Web\Tornado\CancelledException $e) {
$result = 'cancelled promise';
} catch (\Exception $e) {
$result = 'other exception';
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just keep the relevant catch block? To provide minimalist and clear examples 😄


echo "async cancellation with result conservation:\n";

var_dump($result);

echo "Finished!\n";
18 changes: 18 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@
<testsuite name="Tornado Examples">
<directory>examples/tests</directory>
</testsuite>
<testsuite name="Amp">
<directory>/usr/local/var/www/Tornado/tests/Adapter/Amp/EventLoopTest.php</directory>
</testsuite>
<testsuite name="Guzzle">
<directory>/usr/local/var/www/Tornado/tests/Adapter/Guzzle/HttpClientTest.php</directory>
</testsuite>
<testsuite name="ReactPhp">
<directory>/usr/local/var/www/Tornado/tests/Adapter/ReactPhp/EventLoopTest.php</directory>
</testsuite>
<testsuite name="Symfony">
<directory>/usr/local/var/www/Tornado/tests/Adapter/Symfony/HttpClientTest.php</directory>
</testsuite>
<testsuite name="Tornado">
<file>/usr/local/var/www/Tornado/tests/Adapter/Tornado/EventLoopTest.php</file>
</testsuite>
<testsuite name="Tornado Synchronous">
<file>/usr/local/var/www/Tornado/tests/Adapter/Tornado/SynchronousEventLoopTest.php</file>
</testsuite>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Looks like your local development paths, isn't it?
IMHO this file can stay unchanged

</testsuites>

<filter>
Expand Down
87 changes: 70 additions & 17 deletions src/Adapter/Amp/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace M6Web\Tornado\Adapter\Amp;

use M6Web\Tornado\Adapter\Common;
use M6Web\Tornado\CancelledException;
use M6Web\Tornado\Deferred;
use M6Web\Tornado\Promise;

Expand Down Expand Up @@ -41,13 +42,17 @@ public function wait(Promise $promise)
*/
public function async(\Generator $generator): Promise
{
$wrapper = function (\Generator $generator, \Amp\Deferred $deferred): \Generator {
/** @var Promise $currentPromise */
$currentPromise = null;

$wrapper = function (\Generator $generator, \Amp\Deferred $deferred) use (&$currentPromise): \Generator {
try {
while ($generator->valid()) {
$blockingPromise = $generator->current();
if (!$blockingPromise instanceof Promise) {
throw new \Error('Asynchronous function is yielding a ['.gettype($blockingPromise).'] instead of a Promise.');
}
$currentPromise = $blockingPromise;
$blockingPromise = Internal\PromiseWrapper::toHandledPromise(
$blockingPromise,
$this->unhandledFailingPromises
Expand Down Expand Up @@ -79,14 +84,24 @@ public function async(\Generator $generator): Promise
$deferred = new \Amp\Deferred();
\Amp\Promise\rethrow(new \Amp\Coroutine($wrapper($generator, $deferred)));

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
$cancellable = function () use (&$currentPromise) {
$currentPromise->cancel();
};

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises, $cancellable);
}

/**
* {@inheritdoc}
*/
public function promiseAll(Promise ...$promises): Promise
{
$cancellable = function () use (&$promises) {
foreach ($promises as $promise) {
$promise->cancel();
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior should be clearly documented in function DocBlock: If you cancel the returned promise, every sub-promise will be cancelled too
Same for every function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

… and must be tested 😅


return Internal\PromiseWrapper::createUnhandled(
\Amp\Promise\all(
array_map(
Expand All @@ -99,7 +114,8 @@ function (Promise $promise) {
$promises
)
),
$this->unhandledFailingPromises
$this->unhandledFailingPromises,
$cancellable
);
}

Expand All @@ -121,6 +137,8 @@ public function promiseForeach($traversable, callable $function): Promise
*/
public function promiseRace(Promise ...$promises): Promise
{
$toWrapPromises = [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing… what about using original $promises array and use a new name like ampPromises for array_map result?

$promisesCancellation = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pre declaring this variable? Looks confusing…

if (empty($promises)) {
return $this->promiseFulfilled(null);
}
Expand All @@ -144,28 +162,45 @@ public function promiseRace(Promise ...$promises): Promise
};

$promises = array_map(
function (Promise $promise) {
return Internal\PromiseWrapper::toHandledPromise(
function (Promise $promise) use (&$toWrapPromises) {
$tempPromise = Internal\PromiseWrapper::toHandledPromise(
$promise,
$this->unhandledFailingPromises
)->getAmpPromise();
);
$toWrapPromises[] = $tempPromise;

return $tempPromise->getAmpPromise();
},
$promises
);

foreach ($promises as $index => $promise) {
\Amp\Promise\rethrow(new \Amp\Coroutine($wrapPromise($promise)));
}
$promisesCancellation = function () use (&$toWrapPromises) {
foreach ($toWrapPromises as $index => $promise) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
foreach ($toWrapPromises as $index => $promise) {
foreach ($toWrapPromises as $promise) {

$promise->cancel();
}
};

$cancellation = function () use (&$deferred, &$promisesCancellation) {
$deferred->fail(new CancelledException('promise race cancellation'));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the cancellation reason provided when cancelling the promise

($promisesCancellation)();
};

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises, $cancellation);
}

/**
* {@inheritdoc}
*/
public function promiseFulfilled($value): Promise
{
return Internal\PromiseWrapper::createHandled(new \Amp\Success($value));
return Internal\PromiseWrapper::createHandled(
new \Amp\Success($value),
function () {
}
);
}

/**
Expand All @@ -174,7 +209,11 @@ public function promiseFulfilled($value): Promise
public function promiseRejected(\Throwable $throwable): Promise
{
// Manually created promises are considered as handled.
return Internal\PromiseWrapper::createHandled(new \Amp\Failure($throwable));
return Internal\PromiseWrapper::createHandled(
new \Amp\Failure($throwable),
function () {
}
);
}

/**
Expand All @@ -184,11 +223,16 @@ public function idle(): Promise
{
$deferred = new \Amp\Deferred();

\Amp\Loop::defer(function () use ($deferred) {
$deferedId = \Amp\Loop::defer(function () use ($deferred) {
$deferred->resolve();
});

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
$cancellation = function () use ($deferedId, $deferred) {
\Amp\Loop::cancel($deferedId);
$deferred->fail(new CancelledException('Delay cancelled'));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use reason passed in parameter to the cancellation function

};

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises, $cancellation);
}

/**
Expand All @@ -198,22 +242,27 @@ public function delay(int $milliseconds): Promise
{
$deferred = new \Amp\Deferred();

\Amp\Loop::delay($milliseconds, function () use ($deferred) {
$delayId = \Amp\Loop::delay($milliseconds, function () use ($deferred) {
$deferred->resolve();
});

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
$cancellation = function () use ($delayId, $deferred) {
\Amp\Loop::cancel($delayId);
$deferred->fail(new CancelledException('Delay cancelled'));
};

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises, $cancellation);
}

/**
* {@inheritdoc}
*/
public function deferred(): Deferred
public function deferred(callable $canceller = null): Deferred
{
return new Internal\Deferred(
$deferred = new \Amp\Deferred(),
// Manually created promises are considered as handled.
Internal\PromiseWrapper::createHandled($deferred->promise())
Internal\PromiseWrapper::createHandled($deferred->promise(), function () {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Internal\PromiseWrapper::createHandled($deferred->promise(), function () {})
Internal\PromiseWrapper::createHandled($deferred->promise(), $canceller)

);
}

Expand All @@ -232,7 +281,9 @@ function ($watcherId, $stream) use ($deferred) {
}
);

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
$cancellation = function () {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onReadable returns a watcher ID that we can cancel, in the same way that the delay function


return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises, $cancellation);
}

/**
Expand All @@ -250,7 +301,9 @@ function ($watcherId, $stream) use ($deferred) {
}
);

return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises);
$cancellation = function () {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remark: onWritable returns a watcher ID that we can cancel, in the same way that the delay function


return Internal\PromiseWrapper::createUnhandled($deferred->promise(), $this->unhandledFailingPromises, $cancellation);
}

public function __construct()
Expand Down
Loading