采用OpenReplicator解析MySQL binlog

2024-09-02 11:08

本文主要是介绍采用OpenReplicator解析MySQL binlog,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。
Open Replicator项目地址:https://github.com/whitesock/open-replicator

binlog事件分析结构图

这里写图片描述

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{"eventId": 1,"databaseName": "canal_test","tableName": "`company`","eventType": 2,"timestamp": 1477033198000,"timestampReceipt": 1477033248780,"binlogName": "mysql-bin.000006","position": 353,"nextPostion": 468,"serverId": 2,"before": null,"after": null,"isDdl": true,"sql": "DROP TABLE `company` /* generated by server */"
}

DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

{"eventId": 0,"databaseName": "canal_test","tableName": "person","eventType": 24,"timestamp": 1477030734000,"timestampReceipt": 1477032161988,"binlogName": "mysql-bin.000006","position": 242,"nextPostion": 326,"serverId": 2,"before": {"id": "3","sex": "f","address": "shanghai","age": "23","name": "zzh3"},"after": {"id": "3","sex": "m","address": "shanghai","age": "23","name": "zzh3"},"isDdl": false,"sql": null
}

相关的类文件如下:
CDCEvent.java

package or;import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;public class CDCEvent {private long eventId = 0;//事件唯一标识private String databaseName = null;private String tableName = null;private int eventType = 0;//事件类型private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]private String binlogName = null;// binlog file nameprivate long position = 0;private long nextPostion = 0;private long serverId = 0;private Map<String,String> before = null;private Map<String,String> after = null;private Boolean isDdl= null;private String sql = null;private static AtomicLong uuid = new AtomicLong(0);public CDCEvent(){}public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){this.init(are);this.databaseName = databaseName;this.tableName = tableName;}private void init(final BinlogEventV4 be){this.eventId = uuid.getAndAdd(1);BinlogEventV4Header header = be.getHeader();this.timestamp = header.getTimestamp();this.eventType = header.getEventType();this.serverId = header.getServerId();this.timestampReceipt = header.getTimestampOfReceipt();this.position = header.getPosition();this.nextPostion = header.getNextPosition();this.binlogName = header.getBinlogFileName();}@Overridepublic String toString(){StringBuilder builder = new StringBuilder();builder.append("{ eventId:").append(eventId);builder.append(",databaseName:").append(databaseName);builder.append(",tableName:").append(tableName);builder.append(",eventType:").append(eventType);builder.append(",timestamp:").append(timestamp);builder.append(",timestampReceipt:").append(timestampReceipt);builder.append(",binlogName:").append(binlogName);builder.append(",position:").append(position);builder.append(",nextPostion:").append(nextPostion);builder.append(",serverId:").append(serverId);builder.append(",isDdl:").append(isDdl);builder.append(",sql:").append(sql);builder.append(",before:").append(before);builder.append(",after:").append(after).append("}");return builder.toString();}
// 省略Getter和Setter方法	
}

open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:
InstanceListener.java

package or;import java.util.HashMap;
import java.util.List;
import java.util.Map;import or.keeper.TableInfoKeeper;
import or.manager.CDCEventManager;
import or.model.ColumnInfo;
import or.model.TableInfo;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.google.code.or.binlog.BinlogEventListener;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;
import com.google.code.or.binlog.impl.event.QueryEvent;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
import com.google.code.or.binlog.impl.event.WriteRowsEvent;
import com.google.code.or.binlog.impl.event.XidEvent;
import com.google.code.or.common.glossary.Column;
import com.google.code.or.common.glossary.Pair;
import com.google.code.or.common.glossary.Row;
import com.google.code.or.common.util.MySQLConstants;public class InstanceListener implements BinlogEventListener{private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);@Overridepublic void onEvents(BinlogEventV4 be) {if(be == null){logger.error("binlog event is null");return;}int eventType = be.getHeader().getEventType();switch(eventType){case MySQLConstants.FORMAT_DESCRIPTION_EVENT:{logger.trace("FORMAT_DESCRIPTION_EVENT");break;}case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId{TableMapEvent tme = (TableMapEvent)be;TableInfoKeeper.saveTableIdMap(tme);logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());break;}case MySQLConstants.DELETE_ROWS_EVENT:{DeleteRowsEvent dre = (DeleteRowsEvent) be;long tableId = dre.getTableId();logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);String databaseName = tableInfo.getDatabaseName();String tableName = tableInfo.getTableName();List<Row> rows = dre.getRows();for(Row row:rows){List<Column> before = row.getColumns();Map<String,String> beforeMap = getMap(before,databaseName,tableName);if(beforeMap !=null && beforeMap.size()>0){CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);cdcEvent.setIsDdl(false);cdcEvent.setSql(null);cdcEvent.setBefore(beforeMap);CDCEventManager.queue.addLast(cdcEvent);logger.info("cdcEvent:{}",cdcEvent);}}break;}case MySQLConstants.UPDATE_ROWS_EVENT:{UpdateRowsEvent upe = (UpdateRowsEvent)be;long tableId = upe.getTableId();logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);String databaseName = tableInfo.getDatabaseName();String tableName = tableInfo.getTableName();List<Pair<Row>> rows = upe.getRows();for(Pair<Row> p:rows){List<Column> colsBefore = p.getBefore().getColumns();List<Column> colsAfter = p.getAfter().getColumns();Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);cdcEvent.setIsDdl(false);cdcEvent.setSql(null);cdcEvent.setBefore(beforeMap);cdcEvent.setAfter(afterMap);CDCEventManager.queue.addLast(cdcEvent);logger.info("cdcEvent:{}",cdcEvent);}}break;}case MySQLConstants.WRITE_ROWS_EVENT:{WriteRowsEvent wre = (WriteRowsEvent)be;long tableId = wre.getTableId();logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);String databaseName = tableInfo.getDatabaseName();String tableName = tableInfo.getTableName();List<Row> rows = wre.getRows();for(Row row: rows){List<Column> after = row.getColumns();Map<String,String> afterMap = getMap(after,databaseName,tableName);if(afterMap!=null && afterMap.size()>0){CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);cdcEvent.setIsDdl(false);cdcEvent.setSql(null);cdcEvent.setAfter(afterMap);CDCEventManager.queue.addLast(cdcEvent);logger.info("cdcEvent:{}",cdcEvent);}}break;}case MySQLConstants.QUERY_EVENT:{QueryEvent qe = (QueryEvent)be;TableInfo tableInfo = createTableInfo(qe);if(tableInfo == null)break;String databaseName = tableInfo.getDatabaseName();String tableName = tableInfo.getTableName();logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);cdcEvent.setIsDdl(true);cdcEvent.setSql(qe.getSql().toString());CDCEventManager.queue.addLast(cdcEvent);logger.info("cdcEvent:{}",cdcEvent);break;}case MySQLConstants.XID_EVENT:{XidEvent xe = (XidEvent)be;logger.trace("XID_EVENT: xid:{}",xe.getXid());break;}default:{logger.trace("DEFAULT:{}",eventType);break;}}}/*** ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,* 然后跟取回的List<Column>进行映射。* * @param cols* @param databaseName* @param tableName* @return*/private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){Map<String,String> map = new HashMap<>();if(cols == null || cols.size()==0){return null;}String fullName = databaseName+"."+tableName;List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);if(columnInfoList == null)return null;if(columnInfoList.size() != cols.size()){TableInfoKeeper.refreshColumnsMap();if(columnInfoList.size() != cols.size()){logger.warn("columnInfoList.size is not equal to cols.");return null;}}for(int i=0;i<columnInfoList.size(); i++){if(cols.get(i).getValue()==null)map.put(columnInfoList.get(i).getName(),"");elsemap.put(columnInfoList.get(i).getName(), cols.get(i).toString());}return map;}/*** 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,* 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中* * @param qe* @return*/	private TableInfo createTableInfo(QueryEvent qe){String sql = qe.getSql().toString().toLowerCase();TableInfo ti = new TableInfo();String databaseName = qe.getDatabaseName().toString();String tableName = null;if(checkFlag(sql,"table")){tableName = getTableName(sql,"table");} else if(checkFlag(sql,"truncate")){tableName = getTableName(sql,"truncate");} else{logger.warn("can not find table name from sql:{}",sql);return null;}ti.setDatabaseName(databaseName);ti.setTableName(tableName);ti.setFullName(databaseName+"."+tableName);return ti;}private boolean checkFlag(String sql, String flag){String[] ss = sql.split(" ");for(String s:ss){if(s.equals(flag)){return true;}}return false;}private String getTableName(String sql, String flag){String[] ss = sql.split("\\.");String tName = null;if (ss.length > 1) {String[] strs = ss[1].split(" ");tName = strs[0];} else {String[] strs = sql.split(" ");boolean start = false;for (String s : strs) {if (s.indexOf(flag) >= 0) {start = true;continue;}if (start && !s.isEmpty()) {tName = s;break;}}}tName.replaceAll("`", "").replaceAll(";", "");//del "("[create table person(....]int index = tName.indexOf('(');if(index>0){tName = tName.substring(0, index);}return tName;}
}

上面所涉及到的TableInfo .java如下:

package or.model;public class TableInfo {private String databaseName;private String tableName;private String fullName;// 省略Getter和Setter@Overridepublic boolean equals(Object o){if(this == o)return true;if(o == null || this.getClass()!=o.getClass())return false;TableInfo tableInfo = (TableInfo)o;if(!this.databaseName.equals(tableInfo.getDatabaseName()))return false;if(!this.tableName.equals(tableInfo.getTableName()))return false;if(!this.fullName.equals(tableInfo.getFullName()))return false;return true;}@Overridepublic int hashCode(){int result = this.tableName.hashCode();result = 31*result+this.databaseName.hashCode();result = 31*result+this.fullName.hashCode();return result;}
}

接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java

package or.keeper;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;import or.MysqlConnection;
import or.model.ColumnInfo;
import or.model.TableInfo;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.google.code.or.binlog.impl.event.TableMapEvent;public class TableInfoKeeper {private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);private static Map<Long,TableInfo> tabledIdMap = new ConcurrentHashMap<>();private static Map<String,List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();static{columnsMap = MysqlConnection.getColumns();}public static void saveTableIdMap(TableMapEvent tme){long tableId = tme.getTableId();tabledIdMap.remove(tableId);TableInfo table = new TableInfo();table.setDatabaseName(tme.getDatabaseName().toString());table.setTableName(tme.getTableName().toString());table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());tabledIdMap.put(tableId, table);}public static synchronized void refreshColumnsMap(){Map<String,List<ColumnInfo>> map = MysqlConnection.getColumns();if(map.size()>0){
//			logger.warn("refresh and clear cols.");columnsMap = map;
//			logger.warn("refresh and switch cols:{}",map);}else{logger.error("refresh columnsMap error.");}}public static TableInfo getTableInfo(long tableId){return tabledIdMap.get(tableId);}public static List<ColumnInfo> getColumns(String fullName){return columnsMap.get(fullName);}
}

正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:

package or;import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import or.model.BinlogInfo;
import or.model.BinlogMasterStatus;
import or.model.ColumnInfo;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MysqlConnection {private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);private static Connection conn;private static String host;private static int port;private static String user;private static String password;public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){try {if(conn == null || conn.isClosed()){Class.forName("com.mysql.jdbc.Driver");host = hostArg;port = portArg;user = userArg;password = passwordArg;conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);logger.info("connected to mysql:{} : {}",user,password);}} catch (ClassNotFoundException e) {logger.error(e.getMessage(),e);} catch (SQLException e) {logger.error(e.getMessage(),e);}}public static Connection getConnection(){try {if(conn == null || conn.isClosed()){setConnection(host,port,user,password);}} catch (SQLException e) {logger.error(e.getMessage(),e);}return conn;}/*** 获取Column信息* * @return*/public static Map<String,List<ColumnInfo>> getColumns(){Map<String,List<ColumnInfo>> cols = new HashMap<>();Connection conn = getConnection();try {DatabaseMetaData metaData = conn.getMetaData();ResultSet r = metaData.getCatalogs();String tableType[] = {"TABLE"};while(r.next()){String databaseName = r.getString("TABLE_CAT");ResultSet result = metaData.getTables(databaseName, null, null, tableType);while(result.next()){String tableName = result.getString("TABLE_NAME");
//					System.out.println(result.getInt("TABLE_ID"));String key = databaseName +"."+tableName;ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);cols.put(key, new ArrayList<ColumnInfo>());while(colSet.next()){ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));cols.get(key).add(columnInfo);}}}} catch (SQLException e) { logger.error(e.getMessage(),e);}	return cols;}/*** 参考* mysql> show binary logs*  +------------------+-----------+*	| Log_name         | File_size |*	+------------------+-----------+*	| mysql-bin.000001 |       126 |*	| mysql-bin.000002 |       126 |*	| mysql-bin.000003 |      6819 |*	| mysql-bin.000004 |      1868 |*	+------------------+-----------+*/public static List<BinlogInfo> getBinlogInfo(){List<BinlogInfo> binlogList = new ArrayList<>();Connection conn = null;Statement statement = null;ResultSet resultSet = null;try {conn = getConnection();statement = conn.createStatement();resultSet = statement.executeQuery("show binary logs");while(resultSet.next()){BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));binlogList.add(binlogInfo);}} catch (Exception e) {logger.error(e.getMessage(),e);} finally{try {if(resultSet != null)resultSet.close();if(statement != null)statement.close();if(conn != null)conn.close();} catch (SQLException e) {logger.error(e.getMessage(),e);}}return binlogList;}/*** 参考:	* mysql> show master status;* 	+------------------+----------+--------------+------------------+* 	| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |* 	+------------------+----------+--------------+------------------+* 	| mysql-bin.000004 |     1868 |              |                  |* 	+------------------+----------+--------------+------------------+* @return*/public static BinlogMasterStatus getBinlogMasterStatus(){BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();Connection conn = null;Statement statement = null;ResultSet resultSet = null;try {conn = getConnection();statement = conn.createStatement();resultSet = statement.executeQuery("show master status");while(resultSet.next()){binlogMasterStatus.setBinlogName(resultSet.getString("File"));binlogMasterStatus.setPosition(resultSet.getLong("Position"));}} catch (Exception e) {logger.error(e.getMessage(),e);} finally{try {if(resultSet != null)resultSet.close();if(statement != null)statement.close();if(conn != null)conn.close();} catch (SQLException e) {logger.error(e.getMessage(),e);}}return binlogMasterStatus;}/*** 获取open-replicator所连接的mysql服务器的serverid信息* @return*/public static int getServerId(){int serverId=6789;Connection conn = null;Statement statement = null;ResultSet resultSet = null;try {conn = getConnection();statement = conn.createStatement();resultSet = statement.executeQuery("show variables like 'server_id'");while(resultSet.next()){serverId = resultSet.getInt("Value");}} catch (Exception e) {logger.error(e.getMessage(),e);} finally{try {if(resultSet != null)resultSet.close();if(statement != null)statement.close();if(conn != null)conn.close();} catch (SQLException e) {logger.error(e.getMessage(),e);}}return serverId;}
}

上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)

package or.model;
public class BinlogInfo {private String binlogName;private Long fileSize;// 省略Getter和Setter
}package or.model;
public class BinlogMasterStatus {private String binlogName;private long position;// 省略Getter和Setter
}package or.model;
public class ColumnInfo {private String name;private String type;// 省略Getter和Setter
}

最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)

package or.manager;
import java.util.concurrent.ConcurrentLinkedDeque;
import or.CDCEvent;
public class CDCEventManager {public static final ConcurrentLinkedDeque<CDCEvent> queue = new ConcurrentLinkedDeque<>();
}

所有的准备工作都完成了,下面可以解析binlog日志了:

package or.test;import java.util.concurrent.TimeUnit;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import or.CDCEvent;
import or.InstanceListener;
import or.MysqlConnection;
import or.OpenReplicatorPlus;
import or.manager.CDCEventManager;
import or.model.BinlogMasterStatus;import com.google.code.or.OpenReplicator;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;public class OpenReplicatorTest {private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);private static final String host = "xx.xx.xx.60";private static final int port = 3306;private static final String user = "****";private static final String password = "****";public static void main(String[] args){OpenReplicator or = new OpenReplicator ();or.setUser(user);or.setPassword(password);or.setHost(host);or.setPort(port);MysqlConnection.setConnection(host, port, user, password);//		or.setServerId(MysqlConnection.getServerId());//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverIdBinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();or.setBinlogFileName(bms.getBinlogName());
//		or.setBinlogFileName("mysql-bin.000004");or.setBinlogPosition(4);or.setBinlogEventListener(new InstanceListener());try {or.start();} catch (Exception e) {logger.error(e.getMessage(),e);}Thread thread = new Thread(new PrintCDCEvent());thread.start();}public static class PrintCDCEvent implements Runnable{@Overridepublic void run() {while(true){if(CDCEventManager.queue.isEmpty() == false){CDCEvent ce = CDCEventManager.queue.pollFirst();Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();String prettyStr1 = gson.toJson(ce);System.out.println(prettyStr1);	}else{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}}		}
}

时间运行旧了会遇到这样一个问题:

16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog
java.io.EOFException: nullat com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]
16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from xx.xx.xx.60:3306

初步解决方案(extends OpenReplicator然后添加重试机制):

package or;import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.OpenReplicator;public class OpenReplicatorPlus extends OpenReplicator{private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);private volatile boolean autoRestart = true;@Overridepublic void stopQuietly(long timeout, TimeUnit unit){super.stopQuietly(timeout, unit);if(autoRestart){try {TimeUnit.SECONDS.sleep(10);logger.error("Restart OpenReplicator");} catch (InterruptedException e) {e.printStackTrace();}}}
}

最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。

大功告成~~


参考资料

  1. 谈谈对Canal(增量数据订阅与消费)的理解
  2. MySQL主备复制原理、实现及异常处理
  3. https://github.com/whitesock/open-replicator

欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


这篇关于采用OpenReplicator解析MySQL binlog的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1129780

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

[MySQL表的增删改查-进阶]

🌈个人主页:努力学编程’ ⛅个人推荐: c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构,刷题刻不容缓:点击一起刷题 🌙心灵鸡汤:总有人要赢,为什么不能是我呢 💻💻💻数据库约束 🔭🔭🔭约束类型 not null: 指示某列不能存储 NULL 值unique: 保证某列的每行必须有唯一的值default: 规定没有给列赋值时的默认值.primary key:

MySQL-CRUD入门1

文章目录 认识配置文件client节点mysql节点mysqld节点 数据的添加(Create)添加一行数据添加多行数据两种添加数据的效率对比 数据的查询(Retrieve)全列查询指定列查询查询中带有表达式关于字面量关于as重命名 临时表引入distinct去重order by 排序关于NULL 认识配置文件 在我们的MySQL服务安装好了之后, 会有一个配置文件, 也就