本文主要是介绍Sqlite3+RabbitMQ+Celery Python从零开始搭建一个持久化层的生产者消费者服务模型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1. 一张图说明我们接下来要造什么:
本次实验所使用到的玩具:
【1】RabbitMQ:消息队列中间件,做broker角色
【2】Celery:异步调度中间件,做workers的调度者(包工头)
【3】Sqlite:异步计算结果存储的角色
我们的 workers 可以理解为多进程/多线程,然后通过Celery来进行智能化的调度,至于他怎么调度的我们暂时不关心,然后我们使用RabbitMQ作为消息队列缓存的中间服务,当我们接收到生产者的数据的时候就会统一访问到我们的Celery进行调度执行,然后我们可以通过Celery来异步获取计算结果,当然了我们不想等待可以完全交给他执行,他执行完会把数据存入到结果数据库,我们再通过Celery来访问结果数据库(Backend)来获取我们之前计算的结果。
2. 安装配置顺序
- Step1: 安装Erlang(Erlang是一种通用的面向并发的编程语言)
- Step2: 安装RabbitMQ(第三方消息缓存中间件)
- Step3: 安装Sqlite3并配置环境变量
- (可选)安装SqliteStudio来预览数据库数据
- Step4: Python 安装 celery
- Step5: 启动 RabbitMQ服务并测试可视化UI
- (可选)配置VSCode-Celery任务启动
- Step6: 编写Worker-Python代码(消费者)
- Step7: 编写Producer-Python代码(生产者)
- Step8: 新建一个DB数据库
- Step9: 启动 celery 绑定
broker
到RabbitMQ,绑定backend
到新建的DB数据库 - Step10: 发起Producer的处理请求
- Step11: 通过任务ID来访问曾经处理过的任务
- —— 完成
3. 安装需要的文件:
打包下载地址
4. 配置详细步骤
4.1 安装 Erlang
双击安装 otp_win64_23.2.exe
RabbitMQ是用Erlang写的,所以必须安装Erlang虚拟机
4.2 安装 RabbitMQ
也是双击安装
4.3 安装Sqlite3驱动并配置环境变量
在电脑任意一个地方新建一个文件夹用来存Sqlite3数据库驱动,比如我手动新建:sqlite
目录在
C:\Program Files\sqlite
那么你的Path环境变量新建一个的时候就需要把这个目录填上。因人而异,弱智操作,自行体会。
然后把那些和sqlite有关的解压,复制到自己的目录:(结果如下)
4.4 Python 安装 celery
pip install celery
4.5 启动 RabbitMQ服务并测试可视化UI
我们可以看到我们新安装的程序里:
可以从这里启动,也可以直接跑到RabbitMQ的老家那里去启动:
启动管理插件:sbin/rabbitmq-plugins enable rabbitmq_management
启动rabbitmq:sbin/rabbitmq-server -detached
WEB-UI 管理地址:http://localhost:15672/
,自己找他在哪个端口刨出来的,官网有说到,另外自己可以从启动输出的日志找到哦
默认密码和账户都是:guest
打开:
4.6 编写Worker-Python代码(消费者)
新建一个DB数据库放在同一个目录,这个就省略操作了。
不用建表,到时候celery自己会建表的。
from celery import Celery
import sqlite3
broker = 'amqp://guest@localhost//' #缓存消息队列
backend = 'db+sqlite:///qw.db' #结果暂存数据库
app = Celery('task', broker=broker, backend=backend)# param1:app的名字,也就是这个代码文件的名字
# param2:消息中间件,amqp协议访问消息队列,我们使用RabbitMQ,你可以用Redis(严格来说他不是消息队列,而是持久化内存数据库)
# param3:结果存储# 当我们启动任务的时候可以通过异步进行获取结果,也可以通过查询任务id来对结果数据库访问查询@app.task
def add(x, y):return x + y
注意一下协议的格式,特别是像sqlite这种小数据库,celery+sqlite用法的资料并不多,但无奈不想装MySQL先用lite来替代着。
4.7 编写VSCode 任务命令
或者你可以在命令同目录下启动:celery -A task worker --loglevel=info
注意:task 是我的这个代码的名字(严格来说是app的名字)
4.8 启动 celery
4.9 编写生产者
简单的用法我都写在下面了:
from task import add# 我们通过Celery执行worker的代码
# 然后再通过Celery取数据出来,再写入数据库
# 注意在只有一个参数要传时 需要写成列表的形式[v1,]# 这个时候只有在Celery服务器启动的时候才能进行
# Method1 异步获取数据结果
re = add.apply_async((23,1)) # do 23+1
print(re.get())'''
# Method2 异步获取数据结果
re = add.delay(23,1)
print(re.get())
''''''
# 根据任务ID去获取以前计算的结果
re = add.AsyncResult('0b0ebce2-cb17-4b32-8847-68d793205009')
print(re.get())
'''
如上演示了:add这个函数的并发命令执行
我们执行完可以去SQLiteStudio看下:
首先他自己建表了,result就是我们的执行结果,你不要自己尝试是写Sqlite的代码去访问他的result数据,这是不对的,这个序列化的数据是Celery写进来的,至于他是怎样编码的鬼知道,我们正确的操作应该是再通过Celery去访问,可以根据任务ID去获取曾经计算的结果。
re = add.AsyncResult('0b0ebce2-cb17-4b32-8847-68d793205009')
print(re.get())
OK,至此完成了一个持久化层的生产者消费者服务模型搭建。
这个教程当玩具玩下还是可以的,具体的生产环境各种注意事项有空再写吧,学无止境。
这篇关于Sqlite3+RabbitMQ+Celery Python从零开始搭建一个持久化层的生产者消费者服务模型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!