Flink入门之DataStream API及kafka消费者

2023-12-06 07:20

本文主要是介绍Flink入门之DataStream API及kafka消费者,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

DataStream API

  1. 主要流程:
    • 获取执行环境
    • 读取数据源
    • 转换操作
    • 输出数据
    • Execute触发执行
  2. 获取执行环境
    • 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)
    • 创建本地环境StreamExecutionEnvironment.createLocalEnvironment()
    • 创建远程环境createRemoteEnvironment(“hadoop102”, 37784, “jar/1.jar”)
      • 参数1:主机号
      • 参数2:端口号
      • 参数3:作业jar包的路径
  3. 获取数据源
    • 简单数据源
      • 从集合中读取数据env.fromCollection(集合)
      • 从元素列表中获取数据env.fromElements()
      • 从文件中读取数据,env.readTextFIle(路径), 已废弃
      • 从端口读取数据,env.socketTextStream()
    • 文件数据源
    • kafka数据源
    • DataGen数据源
    • 自定义数据源

文件数据源

使用文件数据源前,需要先添加相关依赖

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>
public class Flink02_FileSource {public static void main(String[] args) throw Exception {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//file sourceFileSource.FileSourceBuilder<String> fileSourceBuilder = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat("utf-8"), new Path("input/word.txt"));FileSource<String> fileSource = fileSourceBuilder.build();//source 算子DataStreamSource<String> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");ds.print();env.execute();}
}

DataGen数据源

主要用于生成模拟数据,也需要导入相关依赖

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version><scope>compile</scope></dependency>
public class Flink04_DataGenSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return UUID.randomUUID() + "->" + value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);DataStreamSource<String> dataGenDs = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGenDs");dataGenDs.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

Kafka消费者

  1. 消费方式:拉取

  2. 消费者对象:KafkaConsumenr

  3. 消费原则:
    一个主题的一个分区只能被一个消费者组中的一个消费者消费
    一个消费者组中的一个消费者可以消费一个主题中的多个分区

  4. 消费者相关的参数:

    • key.deserializer 反序列化
    • value.deserializer
    • bootstrap.servers 集群的位置
    • group.id 消费者组id (为何分组,方便同一组的消费者进行断点续传)
    • auto.commit.interval.ms 自动提交间隔 默认5s
    • enable.auto.commit: 开启自动提交offset偏移量
    • auto.offset.reset: 当offset不存在时,offset重置,默认是最末尾的位置
      • ①新的消费者组,之前没有消费过,没有记录的offset
      • ②当前要消费的offset在kafka中已经不存在,可能是因为时间久了,对应的数据清理掉了
    • 重置策略:
      • earliest: 头,能消费到分区中现有的数据
      • latest: 尾,只能消费到分区中新来的数据
    • isolation.level:事务隔离级别
      • 读未提交
      • 读已提交
  5. 消费数据存在的问题

    • 漏消费,导致数据丢失
    • 重复消费,导致数据重复
  6. shell 创建生产者对象:kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

public class Flink03_KafkaSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092").setGroupId("flink").setTopics("first")//优先使用消费者组记录的Offset进行消费,如果offset不存在,根据策略进行重置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema())//如果还有别的配置需要指定,统一使用通用方法
//                .setProperty("isolation.level", "read_committed").build();DataStreamSource<String> kafkaDS = env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), "kafkaDS");kafkaDS.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

这篇关于Flink入门之DataStream API及kafka消费者的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/460931

相关文章

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

数论入门整理(updating)

一、gcd lcm 基础中的基础,一般用来处理计算第一步什么的,分数化简之类。 LL gcd(LL a, LL b) { return b ? gcd(b, a % b) : a; } <pre name="code" class="cpp">LL lcm(LL a, LL b){LL c = gcd(a, b);return a / c * b;} 例题:

Java 创建图形用户界面(GUI)入门指南(Swing库 JFrame 类)概述

概述 基本概念 Java Swing 的架构 Java Swing 是一个为 Java 设计的 GUI 工具包,是 JAVA 基础类的一部分,基于 Java AWT 构建,提供了一系列轻量级、可定制的图形用户界面(GUI)组件。 与 AWT 相比,Swing 提供了许多比 AWT 更好的屏幕显示元素,更加灵活和可定制,具有更好的跨平台性能。 组件和容器 Java Swing 提供了许多

【IPV6从入门到起飞】5-1 IPV6+Home Assistant(搭建基本环境)

【IPV6从入门到起飞】5-1 IPV6+Home Assistant #搭建基本环境 1 背景2 docker下载 hass3 创建容器4 浏览器访问 hass5 手机APP远程访问hass6 更多玩法 1 背景 既然电脑可以IPV6入站,手机流量可以访问IPV6网络的服务,为什么不在电脑搭建Home Assistant(hass),来控制你的设备呢?@智能家居 @万物互联

poj 2104 and hdu 2665 划分树模板入门题

题意: 给一个数组n(1e5)个数,给一个范围(fr, to, k),求这个范围中第k大的数。 解析: 划分树入门。 bing神的模板。 坑爹的地方是把-l 看成了-1........ 一直re。 代码: poj 2104: #include <iostream>#include <cstdio>#include <cstdlib>#include <al

MySQL-CRUD入门1

文章目录 认识配置文件client节点mysql节点mysqld节点 数据的添加(Create)添加一行数据添加多行数据两种添加数据的效率对比 数据的查询(Retrieve)全列查询指定列查询查询中带有表达式关于字面量关于as重命名 临时表引入distinct去重order by 排序关于NULL 认识配置文件 在我们的MySQL服务安装好了之后, 会有一个配置文件, 也就

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显