本文主要是介绍canal_json_to_doris,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
flink同步太消耗资源了,用python写了一个同步的程序。
# -* coding:utf8 *-
import json
import time
import pymysql
import requests
from kafka import KafkaConsumer
import threading
import Queue
from datetime import datetime## 使用一个topic 同步所有表
## 创建同步表不要 not null import!
## nohup python -u canal_doris.py > doris.log 2>&1 &
## -u 关闭缓存
## topic 最好是 单parittion 配置##### 配置开始
tjtimesecond = 30 ## 日志时间
## doris 设置
tourl = "192.168.150.25"
toport = 9030
touser = "root"
topassword = ""
todatabase = "ods"
topic =['sync_other_all'
]
setfilename = "canal_doris_set.json"
##### 配置结束## 初始化
tj = {}
datas = {}
lasttjtime = 0
queue = Queue.Queue()
consumer = KafkaConsumer(group_id='ggg',enable_auto_commit = False,auto_offset_reset= 'earliest',bootstrap_servers=['zoo1:9092','zoo2:9092','zoo3:9092'],max_poll_records = 500
)
consumer.subscribe(topics =topic)## 表设置with open(setfilename, 'r') as file:content = file.read()
tableset = json.loads(content)##初始化结束def sendweixinerror(msg):url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=www'datas = '{\"msgtype\": \"text\",\"text\": {\"content\": \"' + str(msg) + '\"}}'headers = {"Content-Type": "application/json"}response = requests.post(url=url, data=datas, headers=headers)def execDoris(tourl ,toport,touser,topassword,todatabase,sql):try:db = pymysql.connect(host=tourl, port=toport, user=touser, password=topassword, database=todatabase)cursor = db.cursor()cursor.execute(query=sql)res = cursor.fetchall()cursor.close()db.close()return resexcept Exception as e:print(e)return ""## 初始化开始时间
for k in tableset:x = tableset[k]if 'tsStartBeforeHoursFromnow' in x:time0 = time.time() - x['tsStartBeforeHoursFromnow']*3600;x['fromtime'] = time0*1000 ## mselse:x['fromtime'] = 0tj[k] = {"count":0,"tsdate":"","partinfo":{},"ts":"","sendcount":0}def doris_put_delete2(setkey,db,table,data0):wheresql = ""for one in data0:str = ""str0 = ""for k in one:str0 += k+"='"+one[k]+"' and "str += "(" + str0[:-5] + ") or"wheresql += strwheresql = wheresql[:-2]sql = " delete from `"+db+"`.`"+table +"` where "+wheresqlprint sqltry:rs = execDoris(tourl, toport, touser, topassword, todatabase,sql)except Exception as e:sendweixinerror(db + table + "同步错误2,该表停止更新")tableset[setkey]['ifstop'] = Truetj[setkey]['ifstop'] = Truedef doris_put_delete(setkey,db,table,data0):try:response = requests.put(url="http://192.168.150.24:8040/api/"+db+"/"+table+"/_stream_load",data=json.dumps(data0),headers={"format": "json","strip_outer_array": "true","Expect": "100-continue","merge_type": "DELETE"},auth=('root', ''), allow_redirects=False, verify=False)if 'ErrorURL' in response.text:print "delete"+db+tablesendweixinerror(db+table+" 同步接口删除错误1,尝试sql删除 ")print response.textprint json.dumps(data0)doris_put_delete2(setkey,db,table,data0)except Exception as e:print "delete" + db + tablesendweixinerror(db + table + " 同步接口删除错误1,尝试sql删除 "+e)print response.textprint json.dumps(data0)doris_put_delete2(setkey, db, table, data0)def doris_put_append(setkey,db,table,data0):try:response = requests.put(url="http://192.168.150.24:8040/api/"+db+"/"+table+"/_stream_load",data=json.dumps(data0),headers={"format": "json","strip_outer_array": "true","Expect": "100-continue","merge_type": "APPEND"},auth=('root', ''), allow_redirects=False, verify=False)if 'ErrorURL' in response.text:print "append:"+db+tablesendweixinerror(db+table+"同步错误,该表停止更新")print response.textprint json.dumps(data0)tableset[setkey]['ifstop'] = Trueexcept Exception as e:print "append:" + db + tablesendweixinerror(db + table + "同步错误,该表停止更新")print response.textprint json.dumps(data0)tableset[setkey]['ifstop'] = Truetj[setkey]['ifstop'] = Truedef doris_put_truncate(db,table):print " truncate table `"+db+"`.`"+table+"` "execDoris(tourl ,toport,touser,topassword,todatabase," truncate table `"+db+"`.`"+table+"` ")def post2dorislist(dkey,rows):tableset[dkey]['lastsend'] = time.time()ifstop = tableset[dkey]['ifstop']if ifstop == True:returndb = tableset[dkey]['db']table = tableset[dkey]['table']if len(rows) >0 :deletedata = []appenddata = []partinfo = {}ts = 0for rkey in rows:row = rows[rkey]if( row['type'] == 'insert' or row['type'] == 'update' ):appenddata.append( changeRowData(dkey, row["data"]))if row['partition'] not in partinfo:partinfo[row['partition']] = {}partinfo[row['partition']]['offset'] = row['offset']if(row['ts']>ts):ts = row['ts']else:deletedata.append( changeRowData(dkey,row["data"]) )if row['partition'] not in partinfo:partinfo[row['partition']] = {}partinfo[row['partition']]['offset'] = row['offset']if (row['ts'] > ts):ts = row['ts']if len(deletedata) > 0:##doris_put_delete( db,table,deletedata )queue.put({"setkey":dkey, "type":"delete", "db":db,"table":table,"data":deletedata})tj[dkey]['count'] += len(appenddata)tj[dkey]['sendcount'] += 1if len(appenddata) >0:##doris_put_append( db,table,appenddata )queue.put({"setkey":dkey,"type": "append", "db": db, "table": table, "data": appenddata})tj[dkey]['count']+= len(appenddata)tj[dkey]['sendcount'] += 1if len(partinfo)>0:tj[dkey]['partinfo'].update(partinfo)if ts>0:tj[dkey]['ts'] = tstj[dkey]['tsdate'] = str(datetime.fromtimestamp(ts/1000))datas[dkey]['rows'] = {}def printLog():global lasttjtimewhile True:time.sleep(1)if time.time() - lasttjtime > tjtimesecond:try:with open(setfilename, 'r') as file:content = file.read()tableset0 = json.loads(content)ifchange =Falsefor x in tableset0:if x not in tableset:tableset[x] = tableset0[x]ifchange =Trueif ifchange == True:print str(datetime.now()) + ":-----change tableset"print str(datetime.now())+":"+json.dumps(tableset)except Exception as e:print elasttjtime = time.time()print str(datetime.now())+":"+json.dumps(tj)def getkeyd(dataone,keylist):keys = {}for k in keylist:if k in dataone:keys[k] =dataone[k]else:return Nonereturn keysdef changeRowData( dkey,rowdata ):try:if 'difffields' not in tableset[dkey]:return rowdatadifffields = tableset[dkey]['difffields']if len(difffields) == 0:return rowdatafor k in difffields:if k in rowdata:data = rowdata[k]del rowdata[k]rowdata[difffields[k]] = datareturn rowdata;except Exception as e:sendweixinerror(dkey+"同步暂停")print e;print rowdata;return rowdata;
# 使用 `consumer.poll()` 方法获取消息def kafkaProduce():print "start !"while True:time.sleep(0.001)messages = consumer.poll(timeout_ms=500)# 处理消息for topic_partition, message_list in messages.items():for message in message_list:jsonob = json.loads(message.value)datakey = jsonob['database'] + '.' + jsonob['table']if datakey not in tableset:continuefromtime = tableset[datakey]['fromtime']if(jsonob['ts'] < fromtime):continuemaxnum = tableset[datakey]['maxnum']tabletype0 = tableset[datakey]['tabletype']keyf = tableset[datakey]['pknames']ifstop = tableset[datakey]['ifstop'] ## 如果已经暂停则暂停if ifstop == True:continueif (datakey not in datas):datas[datakey] = {"lasttime": time.time(), "rows": {}}if (jsonob['type'] == 'TRUNCATE'):db = tableset[datakey]['db']table = tableset[datakey]['table']doris_put_truncate(db,table)if (jsonob['type'] == 'DELETE'):for dataone in jsonob['data']:keys = getkeyd(dataone, keyf)keyd = ",".join(keys.values())datas[datakey]['rows'][keyd] = {"type": "delete","ts":jsonob['ts'],"data": keys,"partition":topic_partition.partition,"offset":message.offset}if (jsonob['type'] == 'INSERT'):for dataone in jsonob['data']:keys = getkeyd(dataone,keyf)keyd = ",".join(keys.values())datas[datakey]['rows'][keyd] = {"type": "insert","ts":jsonob['ts'], "data": dataone,"partition":topic_partition.partition,"offset":message.offset}if (jsonob['type'] == 'UPDATE'):for key, dataone in enumerate(jsonob['data']):keys = getkeyd(dataone, keyf)keyd = ",".join(keys.values())olddata = jsonob['old'][key]okeyd = Noneokeys = getkeyd(olddata, keyf)if okeys != None:okeyd = ",".join(okeys.values())if tabletype0 == 'unique' and ( okeys == None or okeyd == keyd):datas[datakey]['rows'][keyd] = {"type": "update","ts":jsonob['ts'], "data": dataone, "partition":topic_partition.partition,"offset":message.offset}else:datas[datakey]['rows'][okeyd] = {"type": "delete","ts":jsonob['ts'], "data": okeys,"partition":topic_partition.partition,"offset":message.offset}datas[datakey]['rows'][keyd] = {"type": "insert","ts":jsonob['ts'], "data": dataone,"partition":topic_partition.partition,"offset":message.offset}if len(datas[datakey]['rows'])>maxnum:post2dorislist(datakey,datas[datakey]['rows'])for dkey in datas:lastsend = tableset[dkey]['lastsend']maxsecond = tableset[dkey]['maxsecond']if(time.time() - lastsend> maxsecond ):post2dorislist(dkey,datas[dkey]['rows'])def consumerThread(queue):while True:time.sleep(0.001)item = queue.get()if(item['type'] == "delete"):doris_put_delete(item['setkey'],item['db'], item['table'], item['data'])if (item['type'] == "append"):doris_put_append(item['setkey'],item['db'], item['table'], item['data'])queue.task_done()consumer_thread = threading.Thread(target=consumerThread, args=(queue,))
producer_thread = threading.Thread(target=kafkaProduce, args=())
log_thread = threading.Thread(target=printLog, args=()) ## 日志producer_thread.setDaemon(True)
consumer_thread.setDaemon(True)
log_thread.setDaemon(True)producer_thread.start()consumer_thread.start()log_thread.start()producer_thread.join()
sendweixinerror("produce同步暂停")
consumer_thread.join()
sendweixinerror("consumer同步暂停")
log_thread.join()
print "stop at " + str(datetime.now())
sendweixinerror("同步中断")
{"test_canal.testpk": {"difffields": {},"lastsend": 0,"ifstop": false,"maxnum": 10000,"maxsecond": 5,"table": "testpk","db": "ods","tabletype": "unique","pknames": ["testid"]}
}
这篇关于canal_json_to_doris的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!