《从0到1学习Flink》—— Data Sink 介绍

2024-05-02 08:32
文章标签 学习 介绍 data flink sink

本文主要是介绍《从0到1学习Flink》—— Data Sink 介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

再上一篇文章中 《从0到1学习Flink》—— Data Source 介绍 讲解了 Flink Data Source ,那么这里就来讲讲 Flink Data Sink 吧。

首先 Sink 的意思是:

大概可以猜到了吧!Data sink 有点把数据存储下来(落库)的意思。

如上图,Source 就是数据的来源,中间的 Compute 其实就是 Flink 干的事情,可以做一系列的操作,操作完后就把计算后的数据结果 Sink 到某个地方。(可以是 MySQL、ElasticSearch、Kafka、Cassandra 等)。这里我说下自己目前做告警这块就是把 Compute 计算后的结果 Sink 直接告警出来了(发送告警消息到钉钉群、邮件、短信等),这个 sink 的意思也不一定非得说成要把数据存储到某个地方去。其实官网用的 Connector 来形容要去的地方更合适,这个 Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。

Flink Data Sink

前面文章 《从0到1学习Flink》—— Data Source 介绍 介绍了 Flink Data Source 有哪些,这里也看看 Flink Data Sink 支持的有哪些。

看下源码有哪些呢?

可以看到有 Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等 Sink 的方式。

SinkFunction

从上图可以看到 SinkFunction 接口有 invoke 方法,它有一个 RichSinkFunction 抽象类。

上面的那些自带的 Sink 可以看到都是继承了 RichSinkFunction 抽象类,实现了其中的方法,那么我们要是自己定义自己的 Sink 的话其实也是要按照这个套路来做的。

这里就拿个较为简单的 PrintSinkFunction 源码来讲下:

@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {private static final long serialVersionUID = 1L;private static final boolean STD_OUT = false;private static final boolean STD_ERR = true;private boolean target;private transient PrintStream stream;private transient String prefix;/*** Instantiates a print sink function that prints to standard out.*/public PrintSinkFunction() {}/*** Instantiates a print sink function that prints to standard out.** @param stdErr True, if the format should print to standard error instead of standard out.*/public PrintSinkFunction(boolean stdErr) {target = stdErr;}public void setTargetToStandardOut() {target = STD_OUT;}public void setTargetToStandardErr() {target = STD_ERR;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();// get the target streamstream = target == STD_OUT ? System.out : System.err;// set the prefix if we have a >1 parallelismprefix = (context.getNumberOfParallelSubtasks() > 1) ?((context.getIndexOfThisSubtask() + 1) + "> ") : null;}@Overridepublic void invoke(IN record) {if (prefix != null) {stream.println(prefix + record.toString());}else {stream.println(record.toString());}}@Overridepublic void close() {this.stream = null;this.prefix = null;}@Overridepublic String toString() {return "Print to " + (target == STD_OUT ? "System.out" : "System.err");}
}

可以看到它就是实现了 RichSinkFunction 抽象类,然后实现了 invoke 方法,这里 invoke 方法就是把记录打印出来了就是,没做其他的额外操作。

如何使用?

SingleOutputStreamOperator.addSink(new PrintSinkFunction<>();

这样就可以了,如果是其他的 Sink Function 的话需要换成对应的。

使用这个 Function 其效果就是打印从 Source 过来的数据,和直接 Source.print() 效果一样。

下篇文章我们将讲解下如何自定义自己的 Sink Function,并使用一个 demo 来教大家,让大家知道这个套路,且能够在自己工作中自定义自己需要的 Sink Function,来完成自己的工作需求。

最后

本文主要讲了下 Flink 的 Data Sink,并介绍了常见的 Data Sink,也看了下源码的 SinkFunction,介绍了一个简单的 Function 使用, 告诉了大家自定义 Sink Function 的套路,下篇文章带大家写个。

关注我

转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/10/29/flink-sink/

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

这篇关于《从0到1学习Flink》—— Data Sink 介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/953824

相关文章

MySQL中慢SQL优化的不同方式介绍

《MySQL中慢SQL优化的不同方式介绍》慢SQL的优化,主要从两个方面考虑,SQL语句本身的优化,以及数据库设计的优化,下面小编就来给大家介绍一下有哪些方式可以优化慢SQL吧... 目录避免不必要的列分页优化索引优化JOIN 的优化排序优化UNION 优化慢 SQL 的优化,主要从两个方面考虑,SQL 语

C++中函数模板与类模板的简单使用及区别介绍

《C++中函数模板与类模板的简单使用及区别介绍》这篇文章介绍了C++中的模板机制,包括函数模板和类模板的概念、语法和实际应用,函数模板通过类型参数实现泛型操作,而类模板允许创建可处理多种数据类型的类,... 目录一、函数模板定义语法真实示例二、类模板三、关键区别四、注意事项 ‌在C++中,模板是实现泛型编程

Python实现html转png的完美方案介绍

《Python实现html转png的完美方案介绍》这篇文章主要为大家详细介绍了如何使用Python实现html转png功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 1.增强稳定性与错误处理建议使用三层异常捕获结构:try: with sync_playwright(

Java使用多线程处理未知任务数的方案介绍

《Java使用多线程处理未知任务数的方案介绍》这篇文章主要为大家详细介绍了Java如何使用多线程实现处理未知任务数,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 知道任务个数,你可以定义好线程数规则,生成线程数去跑代码说明:1.虚拟线程池:使用 Executors.newVir

Java进阶学习之如何开启远程调式

《Java进阶学习之如何开启远程调式》Java开发中的远程调试是一项至关重要的技能,特别是在处理生产环境的问题或者协作开发时,:本文主要介绍Java进阶学习之如何开启远程调式的相关资料,需要的朋友... 目录概述Java远程调试的开启与底层原理开启Java远程调试底层原理JVM参数总结&nbsMbKKXJx

JAVA SE包装类和泛型详细介绍及说明方法

《JAVASE包装类和泛型详细介绍及说明方法》:本文主要介绍JAVASE包装类和泛型的相关资料,包括基本数据类型与包装类的对应关系,以及装箱和拆箱的概念,并重点讲解了自动装箱和自动拆箱的机制,文... 目录1. 包装类1.1 基本数据类型和对应的包装类1.2 装箱和拆箱1.3 自动装箱和自动拆箱2. 泛型2

HTML5 data-*自定义数据属性的示例代码

《HTML5data-*自定义数据属性的示例代码》HTML5的自定义数据属性(data-*)提供了一种标准化的方法在HTML元素上存储额外信息,可以通过JavaScript访问、修改和在CSS中使用... 目录引言基本概念使用自定义数据属性1. 在 html 中定义2. 通过 JavaScript 访问3.

Java深度学习库DJL实现Python的NumPy方式

《Java深度学习库DJL实现Python的NumPy方式》本文介绍了DJL库的背景和基本功能,包括NDArray的创建、数学运算、数据获取和设置等,同时,还展示了如何使用NDArray进行数据预处理... 目录1 NDArray 的背景介绍1.1 架构2 JavaDJL使用2.1 安装DJL2.2 基本操

四种Flutter子页面向父组件传递数据的方法介绍

《四种Flutter子页面向父组件传递数据的方法介绍》在Flutter中,如果父组件需要调用子组件的方法,可以通过常用的四种方式实现,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录方法 1:使用 GlobalKey 和 State 调用子组件方法方法 2:通过回调函数(Callb

Python进阶之Excel基本操作介绍

《Python进阶之Excel基本操作介绍》在现实中,很多工作都需要与数据打交道,Excel作为常用的数据处理工具,一直备受人们的青睐,本文主要为大家介绍了一些Python中Excel的基本操作,希望... 目录概述写入使用 xlwt使用 XlsxWriter读取修改概述在现实中,很多工作都需要与数据打交