forked from CodelyTV/php-ddd-example
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRabbitMqEventBus.php
63 lines (54 loc) · 1.84 KB
/
RabbitMqEventBus.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<?php
declare(strict_types=1);
namespace CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq;
use AMQPException;
use CodelyTv\Shared\Domain\Bus\Event\DomainEvent;
use CodelyTv\Shared\Domain\Bus\Event\EventBus;
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventJsonSerializer;
use CodelyTv\Shared\Infrastructure\Bus\Event\MySql\MySqlDoctrineEventBus;
use function Lambdish\Phunctional\each;
final class RabbitMqEventBus implements EventBus
{
private RabbitMqConnection $connection;
private string $exchangeName;
private MySqlDoctrineEventBus $failoverPublisher;
public function __construct(
RabbitMqConnection $connection,
string $exchangeName,
MySqlDoctrineEventBus $failoverPublisher
) {
$this->connection = $connection;
$this->exchangeName = $exchangeName;
$this->failoverPublisher = $failoverPublisher;
}
public function publish(DomainEvent ...$events): void
{
each($this->publisher(), $events);
}
private function publisher(): callable
{
return function (DomainEvent $event) {
try {
$this->publishEvent($event);
} catch (AMQPException $error) {
$this->failoverPublisher->publish($event);
}
};
}
private function publishEvent(DomainEvent $event): void
{
$body = DomainEventJsonSerializer::serialize($event);
$routingKey = $event::eventName();
$messageId = $event->eventId();
$this->connection->exchange($this->exchangeName)->publish(
$body,
$routingKey,
AMQP_NOPARAM,
[
'message_id' => $messageId,
'content_type' => 'application/json',
'content_encoding' => 'utf-8',
]
);
}
}