Skip to content

Commit

Permalink
v3.1.6 适配4.2.12的携程onTask回调
Browse files Browse the repository at this point in the history
  • Loading branch information
kiss291323003 committed Jan 10, 2019
1 parent 743f6dc commit 249f01f
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 36 deletions.
104 changes: 73 additions & 31 deletions src/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use EasySwoole\EasySwoole\Swoole\PipeMessage\OnCommand;
use EasySwoole\EasySwoole\Swoole\Task\AbstractAsyncTask;
use EasySwoole\EasySwoole\Swoole\Task\SuperClosure;
use Swoole\Server\Task;

////////////////////////////////////////////////////////////////////
// _ooOoo_ //
Expand Down Expand Up @@ -275,45 +276,86 @@ private function registerDefaultCallBack(\swoole_server $server,int $serverType)
}
//注册默认的on task,finish 不经过 event register。因为on task需要返回值。不建议重写onTask,否则es自带的异步任务事件失效
//其次finish逻辑在同进程中实现、
EventHelper::on($server,EventRegister::onTask,function (\swoole_server $server, $taskId, $fromWorkerId,$taskObj){
if(is_string($taskObj) && class_exists($taskObj)){
$ref = new \ReflectionClass($taskObj);
if($ref->implementsInterface(QuickTaskInterface::class)){
if(Config::getInstance()->getConf('MAIN_SERVER.SETTING.task_enable_coroutine')){
EventHelper::on($server,EventRegister::onTask,function (\swoole_server $server, Task $task){
$taskObj = $task->data;
if(is_string($taskObj) && class_exists($taskObj)){
$ref = new \ReflectionClass($taskObj);
if($ref->implementsInterface(QuickTaskInterface::class)){
try{
$taskObj::run($server,$task->id,$task->worker_id,$task->flags);
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
return;
}else if($ref->isSubclassOf(AbstractAsyncTask::class)){
$taskObj = new $taskObj;
}
}
if($taskObj instanceof AbstractAsyncTask){
try{
$taskObj::run($server,$taskId,$fromWorkerId);
$ret = $taskObj->__onTaskHook($task->id,$task->worker_id,$task->flags);
if($ret !== null){
$taskObj->__onFinishHook($ret,$task->id);
}
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
return;
}else if($ref->isSubclassOf(AbstractAsyncTask::class)){
$taskObj = new $taskObj;
}
}
if($taskObj instanceof AbstractAsyncTask){
try{
$ret = $taskObj->__onTaskHook($taskId,$fromWorkerId);
if($ret !== null){
$taskObj->__onFinishHook($ret,$taskId);
}else if($taskObj instanceof SuperClosure){
try{
return $taskObj( $server, $task->id,$task->worker_id,$task->flags);
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
}else if(is_callable($taskObj)){
try{
call_user_func($taskObj,$server,$task->id,$task->worker_id,$task->flags);
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
}else if($taskObj instanceof SuperClosure){
try{
return $taskObj( $server, $taskId, $fromWorkerId);
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
return null;
});
}else{
EventHelper::on($server,EventRegister::onTask,function (\swoole_server $server, $taskId, $fromWorkerId,$taskObj){
if(is_string($taskObj) && class_exists($taskObj)){
$ref = new \ReflectionClass($taskObj);
if($ref->implementsInterface(QuickTaskInterface::class)){
try{
$taskObj::run($server,$taskId,$fromWorkerId);
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
return;
}else if($ref->isSubclassOf(AbstractAsyncTask::class)){
$taskObj = new $taskObj;
}
}
}else if(is_callable($taskObj)){
try{
call_user_func($taskObj,$server,$taskId,$fromWorkerId);
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
if($taskObj instanceof AbstractAsyncTask){
try{
$ret = $taskObj->__onTaskHook($taskId,$fromWorkerId);
if($ret !== null){
$taskObj->__onFinishHook($ret,$taskId);
}
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
}else if($taskObj instanceof SuperClosure){
try{
return $taskObj( $server, $taskId, $fromWorkerId);
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
}else if(is_callable($taskObj)){
try{
call_user_func($taskObj,$server,$taskId,$fromWorkerId);
}catch (\Throwable $throwable){
Trigger::getInstance()->throwable($throwable);
}
}
}
return null;
});

return null;
});
}
EventHelper::on($server,EventRegister::onFinish,function (){
//空逻辑
});
Expand Down
7 changes: 4 additions & 3 deletions src/Swoole/Task/AbstractAsyncTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
abstract class AbstractAsyncTask
{
private $data = null;

final public function __construct($data = null)
{
$this->data = $data;
}
/*
* if has return ,do finish call
*/
function __onTaskHook($taskId,$fromWorkerId)
function __onTaskHook($taskId,$fromWorkerId,$flags = null)
{
try{
return $this->run($this->data,$taskId,$fromWorkerId);
return $this->run($this->data,$taskId,$fromWorkerId,$flags);
}catch (\Throwable $throwable){
$this->onException($throwable);
}
Expand All @@ -37,7 +38,7 @@ function __onFinishHook($finishData,$task_id)
}
}

abstract protected function run($taskData,$taskId,$fromWorkerId);
abstract protected function run($taskData,$taskId,$fromWorkerId,$flags = null);

abstract protected function finish($result,$task_id);

Expand Down
2 changes: 1 addition & 1 deletion src/Swoole/Task/QuickTaskInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@

interface QuickTaskInterface
{
static function run(\swoole_server $server,int $taskId,int $fromWorkerId);
static function run(\swoole_server $server,int $taskId,int $fromWorkerId,$flags = null);
}
2 changes: 1 addition & 1 deletion src/SysConst.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class SysConst
{
const EASYSWOOLE_VERSION = '3.1.5';
const EASYSWOOLE_VERSION = '3.1.6';
const ERROR_HANDLER = 'ERROR_HANDLER';
const SHUTDOWN_FUNCTION = 'SHUTDOWN_FUNCTION';

Expand Down

0 comments on commit 249f01f

Please sign in to comment.