maxwell同步mysql到kafka(一个服务器启动多个)

2024-06-06 02:36

本文主要是介绍maxwell同步mysql到kafka(一个服务器启动多个),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

创建mysql同步用户

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

开启mysql binlog

a.修改 /etc/my.cnf 配置

log-bin=mysql-bin  # 开启binlog
binlog-format=ROW  # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin   
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。

mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name            | Value                      |
+--------------------------+----------------------------+
| character_set_client     | utf8                       |
| character_set_connection | utf8                       |
| character_set_database   | latin1                     |
| character_set_filesystem | binary                     |
| character_set_results    | utf8                       |
| character_set_server     | latin1                     |
| character_set_system     | utf8                       |
| character_sets_dir       | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+

安装maxwell

下载

从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

安装

解压即可 tar -zxvf maxwell.tar.gz

配置

vim config_1.properties

server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx

vim config_2.properties

server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx

启动

bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon

验证启动进程

ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l

测试数据库全量同步

maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties

其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。

maxwell -> kafka: 
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}

测试数据库增量同步

参数说明

输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理

创建mysql同步用户

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

开启mysql binlog

a.修改 /etc/my.cnf 配置

log-bin=mysql-bin  # 开启binlog
binlog-format=ROW  # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin   
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。

mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name            | Value                      |
+--------------------------+----------------------------+
| character_set_client     | utf8                       |
| character_set_connection | utf8                       |
| character_set_database   | latin1                     |
| character_set_filesystem | binary                     |
| character_set_results    | utf8                       |
| character_set_server     | latin1                     |
| character_set_system     | utf8                       |
| character_sets_dir       | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+

安装maxwell

下载

从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

安装

解压即可 tar -zxvf maxwell.tar.gz

配置

vim config_1.properties

server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx

vim config_2.properties

server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx

启动

bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon

验证启动进程

ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l

测试数据库全量同步

maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties

其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。

maxwell -> kafka: 
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}

测试数据库增量同步

参数说明

输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理

这篇关于maxwell同步mysql到kafka(一个服务器启动多个)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1034857

相关文章

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

springboot3打包成war包,用tomcat8启动

1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3