消息队列 think-queue tp5.0

2024-09-04 21:04
文章标签 队列 消息 queue think tp5.0

本文主要是介绍消息队列 think-queue tp5.0,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 介绍

think-queue是tp框架消息队列插件,依赖框架框架核心版本,所以下载时需要选择对应框架核心的版本。

比如tp5.1用2.0的都可以,5.0的用1.1.6。其余版本参考composer。

composer地址:topthink/think-queue - Packagist

不同版本中项目结构不同,一般会说明插件的使用方法,比如配置文件位置。可以在项目中查找“queue.php”通过md文件查看说明,或者通过源码分析获取其存贮位置。

由于本地使用tp5.0版本,只有内容都仅适用tp5.0。

消息队列用于解决并发问题。比如处理微信支付回执……目前退款和支付总是同时给出同一单多个回执,连触发器都没拦截住,只能考虑消息队列。

二 安装&配置

composer require topthink/think-queue v1.1.6

配置文件位于 “application/extra/queue.php”。

#application/extra/queue.php
return [//驱动引擎'connector' => 'Redis',//失效时间'expire' => 60,//默认队列名'default' => 'hirorder',//ip地址'host' => '127.0.0.1',//端口'port' => 6379,//密码'password' => '',//选择数据库'select' => 0,//超时时间'timeout' => 0,//是否持久化'persistent' => false,
];

 各个驱动的具体可用配置项在“think\queue\connector”目录下。各个驱动类里的`options`属性是配置数据,写在上面的“queue”配置里即可覆盖。

不同驱动配置数据不同,可以在“think\queue\connector”中查看“options”属性。

三 原理

用redis模拟消息队列的原理很简单。

推送消息到队列中,即redis的list中。

若队列不存在会在redis中新建对应list,格式如queue:队列名。

安装插件后,可使用queue:work和queue:listen等命令。

开启两者都需要设置参数,比如work的“queue”为队列名、work的“daemon”为是否开启守护线程。

对应命令在“think-queue\src\queue\command”可查。

3.1 推送信息

use think\Queue;// 1.当前任务将由哪个类来负责处理。
// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
$jobHandlerClassName = 'app\jobs\JobTest';// 2.当前任务归属的队列名称,如果为新队列,会自动创建
$jobQueueName = "helloJobQueue";// 3.当前任务所需的业务数据, 不能为 resource 类型,其他类型最终将转化为json形式的字符串
// (jobData 为对象时,存储其public属性的键值对 )
$jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1];// 4.将该任务推送到消息队列,等待对应的消费者去执行
$isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);// database 驱动时,返回值为 1|false;redis 驱动时,返回值为 随机字符串|false
if ($isPushed !== false) {echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>";
} else {echo 'Oops, something went wrong.';
}

$jobHandlerClassName为处理数据的类,需要自己定义。

#application\jobs\JobTest
namespace app\jobs;
use think\queue\Job;
use app\common\model\JobsTest as JobsTestModel;
class JobTest {/*** fire方法是消息队列默认调用的方法* @param Job            $job      当前的任务对象* @param array|mixed    $data     发布任务时自定义的数据*/public function fire(Job $job, $data) {// 此处做一些 check,提前判断是否需要执行$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);if (!$isJobStillNeedToBeDone) {$job->delete();return;}// 执行逻辑处理(即:你需要该消息队列做什么)$isJobDone = $this->doHelloJob($data);if ($isJobDone) {// 如果任务执行成功,记得删除任务$job->delete();} else {// 通过这个方法可以检查这个任务已经重试了几次了if ($job->attempts() > 3) {$job->delete();// 也可以重新发布这个任务//$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行}}}/*** 有些消息在到达消费者时,可能已经不再需要执行了* @param $data 发布任务时自定义的数据* @return bool 任务执行的结果*/private function checkDatabaseToSeeIfJobNeedToBeDone($data) {return true;}/*** 根据消息中的数据进行实际的业务处理...* @param $data* @return bool*/private function doHelloJob($data) {// TODO 该处为实际业务逻辑,即:对消息中的数据进行处理// $model = new JobsTestModel();// $inData = [//     'uniqId' => $data['uniqId'],//     'time' => $data['ts'],//     'content' => '队列成功的插入数据'// ];// $res = $model->save($inData);// if (! $res) {//     return false;// }// return true;var_dump("time:" . time() . " doHelloJob");}
}

源码如下

#vendor\topthink\think-queue\src
class Queue
{/** @var Connector */protected static $connector;private static function buildConnector(){$options = Config::get('queue');$type    = !empty($options['connector']) ? $options['connector'] : 'Sync';if (!isset(self::$connector)) {$class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . Str::studly($type);self::$connector = new $class($options);}return self::$connector;}public static function __callStatic($name, $arguments){return call_user_func_array([self::buildConnector(), $name], $arguments);}
}
#vendor\topthink\think-queue\src\queue\connector
class Redis extends Connector
{
protected $options = ['expire'     => 60,'default'    => 'default','host'       => '127.0.0.1','port'       => 6379,'password'   => '','select'     => 0,'timeout'    => 0,'persistent' => false];
public function __construct($options){if (!extension_loaded('redis')) {throw new Exception('redis扩展未安装');}if (!empty($options)) {$this->options = array_merge($this->options, $options);}$func        = $this->options['persistent'] ? 'pconnect' : 'connect';$this->redis = new \Redis;$this->redis->$func($this->options['host'], $this->options['port'], $this->options['timeout']);if ('' != $this->options['password']) {$this->redis->auth($this->options['password']);}if (0 != $this->options['select']) {$this->redis->select($this->options['select']);}}
public function push($job, $data = '', $queue = null){return $this->pushRaw($this->createPayload($job, $data), $queue);}public function pushRaw($payload, $queue = null){$this->redis->rPush($this->getQueue($queue), $payload);return json_decode($payload, true)['id'];}
protected function createPayload($job, $data = '', $queue = null){$payload = $this->setMeta(parent::createPayload($job, $data), 'id', $this->getRandomId());return $this->setMeta($payload, 'attempts', 1);}
protected function getQueue($queue){return 'queues:' . ($queue ?: $this->options['default']);}
}
#vendor\topthink\think-queue\src\queue
abstract class Connector
{
protected function setMeta($payload, $key, $value){$payload       = json_decode($payload, true);$payload[$key] = $value;$payload       = json_encode($payload);if (JSON_ERROR_NONE !== json_last_error()) {throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());}return $payload;}
protected function createPayload($job, $data = '', $queue = null){if (is_object($job)) {$payload = json_encode(['job'  => 'think\queue\CallQueuedHandler@call','data' => ['commandName' => get_class($job),'command'     => serialize(clone $job),],]);} else {$payload = json_encode($this->createPlainPayload($job, $data));}if (JSON_ERROR_NONE !== json_last_error()) {throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());}return $payload;}
protected function createPlainPayload($job, $data){return ['job' => $job, 'data' => $data];}
}

think\Queue::push()执行think\queue\connector\Redis::__construct()调用think\queue\connector\Redis::push()。

push()中将通过父类方法think\queue\Connector::createPayload()和think\queue\Connector::setMeta() 创建的参数传入 think\queue\connector\Redis::pushRaw()中。

think\queue\Connector::createPayload()中逻辑$job可以为对象,其为对象和非对象构造json的数据格式不相同。

pushRaw()使用think\queue\connector\Redis::getQueue()设置list名字。其中使用redis的rpush命令推送数据,该命令若list不存在则新建。

getQueue()中未设置队列名称则会使用配置中的默认名称,若未设置应会报错。

3.2 处理信息

信息处理使用其自带的命令。命令文件在“think-queue\src\queue\command”中。

一般使用“queue:work”或“queue:listen”。

以queue:work为例。

线程执行

php think queue:work --queue list名 --daemon

 线程重启

php think queue:restart

源码比较多,涉及vendor\topthink\think-queue\src\queue\command\Work.php、vendor\topthink\think-queue\src\queue\command\Restart.php、vendor\topthink\think-queue\src\queue\Worker.php等。

command\Work.php中若使用守护线程则会使用死循环,死循环中执行\queue\Worker.php中的pop()函数。

think\queue\Worker::pop()中会调用think\queue\Worker::process()和think\queue\Worker::sleep()。

hink\queue\Worker::process()中通过think\queue\connectorResit::pop()(使用redis驱动)获取实例对象为$job。

其获取实例的过程,实际是解析存储的json数据,通过其中job参数实例化类,使用函数think\queue\Job::resolveAndFire()和think\queue\Job::resolve()。

hink\queue\Worker::process()中获取对象$job后调用$job->fire(),即用户定义的处理数据类中的fire()函数。

即总过程就像死循环中执行逻辑然后睡几秒后继续执行。若逻辑代码改变,需要线程重启。

若"queue:work"命令行未设置队列名也是使用默认队列名。

想一个命令执行全部队列,估计得另外写代码,应该也不难使用redis的keys匹配出数据,稍微修改下command\Work.php中的代码。

public function execute(Input $input, Output $output){$queue = $input->getOption('queue');//替换掉$delay = $input->getOption('delay');$memory = $input->getOption('memory');if ($input->getOption('daemon')) {Hook::listen('worker_daemon_start', $queue);$this->daemon($queue, $delay, $memory,$input->getOption('sleep'), $input->getOption('tries'));} else {$response = $this->worker->pop($queue, $delay, $input->getOption('sleep'),         $input->getOption('tries'));$this->output($response);}
}

3.3 进程管理

两者不同参考:

解析think-queue(围绕redis做分析)-ThinkPHP-PHP中文网

https://segmentfault.com/a/1190000020003672

总结:work 模式的性能会比listen模式高,但需要手动执行。listen不用手动执行。

感觉给work命令做个systemd好些,查过一些线程管理的linux应用。

其他方法参考:

thinkphp队列think-queue的使用以及通过supervisor实现常驻进程 - 简书

代码后台运行,不止nohup,还有Python Supervisor!

supervisor · PyPI

https://www.cnblogs.com/mklblog/p/18070785

四 使用

#application\index\command\jobtest.php
namespace app\index\command;use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Queue;class jobtest extends Command {protected function configure() {$this->setName('jobtest')->setDescription('job queue test');}protected function execute(Input $input, Output $output) {// 1.当前任务将由哪个类来负责处理。// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法$jobHandlerClassName = 'app\jobs\JobTest';// 2.当前任务归属的队列名称,如果为新队列,会自动创建$jobQueueName = "helloJobQueue";// 3.当前任务所需的业务数据, 不能为 resource 类型,其他类型最终将转化为json形式的字符串// (jobData 为对象时,存储其public属性的键值对 )$jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1];// 4.将该任务推送到消息队列,等待对应的消费者去执行$isPushed = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);// database 驱动时,返回值为 1|false;redis 驱动时,返回值为 随机字符串|falseif ($isPushed !== false) {echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>";} else {echo 'Oops, something went wrong.';}}
}
#application\common.php
return [// 'app\index\command\test1','app\index\command\jobtest',
];
#application\jobs\JobTest
namespace app\jobs;
use think\queue\Job;
use app\common\model\JobsTest as JobsTestModel;
class JobTest {/*** fire方法是消息队列默认调用的方法* @param Job            $job      当前的任务对象* @param array|mixed    $data     发布任务时自定义的数据*/public function fire(Job $job, $data) {// 此处做一些 check,提前判断是否需要执行$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);if (!$isJobStillNeedToBeDone) {$job->delete();return;}// 执行逻辑处理(即:你需要该消息队列做什么)$isJobDone = $this->doHelloJob($data);if ($isJobDone) {// 如果任务执行成功,记得删除任务$job->delete();} else {// 通过这个方法可以检查这个任务已经重试了几次了if ($job->attempts() > 3) {$job->delete();// 也可以重新发布这个任务//$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行}}}/*** 有些消息在到达消费者时,可能已经不再需要执行了* @param $data 发布任务时自定义的数据* @return bool 任务执行的结果*/private function checkDatabaseToSeeIfJobNeedToBeDone($data) {return true;}/*** 根据消息中的数据进行实际的业务处理...* @param $data* @return bool*/private function doHelloJob($data) {// TODO 该处为实际业务逻辑,即:对消息中的数据进行处理// $model = new JobsTestModel();// $inData = [//     'uniqId' => $data['uniqId'],//     'time' => $data['ts'],//     'content' => '队列成功的插入数据'// ];// $res = $model->save($inData);// if (! $res) {//     return false;// }// return true;sleep(2);var_dump("time:" . date("Y-m-d H:i:s") . " doHelloJob");}
}
php think queue:work --queue helloJobQueue --daemonphp think jobtest

 压入数据后redis结果

> keys queues:*
1) "queues:helloJobQueue"
2) "queues:helloJobQueue:reserved"

测试结果

string(35) "time:2024-09-03 11:31:20 doHelloJob"
Processed: app\jobs\JobTest
string(35) "time:2024-09-03 11:31:22 doHelloJob"
Processed: app\jobs\JobTest

改变逻辑

private function doHelloJob($data) {sleep(2);var_dump("time:" . time() . "(0_0)" . " doHelloJob");}

若不重启代码无效,测试结果

string(35) "time:2024-09-03 11:32:23 doHelloJob"
Processed: app\jobs\JobTest
string(35) "time:2024-09-03 11:32:37 doHelloJob"
Processed: app\jobs\JobTest
php think queue:restart

queue:restar 后线程终止,需手动启动。在windows中测试……

实际开发应根据开发环境再做调整。

压入数据的程序不限制调用位置,甚至可以作为Hook。

这篇关于消息队列 think-queue tp5.0的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1137023

相关文章

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

C++——stack、queue的实现及deque的介绍

目录 1.stack与queue的实现 1.1stack的实现  1.2 queue的实现 2.重温vector、list、stack、queue的介绍 2.1 STL标准库中stack和queue的底层结构  3.deque的简单介绍 3.1为什么选择deque作为stack和queue的底层默认容器  3.2 STL中对stack与queue的模拟实现 ①stack模拟实现

poj3750约瑟夫环,循环队列

Description 有N个小孩围成一圈,给他们从1开始依次编号,现指定从第W个开始报数,报到第S个时,该小孩出列,然后从下一个小孩开始报数,仍是报到S个出列,如此重复下去,直到所有的小孩都出列(总人数不足S个时将循环报数),求小孩出列的顺序。 Input 第一行输入小孩的人数N(N<=64) 接下来每行输入一个小孩的名字(人名不超过15个字符) 最后一行输入W,S (W < N),用

POJ2010 贪心优先队列

c头牛,需要选n头(奇数);学校总共有f的资金, 每头牛分数score和学费cost,问合法招生方案中,中间分数(即排名第(n+1)/2)最高的是多少。 n头牛按照先score后cost从小到大排序; 枚举中间score的牛,  预处理左边与右边的最小花费和。 预处理直接优先队列贪心 public class Main {public static voi

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

ActiveMQ—Queue与Topic区别

Queue与Topic区别 转自:http://blog.csdn.net/qq_21033663/article/details/52458305 队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:         1、点对点(point-to-point,简称PTP)Queue消息传递模型:         通过该消息传递模型,一个应用程序(即消息生产者)可以

FreeRTOS学习笔记(六)队列

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、队列的基本内容1.1 队列的引入1.2 FreeRTOS 队列的功能与作用1.3 队列的结构体1.4 队列的使用流程 二、相关API详解2.1 xQueueCreate2.2 xQueueSend2.3 xQueueReceive2.4 xQueueSendFromISR2.5 xQueueRecei