本文主要是介绍Python 全栈系列255 UCS实践:按ID同步数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
说明
这是一个常见的使用场景,实测下来效果良好。
内容
1 实验场景
将库中所有的数据取出,送到队列
本质上,这是一种单向不返回的模式。除了在遍历全库有用,在进行回测时也是一样的,时间就是单向不返回的。
通过UCS,将任意离散的数据记录归并到了一个更大的单位下。按照brick、block、part、shard四个层级,使得数据的管理兼顾到人的记忆特性,以及程序批量处理的效率。一个brick通常代表一万条数据,之后以千不断进位。到part级别已经是十亿的容量了。
UCS将所有数据的编号分为三类:
- 1 数值类。从0开始编号,每条记录递增,这个就是mysql的自增id。
- 2 时间类。以小时为brick,天为block,月为part, 年为shard。
- 3 字符类。所有为数值、非时间类的主键采用字符编号。一般采用md5码计算32位字符,然后根据 每8个字符之和对10取余。如果数据很大,也可以考虑对100,甚至1000取余。
UCS规范已经嵌在GFGoLite服务中,通过UCS对象进行快速实现。
以下是本次实验的文件
- 1 首先要声明worker的缓存空间名称,一般只需一次,后续其他的worker也可以使用这个空间。
- 2 worker并不是服务状态的,所以每次启动必须要载入元数据,在结束本次执行后,要保存元数据
- 3 worker的功能是从clickhouse中取数,然后存到stream中
- 4 QM方面,通过声明远端服务器的RedisAgent完成
- 5 从GlobalBuffer中获取clickhouse的连接参数
- 6 使用CHClient来进行实际的控制
- 7 执行前,待执行的brick_list应该被更新后放在缓存内。
- 8 执行时,worker先取出待执行的brick_list和已经执行的brick(last_brick)
- 9 如果last_brick是空,说明这是初始状态,cur_brick为brick_list中的第一个
- 10 其他情况,cur_brick始终可以取到下一个,直到结尾(此时cur_brick始终等于last_brick),worker会跳过执行
- 11 在正常执行时,worker通过ucs知道当前数据主键的范围,所以可以根据这个条件取出对应brick的原始数据
- 12 执行结束时,worker将cur_brick更新到last_brick中。
最终,没执行一次脚本,就会搬运一个brick到远端队列。
'''
UCS顺序Worker的概念Worker采用UCS的顺序编号:id编号、时间编号Worker依赖Buffer提供运行时参数:- 1 brick列表
- 2 上一次处理brick
'''# 1 创建变量空间(Once) worker.general (TroubleShooting ts_001)
# 2 读取需要处理的brick_list(Manually)from Basefuncs import * worker_buffer_space = 'sp_worker.general'
tier1 = 'xxx'
tier2 = 'ucs_brick_ordered.sniffer'
prefix = '.'.join([worker_buffer_space,tier1,tier2]) +'.'target_redis_agent_host = 'http://IP:24118/'
target_redis_connection_hash = None
target_stream_name = 'xxxx'
target_stream_max_len = 10000000qm = QManager(redis_agent_host = target_redis_agent_host, redis_connection_hash = target_redis_connection_hash, q_max_len = target_stream_max_len)
# ========================== Load
gb = GlobalBuffer()
# manually + brick_list
# gb.setx(prefix +'brick_list',brick_list,persist=True)
brick_list= gb.getx(prefix +'brick_list')
last_brick_handled = gb.getx(prefix +'last_brick_handled') or ''
last_runtime = gb.getx(prefix +'last_runtime')# brick_list需要保证顺序
if last_brick_handled is None:current_brick = brick_list[0]
else:if brick_list.index(last_brick_handled) == len(brick_list) -1:current_brick = last_brick_handledelse:current_brick = brick_list[brick_list.index(last_brick_handled) +1]print('current_brick', current_brick)if current_brick != last_brick_handled:
# 根据buffer知道要处理的数据ucs = UCS()current_brick_bounds = ucs.get_brick_bounds(current_brick)# ========================== Processingclick_para = gb.getx('sp_global.buffer.local.container.clickhouse.my_database.para')chc = CHClient(**click_para)# 根据bounds获取数据query_sql = 'select a, b, c, d from xxx where id >= %s and id < %s' % (current_brick_bounds[0], current_brick_bounds[1] )brick_data = chc._exe_sql(query_sql)brick_data_df = pd.DataFrame(brick_data, columns = ['a','b','c','d'])brick_data_df.columns = ['id','task_for','before','after']brick_data_df['function_type'] = 'ucs_worker'brick_data_df['rec_id'] = brick_data_df['id']brick_data_listofdict = brick_data_df.to_dict(orient='records')# ========================== Postcur_q_len = qm.stream_len(target_stream_name)cur_write_resp = qm.parrallel_write_msg(target_stream_name, brick_data_listofdict, time_out=180)# ========================== Updateif cur_write_resp['status']:last_brick_handled = current_brickgb.setx(prefix +'last_brick_handled', last_brick_handled, persist =True)print('current batch ', len(brick_data_listofdict),' 、target stream len',qm.stream_len(target_stream_name))
else:last_brick_handled = current_bricklast_runtime = get_time_str1()
gb.setx(prefix +'last_runtime', last_runtime)
flask_celery
后来我用了python的标准logging包 + RotateLog的方式记录,不过以下脚本仍然有用。
执行脚本
对于非标准的程序执行,通过脚本方式放在本地的home文件夹下,由celery调度。
注意,被celery执行的脚本,里面最好都写上绝对路径,因为在使用celery worker执行时,当前路径会默认为服务的启动路径 /opt/flask_celery。
例如LOG_FILE,只写tem.log,那么就会在flask_celery下发生修改。
始终注意的是,由flask celery执行的应该是简单的流转任务,而不是复杂的计算任务。如果有,就应该放在某个容器里执行。
再考虑到执行环境,flask celery是在base环境启动的,对应的包应该都能用。如果要执行特别的任务,就要在脚本里指定环境的切换。
vim /home/test_exe.sh
#!/bin/bash
# 日志文件路径
LOG_FILE="/home/tem.log"# 获取当前时间并追加到日志文件
echo "$(date '+%Y-%m-%d %H:%M:%S') - 脚本执行" >> $LOG_FILE# 检查日志文件中的记录条数
LINE_COUNT=$(wc -l < "$LOG_FILE")# 如果记录条数超过10000条,则截断日志文件以保留最新的100条记录
if [ "$LINE_COUNT" -gt 10000 ]; then# 计算需要保留的行数LINES_TO_KEEP=100# 截断日志文件tail -n $LINES_TO_KEEP $LOG_FILE > temp.log && mv temp.log $LOG_FILE
fi
然后将脚本改为可执行
chmod +x /home/test_exe.sh
执行测试
import requests as req param_dict = {'script_path': '/home/test_exe.sh'}resp = req.post('http://127.0.0.1:24104/exe_sh/',json = param_dict )In [5]: !cat tem.log
2024-06-17 14:55:54 - 脚本执行
2024-06-17 14:59:14 - 脚本执行
2024-06-17 15:21:13 - 脚本执行
这篇关于Python 全栈系列255 UCS实践:按ID同步数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!