Flink入门之DataStream API及kafka消费者

2023-12-06 07:20

本文主要是介绍Flink入门之DataStream API及kafka消费者,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

DataStream API

  1. 主要流程:
    • 获取执行环境
    • 读取数据源
    • 转换操作
    • 输出数据
    • Execute触发执行
  2. 获取执行环境
    • 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)
    • 创建本地环境StreamExecutionEnvironment.createLocalEnvironment()
    • 创建远程环境createRemoteEnvironment(“hadoop102”, 37784, “jar/1.jar”)
      • 参数1:主机号
      • 参数2:端口号
      • 参数3:作业jar包的路径
  3. 获取数据源
    • 简单数据源
      • 从集合中读取数据env.fromCollection(集合)
      • 从元素列表中获取数据env.fromElements()
      • 从文件中读取数据,env.readTextFIle(路径), 已废弃
      • 从端口读取数据,env.socketTextStream()
    • 文件数据源
    • kafka数据源
    • DataGen数据源
    • 自定义数据源

文件数据源

使用文件数据源前,需要先添加相关依赖

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>
public class Flink02_FileSource {public static void main(String[] args) throw Exception {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//file sourceFileSource.FileSourceBuilder<String> fileSourceBuilder = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat("utf-8"), new Path("input/word.txt"));FileSource<String> fileSource = fileSourceBuilder.build();//source 算子DataStreamSource<String> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");ds.print();env.execute();}
}

DataGen数据源

主要用于生成模拟数据,也需要导入相关依赖

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version><scope>compile</scope></dependency>
public class Flink04_DataGenSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return UUID.randomUUID() + "->" + value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);DataStreamSource<String> dataGenDs = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGenDs");dataGenDs.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

Kafka消费者

  1. 消费方式:拉取

  2. 消费者对象:KafkaConsumenr

  3. 消费原则:
    一个主题的一个分区只能被一个消费者组中的一个消费者消费
    一个消费者组中的一个消费者可以消费一个主题中的多个分区

  4. 消费者相关的参数:

    • key.deserializer 反序列化
    • value.deserializer
    • bootstrap.servers 集群的位置
    • group.id 消费者组id (为何分组,方便同一组的消费者进行断点续传)
    • auto.commit.interval.ms 自动提交间隔 默认5s
    • enable.auto.commit: 开启自动提交offset偏移量
    • auto.offset.reset: 当offset不存在时,offset重置,默认是最末尾的位置
      • ①新的消费者组,之前没有消费过,没有记录的offset
      • ②当前要消费的offset在kafka中已经不存在,可能是因为时间久了,对应的数据清理掉了
    • 重置策略:
      • earliest: 头,能消费到分区中现有的数据
      • latest: 尾,只能消费到分区中新来的数据
    • isolation.level:事务隔离级别
      • 读未提交
      • 读已提交
  5. 消费数据存在的问题

    • 漏消费,导致数据丢失
    • 重复消费,导致数据重复
  6. shell 创建生产者对象:kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

public class Flink03_KafkaSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092").setGroupId("flink").setTopics("first")//优先使用消费者组记录的Offset进行消费,如果offset不存在,根据策略进行重置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema())//如果还有别的配置需要指定,统一使用通用方法
//                .setProperty("isolation.level", "read_committed").build();DataStreamSource<String> kafkaDS = env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), "kafkaDS");kafkaDS.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

这篇关于Flink入门之DataStream API及kafka消费者的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/460931

相关文章

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

基于Flask框架添加多个AI模型的API并进行交互

《基于Flask框架添加多个AI模型的API并进行交互》:本文主要介绍如何基于Flask框架开发AI模型API管理系统,允许用户添加、删除不同AI模型的API密钥,感兴趣的可以了解下... 目录1. 概述2. 后端代码说明2.1 依赖库导入2.2 应用初始化2.3 API 存储字典2.4 路由函数2.5 应

Python FastAPI入门安装使用

《PythonFastAPI入门安装使用》FastAPI是一个现代、快速的PythonWeb框架,用于构建API,它基于Python3.6+的类型提示特性,使得代码更加简洁且易于绶护,这篇文章主要介... 目录第一节:FastAPI入门一、FastAPI框架介绍什么是ASGI服务(WSGI)二、FastAP

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)

《C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)》本文主要介绍了C#集成DeepSeek模型实现AI私有化的方法,包括搭建基础环境,如安装Ollama和下载DeepS... 目录前言搭建基础环境1、安装 Ollama2、下载 DeepSeek R1 模型客户端 ChatBo

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll

浅析如何使用Swagger生成带权限控制的API文档

《浅析如何使用Swagger生成带权限控制的API文档》当涉及到权限控制时,如何生成既安全又详细的API文档就成了一个关键问题,所以这篇文章小编就来和大家好好聊聊如何用Swagger来生成带有... 目录准备工作配置 Swagger权限控制给 API 加上权限注解查看文档注意事项在咱们的开发工作里,API

一分钟带你上手Python调用DeepSeek的API

《一分钟带你上手Python调用DeepSeek的API》最近DeepSeek非常火,作为一枚对前言技术非常关注的程序员来说,自然都想对接DeepSeek的API来体验一把,下面小编就来为大家介绍一下... 目录前言免费体验API-Key申请首次调用API基本概念最小单元推理模型智能体自定义界面总结前言最