Python 全栈系列255 UCS实践:按ID同步数据

2024-06-23 00:12

本文主要是介绍Python 全栈系列255 UCS实践:按ID同步数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明

这是一个常见的使用场景,实测下来效果良好。

内容

1 实验场景

将库中所有的数据取出,送到队列

本质上,这是一种单向不返回的模式。除了在遍历全库有用,在进行回测时也是一样的,时间就是单向不返回的。

通过UCS,将任意离散的数据记录归并到了一个更大的单位下。按照brick、block、part、shard四个层级,使得数据的管理兼顾到人的记忆特性,以及程序批量处理的效率。一个brick通常代表一万条数据,之后以千不断进位。到part级别已经是十亿的容量了。

UCS将所有数据的编号分为三类:

  • 1 数值类。从0开始编号,每条记录递增,这个就是mysql的自增id。
  • 2 时间类。以小时为brick,天为block,月为part, 年为shard。
  • 3 字符类。所有为数值、非时间类的主键采用字符编号。一般采用md5码计算32位字符,然后根据 每8个字符之和对10取余。如果数据很大,也可以考虑对100,甚至1000取余。

UCS规范已经嵌在GFGoLite服务中,通过UCS对象进行快速实现。

以下是本次实验的文件

  • 1 首先要声明worker的缓存空间名称,一般只需一次,后续其他的worker也可以使用这个空间。
  • 2 worker并不是服务状态的,所以每次启动必须要载入元数据,在结束本次执行后,要保存元数据
  • 3 worker的功能是从clickhouse中取数,然后存到stream中
  • 4 QM方面,通过声明远端服务器的RedisAgent完成
  • 5 从GlobalBuffer中获取clickhouse的连接参数
  • 6 使用CHClient来进行实际的控制
  • 7 执行前,待执行的brick_list应该被更新后放在缓存内。
  • 8 执行时,worker先取出待执行的brick_list和已经执行的brick(last_brick)
  • 9 如果last_brick是空,说明这是初始状态,cur_brick为brick_list中的第一个
  • 10 其他情况,cur_brick始终可以取到下一个,直到结尾(此时cur_brick始终等于last_brick),worker会跳过执行
  • 11 在正常执行时,worker通过ucs知道当前数据主键的范围,所以可以根据这个条件取出对应brick的原始数据
  • 12 执行结束时,worker将cur_brick更新到last_brick中。

最终,没执行一次脚本,就会搬运一个brick到远端队列。

'''
UCS顺序Worker的概念Worker采用UCS的顺序编号:id编号、时间编号Worker依赖Buffer提供运行时参数:- 1 brick列表
- 2 上一次处理brick
'''# 1 创建变量空间(Once) worker.general (TroubleShooting ts_001)
# 2 读取需要处理的brick_list(Manually)from Basefuncs import * worker_buffer_space = 'sp_worker.general'
tier1 = 'xxx'
tier2 = 'ucs_brick_ordered.sniffer'
prefix = '.'.join([worker_buffer_space,tier1,tier2]) +'.'target_redis_agent_host = 'http://IP:24118/'
target_redis_connection_hash = None 
target_stream_name = 'xxxx'
target_stream_max_len = 10000000qm = QManager(redis_agent_host = target_redis_agent_host, redis_connection_hash = target_redis_connection_hash, q_max_len = target_stream_max_len)
# ==========================  Load 
gb = GlobalBuffer()
# manually + brick_list
# gb.setx(prefix +'brick_list',brick_list,persist=True)
brick_list=  gb.getx(prefix +'brick_list')
last_brick_handled = gb.getx(prefix +'last_brick_handled')  or ''
last_runtime = gb.getx(prefix +'last_runtime')# brick_list需要保证顺序
if last_brick_handled is None:current_brick =  brick_list[0]
else:if brick_list.index(last_brick_handled) ==  len(brick_list) -1:current_brick = last_brick_handledelse:current_brick = brick_list[brick_list.index(last_brick_handled) +1]print('current_brick', current_brick)if current_brick != last_brick_handled:
# 根据buffer知道要处理的数据ucs = UCS()current_brick_bounds = ucs.get_brick_bounds(current_brick)# ==========================  Processingclick_para = gb.getx('sp_global.buffer.local.container.clickhouse.my_database.para')chc = CHClient(**click_para)# 根据bounds获取数据query_sql = 'select a, b, c, d from xxx where id >= %s and id < %s' % (current_brick_bounds[0], current_brick_bounds[1] )brick_data = chc._exe_sql(query_sql)brick_data_df = pd.DataFrame(brick_data, columns = ['a','b','c','d'])brick_data_df.columns = ['id','task_for','before','after']brick_data_df['function_type'] = 'ucs_worker'brick_data_df['rec_id'] = brick_data_df['id']brick_data_listofdict = brick_data_df.to_dict(orient='records')# ==========================  Postcur_q_len = qm.stream_len(target_stream_name)cur_write_resp = qm.parrallel_write_msg(target_stream_name, brick_data_listofdict, time_out=180)# ==========================  Updateif cur_write_resp['status']:last_brick_handled = current_brickgb.setx(prefix +'last_brick_handled', last_brick_handled, persist =True)print('current batch ', len(brick_data_listofdict),' 、target stream len',qm.stream_len(target_stream_name))
else:last_brick_handled = current_bricklast_runtime = get_time_str1()
gb.setx(prefix +'last_runtime', last_runtime)

flask_celery

后来我用了python的标准logging包 + RotateLog的方式记录,不过以下脚本仍然有用。

执行脚本

对于非标准的程序执行,通过脚本方式放在本地的home文件夹下,由celery调度。
注意,被celery执行的脚本,里面最好都写上绝对路径,因为在使用celery worker执行时,当前路径会默认为服务的启动路径 /opt/flask_celery。
例如LOG_FILE,只写tem.log,那么就会在flask_celery下发生修改。
始终注意的是,由flask celery执行的应该是简单的流转任务,而不是复杂的计算任务。如果有,就应该放在某个容器里执行。
再考虑到执行环境,flask celery是在base环境启动的,对应的包应该都能用。如果要执行特别的任务,就要在脚本里指定环境的切换。

vim /home/test_exe.sh

#!/bin/bash
# 日志文件路径
LOG_FILE="/home/tem.log"# 获取当前时间并追加到日志文件
echo "$(date '+%Y-%m-%d %H:%M:%S') - 脚本执行" >> $LOG_FILE# 检查日志文件中的记录条数
LINE_COUNT=$(wc -l < "$LOG_FILE")# 如果记录条数超过10000条,则截断日志文件以保留最新的100条记录
if [ "$LINE_COUNT" -gt 10000 ]; then# 计算需要保留的行数LINES_TO_KEEP=100# 截断日志文件tail -n $LINES_TO_KEEP $LOG_FILE > temp.log && mv temp.log $LOG_FILE
fi

然后将脚本改为可执行
chmod +x /home/test_exe.sh
执行测试


import requests as req param_dict = {'script_path': '/home/test_exe.sh'}resp = req.post('http://127.0.0.1:24104/exe_sh/',json = param_dict )In [5]: !cat tem.log
2024-06-17 14:55:54 - 脚本执行
2024-06-17 14:59:14 - 脚本执行
2024-06-17 15:21:13 - 脚本执行

这篇关于Python 全栈系列255 UCS实践:按ID同步数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1085804

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi