本文主要是介绍Flink入门之DataStream API及kafka消费者,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
DataStream API
- 主要流程:
- 获取执行环境
- 读取数据源
- 转换操作
- 输出数据
- Execute触发执行
- 获取执行环境
- 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)
- 创建本地环境StreamExecutionEnvironment.createLocalEnvironment()
- 创建远程环境createRemoteEnvironment(“hadoop102”, 37784, “jar/1.jar”)
- 参数1:主机号
- 参数2:端口号
- 参数3:作业jar包的路径
- 获取数据源
- 简单数据源
- 从集合中读取数据env.fromCollection(集合)
- 从元素列表中获取数据env.fromElements()
- 从文件中读取数据,env.readTextFIle(路径), 已废弃
- 从端口读取数据,env.socketTextStream()
- 文件数据源
- kafka数据源
- DataGen数据源
- 自定义数据源
- 简单数据源
文件数据源
使用文件数据源前,需要先添加相关依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>
public class Flink02_FileSource {public static void main(String[] args) throw Exception {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//file sourceFileSource.FileSourceBuilder<String> fileSourceBuilder = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat("utf-8"), new Path("input/word.txt"));FileSource<String> fileSource = fileSourceBuilder.build();//source 算子DataStreamSource<String> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");ds.print();env.execute();}
}
DataGen数据源
主要用于生成模拟数据,也需要导入相关依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version><scope>compile</scope></dependency>
public class Flink04_DataGenSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return UUID.randomUUID() + "->" + value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);DataStreamSource<String> dataGenDs = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGenDs");dataGenDs.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}
Kafka消费者
-
消费方式:拉取
-
消费者对象:KafkaConsumenr
-
消费原则:
一个主题的一个分区只能被一个消费者组中的一个消费者消费
一个消费者组中的一个消费者可以消费一个主题中的多个分区 -
消费者相关的参数:
- key.deserializer 反序列化
- value.deserializer
- bootstrap.servers 集群的位置
- group.id 消费者组id (为何分组,方便同一组的消费者进行断点续传)
- auto.commit.interval.ms 自动提交间隔 默认5s
- enable.auto.commit: 开启自动提交offset偏移量
- auto.offset.reset: 当offset不存在时,offset重置,默认是最末尾的位置
- ①新的消费者组,之前没有消费过,没有记录的offset
- ②当前要消费的offset在kafka中已经不存在,可能是因为时间久了,对应的数据清理掉了
- 重置策略:
- earliest: 头,能消费到分区中现有的数据
- latest: 尾,只能消费到分区中新来的数据
- isolation.level:事务隔离级别
- 读未提交
- 读已提交
-
消费数据存在的问题
- 漏消费,导致数据丢失
- 重复消费,导致数据重复
-
shell 创建生产者对象:kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
public class Flink03_KafkaSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092").setGroupId("flink").setTopics("first")//优先使用消费者组记录的Offset进行消费,如果offset不存在,根据策略进行重置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema())//如果还有别的配置需要指定,统一使用通用方法
// .setProperty("isolation.level", "read_committed").build();DataStreamSource<String> kafkaDS = env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), "kafkaDS");kafkaDS.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}
这篇关于Flink入门之DataStream API及kafka消费者的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!