Flink写出数据到 MySql 控制事务,保证Exactly_Once

2024-05-10 13:32

本文主要是介绍Flink写出数据到 MySql 控制事务,保证Exactly_Once,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 一、MySql Sink
    • 二、控制事务代码
        • 1、主线代码
        • 2、Druid 数据库连接池类

一、MySql Sink

要想使用TwoPhaseCommitSinkFunction,存储系统必须支持事务

Mysql Sink继承TwoPhaseCommitSinkFunction抽象类,分两个阶段提交Sink,保证Exactly_Once:

  • ①做checkpoint
  • ② 提交事务

二、控制事务代码

1、主线代码

参数详解:

  • ① 输入的类型
  • ② connection数据库连接对象
  • ③ 什么都不指定,为了泛型,写void
public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>,MySqlTwoPhaseCommitSink.ConnectionState, Void> {// 定义可用的构造函数public MySqlTwoPhaseCommitSink() {super(new KryoSerializer<>(MySqlTwoPhaseCommitSink.ConnectionState.class, new ExecutionConfig()),VoidSerializer.INSTANCE);}@Overrideprotected ConnectionState beginTransaction() throws Exception {System.out.println("=====> beginTransaction... ");//使用连接池,不使用单个连接//Class.forName("com.mysql.jdbc.Driver");//Connection conn = DriverManager.getConnection("jdbc:mysql://172.16.200// .101:3306/bigdata?characterEncoding=UTF-8", "root", "123456");Connection connection = DruidConnectionPool.getConnection();connection.setAutoCommit(false);//设定不自动提交return new ConnectionState(connection);}@Overrideprotected void invoke(ConnectionState transaction, Tuple2<String, Integer> value, Context context) throws Exception {Connection connection = transaction.connection;PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON" +" DUPLICATE KEY UPDATE counts = ?");pstm.setString(1, value.f0);pstm.setInt(2, value.f1);pstm.setInt(3, value.f1);pstm.executeUpdate();pstm.close();}// 先不做处理@Overrideprotected void preCommit(ConnectionState transaction) throws Exception {System.out.println("=====> preCommit... " + transaction);}//提交事务@Overrideprotected void commit(ConnectionState transaction) {System.out.println("=====> commit... ");Connection connection = transaction.connection;try {connection.commit();connection.close();} catch (SQLException e) {throw new RuntimeException("提交事物异常");}}//回滚事务@Overrideprotected void abort(ConnectionState transaction) {System.out.println("=====> abort... ");Connection connection = transaction.connection;try {connection.rollback();connection.close();} catch (SQLException e) {throw new RuntimeException("回滚事物异常");}}//定义建立数据库连接的方法public static class ConnectionState {private final transient Connection connection;public ConnectionState(Connection connection) {this.connection = connection;}}
}
2、Druid 数据库连接池类

为什么要用连接池?

  • 因为每个数据库连接都要控制事务
import com.alibaba.druid.pool.DruidDataSourceFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;public class DruidConnectionPool {private transient static DataSource dataSource = null;private transient static Properties props = new Properties();// 静态代码块static {props.put("driverClassName", "com.mysql.jdbc.Driver");props.put("url", "jdbc:mysql://localhost:3306/day01?characterEncoding=utf8");props.put("username", "root");props.put("password", "123456");try {dataSource = DruidDataSourceFactory.createDataSource(props);} catch (Exception e) {e.printStackTrace();}}private DruidConnectionPool() {}public static Connection getConnection() throws SQLException {return dataSource.getConnection();}
}

这篇关于Flink写出数据到 MySql 控制事务,保证Exactly_Once的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/976573

相关文章

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

用js控制视频播放进度基本示例代码

《用js控制视频播放进度基本示例代码》写前端的时候,很多的时候是需要支持要网页视频播放的功能,下面这篇文章主要给大家介绍了关于用js控制视频播放进度的相关资料,文中通过代码介绍的非常详细,需要的朋友可... 目录前言html部分:JavaScript部分:注意:总结前言在javascript中控制视频播放

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

mysql出现ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)的解决方法

《mysql出现ERROR2003(HY000):Can‘tconnecttoMySQLserveron‘localhost‘(10061)的解决方法》本文主要介绍了mysql出现... 目录前言:第一步:第二步:第三步:总结:前言:当你想通过命令窗口想打开mysql时候发现提http://www.cpp

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分