canal_json_to_doris

2024-08-21 06:12
文章标签 json doris canal

本文主要是介绍canal_json_to_doris,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flink同步太消耗资源了,用python写了一个同步的程序。

# -* coding:utf8 *-
import json
import time
import pymysql
import requests
from kafka import KafkaConsumer
import threading
import Queue
from datetime import datetime## 使用一个topic 同步所有表
## 创建同步表不要 not null import!
## nohup python  -u canal_doris.py > doris.log 2>&1 &
## -u 关闭缓存
## topic 最好是 单parittion 配置##### 配置开始
tjtimesecond = 30 ## 日志时间
## doris 设置
tourl = "192.168.150.25"
toport = 9030
touser = "root"
topassword = ""
todatabase = "ods"
topic =['sync_other_all'
]
setfilename = "canal_doris_set.json"
##### 配置结束## 初始化
tj = {}
datas = {}
lasttjtime = 0
queue = Queue.Queue()
consumer = KafkaConsumer(group_id='ggg',enable_auto_commit = False,auto_offset_reset= 'earliest',bootstrap_servers=['zoo1:9092','zoo2:9092','zoo3:9092'],max_poll_records = 500
)
consumer.subscribe(topics =topic)## 表设置with open(setfilename, 'r') as file:content = file.read()
tableset = json.loads(content)##初始化结束def sendweixinerror(msg):url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=www'datas = '{\"msgtype\": \"text\",\"text\": {\"content\": \"' + str(msg) + '\"}}'headers = {"Content-Type": "application/json"}response = requests.post(url=url, data=datas, headers=headers)def execDoris(tourl ,toport,touser,topassword,todatabase,sql):try:db = pymysql.connect(host=tourl, port=toport, user=touser, password=topassword, database=todatabase)cursor = db.cursor()cursor.execute(query=sql)res = cursor.fetchall()cursor.close()db.close()return resexcept Exception as e:print(e)return ""## 初始化开始时间
for k in tableset:x = tableset[k]if 'tsStartBeforeHoursFromnow' in x:time0 = time.time() - x['tsStartBeforeHoursFromnow']*3600;x['fromtime'] = time0*1000 ## mselse:x['fromtime'] = 0tj[k] = {"count":0,"tsdate":"","partinfo":{},"ts":"","sendcount":0}def doris_put_delete2(setkey,db,table,data0):wheresql = ""for one in data0:str = ""str0 = ""for k in one:str0 += k+"='"+one[k]+"' and "str += "(" + str0[:-5] + ") or"wheresql += strwheresql = wheresql[:-2]sql = " delete from `"+db+"`.`"+table +"` where "+wheresqlprint sqltry:rs = execDoris(tourl, toport, touser, topassword, todatabase,sql)except Exception as e:sendweixinerror(db + table + "同步错误2,该表停止更新")tableset[setkey]['ifstop'] = Truetj[setkey]['ifstop'] = Truedef doris_put_delete(setkey,db,table,data0):try:response = requests.put(url="http://192.168.150.24:8040/api/"+db+"/"+table+"/_stream_load",data=json.dumps(data0),headers={"format": "json","strip_outer_array": "true","Expect": "100-continue","merge_type": "DELETE"},auth=('root', ''), allow_redirects=False, verify=False)if 'ErrorURL' in response.text:print "delete"+db+tablesendweixinerror(db+table+" 同步接口删除错误1,尝试sql删除 ")print response.textprint json.dumps(data0)doris_put_delete2(setkey,db,table,data0)except Exception as e:print "delete" + db + tablesendweixinerror(db + table + " 同步接口删除错误1,尝试sql删除 "+e)print response.textprint json.dumps(data0)doris_put_delete2(setkey, db, table, data0)def doris_put_append(setkey,db,table,data0):try:response = requests.put(url="http://192.168.150.24:8040/api/"+db+"/"+table+"/_stream_load",data=json.dumps(data0),headers={"format": "json","strip_outer_array": "true","Expect": "100-continue","merge_type": "APPEND"},auth=('root', ''), allow_redirects=False, verify=False)if 'ErrorURL' in response.text:print "append:"+db+tablesendweixinerror(db+table+"同步错误,该表停止更新")print response.textprint json.dumps(data0)tableset[setkey]['ifstop'] = Trueexcept Exception as e:print "append:" + db + tablesendweixinerror(db + table + "同步错误,该表停止更新")print response.textprint json.dumps(data0)tableset[setkey]['ifstop'] = Truetj[setkey]['ifstop'] = Truedef doris_put_truncate(db,table):print " truncate table  `"+db+"`.`"+table+"`  "execDoris(tourl ,toport,touser,topassword,todatabase," truncate table  `"+db+"`.`"+table+"`  ")def post2dorislist(dkey,rows):tableset[dkey]['lastsend'] = time.time()ifstop = tableset[dkey]['ifstop']if ifstop == True:returndb = tableset[dkey]['db']table = tableset[dkey]['table']if len(rows) >0 :deletedata = []appenddata = []partinfo = {}ts = 0for rkey in rows:row = rows[rkey]if( row['type'] == 'insert' or  row['type'] == 'update' ):appenddata.append( changeRowData(dkey, row["data"]))if row['partition'] not in partinfo:partinfo[row['partition']] = {}partinfo[row['partition']]['offset'] = row['offset']if(row['ts']>ts):ts = row['ts']else:deletedata.append( changeRowData(dkey,row["data"]) )if row['partition'] not in partinfo:partinfo[row['partition']] = {}partinfo[row['partition']]['offset'] = row['offset']if (row['ts'] > ts):ts = row['ts']if len(deletedata) > 0:##doris_put_delete( db,table,deletedata )queue.put({"setkey":dkey, "type":"delete", "db":db,"table":table,"data":deletedata})tj[dkey]['count'] += len(appenddata)tj[dkey]['sendcount'] += 1if len(appenddata) >0:##doris_put_append( db,table,appenddata )queue.put({"setkey":dkey,"type": "append", "db": db, "table": table, "data": appenddata})tj[dkey]['count']+= len(appenddata)tj[dkey]['sendcount'] += 1if len(partinfo)>0:tj[dkey]['partinfo'].update(partinfo)if ts>0:tj[dkey]['ts'] = tstj[dkey]['tsdate'] =  str(datetime.fromtimestamp(ts/1000))datas[dkey]['rows'] = {}def printLog():global lasttjtimewhile True:time.sleep(1)if time.time() - lasttjtime > tjtimesecond:try:with open(setfilename, 'r') as file:content = file.read()tableset0 = json.loads(content)ifchange =Falsefor x in tableset0:if x not in tableset:tableset[x] = tableset0[x]ifchange =Trueif ifchange == True:print str(datetime.now()) + ":-----change tableset"print str(datetime.now())+":"+json.dumps(tableset)except Exception as e:print elasttjtime = time.time()print str(datetime.now())+":"+json.dumps(tj)def getkeyd(dataone,keylist):keys = {}for k in keylist:if k in dataone:keys[k] =dataone[k]else:return Nonereturn keysdef changeRowData( dkey,rowdata ):try:if 'difffields' not in tableset[dkey]:return rowdatadifffields = tableset[dkey]['difffields']if len(difffields) == 0:return rowdatafor k in difffields:if k in rowdata:data =  rowdata[k]del  rowdata[k]rowdata[difffields[k]] = datareturn rowdata;except Exception as e:sendweixinerror(dkey+"同步暂停")print e;print rowdata;return rowdata;
# 使用 `consumer.poll()` 方法获取消息def kafkaProduce():print "start !"while True:time.sleep(0.001)messages = consumer.poll(timeout_ms=500)# 处理消息for topic_partition, message_list in messages.items():for message in message_list:jsonob = json.loads(message.value)datakey = jsonob['database'] + '.' + jsonob['table']if datakey not in tableset:continuefromtime = tableset[datakey]['fromtime']if(jsonob['ts'] < fromtime):continuemaxnum = tableset[datakey]['maxnum']tabletype0 = tableset[datakey]['tabletype']keyf = tableset[datakey]['pknames']ifstop = tableset[datakey]['ifstop'] ## 如果已经暂停则暂停if ifstop == True:continueif (datakey not in datas):datas[datakey] = {"lasttime": time.time(), "rows": {}}if (jsonob['type'] == 'TRUNCATE'):db = tableset[datakey]['db']table = tableset[datakey]['table']doris_put_truncate(db,table)if (jsonob['type'] == 'DELETE'):for dataone in jsonob['data']:keys = getkeyd(dataone, keyf)keyd = ",".join(keys.values())datas[datakey]['rows'][keyd] = {"type": "delete","ts":jsonob['ts'],"data": keys,"partition":topic_partition.partition,"offset":message.offset}if (jsonob['type'] == 'INSERT'):for dataone in jsonob['data']:keys = getkeyd(dataone,keyf)keyd = ",".join(keys.values())datas[datakey]['rows'][keyd] = {"type": "insert","ts":jsonob['ts'], "data": dataone,"partition":topic_partition.partition,"offset":message.offset}if (jsonob['type'] == 'UPDATE'):for key, dataone in enumerate(jsonob['data']):keys = getkeyd(dataone, keyf)keyd = ",".join(keys.values())olddata = jsonob['old'][key]okeyd = Noneokeys = getkeyd(olddata, keyf)if okeys != None:okeyd = ",".join(okeys.values())if tabletype0 == 'unique' and  ( okeys == None or  okeyd == keyd):datas[datakey]['rows'][keyd] = {"type": "update","ts":jsonob['ts'], "data": dataone, "partition":topic_partition.partition,"offset":message.offset}else:datas[datakey]['rows'][okeyd] = {"type": "delete","ts":jsonob['ts'], "data": okeys,"partition":topic_partition.partition,"offset":message.offset}datas[datakey]['rows'][keyd] = {"type": "insert","ts":jsonob['ts'], "data": dataone,"partition":topic_partition.partition,"offset":message.offset}if len(datas[datakey]['rows'])>maxnum:post2dorislist(datakey,datas[datakey]['rows'])for dkey in datas:lastsend  = tableset[dkey]['lastsend']maxsecond = tableset[dkey]['maxsecond']if(time.time() - lastsend>  maxsecond ):post2dorislist(dkey,datas[dkey]['rows'])def consumerThread(queue):while True:time.sleep(0.001)item = queue.get()if(item['type'] == "delete"):doris_put_delete(item['setkey'],item['db'], item['table'], item['data'])if (item['type'] == "append"):doris_put_append(item['setkey'],item['db'], item['table'], item['data'])queue.task_done()consumer_thread = threading.Thread(target=consumerThread, args=(queue,))
producer_thread = threading.Thread(target=kafkaProduce, args=())
log_thread = threading.Thread(target=printLog, args=()) ##  日志producer_thread.setDaemon(True)
consumer_thread.setDaemon(True)
log_thread.setDaemon(True)producer_thread.start()consumer_thread.start()log_thread.start()producer_thread.join()
sendweixinerror("produce同步暂停")
consumer_thread.join()
sendweixinerror("consumer同步暂停")
log_thread.join()
print "stop at " + str(datetime.now())
sendweixinerror("同步中断")
{"test_canal.testpk": {"difffields": {},"lastsend": 0,"ifstop": false,"maxnum": 10000,"maxsecond": 5,"table": "testpk","db": "ods","tabletype": "unique","pknames": ["testid"]}
}

这篇关于canal_json_to_doris的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1092348

相关文章

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

MySQL数据库函数之JSON_EXTRACT示例代码

《MySQL数据库函数之JSON_EXTRACT示例代码》:本文主要介绍MySQL数据库函数之JSON_EXTRACT的相关资料,JSON_EXTRACT()函数用于从JSON文档中提取值,支持对... 目录前言基本语法路径表达式示例示例 1: 提取简单值示例 2: 提取嵌套值示例 3: 提取数组中的值注意

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

JSON字符串转成java的Map对象详细步骤

《JSON字符串转成java的Map对象详细步骤》:本文主要介绍如何将JSON字符串转换为Java对象的步骤,包括定义Element类、使用Jackson库解析JSON和添加依赖,文中通过代码介绍... 目录步骤 1: 定义 Element 类步骤 2: 使用 Jackson 库解析 jsON步骤 3: 添

IDEA如何将String类型转json格式

《IDEA如何将String类型转json格式》在Java中,字符串字面量中的转义字符会被自动转换,但通过网络获取的字符串可能不会自动转换,为了解决IDEA无法识别JSON字符串的问题,可以在本地对字... 目录问题描述问题原因解决方案总结问题描述最近做项目需要使用Ai生成json,可生成String类型

Python中json文件和jsonl文件的区别小结

《Python中json文件和jsonl文件的区别小结》本文主要介绍了JSON和JSONL两种文件格式的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下... 众所周知,jsON 文件是使用php JSON(JavaScripythonpt Object No

使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)

《使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)》在现代软件开发中,处理JSON数据是一项非常常见的任务,无论是从API接口获取数据,还是将数据存储为JSON格式,解析... 目录1. 背景介绍1.1 jsON简介1.2 实际案例2. 准备工作2.1 环境搭建2.1.1 添加

Java解析JSON的六种方案

《Java解析JSON的六种方案》这篇文章介绍了6种JSON解析方案,包括Jackson、Gson、FastJSON、JsonPath、、手动解析,分别阐述了它们的功能特点、代码示例、高级功能、优缺点... 目录前言1. 使用 Jackson:业界标配功能特点代码示例高级功能优缺点2. 使用 Gson:轻量

python中json.dumps和json.dump区别

《python中json.dumps和json.dump区别》json.dumps将Python对象序列化为JSON字符串,json.dump直接将Python对象序列化写入文件,本文就来介绍一下两个... 目录1、json.dumps和json.dump的区别2、使用 json.dumps() 然后写入文

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp