BeansTalkd 做消息队列服务

2023-11-21 04:52

本文主要是介绍BeansTalkd 做消息队列服务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

无意间看到这个仓库讲php关于 BeanStalkd 的扩展,然后就去了解了一下beanstalkd,用它可以用来做队列服务。

话不多说,安装一下试试。

首先 sudo apt search beanstalk 搜索一下发现


Sorting... Done
Full Text Search... Done
awscli/focal-updates,focal-updates 1.18.69-1ubuntu0.20.04.1 all
  Universal Command Line Environment for AWS

beanstalkd/focal,now 1.11-1 amd64 [installed]
  simple, in-memory, workqueue service

python-celery-common/focal,focal 4.2.1-5ubuntu1 all
  async task/job queue based on message passing (common files)

python-celery-doc/focal,focal 4.2.1-5ubuntu1 all
  async task/job queue based on message passing (Documentation)

python3-celery/focal,focal 4.2.1-5ubuntu1 all
  async task/job queue based on message passing (Python3 version)

ruby-beaneater/focal,focal 1.0.0-1 all
  simple beanstalkd client for Ruby
 

好了,找到这个beanstalkd了

然后安装

 sudo apt-get install beanstalkd 

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Suggested packages:
  doc-base
The following NEW packages will be installed:
  beanstalkd
0 upgraded, 1 newly installed, 0 to remove and 90 not upgraded.
Need to get 43.9 kB of archives.
After this operation, 125 kB of additional disk space will be used.
Get:1 http://cn.archive.ubuntu.com/ubuntu focal/universe amd64 beanstalkd amd64 1.11-1 [43.9 kB]
Get:1 http://cn.archive.ubuntu.com/ubuntu focal/universe amd64 beanstalkd amd64 1.11-1 [43.9 kB]
Fetched 31.3 kB in 4s (7,047 B/s)     
Selecting previously unselected package beanstalkd.
(Reading database ... 256731 files and directories currently installed.)
Preparing to unpack .../beanstalkd_1.11-1_amd64.deb ...
Unpacking beanstalkd (1.11-1) ...
Setting up beanstalkd (1.11-1) ...
Created symlink /etc/systemd/system/multi-user.target.wants/beanstalkd.service → /lib/systemd/system/beanstalkd.service.
beanstalkd.socket is a disabled or a static unit, not starting it.
Processing triggers for man-db (2.9.1-1) ...
Processing triggers for systemd (245.4-4ubuntu3.11) ...
 

安装成功,这个东西会监听 11300 端口
然后使用这个工具来看看


监控工具:

wget https://github.com/src-d/beanstool/releases/download/v0.2.0/beanstool_v0.2.0_linux_amd64.tar.gz

获取文件后解压

tar -xvzf beanstool_v0.2.0_linux_amd64.tar.gz

然后拷贝到 /usr/local/bin/

sudo cp beanstool_v0.2.0_linux_amd64/beanstool /usr/local/bin/

这样就直接用 beanstool 了

查看当前状态

beanstool stats

结果

+---------+----------+----------+----------+----------+----------+----------+----------+
| Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+---------+----------+----------+----------+----------+----------+----------+----------+
| default | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
+---------+----------+----------+----------+----------+----------+----------+----------+

然后使用composer的vendor包

composer require pda/pheanstalk
安装完毕后创建一个input.php文件做生产者


<?php
require __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('127.0.0.1');

// Queue a Job
$pheanstalk
  ->useTube('testtube')
  ->put("job payload goes here\n");

$pheanstalk
    ->useTube('testtube')
    ->put(
        json_encode(['test' => 'data']),  // encode data in payload
        Pheanstalk::DEFAULT_PRIORITY,     // default priority
        30, // delay by 30s
        60  // beanstalk will retry job after 60s
     );


再创建一个output.php文件做消费者


<?php
require __DIR__ . '/vendor/autoload.php';
use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('127.0.0.1');

// we want jobs from 'testtube' only.
$pheanstalk->watch('testtube');

// this hangs until a Job is produced.
$job = $pheanstalk->reserve();

try {
    $jobPayload = $job->getData();
    // do work.

    sleep(2);
    // If it's going to take a long time, periodically
    // tell beanstalk we're alive to stop it rescheduling the job.
    $pheanstalk->touch($job);
    sleep(2);

    // eventually we're done, delete job.
    $pheanstalk->delete($job);
}
catch(\Exception $e) {
    // handle exception.
    // and let some other worker retry.
    $pheanstalk->release($job); 
}
 

然后执行一下  php input.php

查看 状态

 
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 1        | 1        | 0        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
 
我们看到有一个在ready状态,一个在delayed状态,这是由于第二次的put采用了延时30s,然后过一段时间后再看

 
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 2        | 0        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
复制代码
已经有2个在ready状态了。

此时我们用消费者执行一下 php output.php 

与此同时迅速看状态

复制代码
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 1        | 1        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
再次执行
$ beanstool stats
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 1        | 0        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+
复制代码
因为我们有sleep(2),所以要尽量快点操作这个状态监控的命令,可以看到有一个拿出来放入了reserved,然后就消失了(实际上这是后面的代码delete导致的,因为已经消费完毕)

再次执行 php output.php 

 
$ beanstool stats:
+----------+----------+----------+----------+----------+----------+----------+----------+
| Name     | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+----------+----------+----------+----------+----------+----------+----------+----------+
| default  | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
| testtube | 0        | 0        | 0        | 1        | 0        | 0        | 2        |
+----------+----------+----------+----------+----------+----------+----------+----------+

$ beanstool stats:
+---------+----------+----------+----------+----------+----------+----------+----------+
| Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+---------+----------+----------+----------+----------+----------+----------+----------+
| default | 0        | 0        | 0        | 0        | 0        | 0        | 0        |
+---------+----------+----------+----------+----------+----------+----------+----------+
 
同样也是迅速观测这个状态,发现消费1个,然后删除1个,现在队列空了,这说明确实是符合我们的期望的。

然后回到文章开头提到的扩展,这个扩展就是帮我们实现了composer装的那个pheanstalk包。


这个扩展如何安装呢?

步骤如下:

克隆项目

git clone https://gitee.com/qzfzz/php-beanstalk.git
进行编译 

phpize

然后

./configure

之后

make && make install

sudo make install

然后修改 php.ini

sudo gedit /etc/php/7.4/cli/php.ini /etc/php/7.4/apache2/php.ini

加上这句

extension=beanstalk

重启apache2

sudo /etc/init.d/apache2 restart
之后就可以使用这个扩展了。

这个扩展它封装为函数了,可以看到他有个例子文件

简单的找了几个例子

复制代码
<?php

$config = [
  'host' => '127.0.0.1',
  'port' => 11300
];
$beanstalk_obj = beanstalk_connect($config['host'], $config['port']);
$last_job_id = beanstalk_put($beanstalk_obj, "message detail");
beanstalk_delete($beanstalk_obj, $last_job_id);
$last_job_id = beanstalk_putInTube($beanstalk_obj, 'tubea', "message detail");
复制代码
可以看到使用 connect 连接, put 塞入新的job消息, putInTube 来塞入指定管道的tubea,delete来删除等等,具体可以看看源代码学习一下,我对比了一下这两种方式实现效率。

第一种采用composer包(我还特意去掉了加载文件所需要的时间)

复制代码
<?phprequire __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$start = microtime( true );
$pheanstalk = Pheanstalk::create('127.0.0.1');

$pheanstalk
  ->useTube('testtube')
  ->put(date("Y-m-d H:i:s") . "job payload goes here\n");

$end = microtime(true);

echo ($end - $start) * 1000 . " ms";
复制代码
执行需要2.59ms

第二种直接用扩展函数


<?php
$start = microtime(true);

$config = [
  'host' => '127.0.0.1',
  'port' => 11300
];

$beanstalk_object = beanstalk_connect($config['host'], $config['port']);
$last_job_id = beanstalk_putInTube($beanstalk_object, 'testtube', date("Y-m-d H:i:s") . "job payload goes here\n");

$end = microtime(true);

echo ($end - $start) * 1000 . " ms";
复制代码
执行需要0.34ms

不得不说,扩展就是扩展,就是快的多啊!

我另外测试了一下投递极限

循环投递10000次消息,大概在500ms左右

复制代码
$start = microtime( true );
for ($i=0; $i < 10000; $i++) { 
  $pheanstalk
  ->useTube('testtube')
  ->put(date("Y-m-d H:i:s") . "job payload goes here\n");
}
$end = microtime( true );
echo ($end - $start) * 1000 . " ms";


 
也就意味着1秒钟只能投递20000条消息,这比rabbitmq的投递慢多了(参见这篇文章)

但是它执行1次投递消息的时候却比这个rabbitmq的代码执行的快,原因是我测试了一下这个rabbitmq的连接上就耗费了9ms

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

更别提还有这两步了

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
这样来看,想迅速投递一条短消息,或者建立小信息量的job用beanstalk是很不错的,如果有大量消息集中投递,那使用 rabbitmq 是很不错的。

另外这个beanstalk投递可以延时,非常适合有些时候需要在当前时间一段时间后执行某个任务,往后弄个类似于一次性的定时器的功能,这个东西值得尝试。

而且如果使用while死循环将 output.php 脚本一直挂着跑,它就能一直消费消息了,这样就等于有个后端进程一直能帮我们做消费者处理堆积的任务了,特殊场景可以考虑一下这个方案。

Windows 安装 Beanstalkd

下载并安装 cygwin

1. 添加镜像地址 - http://mirrors.aliyun.com
2. 下载 `gcc`、`gcc-core`、`make`、`automake`, 在 Devel 分支下

下载 beanstalkd-win 包

1. 解压并进入beanstalkd-win目录
2. 打开CMD窗口,运行 ./beanstalkd.exe -l 127.0.0.1 -p 11300
 

这篇关于BeansTalkd 做消息队列服务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/399943

相关文章

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

poj3750约瑟夫环,循环队列

Description 有N个小孩围成一圈,给他们从1开始依次编号,现指定从第W个开始报数,报到第S个时,该小孩出列,然后从下一个小孩开始报数,仍是报到S个出列,如此重复下去,直到所有的小孩都出列(总人数不足S个时将循环报数),求小孩出列的顺序。 Input 第一行输入小孩的人数N(N<=64) 接下来每行输入一个小孩的名字(人名不超过15个字符) 最后一行输入W,S (W < N),用

POJ2010 贪心优先队列

c头牛,需要选n头(奇数);学校总共有f的资金, 每头牛分数score和学费cost,问合法招生方案中,中间分数(即排名第(n+1)/2)最高的是多少。 n头牛按照先score后cost从小到大排序; 枚举中间score的牛,  预处理左边与右边的最小花费和。 预处理直接优先队列贪心 public class Main {public static voi

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

基于SpringBoot的宠物服务系统+uniapp小程序+LW参考示例

系列文章目录 1.基于SSM的洗衣房管理系统+原生微信小程序+LW参考示例 2.基于SpringBoot的宠物摄影网站管理系统+LW参考示例 3.基于SpringBoot+Vue的企业人事管理系统+LW参考示例 4.基于SSM的高校实验室管理系统+LW参考示例 5.基于SpringBoot的二手数码回收系统+原生微信小程序+LW参考示例 6.基于SSM的民宿预订管理系统+LW参考示例 7.基于