本文主要是介绍消息队列 think-queue tp5.0,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一 介绍
think-queue是tp框架消息队列插件,依赖框架框架核心版本,所以下载时需要选择对应框架核心的版本。
比如tp5.1用2.0的都可以,5.0的用1.1.6。其余版本参考composer。
composer地址:topthink/think-queue - Packagist
不同版本中项目结构不同,一般会说明插件的使用方法,比如配置文件位置。可以在项目中查找“queue.php”通过md文件查看说明,或者通过源码分析获取其存贮位置。
由于本地使用tp5.0版本,只有内容都仅适用tp5.0。
消息队列用于解决并发问题。比如处理微信支付回执……目前退款和支付总是同时给出同一单多个回执,连触发器都没拦截住,只能考虑消息队列。
二 安装&配置
composer require topthink/think-queue v1.1.6
配置文件位于 “application/extra/queue.php”。
#application/extra/queue.php
return [//驱动引擎'connector' => 'Redis',//失效时间'expire' => 60,//默认队列名'default' => 'hirorder',//ip地址'host' => '127.0.0.1',//端口'port' => 6379,//密码'password' => '',//选择数据库'select' => 0,//超时时间'timeout' => 0,//是否持久化'persistent' => false,
];
各个驱动的具体可用配置项在“think\queue\connector”目录下。各个驱动类里的`options`属性是配置数据,写在上面的“queue”配置里即可覆盖。
不同驱动配置数据不同,可以在“think\queue\connector”中查看“options”属性。
三 原理
用redis模拟消息队列的原理很简单。
推送消息到队列中,即redis的list中。
若队列不存在会在redis中新建对应list,格式如queue:队列名。
安装插件后,可使用queue:work和queue:listen等命令。
开启两者都需要设置参数,比如work的“queue”为队列名、work的“daemon”为是否开启守护线程。
对应命令在“think-queue\src\queue\command”可查。
3.1 推送信息
use think\Queue;// 1.当前任务将由哪个类来负责处理。
// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
$jobHandlerClassName = 'app\jobs\JobTest';// 2.当前任务归属的队列名称,如果为新队列,会自动创建
$jobQueueName = "helloJobQueue";// 3.当前任务所需的业务数据, 不能为 resource 类型,其他类型最终将转化为json形式的字符串
// (jobData 为对象时,存储其public属性的键值对 )
$jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1];// 4.将该任务推送到消息队列,等待对应的消费者去执行
$isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);// database 驱动时,返回值为 1|false;redis 驱动时,返回值为 随机字符串|false
if ($isPushed !== false) {echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>";
} else {echo 'Oops, something went wrong.';
}
$jobHandlerClassName为处理数据的类,需要自己定义。
#application\jobs\JobTest
namespace app\jobs;
use think\queue\Job;
use app\common\model\JobsTest as JobsTestModel;
class JobTest {/*** fire方法是消息队列默认调用的方法* @param Job $job 当前的任务对象* @param array|mixed $data 发布任务时自定义的数据*/public function fire(Job $job, $data) {// 此处做一些 check,提前判断是否需要执行$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);if (!$isJobStillNeedToBeDone) {$job->delete();return;}// 执行逻辑处理(即:你需要该消息队列做什么)$isJobDone = $this->doHelloJob($data);if ($isJobDone) {// 如果任务执行成功,记得删除任务$job->delete();} else {// 通过这个方法可以检查这个任务已经重试了几次了if ($job->attempts() > 3) {$job->delete();// 也可以重新发布这个任务//$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行}}}/*** 有些消息在到达消费者时,可能已经不再需要执行了* @param $data 发布任务时自定义的数据* @return bool 任务执行的结果*/private function checkDatabaseToSeeIfJobNeedToBeDone($data) {return true;}/*** 根据消息中的数据进行实际的业务处理...* @param $data* @return bool*/private function doHelloJob($data) {// TODO 该处为实际业务逻辑,即:对消息中的数据进行处理// $model = new JobsTestModel();// $inData = [// 'uniqId' => $data['uniqId'],// 'time' => $data['ts'],// 'content' => '队列成功的插入数据'// ];// $res = $model->save($inData);// if (! $res) {// return false;// }// return true;var_dump("time:" . time() . " doHelloJob");}
}
源码如下
#vendor\topthink\think-queue\src
class Queue
{/** @var Connector */protected static $connector;private static function buildConnector(){$options = Config::get('queue');$type = !empty($options['connector']) ? $options['connector'] : 'Sync';if (!isset(self::$connector)) {$class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . Str::studly($type);self::$connector = new $class($options);}return self::$connector;}public static function __callStatic($name, $arguments){return call_user_func_array([self::buildConnector(), $name], $arguments);}
}
#vendor\topthink\think-queue\src\queue\connector
class Redis extends Connector
{
protected $options = ['expire' => 60,'default' => 'default','host' => '127.0.0.1','port' => 6379,'password' => '','select' => 0,'timeout' => 0,'persistent' => false];
public function __construct($options){if (!extension_loaded('redis')) {throw new Exception('redis扩展未安装');}if (!empty($options)) {$this->options = array_merge($this->options, $options);}$func = $this->options['persistent'] ? 'pconnect' : 'connect';$this->redis = new \Redis;$this->redis->$func($this->options['host'], $this->options['port'], $this->options['timeout']);if ('' != $this->options['password']) {$this->redis->auth($this->options['password']);}if (0 != $this->options['select']) {$this->redis->select($this->options['select']);}}
public function push($job, $data = '', $queue = null){return $this->pushRaw($this->createPayload($job, $data), $queue);}public function pushRaw($payload, $queue = null){$this->redis->rPush($this->getQueue($queue), $payload);return json_decode($payload, true)['id'];}
protected function createPayload($job, $data = '', $queue = null){$payload = $this->setMeta(parent::createPayload($job, $data), 'id', $this->getRandomId());return $this->setMeta($payload, 'attempts', 1);}
protected function getQueue($queue){return 'queues:' . ($queue ?: $this->options['default']);}
}
#vendor\topthink\think-queue\src\queue
abstract class Connector
{
protected function setMeta($payload, $key, $value){$payload = json_decode($payload, true);$payload[$key] = $value;$payload = json_encode($payload);if (JSON_ERROR_NONE !== json_last_error()) {throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());}return $payload;}
protected function createPayload($job, $data = '', $queue = null){if (is_object($job)) {$payload = json_encode(['job' => 'think\queue\CallQueuedHandler@call','data' => ['commandName' => get_class($job),'command' => serialize(clone $job),],]);} else {$payload = json_encode($this->createPlainPayload($job, $data));}if (JSON_ERROR_NONE !== json_last_error()) {throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());}return $payload;}
protected function createPlainPayload($job, $data){return ['job' => $job, 'data' => $data];}
}
think\Queue::push()执行think\queue\connector\Redis::__construct()调用think\queue\connector\Redis::push()。
push()中将通过父类方法think\queue\Connector::createPayload()和think\queue\Connector::setMeta() 创建的参数传入 think\queue\connector\Redis::pushRaw()中。
think\queue\Connector::createPayload()中逻辑$job可以为对象,其为对象和非对象构造json的数据格式不相同。
pushRaw()使用think\queue\connector\Redis::getQueue()设置list名字。其中使用redis的rpush命令推送数据,该命令若list不存在则新建。
getQueue()中若未设置队列名称则会使用配置中的默认名称,若未设置应会报错。
3.2 处理信息
信息处理使用其自带的命令。命令文件在“think-queue\src\queue\command”中。
一般使用“queue:work”或“queue:listen”。
以queue:work为例。
线程执行
php think queue:work --queue list名 --daemon
线程重启
php think queue:restart
源码比较多,涉及vendor\topthink\think-queue\src\queue\command\Work.php、vendor\topthink\think-queue\src\queue\command\Restart.php、vendor\topthink\think-queue\src\queue\Worker.php等。
command\Work.php中若使用守护线程则会使用死循环,死循环中执行\queue\Worker.php中的pop()函数。
think\queue\Worker::pop()中会调用think\queue\Worker::process()和think\queue\Worker::sleep()。
hink\queue\Worker::process()中通过think\queue\connectorResit::pop()(使用redis驱动)获取实例对象为$job。
其获取实例的过程,实际是解析存储的json数据,通过其中job参数实例化类,使用函数think\queue\Job::resolveAndFire()和think\queue\Job::resolve()。
hink\queue\Worker::process()中获取对象$job后调用$job->fire(),即用户定义的处理数据类中的fire()函数。
即总过程就像死循环中执行逻辑然后睡几秒后继续执行。若逻辑代码改变,需要线程重启。
若"queue:work"命令行未设置队列名也是使用默认队列名。
想一个命令执行全部队列,估计得另外写代码,应该也不难使用redis的keys匹配出数据,稍微修改下command\Work.php中的代码。
public function execute(Input $input, Output $output){$queue = $input->getOption('queue');//替换掉$delay = $input->getOption('delay');$memory = $input->getOption('memory');if ($input->getOption('daemon')) {Hook::listen('worker_daemon_start', $queue);$this->daemon($queue, $delay, $memory,$input->getOption('sleep'), $input->getOption('tries'));} else {$response = $this->worker->pop($queue, $delay, $input->getOption('sleep'), $input->getOption('tries'));$this->output($response);}
}
3.3 进程管理
两者不同参考:
解析think-queue(围绕redis做分析)-ThinkPHP-PHP中文网
https://segmentfault.com/a/1190000020003672
总结:work 模式的性能会比listen模式高,但需要手动执行。listen不用手动执行。
感觉给work命令做个systemd好些,查过一些线程管理的linux应用。
其他方法参考:
thinkphp队列think-queue的使用以及通过supervisor实现常驻进程 - 简书
代码后台运行,不止nohup,还有Python Supervisor!
supervisor · PyPI
https://www.cnblogs.com/mklblog/p/18070785
四 使用
#application\index\command\jobtest.php
namespace app\index\command;use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Queue;class jobtest extends Command {protected function configure() {$this->setName('jobtest')->setDescription('job queue test');}protected function execute(Input $input, Output $output) {// 1.当前任务将由哪个类来负责处理。// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法$jobHandlerClassName = 'app\jobs\JobTest';// 2.当前任务归属的队列名称,如果为新队列,会自动创建$jobQueueName = "helloJobQueue";// 3.当前任务所需的业务数据, 不能为 resource 类型,其他类型最终将转化为json形式的字符串// (jobData 为对象时,存储其public属性的键值对 )$jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1];// 4.将该任务推送到消息队列,等待对应的消费者去执行$isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);// database 驱动时,返回值为 1|false;redis 驱动时,返回值为 随机字符串|falseif ($isPushed !== false) {echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>";} else {echo 'Oops, something went wrong.';}}
}
#application\common.php
return [// 'app\index\command\test1','app\index\command\jobtest',
];
#application\jobs\JobTest
namespace app\jobs;
use think\queue\Job;
use app\common\model\JobsTest as JobsTestModel;
class JobTest {/*** fire方法是消息队列默认调用的方法* @param Job $job 当前的任务对象* @param array|mixed $data 发布任务时自定义的数据*/public function fire(Job $job, $data) {// 此处做一些 check,提前判断是否需要执行$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);if (!$isJobStillNeedToBeDone) {$job->delete();return;}// 执行逻辑处理(即:你需要该消息队列做什么)$isJobDone = $this->doHelloJob($data);if ($isJobDone) {// 如果任务执行成功,记得删除任务$job->delete();} else {// 通过这个方法可以检查这个任务已经重试了几次了if ($job->attempts() > 3) {$job->delete();// 也可以重新发布这个任务//$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行}}}/*** 有些消息在到达消费者时,可能已经不再需要执行了* @param $data 发布任务时自定义的数据* @return bool 任务执行的结果*/private function checkDatabaseToSeeIfJobNeedToBeDone($data) {return true;}/*** 根据消息中的数据进行实际的业务处理...* @param $data* @return bool*/private function doHelloJob($data) {// TODO 该处为实际业务逻辑,即:对消息中的数据进行处理// $model = new JobsTestModel();// $inData = [// 'uniqId' => $data['uniqId'],// 'time' => $data['ts'],// 'content' => '队列成功的插入数据'// ];// $res = $model->save($inData);// if (! $res) {// return false;// }// return true;sleep(2);var_dump("time:" . date("Y-m-d H:i:s") . " doHelloJob");}
}
php think queue:work --queue helloJobQueue --daemonphp think jobtest
压入数据后redis结果
> keys queues:*
1) "queues:helloJobQueue"
2) "queues:helloJobQueue:reserved"
测试结果
string(35) "time:2024-09-03 11:31:20 doHelloJob"
Processed: app\jobs\JobTest
string(35) "time:2024-09-03 11:31:22 doHelloJob"
Processed: app\jobs\JobTest
改变逻辑
private function doHelloJob($data) {sleep(2);var_dump("time:" . time() . "(0_0)" . " doHelloJob");}
若不重启代码无效,测试结果
string(35) "time:2024-09-03 11:32:23 doHelloJob"
Processed: app\jobs\JobTest
string(35) "time:2024-09-03 11:32:37 doHelloJob"
Processed: app\jobs\JobTest
php think queue:restart
queue:restar 后线程终止,需手动启动。在windows中测试……
实际开发应根据开发环境再做调整。
压入数据的程序不限制调用位置,甚至可以作为Hook。
这篇关于消息队列 think-queue tp5.0的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!