本文主要是介绍Flink之DataStream API的转换算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
简单转换算子
函数的实现方式
- 自定义类,实现函数接口:编码麻烦,使用灵活
- 匿名内部类:编码简单
- Lambda:编码简洁
public class Flink02_FunctionImplement {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);env.socketTextStream("hadoop102",8888).flatMap((String line, Collector<Tuple2<String, Integer>> out)->{String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(0).sum(1).print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>> {private String Operator;@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}}
}
Reduce规约聚合
- reduce:规约聚合
- 聚合的原理:两两聚合,上一次的聚合值与本次新到的值进行聚合
- 泛型 T : 流中的数据类型, 从方法声明中可以看到,输入输出类型一直
- 方法: T reduce(T value1, T value2) throws Exception
- value1:上一次的聚合值
- value2:本次新到的值
public class Flink04_ReduceAggOpterator {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);ds.print("input================");//reduce:每个用户的点击次数ds.map(event->new WordCount(event.getUser(),1)).keyBy(WordCount::getWord).reduce(new ReduceFunction<WordCount>() {/**** @param value1 上次聚合的结果,第一个数据不参与聚合,直接输出* @param value2 新来的值* @return* @throws Exception*/@Overridepublic WordCount reduce(WordCount value1, WordCount value2) throws Exception {System.out.println("测试");return new WordCount(value1.getWord(),value1.getCount()+value2.getCount());}}).print("reduce");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}
富函数
调用算子的时候,需要传入对应的用户自定义函数来完成具体的功能
- 函数:
- 普通函数
Map
filter
flatMap
reduce
富函数:基本上每个普通函数都有对应的富函数
统一接口interface RichFunction extends Function
具体使用的富函数类:
- RichMapFunction
- RichFilterFunction
- RichFlatMapFunction
- RichReduceFunction
- …
富函数功能:- 生命周期方法:
- open(): 当前算子的每个并行子任务的实例创建时会调用一次
- close():当前算子的每个并行子任务的实例销毁时(有界流),调用一次
- 获取运行时上下文对象 getRuntimeContext
- 可以获取当前作业,当前task的相关信息
- 获取不同类型的状态,进行状态编程*
getState | getListState | getReducingState | getMapState
- 生命周期方法:
- 普通函数
public class Flink05_RichFunction {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);// DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);FileSource<String> fileSource = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat(),new Path("input/enents.txt")).build();DataStreamSource<String> fileDs = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");SingleOutputStreamOperator<Event> ds = fileDs.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {String[] valueArr = value.split("");return new Event(valueArr[0], valueArr[1], Long.valueOf(valueArr[2]));}});ds.print("input================");ds.map(new RichMapFunction<Event, WordCount>() {/*** 生命周期open方法,当前算子实例创建时执行一次,只执行一次* @param parameters The configuration containing the parameters attached to the contract.* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("创建Redis的连接对象");}@Overridepublic WordCount map(Event value) throws Exception {System.out.println("每条数据执行一次");return new WordCount(value.getUser(),1);}/*** 生命周期的close方法* 当前算子实例销毁时执行一次* @throws Exception*/@Overridepublic void close() throws Exception {System.out.println("关闭连接对象");}}).print("map");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}
这篇关于Flink之DataStream API的转换算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!