Skip to content

Commit

Permalink
Scheduling Comlete, next step Notification with channels: telegram,sl…
Browse files Browse the repository at this point in the history
…ack,twilio,nexmo,broadcast
  • Loading branch information
Shadow243 committed Nov 8, 2024
1 parent dac1a73 commit 1b03512
Show file tree
Hide file tree
Showing 22 changed files with 441 additions and 133 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ WIN_CACERT_DIR=

JS_EXCLUDE_DEPS=

QUEUE_ENABLED=false
QUEUE_DRIVER=database

AWS_ACCESS_KEY_ID='your-aws-access-key'
Expand Down
1 change: 0 additions & 1 deletion lib/cache.php
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,6 @@ protected function noop_get($key, $default) {
* @return string
*/
protected function key_hash($key) {
dd($this->session->get('fingerprint'));
return sprintf('hm_cache_%s', hash('sha256', (sprintf('%s%s%s%s', $key, SITE_ID,
$this->session->get('fingerprint'), $this->session->get('username')))));
}
Expand Down
14 changes: 14 additions & 0 deletions services/Core/Commands/Hm_QueueWorkCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,17 @@
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
* Class Hm_QueueWorkCommand
* @package Services\Core\Commands
*/
class Hm_QueueWorkCommand extends Hm_BaseCommand
{
protected static $defaultName = 'queue:work';

/**
* Configure the command.
*/
protected function configure()
{
$this
Expand All @@ -27,6 +34,13 @@ protected function configure()
->addOption('tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 1);
}

/**
* Execute the console command.
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int Command exit code.
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$connection = $input->getArgument('connection') ?: env('QUEUE_DRIVER', 'database');
Expand Down
22 changes: 19 additions & 3 deletions services/Core/Commands/Hm_ScheduleRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,22 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

/**
* Class Hm_ScheduleRunCommand
* @package Services\Core\Commands
*/
class Hm_ScheduleRunCommand extends Hm_BaseCommand
{
// Default name for the command
/**
* The name of the command.
*
* @var string
*/
protected static $defaultName = 'schedule:run';

/**
* Configure the command.
*/
protected function configure()
{
$this
Expand All @@ -21,11 +32,16 @@ protected function configure()
;
}

/**
* Execute the console command.
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int Command exit code.
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
// Get the scheduler instance from the container
$scheduler = Hm_Container::getContainer()->get('scheduler');
// Run the tasks that are due
$scheduler->run();

$output->writeln("All due scheduled tasks have been executed.");
Expand Down
100 changes: 100 additions & 0 deletions services/Core/Commands/Hm_SchedulerWorkCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?php

namespace Services\Core\Commands;

use Services\Core\Hm_Container;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

/**
* Class Hm_SchedulerWorkCommand
* @package Services\Core\Commands
*/
class Hm_SchedulerWorkCommand extends Hm_BaseCommand
{
/**
* The name of the command.
*
* @var string
*/
protected static $defaultName = 'schedule:work';

/**
* Flag to indicate if the scheduler should stop running.
* @var bool
*/
private $shouldStop = false;

/**
* Store the last run time for each task to prevent overlapping runs.
* @var array
*/
private $lastRunTimes = [];

/**
* Configure the command.
*/
protected function configure()
{
$this
->setDescription('Continuously run the scheduler to execute due tasks')
->setHelp('This command runs the scheduler in a loop to continuously check and execute scheduled tasks.');
}

/**
* Execute the console command.
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int Command exit code.
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$scheduler = Hm_Container::getContainer()->get('scheduler');
$output->writeln("Scheduler started. Press Ctrl+C to stop.");

if (function_exists('pcntl_signal')) {
pcntl_signal(SIGINT, function () {
$this->shouldStop = true;
});
}

while (!$this->shouldStop) {
foreach ($scheduler->getTasks() as $task) {
$taskId = spl_object_hash($task);
$currentTime = new \DateTime('now', new \DateTimeZone($task->getTimezone()));

$lastRunTime = isset($this->lastRunTimes[$taskId]) ? $this->lastRunTimes[$taskId] : $currentTime;

$this->lastRunTimes[$taskId] = $currentTime;

if ($task->isDue() && $currentTime > $lastRunTime) {
$output->writeln("Running task: {$task->getName()} at " . $currentTime->format('Y-m-d H:i:s'));
$task->run();
$output->writeln("Task: {$task->getName()} added to queue");
}
}

// Wait one minute before the next loop iteration
sleep(60);

// Dispatch any pending signals
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}

$output->writeln("Scheduler stopped gracefully.");

return Command::SUCCESS;
}

/**
* Stops the scheduler loop gracefully.
*/
public function stop()
{
$this->shouldStop = true;
}
}
22 changes: 21 additions & 1 deletion services/Core/Hm_Container.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Services\Providers\{Hm_CommandServiceProvider, Hm_EventServiceProvider, Hm_SchedulerServiceProvider, Hm_QueueServiceProvider};

/**
* Class Hm_Container
* @package Services\Core
*/
class Hm_Container
{
private static $container = null;
Expand All @@ -16,6 +20,12 @@ class Hm_Container
private function __construct() {}
private function __clone() {}

/**
* Set the container
*
* @param ContainerBuilder $containerBuilder
* @return ContainerBuilder
*/
public static function setContainer(ContainerBuilder $containerBuilder): ContainerBuilder
{
if (self::$container === null) {
Expand All @@ -25,12 +35,17 @@ public static function setContainer(ContainerBuilder $containerBuilder): Contain
return self::$container;
}

/**
* Bind the container
*
* @return ContainerBuilder
*/
public static function bind(): ContainerBuilder
{
$config = self::$container->get('config');

if ($config->get('queue_enabled')) {

if ($config->get('queue_driver') === 'database') {
// Register Hm_DB
self::$container->set('db.connection', Hm_DB::connect(self::$container->get('config')));
Expand Down Expand Up @@ -67,6 +82,11 @@ public static function bind(): ContainerBuilder
return self::$container;
}

/**
* Get the container
*
* @return ContainerBuilder
*/
public static function getContainer(): ContainerBuilder
{
return self::$container;
Expand Down
33 changes: 30 additions & 3 deletions services/Core/Jobs/Hm_BaseJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,61 @@

abstract class Hm_BaseJob implements Hm_Job
{
public string $driver = 'database';
public string $driver = '';
public int $tries = 3;
protected int $attempts = 0;

public function __construct(protected array $data = []) {
$this->data = $data;
}

/**
* Execute the job.
*
* @return void
*/
public function handle(): void {}
/**
* Handle a job failure.
*
* @return void
*/
public function failed(): void {}

/**
* Get the driver name for the job.
*
* @return int
*/
public function getDriver(): string
{
return $this->driver;
}

/**
* Get the number of times the job has been attempted.
*
* @return int
*/
public function getAttempts(): int
{
return $this->attempts;
}

// Method to increment the attempt count
/**
* Method to increment the attempt count
*
*/
public function incrementAttempts(): void
{
$this->attempts++;
}

// Check if the job has exceeded the max attempts
/**
* Determine if the job has exceeded the maximum number of attempts.
*
* @return bool
*/
public function hasExceededMaxAttempts(): bool
{
return $this->attempts >= $this->tries;
Expand Down
1 change: 1 addition & 0 deletions services/Core/Queue/Hm_JobDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Hm_JobDispatcher
static public function dispatch(Hm_BaseJob $job): void {
if (is_subclass_of($job, Hm_ShouldQueue::class)) {
$driver = $job->driver;
dd($driver);
$queueDriver = Hm_Container::getContainer()->get('queue.manager')->getDriver($driver);
if ($queueDriver) {
$queueDriver->push($job);
Expand Down
1 change: 1 addition & 0 deletions services/Core/Queue/Hm_QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public function addDriver(string $name, Hm_ShouldQueue $driver): void

public function getDriver(string $name): Hm_ShouldQueue
{
dump("Getting driver $name");
return $this->drivers[$name];
}
}
1 change: 0 additions & 1 deletion services/Core/Queue/Hm_QueueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public function work(): void {
while ($job = $this->queue->pop())
{
try {
// dd($job);
$this->queue->process($job);
} catch (\Exception $e) {
// $job->failed();
Expand Down
4 changes: 2 additions & 2 deletions services/Core/Scheduling/Hm_CacheMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public function release($task)
*/
private function getMutexKey($task)
{
return 'mutex_' . hash('sha256', get_class($task) . $task->name . json_encode($task->command));
// return 'mutex_' . hash('sha256', $task->name);
// return 'mutex_' . hash('sha256', get_class($task) . $task->name . json_encode($task->command));
return 'mutex_' . hash('sha256', $task->name);
}

/**
Expand Down
Loading

0 comments on commit 1b03512

Please sign in to comment.