canal_json_to_doris

2024-08-21 06:12
文章标签 json doris canal

本文主要是介绍canal_json_to_doris,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flink同步太消耗资源了,用python写了一个同步的程序。

# -* coding:utf8 *-
import json
import time
import pymysql
import requests
from kafka import KafkaConsumer
import threading
import Queue
from datetime import datetime## 使用一个topic 同步所有表
## 创建同步表不要 not null import!
## nohup python  -u canal_doris.py > doris.log 2>&1 &
## -u 关闭缓存
## topic 最好是 单parittion 配置##### 配置开始
tjtimesecond = 30 ## 日志时间
## doris 设置
tourl = "192.168.150.25"
toport = 9030
touser = "root"
topassword = ""
todatabase = "ods"
topic =['sync_other_all'
]
setfilename = "canal_doris_set.json"
##### 配置结束## 初始化
tj = {}
datas = {}
lasttjtime = 0
queue = Queue.Queue()
consumer = KafkaConsumer(group_id='ggg',enable_auto_commit = False,auto_offset_reset= 'earliest',bootstrap_servers=['zoo1:9092','zoo2:9092','zoo3:9092'],max_poll_records = 500
)
consumer.subscribe(topics =topic)## 表设置with open(setfilename, 'r') as file:content = file.read()
tableset = json.loads(content)##初始化结束def sendweixinerror(msg):url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=www'datas = '{\"msgtype\": \"text\",\"text\": {\"content\": \"' + str(msg) + '\"}}'headers = {"Content-Type": "application/json"}response = requests.post(url=url, data=datas, headers=headers)def execDoris(tourl ,toport,touser,topassword,todatabase,sql):try:db = pymysql.connect(host=tourl, port=toport, user=touser, password=topassword, database=todatabase)cursor = db.cursor()cursor.execute(query=sql)res = cursor.fetchall()cursor.close()db.close()return resexcept Exception as e:print(e)return ""## 初始化开始时间
for k in tableset:x = tableset[k]if 'tsStartBeforeHoursFromnow' in x:time0 = time.time() - x['tsStartBeforeHoursFromnow']*3600;x['fromtime'] = time0*1000 ## mselse:x['fromtime'] = 0tj[k] = {"count":0,"tsdate":"","partinfo":{},"ts":"","sendcount":0}def doris_put_delete2(setkey,db,table,data0):wheresql = ""for one in data0:str = ""str0 = ""for k in one:str0 += k+"='"+one[k]+"' and "str += "(" + str0[:-5] + ") or"wheresql += strwheresql = wheresql[:-2]sql = " delete from `"+db+"`.`"+table +"` where "+wheresqlprint sqltry:rs = execDoris(tourl, toport, touser, topassword, todatabase,sql)except Exception as e:sendweixinerror(db + table + "同步错误2,该表停止更新")tableset[setkey]['ifstop'] = Truetj[setkey]['ifstop'] = Truedef doris_put_delete(setkey,db,table,data0):try:response = requests.put(url="http://192.168.150.24:8040/api/"+db+"/"+table+"/_stream_load",data=json.dumps(data0),headers={"format": "json","strip_outer_array": "true","Expect": "100-continue","merge_type": "DELETE"},auth=('root', ''), allow_redirects=False, verify=False)if 'ErrorURL' in response.text:print "delete"+db+tablesendweixinerror(db+table+" 同步接口删除错误1,尝试sql删除 ")print response.textprint json.dumps(data0)doris_put_delete2(setkey,db,table,data0)except Exception as e:print "delete" + db + tablesendweixinerror(db + table + " 同步接口删除错误1,尝试sql删除 "+e)print response.textprint json.dumps(data0)doris_put_delete2(setkey, db, table, data0)def doris_put_append(setkey,db,table,data0):try:response = requests.put(url="http://192.168.150.24:8040/api/"+db+"/"+table+"/_stream_load",data=json.dumps(data0),headers={"format": "json","strip_outer_array": "true","Expect": "100-continue","merge_type": "APPEND"},auth=('root', ''), allow_redirects=False, verify=False)if 'ErrorURL' in response.text:print "append:"+db+tablesendweixinerror(db+table+"同步错误,该表停止更新")print response.textprint json.dumps(data0)tableset[setkey]['ifstop'] = Trueexcept Exception as e:print "append:" + db + tablesendweixinerror(db + table + "同步错误,该表停止更新")print response.textprint json.dumps(data0)tableset[setkey]['ifstop'] = Truetj[setkey]['ifstop'] = Truedef doris_put_truncate(db,table):print " truncate table  `"+db+"`.`"+table+"`  "execDoris(tourl ,toport,touser,topassword,todatabase," truncate table  `"+db+"`.`"+table+"`  ")def post2dorislist(dkey,rows):tableset[dkey]['lastsend'] = time.time()ifstop = tableset[dkey]['ifstop']if ifstop == True:returndb = tableset[dkey]['db']table = tableset[dkey]['table']if len(rows) >0 :deletedata = []appenddata = []partinfo = {}ts = 0for rkey in rows:row = rows[rkey]if( row['type'] == 'insert' or  row['type'] == 'update' ):appenddata.append( changeRowData(dkey, row["data"]))if row['partition'] not in partinfo:partinfo[row['partition']] = {}partinfo[row['partition']]['offset'] = row['offset']if(row['ts']>ts):ts = row['ts']else:deletedata.append( changeRowData(dkey,row["data"]) )if row['partition'] not in partinfo:partinfo[row['partition']] = {}partinfo[row['partition']]['offset'] = row['offset']if (row['ts'] > ts):ts = row['ts']if len(deletedata) > 0:##doris_put_delete( db,table,deletedata )queue.put({"setkey":dkey, "type":"delete", "db":db,"table":table,"data":deletedata})tj[dkey]['count'] += len(appenddata)tj[dkey]['sendcount'] += 1if len(appenddata) >0:##doris_put_append( db,table,appenddata )queue.put({"setkey":dkey,"type": "append", "db": db, "table": table, "data": appenddata})tj[dkey]['count']+= len(appenddata)tj[dkey]['sendcount'] += 1if len(partinfo)>0:tj[dkey]['partinfo'].update(partinfo)if ts>0:tj[dkey]['ts'] = tstj[dkey]['tsdate'] =  str(datetime.fromtimestamp(ts/1000))datas[dkey]['rows'] = {}def printLog():global lasttjtimewhile True:time.sleep(1)if time.time() - lasttjtime > tjtimesecond:try:with open(setfilename, 'r') as file:content = file.read()tableset0 = json.loads(content)ifchange =Falsefor x in tableset0:if x not in tableset:tableset[x] = tableset0[x]ifchange =Trueif ifchange == True:print str(datetime.now()) + ":-----change tableset"print str(datetime.now())+":"+json.dumps(tableset)except Exception as e:print elasttjtime = time.time()print str(datetime.now())+":"+json.dumps(tj)def getkeyd(dataone,keylist):keys = {}for k in keylist:if k in dataone:keys[k] =dataone[k]else:return Nonereturn keysdef changeRowData( dkey,rowdata ):try:if 'difffields' not in tableset[dkey]:return rowdatadifffields = tableset[dkey]['difffields']if len(difffields) == 0:return rowdatafor k in difffields:if k in rowdata:data =  rowdata[k]del  rowdata[k]rowdata[difffields[k]] = datareturn rowdata;except Exception as e:sendweixinerror(dkey+"同步暂停")print e;print rowdata;return rowdata;
# 使用 `consumer.poll()` 方法获取消息def kafkaProduce():print "start !"while True:time.sleep(0.001)messages = consumer.poll(timeout_ms=500)# 处理消息for topic_partition, message_list in messages.items():for message in message_list:jsonob = json.loads(message.value)datakey = jsonob['database'] + '.' + jsonob['table']if datakey not in tableset:continuefromtime = tableset[datakey]['fromtime']if(jsonob['ts'] < fromtime):continuemaxnum = tableset[datakey]['maxnum']tabletype0 = tableset[datakey]['tabletype']keyf = tableset[datakey]['pknames']ifstop = tableset[datakey]['ifstop'] ## 如果已经暂停则暂停if ifstop == True:continueif (datakey not in datas):datas[datakey] = {"lasttime": time.time(), "rows": {}}if (jsonob['type'] == 'TRUNCATE'):db = tableset[datakey]['db']table = tableset[datakey]['table']doris_put_truncate(db,table)if (jsonob['type'] == 'DELETE'):for dataone in jsonob['data']:keys = getkeyd(dataone, keyf)keyd = ",".join(keys.values())datas[datakey]['rows'][keyd] = {"type": "delete","ts":jsonob['ts'],"data": keys,"partition":topic_partition.partition,"offset":message.offset}if (jsonob['type'] == 'INSERT'):for dataone in jsonob['data']:keys = getkeyd(dataone,keyf)keyd = ",".join(keys.values())datas[datakey]['rows'][keyd] = {"type": "insert","ts":jsonob['ts'], "data": dataone,"partition":topic_partition.partition,"offset":message.offset}if (jsonob['type'] == 'UPDATE'):for key, dataone in enumerate(jsonob['data']):keys = getkeyd(dataone, keyf)keyd = ",".join(keys.values())olddata = jsonob['old'][key]okeyd = Noneokeys = getkeyd(olddata, keyf)if okeys != None:okeyd = ",".join(okeys.values())if tabletype0 == 'unique' and  ( okeys == None or  okeyd == keyd):datas[datakey]['rows'][keyd] = {"type": "update","ts":jsonob['ts'], "data": dataone, "partition":topic_partition.partition,"offset":message.offset}else:datas[datakey]['rows'][okeyd] = {"type": "delete","ts":jsonob['ts'], "data": okeys,"partition":topic_partition.partition,"offset":message.offset}datas[datakey]['rows'][keyd] = {"type": "insert","ts":jsonob['ts'], "data": dataone,"partition":topic_partition.partition,"offset":message.offset}if len(datas[datakey]['rows'])>maxnum:post2dorislist(datakey,datas[datakey]['rows'])for dkey in datas:lastsend  = tableset[dkey]['lastsend']maxsecond = tableset[dkey]['maxsecond']if(time.time() - lastsend>  maxsecond ):post2dorislist(dkey,datas[dkey]['rows'])def consumerThread(queue):while True:time.sleep(0.001)item = queue.get()if(item['type'] == "delete"):doris_put_delete(item['setkey'],item['db'], item['table'], item['data'])if (item['type'] == "append"):doris_put_append(item['setkey'],item['db'], item['table'], item['data'])queue.task_done()consumer_thread = threading.Thread(target=consumerThread, args=(queue,))
producer_thread = threading.Thread(target=kafkaProduce, args=())
log_thread = threading.Thread(target=printLog, args=()) ##  日志producer_thread.setDaemon(True)
consumer_thread.setDaemon(True)
log_thread.setDaemon(True)producer_thread.start()consumer_thread.start()log_thread.start()producer_thread.join()
sendweixinerror("produce同步暂停")
consumer_thread.join()
sendweixinerror("consumer同步暂停")
log_thread.join()
print "stop at " + str(datetime.now())
sendweixinerror("同步中断")
{"test_canal.testpk": {"difffields": {},"lastsend": 0,"ifstop": false,"maxnum": 10000,"maxsecond": 5,"table": "testpk","db": "ods","tabletype": "unique","pknames": ["testid"]}
}

这篇关于canal_json_to_doris的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1092348

相关文章

php中json_decode()和json_encode()

1.json_decode() json_decode (PHP 5 >= 5.2.0, PECL json >= 1.2.0) json_decode — 对 JSON 格式的字符串进行编码 说明 mixed json_decode ( string $json [, bool $assoc ] ) 接受一个 JSON 格式的字符串并且把它转换为 PHP 变量 参数 json

struts2中的json返回指定的多个参数

要返回指定的多个参数,就必须在struts.xml中的配置如下: <action name="goodsType_*" class="goodsTypeAction" method="{1}"> <!-- 查询商品类别信息==分页 --> <result type="json" name="goodsType_findPgae"> <!--在这一行进行指定,其中lis是一个List集合,但

特殊JSON解析

一般的与后台交互;都会涉及到接口数据的获取;而这里的数据一般情况就是JSON 了;JSON 解析起来方便;而且数据量也较小一些;所以JSON在接口数据返回中是个很不错的选择。 下面简单说下JSON解析过程中的一些案例: 这里我用到了三方的架包:fastjson-1.1.39.jar 架包 可以在我的博客中找到下载;或者网上找下 很多的; 这里主要就是映射  关系了;这就要求:实体类的名称和

三.海量数据实时分析-FlinkCDC实现Mysql数据同步到Doris

FlinkCDC 同步Mysql到Doris 参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/get-started/quickstart/mysql-to-doris/ 1.安装Flink 下载 Flink 1.18.0,下载后把压缩包上传到服务器,使用tar -zxvf flink-xxx-

基于canal的Redis缓存双写

canal地址:alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件 (github.com)https://github.com/alibaba/canal 1. 准备 1.1 MySQL 查看主机二进制日志 show master status 查看binlog是否开启 show variables like 'log_bin' 授权

用ajax json给后台action传数据要注意的问题

必须要有get和set方法   1 action中定义bean变量,注意写get和set方法 2 js中写ajax方法,传json类型数据 3 配置action在struts2中

go json反序列化成指定类型

简介 简单的介绍一下使用go的json库,将json字符串反序列化成接口中指定的实现类 代码如下 package usejsontype ExamInterface interface {CheckRule(data any) bool}type IntStru struct {DefalutVal int `json:"defalut_val"`Max int `json:

Java构造和解析Json数据的两种方法(json-lib构造和解析Json数据, org.json构造和解析Json数据)

在www.json.org上公布了很多JAVA下的json构造和解析工具,其中org.json和json-lib比较简单,两者使用上差不多但还是有些区别。下面首先介绍用json-lib构造和解析Json数据的方法示例。 一、介绍       JSON-lib包是一个beans,collections,maps,java arrays 和XML和JSON互相转换的包,主要就是用来解析Json

Ajax中根据json数据不同,对页面上的单选框Radio进行回显

Ajax中根据json数据不同,对页面上的单选框Radio进行回显 js代码: $(document).ready(function(){$.ajax({type: "POST",url: path+"/pop/nowTodayMeet2",dataType: "json",success: function(data){$("#discussTopicsEdit").val(da

C++利用jsoncpp库实现写入和读取json文件(含中文处理)

C++利用jsoncpp库实现写入和读取json文件 1 jsoncpp常用类1.1 Json::Value1.2 Json::Reader1.3 Json::Writer 2 json文件3 写json文件3.1 linux存储结果3.2 windows存储结果 3 读json文件4 读json字符串参考文章 在C++中使用跨平台的开源库JsonCpp,实现json的序列化和反序列