Flink基础之DataStream API

2023-12-07 10:20
文章标签 基础 api flink datastream

本文主要是介绍Flink基础之DataStream API,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

流的合并

  1. union联合:被unioin的流中的数据类型必须一致
  2. connect连接:合并的两条流的数据类型可以不一致
    • connec后,得到的是ConnectedStreams
    • 合并后需要根据数据流是否经过keyby分区
      • coConnect: 将两条数据流合并为同一数据类型
      • keyedConnect
public class Flink09_UnionConnectStream {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3, 4, 5, 6, 7);DataStreamSource<Integer> ds2 = env.fromElements(8, 9);DataStreamSource<String> ds3 = env.fromElements("a", "b", "c");DataStream<Integer> unionDs = ds1.union(ds2);unionDs.print();//connectConnectedStreams<Integer, String> connectDs = ds1.connect(ds3);//处理connectDs.process(new CoProcessFunction<Integer, String, String>() {@Overridepublic void processElement1(Integer value, CoProcessFunction<Integer, String, String>.Context ctx, Collector<String> out) throws Exception {out.collect(value.toString());}@Overridepublic void processElement2(String value, CoProcessFunction<Integer, String, String>.Context ctx, Collector<String> out) throws Exception {out.collect(value.toUpperCase());}}).print("connect");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

Sink输出算子

目前所使用的大多数Sink, 都是基于2PC的方式来保证状态精确一次性。2PC 即 two face commit, 两阶段提交,该机制的实现必须要开启Flink的检查点。

  1. FileSink:fileSink = FileSink.<数据流泛型>forRowFormat(输出路径, 数据流编码器)
    • 文件滚动策略 .withRollingPolicy().builder()
      • 文件多大滚动.withMaxPartSize(MemorySize.parse(“10m”))
      • 多长时间滚动一次 .withRolloverInterval(Duration.ofSeconds(10))
      • 多久不活跃滚动 .withInactivityInterval(Duration.ofSeconds(5))
    • 目录滚动策略:一般设置为按照天或者小时或者其他时间间隔
    • 文件输出配置:可以设置输出文件的前缀和后缀
public class Flink01_FileSink {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(2000);//默认是最大并行度env.setParallelism(1);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);//FileSinkFileSink<String> stringFileSink = FileSink.<String>forRowFormat(new Path("output"),new SimpleStringEncoder<>()).withRollingPolicy(//文件滚动策略DefaultRollingPolicy.builder().withMaxPartSize(MemorySize.parse("10m"))//文件多大滚动.withRolloverInterval(Duration.ofSeconds(10))//多久滚动.withInactivityInterval(Duration.ofSeconds(5))//多久不活跃滚动.build()).withBucketAssigner(//目录滚动策略new DateTimeBucketAssigner<>("yyyy-MM-dd HH-mm")).withBucketCheckInterval(1000L)//检查的间隔.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu").withPartSuffix(".log").build()).build();ds.map(JSON::toJSONString).sinkTo(stringFileSink);try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}
  1. Kafka Sink(重点)
    • 生产者对象:KafkaProducer
    • Kafka生产者分区策略:
      • 如果明确指定分区号,直接用
      • 如果没有指定分区号,但是Record中带了key,就按照key的hash值对分区数取余得到分区号
      • 如果没有指定相关分区号,使用粘性分区策略
    • 生产者相关配置
      • key.serializer : key的序列化器
      • value.serializer: value的序列化器
      • bootstrap.servers: 集群位置
      • retries: 重试次数
      • batch.size 批次大小
      • linger.ms 批次超时时间
      • acks 应答级别
      • transaction.id 事务ID
    • Shell中开启Kafka消费者的命令:kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
public class Flink02_KafkaSink {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//开启检查点env.enableCheckpointing(5000);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);//KafkaSinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("first").setValueSerializationSchema(new SimpleStringSchema()).build())//语义//AT_LEAST_ONCE:至少一次,表示数据可能重复,需要考虑去重操作//EXACTLY_ONCE:精确一次//kafka transaction timeout is larger than broker//kafka超时时间:1H//broker超时时间:15分钟//                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据传输的保障.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)//数据传输的保障.setTransactionalIdPrefix("flink"+ RandomUtils.nextInt(0,100000))
//                .setProperty(ProducerConfig.RETRIES_CONFIG,"10").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"600000").build();ds.map(JSON::toJSONString).sinkTo(kafkaSink);//写入到kafka 生产者//shell 消费者:kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic firsttry {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

为了在Shell中开启消费者更为便捷,这里写了一个小脚本,用来动态的设置主题并开启相应的Kafka消费者,脚本名称为kc.sh.

#!/bin/bash# 检查参数数量
if [ $# -lt 1 ]; thenecho "Usage: $0 <topic>"exit 1
fi# 从命令行参数获取主题
topic=$1# Kafka配置
bootstrap_server="hadoop102:9092"# 构建kafka-console-consumer命令
consumer_command="kafka-console-consumer.sh --bootstrap-server $bootstrap_server --topic $topic"# 打印消费命令
echo "Running Kafka Consumer for topic: $topic"
echo "Command: $consumer_command"# 执行消费命令
$consumer_command

这篇关于Flink基础之DataStream API的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/465500

相关文章

C#基础之委托详解(Delegate)

《C#基础之委托详解(Delegate)》:本文主要介绍C#基础之委托(Delegate),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 委托定义2. 委托实例化3. 多播委托(Multicast Delegates)4. 委托的用途事件处理回调函数LINQ

基于Flask框架添加多个AI模型的API并进行交互

《基于Flask框架添加多个AI模型的API并进行交互》:本文主要介绍如何基于Flask框架开发AI模型API管理系统,允许用户添加、删除不同AI模型的API密钥,感兴趣的可以了解下... 目录1. 概述2. 后端代码说明2.1 依赖库导入2.2 应用初始化2.3 API 存储字典2.4 路由函数2.5 应

C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)

《C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)》本文主要介绍了C#集成DeepSeek模型实现AI私有化的方法,包括搭建基础环境,如安装Ollama和下载DeepS... 目录前言搭建基础环境1、安装 Ollama2、下载 DeepSeek R1 模型客户端 ChatBo

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll

浅析如何使用Swagger生成带权限控制的API文档

《浅析如何使用Swagger生成带权限控制的API文档》当涉及到权限控制时,如何生成既安全又详细的API文档就成了一个关键问题,所以这篇文章小编就来和大家好好聊聊如何用Swagger来生成带有... 目录准备工作配置 Swagger权限控制给 API 加上权限注解查看文档注意事项在咱们的开发工作里,API

一分钟带你上手Python调用DeepSeek的API

《一分钟带你上手Python调用DeepSeek的API》最近DeepSeek非常火,作为一枚对前言技术非常关注的程序员来说,自然都想对接DeepSeek的API来体验一把,下面小编就来为大家介绍一下... 目录前言免费体验API-Key申请首次调用API基本概念最小单元推理模型智能体自定义界面总结前言最

JAVA调用Deepseek的api完成基本对话简单代码示例

《JAVA调用Deepseek的api完成基本对话简单代码示例》:本文主要介绍JAVA调用Deepseek的api完成基本对话的相关资料,文中详细讲解了如何获取DeepSeekAPI密钥、添加H... 获取API密钥首先,从DeepSeek平台获取API密钥,用于身份验证。添加HTTP客户端依赖使用Jav

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep