canal_json_to_doris

2024-08-21 06:12
文章标签 json doris canal

本文主要是介绍canal_json_to_doris,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flink同步太消耗资源了,用python写了一个同步的程序。

# -* coding:utf8 *-
import json
import time
import pymysql
import requests
from kafka import KafkaConsumer
import threading
import Queue
from datetime import datetime## 使用一个topic 同步所有表
## 创建同步表不要 not null import!
## nohup python  -u canal_doris.py > doris.log 2>&1 &
## -u 关闭缓存
## topic 最好是 单parittion 配置##### 配置开始
tjtimesecond = 30 ## 日志时间
## doris 设置
tourl = "192.168.150.25"
toport = 9030
touser = "root"
topassword = ""
todatabase = "ods"
topic =['sync_other_all'
]
setfilename = "canal_doris_set.json"
##### 配置结束## 初始化
tj = {}
datas = {}
lasttjtime = 0
queue = Queue.Queue()
consumer = KafkaConsumer(group_id='ggg',enable_auto_commit = False,auto_offset_reset= 'earliest',bootstrap_servers=['zoo1:9092','zoo2:9092','zoo3:9092'],max_poll_records = 500
)
consumer.subscribe(topics =topic)## 表设置with open(setfilename, 'r') as file:content = file.read()
tableset = json.loads(content)##初始化结束def sendweixinerror(msg):url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=www'datas = '{\"msgtype\": \"text\",\"text\": {\"content\": \"' + str(msg) + '\"}}'headers = {"Content-Type": "application/json"}response = requests.post(url=url, data=datas, headers=headers)def execDoris(tourl ,toport,touser,topassword,todatabase,sql):try:db = pymysql.connect(host=tourl, port=toport, user=touser, password=topassword, database=todatabase)cursor = db.cursor()cursor.execute(query=sql)res = cursor.fetchall()cursor.close()db.close()return resexcept Exception as e:print(e)return ""## 初始化开始时间
for k in tableset:x = tableset[k]if 'tsStartBeforeHoursFromnow' in x:time0 = time.time() - x['tsStartBeforeHoursFromnow']*3600;x['fromtime'] = time0*1000 ## mselse:x['fromtime'] = 0tj[k] = {"count":0,"tsdate":"","partinfo":{},"ts":"","sendcount":0}def doris_put_delete2(setkey,db,table,data0):wheresql = ""for one in data0:str = ""str0 = ""for k in one:str0 += k+"='"+one[k]+"' and "str += "(" + str0[:-5] + ") or"wheresql += strwheresql = wheresql[:-2]sql = " delete from `"+db+"`.`"+table +"` where "+wheresqlprint sqltry:rs = execDoris(tourl, toport, touser, topassword, todatabase,sql)except Exception as e:sendweixinerror(db + table + "同步错误2,该表停止更新")tableset[setkey]['ifstop'] = Truetj[setkey]['ifstop'] = Truedef doris_put_delete(setkey,db,table,data0):try:response = requests.put(url="http://192.168.150.24:8040/api/"+db+"/"+table+"/_stream_load",data=json.dumps(data0),headers={"format": "json","strip_outer_array": "true","Expect": "100-continue","merge_type": "DELETE"},auth=('root', ''), allow_redirects=False, verify=False)if 'ErrorURL' in response.text:print "delete"+db+tablesendweixinerror(db+table+" 同步接口删除错误1,尝试sql删除 ")print response.textprint json.dumps(data0)doris_put_delete2(setkey,db,table,data0)except Exception as e:print "delete" + db + tablesendweixinerror(db + table + " 同步接口删除错误1,尝试sql删除 "+e)print response.textprint json.dumps(data0)doris_put_delete2(setkey, db, table, data0)def doris_put_append(setkey,db,table,data0):try:response = requests.put(url="http://192.168.150.24:8040/api/"+db+"/"+table+"/_stream_load",data=json.dumps(data0),headers={"format": "json","strip_outer_array": "true","Expect": "100-continue","merge_type": "APPEND"},auth=('root', ''), allow_redirects=False, verify=False)if 'ErrorURL' in response.text:print "append:"+db+tablesendweixinerror(db+table+"同步错误,该表停止更新")print response.textprint json.dumps(data0)tableset[setkey]['ifstop'] = Trueexcept Exception as e:print "append:" + db + tablesendweixinerror(db + table + "同步错误,该表停止更新")print response.textprint json.dumps(data0)tableset[setkey]['ifstop'] = Truetj[setkey]['ifstop'] = Truedef doris_put_truncate(db,table):print " truncate table  `"+db+"`.`"+table+"`  "execDoris(tourl ,toport,touser,topassword,todatabase," truncate table  `"+db+"`.`"+table+"`  ")def post2dorislist(dkey,rows):tableset[dkey]['lastsend'] = time.time()ifstop = tableset[dkey]['ifstop']if ifstop == True:returndb = tableset[dkey]['db']table = tableset[dkey]['table']if len(rows) >0 :deletedata = []appenddata = []partinfo = {}ts = 0for rkey in rows:row = rows[rkey]if( row['type'] == 'insert' or  row['type'] == 'update' ):appenddata.append( changeRowData(dkey, row["data"]))if row['partition'] not in partinfo:partinfo[row['partition']] = {}partinfo[row['partition']]['offset'] = row['offset']if(row['ts']>ts):ts = row['ts']else:deletedata.append( changeRowData(dkey,row["data"]) )if row['partition'] not in partinfo:partinfo[row['partition']] = {}partinfo[row['partition']]['offset'] = row['offset']if (row['ts'] > ts):ts = row['ts']if len(deletedata) > 0:##doris_put_delete( db,table,deletedata )queue.put({"setkey":dkey, "type":"delete", "db":db,"table":table,"data":deletedata})tj[dkey]['count'] += len(appenddata)tj[dkey]['sendcount'] += 1if len(appenddata) >0:##doris_put_append( db,table,appenddata )queue.put({"setkey":dkey,"type": "append", "db": db, "table": table, "data": appenddata})tj[dkey]['count']+= len(appenddata)tj[dkey]['sendcount'] += 1if len(partinfo)>0:tj[dkey]['partinfo'].update(partinfo)if ts>0:tj[dkey]['ts'] = tstj[dkey]['tsdate'] =  str(datetime.fromtimestamp(ts/1000))datas[dkey]['rows'] = {}def printLog():global lasttjtimewhile True:time.sleep(1)if time.time() - lasttjtime > tjtimesecond:try:with open(setfilename, 'r') as file:content = file.read()tableset0 = json.loads(content)ifchange =Falsefor x in tableset0:if x not in tableset:tableset[x] = tableset0[x]ifchange =Trueif ifchange == True:print str(datetime.now()) + ":-----change tableset"print str(datetime.now())+":"+json.dumps(tableset)except Exception as e:print elasttjtime = time.time()print str(datetime.now())+":"+json.dumps(tj)def getkeyd(dataone,keylist):keys = {}for k in keylist:if k in dataone:keys[k] =dataone[k]else:return Nonereturn keysdef changeRowData( dkey,rowdata ):try:if 'difffields' not in tableset[dkey]:return rowdatadifffields = tableset[dkey]['difffields']if len(difffields) == 0:return rowdatafor k in difffields:if k in rowdata:data =  rowdata[k]del  rowdata[k]rowdata[difffields[k]] = datareturn rowdata;except Exception as e:sendweixinerror(dkey+"同步暂停")print e;print rowdata;return rowdata;
# 使用 `consumer.poll()` 方法获取消息def kafkaProduce():print "start !"while True:time.sleep(0.001)messages = consumer.poll(timeout_ms=500)# 处理消息for topic_partition, message_list in messages.items():for message in message_list:jsonob = json.loads(message.value)datakey = jsonob['database'] + '.' + jsonob['table']if datakey not in tableset:continuefromtime = tableset[datakey]['fromtime']if(jsonob['ts'] < fromtime):continuemaxnum = tableset[datakey]['maxnum']tabletype0 = tableset[datakey]['tabletype']keyf = tableset[datakey]['pknames']ifstop = tableset[datakey]['ifstop'] ## 如果已经暂停则暂停if ifstop == True:continueif (datakey not in datas):datas[datakey] = {"lasttime": time.time(), "rows": {}}if (jsonob['type'] == 'TRUNCATE'):db = tableset[datakey]['db']table = tableset[datakey]['table']doris_put_truncate(db,table)if (jsonob['type'] == 'DELETE'):for dataone in jsonob['data']:keys = getkeyd(dataone, keyf)keyd = ",".join(keys.values())datas[datakey]['rows'][keyd] = {"type": "delete","ts":jsonob['ts'],"data": keys,"partition":topic_partition.partition,"offset":message.offset}if (jsonob['type'] == 'INSERT'):for dataone in jsonob['data']:keys = getkeyd(dataone,keyf)keyd = ",".join(keys.values())datas[datakey]['rows'][keyd] = {"type": "insert","ts":jsonob['ts'], "data": dataone,"partition":topic_partition.partition,"offset":message.offset}if (jsonob['type'] == 'UPDATE'):for key, dataone in enumerate(jsonob['data']):keys = getkeyd(dataone, keyf)keyd = ",".join(keys.values())olddata = jsonob['old'][key]okeyd = Noneokeys = getkeyd(olddata, keyf)if okeys != None:okeyd = ",".join(okeys.values())if tabletype0 == 'unique' and  ( okeys == None or  okeyd == keyd):datas[datakey]['rows'][keyd] = {"type": "update","ts":jsonob['ts'], "data": dataone, "partition":topic_partition.partition,"offset":message.offset}else:datas[datakey]['rows'][okeyd] = {"type": "delete","ts":jsonob['ts'], "data": okeys,"partition":topic_partition.partition,"offset":message.offset}datas[datakey]['rows'][keyd] = {"type": "insert","ts":jsonob['ts'], "data": dataone,"partition":topic_partition.partition,"offset":message.offset}if len(datas[datakey]['rows'])>maxnum:post2dorislist(datakey,datas[datakey]['rows'])for dkey in datas:lastsend  = tableset[dkey]['lastsend']maxsecond = tableset[dkey]['maxsecond']if(time.time() - lastsend>  maxsecond ):post2dorislist(dkey,datas[dkey]['rows'])def consumerThread(queue):while True:time.sleep(0.001)item = queue.get()if(item['type'] == "delete"):doris_put_delete(item['setkey'],item['db'], item['table'], item['data'])if (item['type'] == "append"):doris_put_append(item['setkey'],item['db'], item['table'], item['data'])queue.task_done()consumer_thread = threading.Thread(target=consumerThread, args=(queue,))
producer_thread = threading.Thread(target=kafkaProduce, args=())
log_thread = threading.Thread(target=printLog, args=()) ##  日志producer_thread.setDaemon(True)
consumer_thread.setDaemon(True)
log_thread.setDaemon(True)producer_thread.start()consumer_thread.start()log_thread.start()producer_thread.join()
sendweixinerror("produce同步暂停")
consumer_thread.join()
sendweixinerror("consumer同步暂停")
log_thread.join()
print "stop at " + str(datetime.now())
sendweixinerror("同步中断")
{"test_canal.testpk": {"difffields": {},"lastsend": 0,"ifstop": false,"maxnum": 10000,"maxsecond": 5,"table": "testpk","db": "ods","tabletype": "unique","pknames": ["testid"]}
}

这篇关于canal_json_to_doris的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1092348

相关文章

MySQL查询JSON数组字段包含特定字符串的方法

《MySQL查询JSON数组字段包含特定字符串的方法》在MySQL数据库中,当某个字段存储的是JSON数组,需要查询数组中包含特定字符串的记录时传统的LIKE语句无法直接使用,下面小编就为大家介绍两种... 目录问题背景解决方案对比1. 精确匹配方案(推荐)2. 模糊匹配方案参数化查询示例使用场景建议性能优

解决未解析的依赖项:‘net.sf.json-lib:json-lib:jar:2.4‘问题

《解决未解析的依赖项:‘net.sf.json-lib:json-lib:jar:2.4‘问题》:本文主要介绍解决未解析的依赖项:‘net.sf.json-lib:json-lib:jar:2.4... 目录未解析的依赖项:‘net.sf.json-lib:json-lib:jar:2.4‘打开pom.XM

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

SpringBoot排查和解决JSON解析错误(400 Bad Request)的方法

《SpringBoot排查和解决JSON解析错误(400BadRequest)的方法》在开发SpringBootRESTfulAPI时,客户端与服务端的数据交互通常使用JSON格式,然而,JSON... 目录问题背景1. 问题描述2. 错误分析解决方案1. 手动重新输入jsON2. 使用工具清理JSON3.

Springboot3+将ID转为JSON字符串的详细配置方案

《Springboot3+将ID转为JSON字符串的详细配置方案》:本文主要介绍纯后端实现Long/BigIntegerID转为JSON字符串的详细配置方案,s基于SpringBoot3+和Spr... 目录1. 添加依赖2. 全局 Jackson 配置3. 精准控制(可选)4. OpenAPI (Spri

MySQL JSON 查询中的对象与数组技巧及查询示例

《MySQLJSON查询中的对象与数组技巧及查询示例》MySQL中JSON对象和JSON数组查询的详细介绍及带有WHERE条件的查询示例,本文给大家介绍的非常详细,mysqljson查询示例相关知... 目录jsON 对象查询1. JSON_CONTAINS2. JSON_EXTRACT3. JSON_TA

Java中JSON格式反序列化为Map且保证存取顺序一致的问题

《Java中JSON格式反序列化为Map且保证存取顺序一致的问题》:本文主要介绍Java中JSON格式反序列化为Map且保证存取顺序一致的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未... 目录背景问题解决方法总结背景做项目涉及两个微服务之间传数据时,需要提供方将Map类型的数据序列化为co

使用Java将实体类转换为JSON并输出到控制台的完整过程

《使用Java将实体类转换为JSON并输出到控制台的完整过程》在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用JSON格式,用Java将实体类转换为J... 在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用j

MySQL 中的 JSON 查询案例详解

《MySQL中的JSON查询案例详解》:本文主要介绍MySQL的JSON查询的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录mysql 的 jsON 路径格式基本结构路径组件详解特殊语法元素实际示例简单路径复杂路径简写操作符注意MySQL 的 J

Spring Boot中JSON数值溢出问题从报错到优雅解决办法

《SpringBoot中JSON数值溢出问题从报错到优雅解决办法》:本文主要介绍SpringBoot中JSON数值溢出问题从报错到优雅的解决办法,通过修改字段类型为Long、添加全局异常处理和... 目录一、问题背景:为什么我的接口突然报错了?二、为什么会发生这个错误?1. Java 数据类型的“容量”限制