Flink写出数据到 MySql 控制事务,保证Exactly_Once

2024-05-10 13:32

本文主要是介绍Flink写出数据到 MySql 控制事务,保证Exactly_Once,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 一、MySql Sink
    • 二、控制事务代码
        • 1、主线代码
        • 2、Druid 数据库连接池类

一、MySql Sink

要想使用TwoPhaseCommitSinkFunction,存储系统必须支持事务

Mysql Sink继承TwoPhaseCommitSinkFunction抽象类,分两个阶段提交Sink,保证Exactly_Once:

  • ①做checkpoint
  • ② 提交事务

二、控制事务代码

1、主线代码

参数详解:

  • ① 输入的类型
  • ② connection数据库连接对象
  • ③ 什么都不指定,为了泛型,写void
public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>,MySqlTwoPhaseCommitSink.ConnectionState, Void> {// 定义可用的构造函数public MySqlTwoPhaseCommitSink() {super(new KryoSerializer<>(MySqlTwoPhaseCommitSink.ConnectionState.class, new ExecutionConfig()),VoidSerializer.INSTANCE);}@Overrideprotected ConnectionState beginTransaction() throws Exception {System.out.println("=====> beginTransaction... ");//使用连接池,不使用单个连接//Class.forName("com.mysql.jdbc.Driver");//Connection conn = DriverManager.getConnection("jdbc:mysql://172.16.200// .101:3306/bigdata?characterEncoding=UTF-8", "root", "123456");Connection connection = DruidConnectionPool.getConnection();connection.setAutoCommit(false);//设定不自动提交return new ConnectionState(connection);}@Overrideprotected void invoke(ConnectionState transaction, Tuple2<String, Integer> value, Context context) throws Exception {Connection connection = transaction.connection;PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON" +" DUPLICATE KEY UPDATE counts = ?");pstm.setString(1, value.f0);pstm.setInt(2, value.f1);pstm.setInt(3, value.f1);pstm.executeUpdate();pstm.close();}// 先不做处理@Overrideprotected void preCommit(ConnectionState transaction) throws Exception {System.out.println("=====> preCommit... " + transaction);}//提交事务@Overrideprotected void commit(ConnectionState transaction) {System.out.println("=====> commit... ");Connection connection = transaction.connection;try {connection.commit();connection.close();} catch (SQLException e) {throw new RuntimeException("提交事物异常");}}//回滚事务@Overrideprotected void abort(ConnectionState transaction) {System.out.println("=====> abort... ");Connection connection = transaction.connection;try {connection.rollback();connection.close();} catch (SQLException e) {throw new RuntimeException("回滚事物异常");}}//定义建立数据库连接的方法public static class ConnectionState {private final transient Connection connection;public ConnectionState(Connection connection) {this.connection = connection;}}
}
2、Druid 数据库连接池类

为什么要用连接池?

  • 因为每个数据库连接都要控制事务
import com.alibaba.druid.pool.DruidDataSourceFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;public class DruidConnectionPool {private transient static DataSource dataSource = null;private transient static Properties props = new Properties();// 静态代码块static {props.put("driverClassName", "com.mysql.jdbc.Driver");props.put("url", "jdbc:mysql://localhost:3306/day01?characterEncoding=utf8");props.put("username", "root");props.put("password", "123456");try {dataSource = DruidDataSourceFactory.createDataSource(props);} catch (Exception e) {e.printStackTrace();}}private DruidConnectionPool() {}public static Connection getConnection() throws SQLException {return dataSource.getConnection();}
}

这篇关于Flink写出数据到 MySql 控制事务,保证Exactly_Once的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/976573

相关文章

mysql索引四(组合索引)

单列索引,即一个索引只包含单个列,一个表可以有多个单列索引,但这不是组合索引;组合索引,即一个索引包含多个列。 因为有事,下面内容全部转自:https://www.cnblogs.com/farmer-cabbage/p/5793589.html 为了形象地对比单列索引和组合索引,为表添加多个字段:    CREATE TABLE mytable( ID INT NOT NULL, use

mysql索引三(全文索引)

前面分别介绍了mysql索引一(普通索引)、mysql索引二(唯一索引)。 本文学习mysql全文索引。 全文索引(也称全文检索)是目前搜索引擎使用的一种关键技术。它能够利用【分词技术】等多种算法智能分析出文本文字中关键词的频率和重要性,然后按照一定的算法规则智能地筛选出我们想要的搜索结果。 在MySql中,创建全文索引相对比较简单。例如:我们有一个文章表(article),其中有主键ID(

mysql索引二(唯一索引)

前文中介绍了MySQL中普通索引用法,和没有索引的区别。mysql索引一(普通索引) 下面学习一下唯一索引。 创建唯一索引的目的不是为了提高访问速度,而只是为了避免数据出现重复。唯一索引可以有多个但索引列的值必须唯一,索引列的值允许有空值。如果能确定某个数据列将只包含彼此各不相同的值,在为这个数据列创建索引的时候就应该使用关键字UNIQUE,把它定义为一个唯一索引。 添加数据库唯一索引的几种

mysql索引一(普通索引)

mysql的索引分为两大类,聚簇索引、非聚簇索引。聚簇索引是按照数据存放的物理位置为顺序的,而非聚簇索引则不同。聚簇索引能够提高多行检索的速度、非聚簇索引则对单行检索的速度很快。         在这两大类的索引类型下,还可以降索引分为4个小类型:         1,普通索引:最基本的索引,没有任何限制,是我们经常使用到的索引。         2,唯一索引:与普通索引

【服务器运维】MySQL数据存储至数据盘

查看磁盘及分区 [root@MySQL tmp]# fdisk -lDisk /dev/sda: 21.5 GB, 21474836480 bytes255 heads, 63 sectors/track, 2610 cylindersUnits = cylinders of 16065 * 512 = 8225280 bytesSector size (logical/physical)

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

SQL Server中,always on服务器的相关操作

在SQL Server中,建立了always on服务,可用于数据库的同步备份,当数据库出现问题后,always on服务会自动切换主从服务器。 例如192.168.1.10为主服务器,12为从服务器,当主服务器出现问题后,always on自动将主服务器切换为12,保证数据库正常访问。 对于always on服务器有如下操作: 1、切换主从服务器:假如需要手动切换主从服务器时(如果两个服务

SQL Server中,isnull()函数以及null的用法

SQL Serve中的isnull()函数:          isnull(value1,value2)         1、value1与value2的数据类型必须一致。         2、如果value1的值不为null,结果返回value1。         3、如果value1为null,结果返回vaule2的值。vaule2是你设定的值。        如

SQL Server中,添加数据库到AlwaysOn高可用性组条件

1、将数据添加到AlwaysOn高可用性组,需要满足以下条件: 2、更多具体AlwaysOn设置,参考:https://msdn.microsoft.com/zh-cn/library/windows/apps/ff878487(v=sql.120).aspx 注:上述资源来自MSDN。

SQL Server中,用Restore DataBase把数据库还原到指定的路径

restore database 数据库名 from disk='备份文件路径' with move '数据库文件名' to '数据库文件放置路径', move '日志文件名' to '日志文件存放置路径' Go 如: restore database EaseWe from disk='H:\EaseWe.bak' with move 'Ease