Skip to content

Commit

Permalink
Complete Database queue driver and tested in real scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
Shadow243 committed Oct 31, 2024
1 parent 8dd16b5 commit 46fae2f
Show file tree
Hide file tree
Showing 19 changed files with 346 additions and 74 deletions.
1 change: 1 addition & 0 deletions lib/db.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ static public function build_dsn() {
* @return boolean|integer|array
*/
static public function execute($dbh, $sql, $args, $type = false, $all = false) {

if (!$dbh) {
return false;
}
Expand Down
12 changes: 0 additions & 12 deletions services/Commands/Hm_CheckMailCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,7 @@ protected function configure()
protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->info("Checking for new mail...");

// Example: Call the mail checking service from the container
// $imap = $this->getService('Hm_Imap');
// $newMessages = $imap->search('UNSEEN');
Hm_ProcessNewEmail::dispatch(email: '[email protected]');

if (!empty($newMessages)) {
$this->success('You have new messages!');
// dispatch event
} else {
$this->info('No new messages.');
}

return Command::SUCCESS;
}
}
10 changes: 10 additions & 0 deletions services/Contracts/Queue/Hm_Queueable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Services\Contracts\Queue;

use Services\Core\Jobs\Hm_BaseJob;

interface Hm_Queueable
{
public function process(Hm_BaseJob $job): void;
}
5 changes: 4 additions & 1 deletion services/Core/Events/Hm_BaseEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

namespace Services\Core\Events;

use Services\Traits\Hm_Serializes;

abstract class Hm_BaseEvent
{

use Hm_Serializes;

protected array $params;

public function __construct(...$params)
Expand Down
2 changes: 1 addition & 1 deletion services/Core/Events/Hm_EventDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static function listen(string $eventClass, string $listenerClass): void

public static function dispatch($event): void
{
$eventClass = get_class($event);
$eventClass = get_class($event);
// Check if there are listeners for this event
if (isset(self::$listeners[$eventClass])) {
foreach (self::$listeners[$eventClass] as $listenerClass) {
Expand Down
26 changes: 24 additions & 2 deletions services/Core/Jobs/Hm_BaseJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@

abstract class Hm_BaseJob implements Hm_Job
{
protected string $driver = 'database';
public function __construct(protected array $data = []) {}
public string $driver = 'database';
public int $tries = 3;
protected int $attempts = 0;

public function __construct(protected array $data = []) {
$this->data = $data;
}

public function handle(): void {}
public function failed(): void {}
Expand All @@ -17,4 +22,21 @@ public function getDriver(): string
return $this->driver;
}

public function getAttempts(): int
{
return $this->attempts;
}

// Method to increment the attempt count
public function incrementAttempts(): void
{
$this->attempts++;
}

// Check if the job has exceeded the max attempts
public function hasExceededMaxAttempts(): bool
{
return $this->attempts >= $this->tries;
}

}
102 changes: 96 additions & 6 deletions services/Core/Queue/Drivers/Hm_DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@
use PDO;
use Hm_DB;
use Services\Core\Jobs\Hm_BaseJob;
use Services\Contracts\Queue\Hm_Queueable;
use Services\Contracts\Queue\Hm_ShouldQueue;

/**
* Class Hm_DatabaseQueue
* @package App\Queue\Drivers
* @package Services\Core\Queue\Drivers
*/
class Hm_DatabaseQueue implements Hm_ShouldQueue
class Hm_DatabaseQueue implements Hm_ShouldQueue, Hm_Queueable
{
protected const FAILED_JOBS_TABLE = 'hm_failed_jobs';

/**
* Hm_DatabaseQueue constructor.
* @param Hm_DB $db
* @param PDO $dbConnection
*/
public function __construct(private Hm_DB $db, protected PDO $dbConnection) {
}
public function __construct(private Hm_DB $db, protected PDO $dbConnection) {}

/**
* Push the job to the queue
Expand All @@ -30,7 +31,12 @@ public function __construct(private Hm_DB $db, protected PDO $dbConnection) {
*/
public function push(Hm_BaseJob $job): void {
$sql = "INSERT INTO hm_jobs (payload) VALUES (:payload)";
$this->db->execute($this->dbConnection, $sql, ['payload' => serialize($job)], 'insert');
try {
// Use the __serialize method from the Serializer trait
$this->db->execute($this->dbConnection, $sql, ['payload' => serialize($job)], 'insert');
} catch (\Throwable $th) {
throw new \Exception("Failed to push job to the queue: " . $th->getMessage());
}
}

/**
Expand All @@ -45,7 +51,11 @@ public function pop(): ?Hm_BaseJob {
if ($jobRecord) {
$deleteSql = "DELETE FROM hm_jobs WHERE id = :id";
$this->db->execute($this->dbConnection, $deleteSql, ['id' => $jobRecord['id']], 'modify');
return unserialize($jobRecord['payload']);

// Use the __unserialize method from the Serializer trait
$job = unserialize($jobRecord['payload']);
$job->incrementAttempts();
return $job;
}

return null;
Expand All @@ -64,4 +74,84 @@ public function release(Hm_BaseJob $job, int $delay = 0): void {
}
$this->push($job);
}

/**
* Process the job and handle failures.
*
* @param Hm_BaseJob $job
* @param int $maxAttempts
* @return void
*/
public function process(Hm_BaseJob $job): void
{
try {
$job->handle();
} catch (\Exception $e) {
$job->incrementAttempts();
if ($job->getAttempts() >= $job->tries) {
$this->fail($job, $e);
} else {
$this->release($job, 5);
}
}
}

/**
* Move job to failed jobs table after max attempts.
*
* @param Hm_BaseJob $job
* @param Exception $exception
* @return void
*/
public function fail(Hm_BaseJob $job, \Exception $exception): void
{
$sql = "INSERT INTO " . self::FAILED_JOBS_TABLE . " (payload, failed_at, exception) VALUES (:payload, :failed_at, :exception)";
$this->db->execute(
$this->dbConnection,
$sql,
[
'payload' => serialize($job), // This still requires serialization, keep in mind
'failed_at' => (new \DateTime())->format('Y-m-d H:i:s'),
'exception' => $exception->getMessage()
],
'insert'
);
}

/**
* Retry a failed job by moving it back to the main queue.
*
* @param int $failedJobId
* @return void
*/
public function retry(int $failedJobId): void
{
$sql = "SELECT * FROM " . self::FAILED_JOBS_TABLE . " WHERE id = :id";
$failedJobRecord = $this->db->execute($this->dbConnection, $sql, ['id' => $failedJobId], 'select');

if ($failedJobRecord) {
$job = unserialize($failedJobRecord['payload']);

// Remove from failed jobs table
$deleteSql = "DELETE FROM " . self::FAILED_JOBS_TABLE . " WHERE id = :id";
$this->db->execute($this->dbConnection, $deleteSql, ['id' => $failedJobId], 'modify');

// Push back to the main queue
$this->push($job);
}
}

/**
* Log the failed job.
*
* @param Hm_BaseJob $job
* @return void
*/
protected function logFailedJob(Hm_BaseJob $job): void {
$sql = "INSERT INTO failed_jobs (payload, attempts) VALUES (:payload, :attempts)";
$this->db->execute($this->dbConnection, $sql, [
'payload' => serialize($job), // This still requires serialization
'attempts' => $job->getAttempts()
], 'insert');
}
}
6 changes: 5 additions & 1 deletion services/Core/Queue/Drivers/Hm_RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Services\Core\Queue\Drivers;

use Hm_Redis;
use Services\Jobs\Hm_BaseJob;
use Services\Core\Jobs\Hm_BaseJob;
use Services\Contracts\Queue\Hm_ShouldQueue;

/**
Expand Down Expand Up @@ -66,4 +66,8 @@ public function release(Hm_BaseJob $job, int $delay = 0): void {
}
$this->push($job);
}
public function process(Hm_BaseJob $job): void
{
//TO DO: Implement process() method
}
}
26 changes: 5 additions & 21 deletions services/Core/Queue/Hm_JobDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Services\Core\Jobs\Hm_BaseJob;
use Services\Contracts\Queue\Hm_ShouldQueue;
use Services\Core\Hm_Container;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
Expand All @@ -12,32 +13,17 @@
*/
class Hm_JobDispatcher
{
protected Hm_QueueManager $queueManager;
protected string $defaultDriver;

/**
* Hm_JobDispatcher constructor.
* @param Hm_QueueManager $queueManager
* @param string $defaultDriver
*/
public function __construct(ContainerInterface $container, string $defaultDriver = 'redis')
{
$this->queueManager = $container->get('Hm_QueueManager');//$this->queueManager = $queueManager;
$this->defaultDriver = $defaultDriver;
}

/**
* Dispatch the job to the queue
*
* @param Hm_BaseJob $job
* @param string|null $queue
* @return void
*/
public function dispatch(Hm_BaseJob $job, string $queue = null): void {
if ($job instanceof Hm_ShouldQueue) {
$driver = $job->driver ?? $this->defaultDriver;
$queueDriver = $this->queueManager->getDriver($driver);

static public function dispatch(Hm_BaseJob $job): void {
if (is_subclass_of($job, Hm_ShouldQueue::class)) {
$driver = $job->driver;
$queueDriver = Hm_Container::getContainer()->get('queue.manager')->getDriver($driver);
if ($queueDriver) {
$queueDriver->push($job);
} else {
Expand All @@ -46,7 +32,5 @@ public function dispatch(Hm_BaseJob $job, string $queue = null): void {
}else {
$job->handle();
}
// $driver = $this->queueManager->getDriver($queue ?? $this->defaultDriver);
// $driver->push($job);
}
}
10 changes: 5 additions & 5 deletions services/Core/Queue/Hm_QueueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public function __construct(Hm_ShouldQueue $queue)
* @return void
*/
public function work(): void {
dd($this->queue);
while ($job = $this->queue->pop())
{
try {
$job->handle();
// dd($job);
$this->queue->process($job);
} catch (\Exception $e) {
$job->failed();
// Optionally release the job back to the queue with a delay
$this->queue->release($job, 30);
// $job->failed();
// // Optionally release the job back to the queue with a delay
// $this->queue->release($job, 30);
}
}
}
Expand Down
19 changes: 16 additions & 3 deletions services/Events/Hm_NewEmailProcessedEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,23 @@

namespace Services\Events;

use Services\Core\Events\Hm_BaseEvent;
use Services\Traits\Hm_Dispatchable;
use Services\Core\Events\Hm_BaseEvent;
use Services\Traits\Hm_InteractsWithQueue;
use Services\Contracts\Queue\Hm_ShouldQueue;

class Hm_NewEmailProcessedEvent extends Hm_BaseEvent
class Hm_NewEmailProcessedEvent extends Hm_BaseEvent implements Hm_ShouldQueue
{
use Hm_Dispatchable;
use Hm_Dispatchable, Hm_InteractsWithQueue;

/**
* Create a new event instance.
* @param $email
*
* @return void
*/
public function __construct(public string $email)
{
$this->email = $email;
}
}
Loading

0 comments on commit 46fae2f

Please sign in to comment.