Python 全栈系列243 S2S flask_celery

2024-05-12 11:04

本文主要是介绍Python 全栈系列243 S2S flask_celery,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明

按现有的几个架构部件,构建数据流。

S = Redis Stream。这个可以作为缓冲队列和简单任务队列,速度非常快,至少是万条/秒的速度。
Q = RabbitMQ。这个作为任务队列,消息也主要是元数据。读速比较慢,但有一些特性,然后自带前端,作为任务队列比较合适。
M = Mongo。这个作为数据主库还是比较合适的。具有丰富的数据操作模式,同时性能也不错。
C = ClickHouse。这个特别适合作为任务数据库。因为列式存储的特性,其吞吐性能,简单统计功能甚至逼近了程序处理的速度。例如,存储10万条数据,大约也就3秒;统计900万数据某个字段的长度,时间也不到5秒。(过去在处理上,基本上按照100万条/秒来评估默认的程序处理能力)

RabbitMQ和Redis Streams都是流行的队列系统,用于处理消息传递任务,但它们在效率和应用场景上有所不同。
RabbitMQ是基于AMQP(高级消息队列协议)的开源消息代理,它提供了可靠的消息传递机制,能够保证消息的持久性,即使在发送或接收过程中出现故障也不会丢失消息。RabbitMQ适用于需要高可靠性和复杂路由策略的生产环境,特别是在分布式系统中,它能够很好地处理复杂的异步消息传递任务。
另一方面,Redis Streams是Redis 5.0版本引入的新特性,它提供了一个持久化的消息队列系统。Redis Streams的设计理念在于提供高性能的发布/订阅模型,尤其适合于即时消息处理场景。与RabbitMQ相比,Redis Streams在性能上具有优势,因为它利用了Redis本身的高性能特性,使得消息的读写速度非常快。
在效率方面,Redis Streams通常被认为比RabbitMQ更快,特别是在处理大量实时数据流时。这是因为Redis作为一个内存中的键值存储系统,本身就具有很高的读写速度,而Streams作为其一部分,也继承了这种高效性。此外,Redis Streams的无锁设计进一步提升了其性能,使得它在处理并发请求时更加高效。
然而,RabbitMQ在某些情况下可能更适合使用,特别是当需要处理复杂的异步任务、保证消息的顺序性以及实现可靠的消息传递时。RabbitMQ的这些特性使得它在金融、医疗等关键行业中被广泛应用。
总之,在选择RabbitMQ还是Redis Streams时,应考虑到具体的应用场景、性能需求和可靠性要求。如果追求极致的性能和实时性,Redis Streams可能是更好的选择;而如果需要更高的可靠性和复杂的路由功能,RabbitMQ可能更为合适。

本次目标是搭建一个worker,可以通过参数化方式,完成两个S间的流转。除了M和C之前一般不会直接流转,那么应该有 4*3 - 2 = 10 种组件间的流转。

内容

整体的实现逻辑顺序为:

  • 1 使用QManager完成S2S的动作(函数)
  • 2 将函数定义为celery task
  • 3 将flask-celery发布为systemd服务

1 S2S 函数

S2S应该是一种最常见的任务

首先是QManager, 这个是对RedisAgent进行封装和集成的对象,本质上是个二传手。

QManager 集成了:

  • 1 判断队列是否可以写入
  • 2 并行写入
  • 3 fetch和range两种方式取数
  • 4 删除消息
import requests as req 
class QManager:def __init__(self , batch_size = 1000, redis_agent_host = 'http://172.17.0.1:24021/',redis_connection_hash =None,q_max_len = 100000):self.batch_size = batch_sizeself.redis_agent_host = redis_agent_hostself.redis_connection_hash = redis_connection_hashself.q_max_len = q_max_lendef auto_connect(self, db_server_name):print('这里应该根据某个参数值,自动切换为合适的连接')def info(self):return req.post(self.redis_agent_host + 'info/',json = {'connection_hash':self.redis_connection_hash}).json()# redis没有提供命令来列出streams# def qname_list(self, stream_name = '*'):#     return req.post(self.redis_agent_host + 'info_stream/',json = {'stream_name':stream_name}).json()# 查看队列长度def stream_len(self, stream_name):cur_len_resp = req.post(self.redis_agent_host + 'len_of_queue/',json ={'stream_name':stream_name,'connection_hash':self.redis_connection_hash}).json()return cur_len_resp['data']# 创建队列和分组def ensure_group(self, stream_name, group_name ='group1', start_point='0'):return req.post(self.redis_agent_host +'ensure_group/',json ={'stream_name':stream_name,'group_name':group_name,'start_point':start_point}).json()# 判断队列是否可以插入def _is_q_available(self,stream_name):cur_len = self.stream_len(stream_name)if cur_len + self.batch_size >=self.q_max_len:return False else:return True #  基于并发方法,向数据库存数【队列Write相关-写入消息】- 其实是使用pipeline - 最好单次一万左右def parrallel_write_msg(self,stream_name, data_listofdict = None, time_out = 30,is_return_msg_id_list=False):resp_dict = req.post(self.redis_agent_host + 'batch_add_msg/',json ={'connection_hash':self.redis_connection_hash,'stream_name':stream_name,'msg_dict_list':data_listofdict,'maxlen':self.q_max_len,'is_return_msg_id_list':is_return_msg_id_list},timeout=time_out).json()return resp_dict# 读取# 批量获取数据 getdef xrange(self, stream_name, count = None):cur_count = count or self.batch_size recs_resp = req.post(self.redis_agent_host + 'xrange/',json ={'connection_hash':self.redis_connection_hash, 'stream_name':stream_name,'count':cur_count}).json()return recs_resp# 批量获取数据 fetchdef xfetch(self, stream_name, count = None,group_name = 'group1' , consumer_name = 'consumer1'):cur_count = count or self.batch_sizereturn req.post(self.redis_agent_host + 'fetch_msg/',json = {'connection_hash':self.redis_connection_hash,'stream_name':stream_name,'group_name':group_name,'consumer_name':consumer_name,'count':cur_count}).json()# 批量删除消息def xdel(self,stream_name,mid_or_list =None):if len(mid_or_list):return req.post(self.redis_agent_host  + 'del_msg/',json ={'connection_hash':self.redis_connection_hash, 'stream_name':stream_name,'mid_or_list':mid_or_list}).json()@staticmethoddef extract_msg_id(some_msg_list):return [x['_msg_id'] for x in some_msg_list]

基于此,稍微修改就可以完成S2S的任务

按照边的方式,给到left和right的参数信息。使用这些信息分别初始化left和right的QManager。最后按照配置里的约定,执行n次同步。每次执行时,都会看下目标队列是否已满,若已满则放弃写入,否则执行写入,然后删除消息。

# local
cfg = {'target_q_max_len': 10,'source_read_batch_num':1,'target_write_batch_num':1,'source_redis_agent_host':'http://172.17.0.1:24021/','source_connection_hash':None,'target_redis_agent_host':'http://172.17.0.1:24021/','target_connection_hash':None,'source_stream':'.'.join(['STREAM','test','test', 'stream_in']),'target_stream':'.'.join(['STREAM','test','test', 'stream_out'])}# read
source_qm = QManager(batch_size =cfg['source_read_batch_num'],redis_agent_host = cfg['source_redis_agent_host'],redis_connection_hash = cfg['source_connection_hash'])
# write
target_qm = QManager(batch_size =cfg['target_write_batch_num'],redis_agent_host = cfg['target_redis_agent_host'],redis_connection_hash = cfg['target_connection_hash'])# 确保队列的存在
if True:source_qm.ensure_group(cfg['source_stream'])target_qm.ensure_group(cfg['target_stream'])'''
主逻辑:- 1 判断目标队列是否满,如果是,那么直接退出
- 2 从源队列取数(采用xrange方法),如果没有数据,直接退出【每对stream之间,只会有一个 sniffer 】
- 3 将源队列数据写入目标队列
- 4 从源队列中删除这些数据'''print('source q len ', source_qm.stream_len(cfg['source_stream']))
print('target q len ', target_qm.stream_len(cfg['target_stream']))for _ in range(cfg['max_exec_cnt']):if target_qm._is_q_available(cfg['target_stream']):print('target q ok')msg_num_limit = min(cfg['source_read_batch_num'],cfg['target_write_batch_num'])msg_list = source_qm.xrange(cfg['source_stream'], count=msg_num_limit)['data']if len(msg_list) == 0:print('source q empty')breakelse:# 写入目标队列target_qm.parrallel_write_msg(cfg['target_stream'], data_listofdict= msg_list)# 将写入的消息从源队列删除to_del_msg_id_list = source_qm.extract_msg_id(msg_list)source_qm.xdel(cfg['source_stream'], mid_or_list= to_del_msg_id_list)else:break

2 Celery Task

然后将上述功能函数写入Flask-Celery

第一部分是在 celery的修饰器下,将任务函数搬进去。然后在app下定义了任务的调用,主要是用到了delay方法,实现异步调用。

# =======================以下是正式的内容
@celery_.task
def s2s_handler(cfg_dict = None):cfg = cfg_dict# readsource_qm = QManager(batch_size =cfg['source_read_batch_num'],redis_agent_host = cfg['source_redis_agent_host'],redis_connection_hash = cfg['source_connection_hash'])# writetarget_qm = QManager(batch_size =cfg['target_write_batch_num'],redis_agent_host = cfg['target_redis_agent_host'],redis_connection_hash = cfg['target_connection_hash'])print('source q len ', source_qm.stream_len(cfg['source_stream']))print('target q len ', target_qm.stream_len(cfg['target_stream']))for _ in range(cfg['max_exec_cnt']):if target_qm._is_q_available(cfg['target_stream']):print('target q ok')msg_num_limit = min(cfg['source_read_batch_num'],cfg['target_write_batch_num'])msg_list = source_qm.xrange(cfg['source_stream'], count=msg_num_limit)['data']if len(msg_list) == 0:print('source q empty')breakelse:# 写入目标队列target_qm.parrallel_write_msg(cfg['target_stream'], data_listofdict= msg_list)# 将写入的消息从源队列删除to_del_msg_id_list = source_qm.extract_msg_id(msg_list)source_qm.xdel(cfg['source_stream'], mid_or_list= to_del_msg_id_list)else:break# 执行任务的路由 POST
@app.route("/s2s/", methods=['GET','POST'] )
def s2s():input_data = request.get_json()# 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果result = s2s_handler.delay(input_data)return result.id

调用测试,存入一万条消息(之前还有70条残留),任务执行后,source_q中的数据将会逐渐流转到target_q

# debug - 样例数据写入源队列
data_listofdict = [{'msg_id': i, 'data':'test'} for i in range(10000)]
source_qm.parrallel_write_msg(cfg['source_stream'], data_listofdict= data_listofdict)print('source q len ', source_qm.stream_len(cfg['source_stream']))
print('target q len ', target_qm.stream_len(cfg['target_stream']))source q len  10070
target q len  230import requests as req 
# 假设是发往本机: 注意,地址是127.0.0.1
cfg1 = {'target_q_max_len': 100000,'source_read_batch_num':1,'target_write_batch_num':1,'source_redis_agent_host':'http://127.0.0.1:24021/','source_connection_hash':None,'target_redis_agent_host':'http://127.0.0.1:24021/','target_connection_hash':None,'source_stream':'.'.join(['STREAM','test','test', 'stream_in']),'target_stream':'.'.join(['STREAM','test','test', 'stream_out']),'max_exec_cnt':10}resp = req.post('http://127.0.0.1:24104/s2s/',json = cfg1 )# 返回任务号
In [9]: resp.text
Out[9]: '177e57b7-09c5-43f0-ae1f-0cbe8e41dbf5'# 流转了10条消息
In [10]: print('source q len ', source_qm.stream_len(cfg['source_stream']))...: print('target q len ', target_qm.stream_len(cfg['target_stream']))
source q len  10060
target q len  240

3 Systemd Service

由于服务是在宿主机启动的,而且是基础服务,所以使用systemd配置自启动。启动命令有点小坑,可参考 一次搞定 Linux systemd 服务脚本

本次要点就在于要用forking启动【采用sh脚本启动其他进程时Type须为forking】,因为要启动flask和celery两个服务才行。

[Unit]   
Description=test        # 简单描述服务
After=network.target    # 描述服务类别,表示本服务需要在network服务启动后在启动
Before=xxx.service      # 表示需要在某些服务启动之前启动,After和Before字段只涉及启动顺序,不涉及依赖关系[Service] 
Type=forking            # 设置服务的启动方式
User=USER               # 设置服务运行的用户
Group=USER              # 设置服务运行的用户组
WorkingDirectory=/PATH  # 设置服务运行的路径(cwd)
KillMode=control-group  # 定义systemd如何停止服务
Restart=no              # 定义服务进程退出后,systemd的重启方式,默认是不重启
ExecStart=/start.sh     # 服务启动命令,命令需要绝对路径(采用sh脚本启动其他进程时Type须为forking)[Install]   
WantedBy=multi-user.target  # 多用户

然后就好了
在这里插入图片描述

这篇关于Python 全栈系列243 S2S flask_celery的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/982442

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

nudepy,一个有趣的 Python 库!

更多资料获取 📚 个人网站:ipengtao.com 大家好,今天为大家分享一个有趣的 Python 库 - nudepy。 Github地址:https://github.com/hhatto/nude.py 在图像处理和计算机视觉应用中,检测图像中的不适当内容(例如裸露图像)是一个重要的任务。nudepy 是一个基于 Python 的库,专门用于检测图像中的不适当内容。该

pip-tools:打造可重复、可控的 Python 开发环境,解决依赖关系,让代码更稳定

在 Python 开发中,管理依赖关系是一项繁琐且容易出错的任务。手动更新依赖版本、处理冲突、确保一致性等等,都可能让开发者感到头疼。而 pip-tools 为开发者提供了一套稳定可靠的解决方案。 什么是 pip-tools? pip-tools 是一组命令行工具,旨在简化 Python 依赖关系的管理,确保项目环境的稳定性和可重复性。它主要包含两个核心工具:pip-compile 和 pip

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0