Canal源码分析---模拟Slave同步binlog

2024-04-19 12:48

本文主要是介绍Canal源码分析---模拟Slave同步binlog,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

通过分析Canal,完成模拟Slave同Master建立连接,然后同步Binlog的过程。
通过本文可以理解 Mysql的 Slave如何同Master进行同步的,可以自行开发MockSlave,同时让我们可以更好的使用canal,并且在canal出现问题的时候更好的定位问题。
本文的代码是在canal项目中提取出来的,主要目的就是理解Slave同Master的同步过程。
能力有限,如果出入欢迎指正。

注意事项

Slave同Master通信发送的数据都是Little-Endian。这个是需要特出注意的,无论是解析还是发送。
Java都是Big-Endian。
因此最好实现一个工具类,对数据进行读取。使操作更加简单,canal就是实现了一个ByteHelper来完成这个功能的。

准备工作

需要配置一下master数据库
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

总体流程图

总体流程图

流程描述

1:连接Mysql

提供了IP和Port,用Socket直接连接Mysql。

2:解析握手信息

当连接上后,Master会立刻回消息给Slave,如果失败,会返回错误信息,如果成功则返回必要的握手信息。握手信息里面的数据在后面通信过程中会使用,需要进行保存。

3:发送认证消息

收到握手消息表示连接成功了,现在要同Master建立一个安全的连接,需要将用户名和密码发送给Master。格式如下:

bytes描述
4client_flags
4max_packet_size
1charset_number
23(filler) always 0x00…
n(Null-Terminated String) user
n(Length Coded Binary) scramble_buff (1 + x bytes),scramble411加密的密码
n(Null-Terminated String) databasename (optional)

4:解析认证消息

解析返回消息,如果有错误进行解析,获取错误提示。无错误则忽略。

5:发送323认证请求

利用握手消息的seed和密码用scramble323函数对密码进行加密,将密码发送给master。发送的内容是加密后的byte数组。

6:解析323返回消息

这个解析主要也是对错误进行处理。

7:发送数据库连接参数

主要是将如下的参数发送给Master,进行设置。

set wait_timeout=9999999
set net_write_timeout=1800
set net_read_timeout=1800
set names 'binary'

8:解析参数设置结果

设置成功后会返回一个OK的消息,如果失败会有错误提示。消息格式如下:
VERSION 4.1

bytes描述
1(Length Coded Binary) field_count, always = 0
1-9(Length Coded Binary) affected_rows,根据类型变化
1-9(Length Coded Binary) insert_id,根据类型变化
2server_status
2warning_count
n(until end of packet) message

9:发送获取checkSum的消息

发送请求给Master,发送的消息内容”select @master_binlog_checksum”。

10:解析消息,记录CheckSum

由于这个查询有结果集返回,因此需要对结果集进行解析。

a:验证消息是否有错误。
b:解析正确的消息

读取列信息,格式如下:

bytes描述
1-9columnCount
1-9extra

c:循环读取列信息

bytes描述
n(Length Coded String) catalog
n(Length Coded String) db
n(Length Coded String) table
n(Length Coded String) org_table
n(Length Coded String) name
n(Length Coded String) org_name
1(filler)
2charsetnr
4length
1type
2flags
1decimals
2(filler), always 0x00
n(Length Coded Binary) default

d:读取EOF

正常读取一个消息,不处理

e:循环读取行数据

都是字符串,按照规定的格式进行读取。(格式省略)

f:最后设置binlogChecksum的值

数值1表示CRC32的压缩 0表示不需要进行处理

11:发送获取binlog位置消息

发送查询请求给Master,查询内容为”show master status”。

12:解析消息,获得位置

由于要读取数据,因此解析的过程类似于step10。
最后获得binlog文件名称和位置两个信息。这里我是为了测试,实际上这个数据是需要存储在Slave本地的,否则Slave和Master的数据不同步了。

13:发送同步binlog请求

发送binlog请求给Master,消息格式如下:

bytes描述
1command
narg
4binlog position to start at (little endian)
2binlog flags (currently not used; always 0)
4server_id of the slave (little endian)
nbinlog file name (optional)

14:启动读取线程,循环读取

启动读取线程,接受master发送过来的同步log。
log的种类很多,canal中都有详细说明,这里不再进行消息说明,可以参考canal的代码进行理解。

代码

代码有些乱,主要看main方法就好了。

public class SlaveMain
{// 连接所有的scramble_buffprivate static byte[] joinAndCreateScrambleBuff(HandshakeInitializationPacket handshakePacket) throws IOException{byte[] dest = new byte[handshakePacket.seed.length + handshakePacket.restOfScrambleBuff.length];System.arraycopy(handshakePacket.seed, 0, dest, 0, handshakePacket.seed.length);System.arraycopy(handshakePacket.restOfScrambleBuff, 0, dest, handshakePacket.seed.length,handshakePacket.restOfScrambleBuff.length);return dest;}private static void auth323(SocketChannel channel, byte packetSequenceNumber, byte[] seed, String password)throws IOException{// auth 323Reply323Packet r323 = new Reply323Packet();// 1.对密码进行加密if (password != null && password.length() > 0){r323.seed = MySQLPasswordEncrypter.scramble323(password, new String(seed)).getBytes();}byte[] b323Body = r323.toBytes();HeaderPacket h323 = new HeaderPacket();h323.setPacketBodyLength(b323Body.length);h323.setPacketSequenceNumber((byte) (packetSequenceNumber + 1));PacketManager.write(channel, new ByteBuffer[] { ByteBuffer.wrap(h323.toBytes()), ByteBuffer.wrap(b323Body) });System.out.println("client 323 authentication packet is sent out.");// check auth resultHeaderPacket header = PacketManager.readHeader(channel, 4);byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength());assert body != null;switch (body[0]){case 0:break;case -1:ErrorPacket err = new ErrorPacket();err.fromBytes(body);throw new IOException("Error When doing Client Authentication:" + err.toString());default:throw new IOException("unpexpected packet with field_count=" + body[0]);}}// 更新操作public static OKPacket update(SocketChannel channel, String updateString) throws IOException{QueryCommandPacket cmd = new QueryCommandPacket();cmd.setQueryString(updateString);byte[] bodyBytes = cmd.toBytes();PacketManager.write(channel, bodyBytes);System.out.println("read update result...");byte[] body = PacketManager.readBytes(channel, PacketManager.readHeader(channel, 4).getPacketBodyLength());if (body[0] < 0){ErrorPacket packet = new ErrorPacket();packet.fromBytes(body);throw new IOException(packet + "\n with command: " + updateString);}OKPacket packet = new OKPacket();packet.fromBytes(body);return packet;}// 查询操作public static ResultSetPacket query(SocketChannel channel, String queryString) throws IOException{QueryCommandPacket cmd = new QueryCommandPacket();cmd.setQueryString(queryString);byte[] bodyBytes = cmd.toBytes();PacketManager.write(channel, bodyBytes);byte[] body = readNextPacket(channel);if (body[0] < 0){ErrorPacket packet = new ErrorPacket();packet.fromBytes(body);throw new IOException(packet + "\n with command: " + queryString);}ResultSetHeaderPacket rsHeader = new ResultSetHeaderPacket();rsHeader.fromBytes(body);List<FieldPacket> fields = new ArrayList<FieldPacket>();for (int i = 0; i < rsHeader.getColumnCount(); i++){FieldPacket fp = new FieldPacket();fp.fromBytes(readNextPacket(channel));fields.add(fp);}readEofPacket(channel);List<RowDataPacket> rowData = new ArrayList<RowDataPacket>();while (true){body = readNextPacket(channel);if (body[0] == -2){break;}RowDataPacket rowDataPacket = new RowDataPacket();rowDataPacket.fromBytes(body);rowData.add(rowDataPacket);}ResultSetPacket resultSet = new ResultSetPacket();resultSet.getFieldDescriptors().addAll(fields);for (RowDataPacket r : rowData){resultSet.getFieldValues().addAll(r.getColumns());}resultSet.setSourceAddress(channel.socket().getRemoteSocketAddress());return resultSet;}private static void readEofPacket(SocketChannel channel) throws IOException{byte[] eofBody = readNextPacket(channel);if (eofBody[0] != -2){throw new IOException("EOF Packet is expected, but packet with field_count=" + eofBody[0] + " is found.");}}protected static byte[] readNextPacket(SocketChannel channel) throws IOException{HeaderPacket h = PacketManager.readHeader(channel, 4);return PacketManager.readBytes(channel, h.getPacketBodyLength());}public static void main(String[] args) throws Exception{// MysqlConnector.connect()// 1.连接到MysqlServer,同时设置一些参数int soTimeout = 30 * 1000;int receiveBufferSize = 16 * 1024;int sendBufferSize = 16 * 1024;String ip = "127.0.0.1";int port = 3306;String username = "root";String password = "root";SocketChannel channel = SocketChannel.open();InetSocketAddress masterAddress = new InetSocketAddress(ip, port);channel.socket().setKeepAlive(true);channel.socket().setReuseAddress(true);channel.socket().setSoTimeout(soTimeout);channel.socket().setTcpNoDelay(true);channel.socket().setReceiveBufferSize(receiveBufferSize);channel.socket().setSendBufferSize(sendBufferSize);channel.connect(masterAddress);// MysqlConnector.negotiate()// 2.读取消息头信息// 消息头,头3个byte是消息长度,1个byte是序列号HeaderPacket header = PacketManager.readHeader(channel, 4);System.out.println("header length=" + header.getPacketBodyLength());// 3.读取指定长度的报文byte[] body = PacketManager.readBytes(channel, header.getPacketBodyLength());// 4.错误检查if (body[0] < 0){// check field_countif (body[0] == -1){ErrorPacket error = new ErrorPacket();error.fromBytes(body);throw new IOException("handshake exception:\n" + error.toString());}else if (body[0] == -2){throw new IOException("Unexpected EOF packet at handshake phase.");}else{throw new IOException("unpexpected packet with field_count=" + body[0]);}}// 5.握手消息初始化HandshakeInitializationPacket handshakePacket = new HandshakeInitializationPacket();handshakePacket.fromBytes(body);// 解析握手消息long connectionId = -1;byte charsetNumber = 33;String defaultSchema = "testdb";connectionId = handshakePacket.threadId; // 记录一下connectionSystem.out.println("handshake initialization packet received, prepare the client authentication packet to send");// 6.组织认证消息ClientAuthenticationPacket clientAuth = new ClientAuthenticationPacket();clientAuth.setCharsetNumber(charsetNumber);clientAuth.setUsername(username);// 用户名clientAuth.setPassword(password);// 密码clientAuth.setServerCapabilities(handshakePacket.serverCapabilities);// 容错情况,这个消息是握手消息中拿到的clientAuth.setDatabaseName(defaultSchema);// 默认数据库clientAuth.setScrumbleBuff(joinAndCreateScrambleBuff(handshakePacket));// 将两个不同位置的scramble_buff合并到一个数组中byte[] clientAuthPkgBody = clientAuth.toBytes();HeaderPacket h = new HeaderPacket();h.setPacketBodyLength(clientAuthPkgBody.length);// 设置报文内容长度h.setPacketSequenceNumber((byte) (header.getPacketSequenceNumber() + 1));// 序列+1// 7.发送认证信息PacketManager.write(channel,new ByteBuffer[] { ByteBuffer.wrap(h.toBytes()), ByteBuffer.wrap(clientAuthPkgBody) });System.out.println("client authentication packet is sent out.");// Mysql会立刻返回信息,解析Mysql的第二个信息// check auth result// 8.读取消息头header = null;header = PacketManager.readHeader(channel, 4);body = null;// 9.读取消息体body = PacketManager.readBytes(channel, header.getPacketBodyLength());assert body != null;if (body[0] < 0){if (body[0] == -1){ErrorPacket err = new ErrorPacket();err.fromBytes(body);throw new IOException("Error When doing Client Authentication:" + err.toString());}else if (body[0] == -2){auth323(channel, header.getPacketSequenceNumber(), handshakePacket.seed, password);}else{throw new IOException("unpexpected packet with field_count=" + body[0]);}}// Connection 已经建立完成了,接下来就可以去请求当前binlog文件名称,位置等信息。// 更新数据库信息try{update(channel, "set wait_timeout=9999999");}catch (Exception e){e.printStackTrace();}try{update(channel, "set net_write_timeout=1800");}catch (Exception e){e.printStackTrace();}try{update(channel, "set net_read_timeout=1800");}catch (Exception e){}try{// 设置服务端返回结果时不做编码转化,直接按照数据库的二进制编码进行发送,由客户端自己根据需求进行编码转化update(channel, "set names 'binary'");}catch (Exception e){e.printStackTrace();}try{// mysql5.6针对checksum支持需要设置session变量// 如果不设置会出现错误: Slave can not handle replication events with the// checksum that master is configured to log// 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码// '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出update(channel, "set @master_binlog_checksum= @@global.binlog_checksum");}catch (Exception e){e.printStackTrace();}// 查询binlog_checksumResultSetPacket rs = null;try{rs = query(channel, "select @master_binlog_checksum");}catch (IOException e){e.printStackTrace();}int binlogChecksum;List<String> columnValues = rs.getFieldValues();if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0).toUpperCase().equals("CRC32")){binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;}else{binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;}// 10.发送请求查询位置ResultSetPacket packet = query(channel, "show master status");List<String> fields = packet.getFieldValues();if (fields == null || fields.size() == 0){System.out.println("无法找到当前的位置");return;}EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));System.out.println(String.format("fileName= %s   pos=%d", endPosition.getJournalName(), endPosition.getPosition()));// 实际上这个文件名称和位置是需要存储到本地的,然后去进行同步// 10.发送申请binlog的命令BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();// 这个数据是需要查出来的binlogDumpCmd.binlogFileName = endPosition.getJournalName();binlogDumpCmd.binlogPosition = endPosition.getPosition();binlogDumpCmd.slaveServerId = 3;byte[] cmdBody = binlogDumpCmd.toBytes();HeaderPacket binlogDumpHeader = new HeaderPacket();binlogDumpHeader.setPacketBodyLength(cmdBody.length);binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);PacketManager.write(channel,new ByteBuffer[] { ByteBuffer.wrap(binlogDumpHeader.toBytes()), ByteBuffer.wrap(cmdBody) });DirectLogFetcher fetcher = new DirectLogFetcher();// 11.启动读取channel的线程fetcher.start(channel);LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);LogContext context = new LogContext();context.setLogPosition(new LogPosition(binlogDumpCmd.binlogFileName));context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));// 12.循环读取消息while (fetcher.fetch()){LogEvent event = null;event = decoder.decode(fetcher, context);if (event == null){throw new RuntimeException("parse failed");}int eventType = event.getHeader().getType();System.out.println("eventType=" + eventType);switch (eventType){case LogEvent.ROTATE_EVENT:// binlogFileName = ((RotateLogEvent)// event).getFilename();break;case LogEvent.WRITE_ROWS_EVENT_V1:case LogEvent.WRITE_ROWS_EVENT:parseRowsEvent(endPosition.getJournalName(), (WriteRowsLogEvent) event);break;case LogEvent.UPDATE_ROWS_EVENT_V1:case LogEvent.UPDATE_ROWS_EVENT:// parseRowsEvent((UpdateRowsLogEvent) event);break;case LogEvent.DELETE_ROWS_EVENT_V1:case LogEvent.DELETE_ROWS_EVENT:// parseRowsEvent((DeleteRowsLogEvent) event);break;case LogEvent.QUERY_EVENT:// parseQueryEvent((QueryLogEvent) event);break;case LogEvent.ROWS_QUERY_LOG_EVENT:// parseRowsQueryEvent((RowsQueryLogEvent) event);break;case LogEvent.ANNOTATE_ROWS_EVENT:break;case LogEvent.XID_EVENT:break;default:break;}}}protected static Charset charset = Charset.forName("utf-8");protected static void parseRowsEvent(String binlogFileName, RowsLogEvent event){try{System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s]", binlogFileName,event.getHeader().getLogPos() - event.getHeader().getEventLen(), event.getTable().getDbName(),event.getTable().getTableName()));RowsLogBuffer buffer = event.getRowsBuf(charset.name());BitSet columns = event.getColumns();BitSet changeColumns = event.getChangeColumns();while (buffer.nextOneRow(columns)){// 处理row记录int type = event.getHeader().getType();if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type){// insert的记录放在before字段中parseOneRow(event, buffer, columns, true);}else if (LogEvent.DELETE_ROWS_EVENT_V1 == type || LogEvent.DELETE_ROWS_EVENT == type){// delete的记录放在before字段中parseOneRow(event, buffer, columns, false);}else{// update需要处理before/afterSystem.out.println("-------> before");parseOneRow(event, buffer, columns, false);if (!buffer.nextOneRow(changeColumns)){break;}System.out.println("-------> after");parseOneRow(event, buffer, changeColumns, true);}}}catch (Exception e){throw new RuntimeException("parse row data failed.", e);}}protected static void parseOneRow(RowsLogEvent event, RowsLogBuffer buffer, BitSet cols, boolean isAfter)throws UnsupportedEncodingException{TableMapLogEvent map = event.getTable();if (map == null){throw new RuntimeException("not found TableMap with tid=" + event.getTableId());}final int columnCnt = map.getColumnCnt();final ColumnInfo[] columnInfo = map.getColumnInfo();for (int i = 0; i < columnCnt; i++){if (!cols.get(i)){continue;}ColumnInfo info = columnInfo[i];buffer.nextValue(info.type, info.meta);if (buffer.isNull()){//}else{final Serializable value = buffer.getValue();if (value instanceof byte[]){System.out.println(new String((byte[]) value));}else{System.out.println(value);}}}}
}

Mysql抛出错误

Could not find first log file name in binary log index file

发生了这个问题,网上有很多解决方法。
发生这个问题的原因描述为:
请求binlog文件名或者位置出现了错误。
master上保存的binlog文件和位置经常会出错。(这个还没有花时间去找,有知道的留个言)
你用show master status获取的信息可能有错误。
这个时候需要讲binlog通过mysqlbinlog命令将binlog文件转换成txt,然后查看文件找到最后一个#at 123 这个123就是正确的pos,请求的时候需要设置这个123来请求
mysqlbinglog binlog文件>info.txt
还有一个比较笨但是很有效的办法,就是把master停了,然后重启就可以了。

部分参考

http://blog.csdn.net/hackerwin7/article/details/37923607
https://dev.mysql.com/doc/refman/5.5/en/binary-log.html
https://github.com/alibaba/canal

这篇关于Canal源码分析---模拟Slave同步binlog的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/917585

相关文章

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

usaco 1.2 Transformations(模拟)

我的做法就是一个一个情况枚举出来 注意计算公式: ( 变换后的矩阵记为C) 顺时针旋转90°:C[i] [j]=A[n-j-1] [i] (旋转180°和270° 可以多转几个九十度来推) 对称:C[i] [n-j-1]=A[i] [j] 代码有点长 。。。 /*ID: who jayLANG: C++TASK: transform*/#include<

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57