Flink1.14.3流批一体体验

2023-10-31 08:40
文章标签 体验 一体 flink1.14 流批

本文主要是介绍Flink1.14.3流批一体体验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

Flink自从1.10就喊着要搞流批一体,据说1.14是个里程碑,特意体验下。

变化

DataSet消失

笔者隐约记得,Flink1.8老版本和Spark很像,同样分Stream流处理和DataSet批处理。新版本中:

package com.zhiyong.flinkStudy;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class FlinkDatasetDemo1 {public static void main(String[] args) throws Exception{ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> data = env.fromElements("hehe", "haha", "哈哈", "哈哈");//老版本是返回DataSetString[] str1 = {"hehe1", "haha1", "哈哈1", "哈哈1"};DataSource<String> data1 = env.fromElements(str1);//老版本是返回DataSetAggregateOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction1()).groupBy(0).sum(1);result.print();System.out.println("**************************");SortPartitionOperator<Tuple2<String, Integer>> result1 = data1.flatMap(new FlatMapFunction2()).groupBy(0).sum(1).sortPartition(1, Order.DESCENDING);result1.print();}private static class FlatMapFunction1 implements FlatMapFunction<String, Tuple2<String,Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String cell : value.split("\\s+") ) {out.collect(Tuple2.of(cell,1));}}}private static class FlatMapFunction2 implements FlatMapFunction<String, Tuple2<String,Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] split = value.split("\\s+");for (int i = 0; i < split.length; i++) {out.collect(new Tuple2<>(split[i],1));}}}
}

执行后:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(hehe,1)
(haha,1)
(哈哈,2)
**************************
(哈哈1,2)
(hehe1,1)
(haha1,1)Process finished with exit code 0

结果当然是不会有啥变化,但是记忆中的DataSet消失了,变成了DataSource,点进去可以看到:

package org.apache.flink.api.java.operators;import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.SplitDataProperties;
import org.apache.flink.configuration.Configuration;/*** An operation that creates a new data set (data source). The operation acts as the data set on* which to apply further transformations. It encapsulates additional configuration parameters, to* customize the execution.** @param <OUT> The type of the elements produced by this data source.*/
@Public
public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {private final InputFormat<OUT, ?> inputFormat;private final String dataSourceLocationName;private Configuration parameters;private SplitDataProperties<OUT> splitDataProperties;// --------------------------------------------------------------------------------------------/*** Creates a new data source.** @param context The environment in which the data source gets executed.* @param inputFormat The input format that the data source executes.* @param type The type of the elements produced by this input format.*/public DataSource(ExecutionEnvironment context,InputFormat<OUT, ?> inputFormat,TypeInformation<OUT> type,String dataSourceLocationName) {super(context, type);this.dataSourceLocationName = dataSourceLocationName;if (inputFormat == null) {throw new IllegalArgumentException("The input format may not be null.");}this.inputFormat = inputFormat;if (inputFormat instanceof NonParallelInput) {this.parallelism = 1;}}/*** Gets the input format that is executed by this data source.** @return The input format that is executed by this data source.*/@Internalpublic InputFormat<OUT, ?> getInputFormat() {return this.inputFormat;}/*** Pass a configuration to the InputFormat.** @param parameters Configuration parameters*/public DataSource<OUT> withParameters(Configuration parameters) {this.parameters = parameters;return this;}/** @return Configuration for the InputFormat. */public Configuration getParameters() {return this.parameters;}/*** Returns the {@link org.apache.flink.api.java.io.SplitDataProperties} for the {@link* org.apache.flink.core.io.InputSplit}s of this DataSource for configurations.** <p>SplitDataProperties can help to generate more efficient execution plans.** <p><b> IMPORTANT: Incorrect configuration of SplitDataProperties can cause wrong results!* </b>** @return The SplitDataProperties for the InputSplits of this DataSource.*/@PublicEvolvingpublic SplitDataProperties<OUT> getSplitDataProperties() {if (this.splitDataProperties == null) {this.splitDataProperties = new SplitDataProperties<OUT>(this);}return this.splitDataProperties;}// --------------------------------------------------------------------------------------------protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {String name =this.name != null? this.name: "at "+ dataSourceLocationName+ " ("+ inputFormat.getClass().getName()+ ")";if (name.length() > 150) {name = name.substring(0, 150);}@SuppressWarnings({"unchecked", "rawtypes"})GenericDataSourceBase<OUT, ?> source =new GenericDataSourceBase(this.inputFormat, new OperatorInformation<OUT>(getType()), name);source.setParallelism(parallelism);if (this.parameters != null) {source.getParameters().addAll(this.parameters);}if (this.splitDataProperties != null) {source.setSplitDataProperties(this.splitDataProperties);}return source;}
}

继续往下找:

package org.apache.flink.api.java.operators;import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;/*** Base class of all operators in the Java API.** @param <OUT> The type of the data set produced by this operator.* @param <O> The type of the operator, so that we can return it.*/
@Public
public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT> {
}

接着往下找:

package org.apache.flink.api.java;import org.apache.flink.annotation.Public;
import 省略中间的。。。。。。。。。。。。
import org.apache.flink.util.Preconditions;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;/*** A DataSet represents a collection of elements of the same type.** <p>A DataSet can be transformed into another DataSet by applying a transformation as for example** <ul>*   <li>{@link DataSet#map(org.apache.flink.api.common.functions.MapFunction)},*   <li>{@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},*   <li>{@link DataSet#join(DataSet)}, or*   <li>{@link DataSet#coGroup(DataSet)}.* </ul>** @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.*/
@Public
public abstract class DataSet<T> {
}

新版本已经废弃了直接操作DataSet,使用船新的DataSource来做批处理!!!

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y9VLVq1C-1647876791422)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220321213641398.png)]
可以看到现在使用的DataSet的实现类Operator的实现类DataSource。

DataStream有了实现类

每秒mock一条数据的数据源:

package com.zhiyong.flinkStudy;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.ArrayList;
import java.util.Random;/*** @program: study* @description: Flink的WordCount数据源,每秒产生1条数据* @author: zhiyong* @create: 2022-03-17 00:06**/
public class WordCountSource1ps implements SourceFunction<String> {private boolean needRun = true;@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {while (needRun){ArrayList<String> result = new ArrayList<>();for (int i = 0; i < 20; i++) {result.add("zhiyong"+i);}sourceContext.collect(result.get(new Random().nextInt(20)));Thread.sleep(1000);}}@Overridepublic void cancel() {needRun = false;}
}

DataStream程序:

package com.zhiyong.flinkStudy;import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;import java.util.Collection;/*** @program: study* @description: Flink的DataStreamDemo* @author: zhiyong* @create: 2022-03-17 00:06**/
public class FlinkDataStreamDemo1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//防止报网络资源不充分的错SingleOutputStreamOperator<Tuple2<String, Integer>> result1 = env.addSource(new WordCountSource1ps()).flatMap(new FlatMapFunction1()).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {@Overridepublic Object getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).sum(1);DataStream<Tuple2<String, Integer>> result2 = env.addSource(new WordCountSource1ps()).flatMap(new FlatMapFunction1()).keyBy(0)// 已经过时的方法.sum(1);//        SingleOutputStreamOperator<Tuple2<String, Integer>> result3 = env.addSource(new WordCountSource1ps())
//                .flatMap(new FlatMapFunction1())
//                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
//                    @Override
//                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
//                        return value.f0;
//                    }
//                })
//                .window(new WindowAssigner<Tuple2<String, Integer>, Window>() {
//                    @Override
//                    public Collection<Window> assignWindows(Tuple2<String, Integer> element, long timestamp, WindowAssignerContext context) {
//                        return null;
//                    }
//
//                    @Override
//                    public Trigger<Tuple2<String, Integer>, Window> getDefaultTrigger(StreamExecutionEnvironment env) {
//                        return null;
//                    }
//
//                    @Override
//                    public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig) {
//                        return null;
//                    }
//
//                    @Override
//                    public boolean isEventTime() {
//                        return false;
//                    }
//                })
//                .sum(1);SingleOutputStreamOperator<Tuple2<String, Integer>> result4 = env.addSource(new WordCountSource1ps()).flatMap(new FlatMapFunction1()).keyBy(0)// keyBy已经过时的方法.timeWindow(Time.seconds(30))// timeWindow已经过时的方法.sum(1);SingleOutputStreamOperator<Tuple2<String, Integer>> result5 = env.addSource(new WordCountSource1ps()).flatMap(new FlatMapFunction1()).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {@Overridepublic Object getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1);//result1.print();//result2.print();//result3.print();//result4.print();result5.print();env.execute("有这句才能执行任务,没有这句会Process finished with exit code 0直接结束");}public static class FlatMapFunction1 implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String cell : value.split("\\s+")) {out.collect(Tuple2.of(cell, 1));}}}
}

result1.print()执行后:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
10> (zhiyong16,1)
30> (zhiyong7,1)
32> (zhiyong14,1)
33> (zhiyong3,1)
29> (zhiyong12,1)
2> (zhiyong15,1)
10> (zhiyong16,2)
2> (zhiyong15,2)
10> (zhiyong16,3)
30> (zhiyong7,2)
17> (zhiyong18,1)
30> (zhiyong7,3)
35> (zhiyong19,1)
35> (zhiyong19,2)
4> (zhiyong4,1)
18> (zhiyong5,1)
35> (zhiyong8,1)
18> (zhiyong5,2)
25> (zhiyong11,1)
23> (zhiyong2,1)
23> (zhiyong2,2)
25> (zhiyong11,2)
35> (zhiyong8,2)
18> (zhiyong5,3)
35> (zhiyong19,3)
35> (zhiyong8,3)
33> (zhiyong3,2)
35> (zhiyong19,4)
35> (zhiyong8,4)
35> (zhiyong8,5)
4> (zhiyong0,1)
23> (zhiyong2,3)
32> (zhiyong14,2)
10> (zhiyong10,1)
25> (zhiyong11,3)
35> (zhiyong17,1)
35> (zhiyong17,2)
32> (zhiyong14,3)Process finished with exit code 130

SingleOutputStreamOperator这种新类:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x41eAtuW-1647876791424)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220317002947550.png)]

官方介绍:

SingleOutputStreamOperator represents a user defined transformation applied on a DataStream with one predefined output type.
Type parameters:
<T> – The type of the elements in this stream.

显然这是一种DataStream的继承类。

如果遇到了报错:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)at akka.dispatch.OnComplete.internal(Future.scala:300)at akka.dispatch.OnComplete.internal(Future.scala:297)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyat org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:537)at akka.actor.Actor.aroundReceive$(Actor.scala:535)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)at akka.actor.ActorCell.invoke(ActorCell.scala:548)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)at akka.dispatch.Mailbox.run(Mailbox.scala:231)at akka.dispatch.Mailbox.exec(Mailbox.scala:243)... 4 more
Caused by: java.io.IOException: Insufficient number of network buffers: required 37, but only 3 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:386)at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:364)at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:279)at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:151)at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.setup(BufferWritingResultPartition.java:95)at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:969)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:664)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)at java.lang.Thread.run(Thread.java:750)Process finished with exit code 1

这是因为网络资源不充分,最简单的方式就是设置并行度来降低网络要求:

env.setParallelism(1);

之后result2.print()可以正常输出:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(zhiyong3,1)
(zhiyong0,1)
(zhiyong14,1)
(zhiyong3,2)
(zhiyong18,1)
(zhiyong1,1)
(zhiyong13,1)
(zhiyong19,1)
(zhiyong13,2)
(zhiyong4,1)
(zhiyong3,3)
(zhiyong9,1)
(zhiyong0,2)
(zhiyong12,1)
(zhiyong10,1)
(zhiyong6,1)
(zhiyong19,2)
(zhiyong18,2)
(zhiyong15,1)
(zhiyong6,2)
(zhiyong4,2)
(zhiyong16,1)
(zhiyong15,2)
(zhiyong6,3)
(zhiyong10,2)
(zhiyong4,3)Process finished with exit code 130

继承类直接手动强转为父类,调用父类的方法一般不会有啥毛病。

再来试试DataStream的窗口:

SingleOutputStreamOperator<Tuple2<String, Integer>> result4 = env.addSource(new WordCountSource1ps()).flatMap(new FlatMapFunction1()).keyBy(0)// keyBy已经过时的方法.timeWindow(Time.seconds(30))// timeWindow已经过时的方法.sum(1);

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XUWQDA9U-1647876791425)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220317224544571.png)]

虽然还能用,但是已经是过时的方法,点进去看timeWindow:

    /*** Windows this {@code KeyedStream} into tumbling time windows.** <p>This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or {@code* .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic set* using {@link* org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}** @param size The size of the window.* @deprecated Please use {@link #window(WindowAssigner)} with either {@link*     TumblingEventTimeWindows} or {@link TumblingProcessingTimeWindows}. For more information,*     see the deprecation notice on {@link TimeCharacteristic}*/@Deprecatedpublic WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {return window(TumblingProcessingTimeWindows.of(size));} else {return window(TumblingEventTimeWindows.of(size));}}

显然是不推荐继续使用timeWindow算子了。

result3的window算子直接new的原生WindowAssigner对象用起来显然是有点复杂,源码也写了可以使用.window(TumblingEventTimeWindows.of(size))或者.window(TumblingProcessingTimeWindows.of(size)),即使用滚动的时间时间窗口或者滚动的处理时间窗口。

点到WindowAssigner看到:

package org.apache.flink.streaming.api.windowing.assigners;import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;import java.io.Serializable;
import java.util.Collection;/*** A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.** <p>In a window operation, elements are grouped by their key (if available) and by the windows to* which it was assigned. The set of elements with the same key and window is called a pane. When a* {@link Trigger} decides that a certain pane should fire the {@link* org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied to produce output* elements for that pane.** @param <T> The type of elements that this WindowAssigner can assign windows to.* @param <W> The type of {@code Window} that this assigner assigns.*/
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;/*** Returns a {@code Collection} of windows that should be assigned to the element.** @param element The element to which windows should be assigned.* @param timestamp The timestamp of the element.* @param context The {@link WindowAssignerContext} in which the assigner operates.*/public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);/** Returns the default trigger associated with this {@code WindowAssigner}. */public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);/*** Returns a {@link TypeSerializer} for serializing windows that are assigned by this {@code* WindowAssigner}.*/public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);/*** Returns {@code true} if elements are assigned to windows based on event time, {@code false}* otherwise.*/public abstract boolean isEventTime();/*** A context provided to the {@link WindowAssigner} that allows it to query the current* processing time.** <p>This is provided to the assigner by its containing {@link* org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}, which, in turn, gets* it from the containing {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.*/public abstract static class WindowAssignerContext {/** Returns the current processing time. */public abstract long getCurrentProcessingTime();}
}

父类WindowAssigner有子类:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-C8YhSxjZ-1647876791426)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220317225928735.png)]

除了2种滚动窗口,当然还有2种滑动窗口。

简单使用下滑动窗口:

SingleOutputStreamOperator<Tuple2<String, Integer>> result5 = env.addSource(new WordCountSource1ps()).flatMap(new FlatMapFunction1()).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {@Overridepublic Object getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1);

输出:

(zhiyong17,1)
(zhiyong17,1)
(zhiyong7,1)
(zhiyong15,1)
(zhiyong19,1)
(zhiyong4,2)
(zhiyong4,2)
(zhiyong0,1)
(zhiyong15,1)
(zhiyong7,1)
(zhiyong9,2)
(zhiyong18,1)
(zhiyong19,1)
(zhiyong8,1)Process finished with exit code 130

算子API变化了很多,过时的老API目前也还能凑合着用,以后肯定是要慢慢习惯新API的,老API搞不好哪个版本就不能用了。

DSL(Table API)更新

构造执行环境的设置对象时发现嘴强王者的BlinkPlanner居然作废了!!!

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()// useBlinkPlanner()已过期.inStreamingMode().build();

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sDMaYXoI-1647876791427)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220318001030442.png)]

点进去发现源码写着:

/*** @deprecated The old planner has been removed in Flink 1.14. Since there is only one*     planner left (previously called the 'blink' planner), this setting will throw an*     exception.*/
@Deprecated
public Builder useOldPlanner() {throw new TableException("The old planner has been removed in Flink 1.14. "+ "Please upgrade your table program to use the default "+ "planner (previously called the 'blink' planner).");
}/*** Sets the Blink planner as the required module.** <p>This is the default behavior.** @deprecated The old planner has been removed in Flink 1.14. Since there is only one*     planner left (previously called the 'blink' planner), this setting is obsolete and*     will be removed in future versions.*/
@Deprecated
public Builder useBlinkPlanner() {return this;
}

好家伙,这2个Planner都要废弃了。

package com.zhiyong.flinkStudy;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;//可以使用$("变量名")/*** @program: study* @description: 使用TableAPI实现流批一体* @author: zhiyong* @create: 2022-03-17 23:52**/
public class FlinkTableApiDemo1 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Flink1.14不需要设置PlannerEnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> data = env.addSource(new WordCountSource1ps());Table table1 = tableEnv.fromDataStream(data, "word");//过时Table table1_1 = table1.where($("word").like("%5%"));System.out.println("tableEnv.explain(table1_1) = " + tableEnv.explain(table1_1));//过时tableEnv.toAppendStream(table1_1, Row.class).print("table1_1");//过时System.out.println("env.getExecutionPlan() = " + env.getExecutionPlan());env.execute();}
}

执行后:

tableEnv.explain(table1_1) = == Abstract Syntax Tree ==
LogicalFilter(condition=[LIKE($0, _UTF-16LE'%5%')])
+- LogicalTableScan(table=[[Unregistered_DataStream_1]])== Optimized Physical Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])== Optimized Execution Plan ==
Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])
+- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[word])env.getExecutionPlan() = {"nodes" : [ {"id" : 1,"type" : "Source: Custom Source","pact" : "Data Source","contents" : "Source: Custom Source","parallelism" : 1}, {"id" : 4,"type" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])","pact" : "Operator","contents" : "SourceConversion(table=[Unregistered_DataStream_1], fields=[word])","parallelism" : 1,"predecessors" : [ {"id" : 1,"ship_strategy" : "FORWARD","side" : "second"} ]}, {"id" : 5,"type" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])","pact" : "Operator","contents" : "Calc(select=[word], where=[LIKE(word, _UTF-16LE'%5%')])","parallelism" : 1,"predecessors" : [ {"id" : 4,"ship_strategy" : "FORWARD","side" : "second"} ]}, {"id" : 6,"type" : "SinkConversionToRow","pact" : "Operator","contents" : "SinkConversionToRow","parallelism" : 1,"predecessors" : [ {"id" : 5,"ship_strategy" : "FORWARD","side" : "second"} ]}, {"id" : 7,"type" : "Sink: Print to Std. Out","pact" : "Data Sink","contents" : "Sink: Print to Std. Out","parallelism" : 36,"predecessors" : [ {"id" : 6,"ship_strategy" : "REBALANCE","side" : "second"} ]} ]
}
table1_1:8> +I[zhiyong5]
table1_1:9> +I[zhiyong5]
table1_1:10> +I[zhiyong5]
table1_1:11> +I[zhiyong15]
table1_1:12> +I[zhiyong15]Process finished with exit code 130

浏览器:

https://flink.apache.org/visualizer/

将上方打印出的JSON字符串粘贴到网站的文本框,点Draw:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sW1sMumo-1647876791428)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220318003647976.png)]

可以看到DAG图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WU40bgUt-1647876791429)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220318003736080.png)]

虽然可以正常使用Table API,但是过时方法太多了:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XnVRBq38-1647876791429)(E:\study\flink\Flink1.14.3流批一体体验.assets\image-20220318003843589.png)]

例如过时方法fromDataStream:

/*** Converts the given {@link DataStream} into a {@link Table} with specified field names.** <p>There are two modes for mapping original fields to the fields of the {@link Table}:** <p>1. Reference input fields by name: All fields in the schema definition are referenced by* name (and possibly renamed using an alias (as). Moreover, we can define proctime and rowtime* attributes at arbitrary positions using arbitrary names (except those that exist in the* result schema). In this mode, fields can be reordered and projected out. This mode can be* used for any input type, including POJOs.** <p>Example:** <pre>{@code* DataStream<Tuple2<String, Long>> stream = ...* // reorder the fields, rename the original 'f0' field to 'name' and add event-time* // attribute named 'rowtime'* Table table = tableEnv.fromDataStream(stream, "f1, rowtime.rowtime, f0 as 'name'");* }</pre>** <p>2. Reference input fields by position: In this mode, fields are simply renamed. Event-time* attributes can replace the field on their position in the input data (if it is of correct* type) or be appended at the end. Proctime attributes must be appended at the end. This mode* can only be used if the input type has a defined field order (tuple, case class, Row) and* none of the {@code fields} references a field of the input type.** <p>Example:** <pre>{@code* DataStream<Tuple2<String, Long>> stream = ...* // rename the original fields to 'a' and 'b' and extract the internally attached timestamp into an event-time* // attribute named 'rowtime'* Table table = tableEnv.fromDataStream(stream, "a, b, rowtime.rowtime");* }</pre>** @param dataStream The {@link DataStream} to be converted.* @param fields The fields expressions to map original fields of the DataStream to the fields*     of the {@link Table}.* @param <T> The type of the {@link DataStream}.* @return The converted {@link Table}.* @deprecated use {@link #fromDataStream(DataStream, Expression...)}*/
@Deprecated
<T> Table fromDataStream(DataStream<T> dataStream, String fields);

例如过时方法explain:

/*** Returns the AST of the specified Table API and SQL queries and the execution plan to compute* the result of the given {@link Table}.** @param table The table for which the AST and execution plan will be returned.* @deprecated use {@link Table#explain(ExplainDetail...)}.*/
@Deprecated
String explain(Table table);

还有过时方法toAppendStream:

/*** Converts the given {@link Table} into an append {@link DataStream} of a specified type.** <p>The {@link Table} must only have insert (append) changes. If the {@link Table} is also* modified by update or delete changes, the conversion will fail.** <p>The fields of the {@link Table} are mapped to {@link DataStream} fields as follows:** <ul>*   <li>{@link Row} and {@link org.apache.flink.api.java.tuple.Tuple} types: Fields are mapped*       by position, field types must match.*   <li>POJO {@link DataStream} types: Fields are mapped by field name, field types must match.* </ul>** @param table The {@link Table} to convert.* @param clazz The class of the type of the resulting {@link DataStream}.* @param <T> The type of the resulting {@link DataStream}.* @return The converted {@link DataStream}.* @deprecated Use {@link #toDataStream(Table, Class)} instead. It integrates with the new type*     system and supports all kinds of {@link DataTypes} that the table runtime can produce.*     The semantics might be slightly different for raw and structured types. Use {@code*     toDataStream(DataTypes.of(TypeInformation.of(Class)))} if {@link TypeInformation} should*     be used as source of truth.*/
@Deprecated
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);

根据源码API替换为新方法后:

package com.zhiyong.flinkStudy;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import static org.apache.flink.table.api.Expressions.$;//可以使用$("变量名")/*** @program: study* @description: 使用TableAPI实现流批一体* @author: zhiyong* @create: 2022-03-17 23:52**/
public class FlinkTableApiDemo1 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Flink1.14不需要设置PlannerEnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> data = env.addSource(new WordCountSource1ps());System.out.println("***********新方法**************");ArrayList<String> strings = new ArrayList<>();strings.add("f0");//必须写f0List<DataType> dataTypes = new ArrayList<DataType>();dataTypes.add(DataTypes.STRING());Schema schema = Schema.newBuilder().fromFields(strings, dataTypes).build();List<Schema.UnresolvedColumn> columns = schema.getColumns();for (Schema.UnresolvedColumn column : columns) {System.out.println("column = " + column);}Table table2 = tableEnv.fromDataStream(data, schema);Table table2_1 = table2.where($("f0").like("%5%"));//必须写f0System.out.println("table2_1.explain() = " + table2_1.explain(ExplainDetail.JSON_EXECUTION_PLAN));tableEnv.toDataStream(table2_1,Row.class).print("table2_1");System.out.println("env.getExecutionPlan() = " + env.getExecutionPlan());env.execute();}
}

字段名称必须写【f0】,还没来得及扒源码仔细研究为何是这样。不这么写会报错:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to find a field named 'word' in the physical data type derived from the given type information for schema declaration. Make sure that the type information is not a generic raw type. Currently available fields are: [f0]at org.apache.flink.table.catalog.SchemaTranslator.patchDataTypeFromColumn(SchemaTranslator.java:327)at org.apache.flink.table.catalog.SchemaTranslator.patchDataTypeFromDeclaredSchema(SchemaTranslator.java:314)at org.apache.flink.table.catalog.SchemaTranslator.createConsumingResult(SchemaTranslator.java:213)at org.apache.flink.table.catalog.SchemaTranslator.createConsumingResult(SchemaTranslator.java:158)at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromStreamInternal(StreamTableEnvironmentImpl.java:294)at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:232)at com.zhiyong.flinkStudy.FlinkTableApiDemo1.main(FlinkTableApiDemo1.java:65)Process finished with exit code 1

DSL(Table API)进行批处理

之前进行了流处理,接下来试试批处理。

由于批处理已经不直接使用DataSet,而是使用DataSource,故如下算子已经消失:

tableEnv.fromDataSet(data1);//老版本Flink中,data1是DataSet的实例对象,该API可以从DataSet创建Table类的实例对象
tableEnv.toDataSet(table1);//老版本Flink中,table1是Table的实例对象,该API可以转出DataSet对象

整个DSL方式如果按照如下方式构建TableEnvironment:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

将会很鸡肋,这样产生的tableEnv实例对象可用方法很少。

但是可以使用如下方式:

package com.zhiyong.flinkStudy;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;/*** @program: study* @description: Flink使用DSL实现流批一体* @author: zhiyong* @create: 2022-03-18 01:48**/
public class FlinkTableApiDemo2 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);String[] str1 = {"hehe1", "haha1", "哈哈1", "哈哈1"};Table table1 = streamTableEnv.fromValues(str1);Table table1_1 = table1.where($("f0").like("%h%"));DataStream<Row> batchTable1 = streamTableEnv.toDataStream(table1_1);batchTable1.print();System.out.println("*************************");DataStreamSource<String> dataStream2 = env.fromElements(str1);Table table2 = streamTableEnv.fromDataStream(dataStream2);Table table2_1 = table2.where($("f0").like("%哈%"));DataStream<Row> batchTable2 = streamTableEnv.toDataStream(table2_1);batchTable2.print();env.execute();}}

执行后:

log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
*************************
20> +I[哈哈1]
21> +I[哈哈1]
20> +I[hehe1]
21> +I[haha1]Process finished with exit code 0

可以发现,Flink1.14.3中,已经可以直接使用流的方式处理批,而不像Flink1.8老版本那样还区分stream和batch。虽然现在还保留了batch的Env及API,但是已经废弃的差不多了,以后可能再也用不上了。事实证明,在Flink1.14.3中,DSL方式的Table API层面已经可以不用做区分,统一转换为DataStream即可。而DataStream也可以不区分是stream环境的Table还是batch环境的Table。

使用DataStream实现流批一体

package com.zhiyong.flinkStudy;import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @program: study* @description: Flink的SQL实现流批一体* @author: zhiyong* @create: 2022-03-21 22:32**/
public class FlinkSqlApiDemo1 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);DataStreamSource<String> data1 = env.addSource(new WordCountSource1ps());String inputPath = "E:/study/flink/data/test1";DataStreamSource<String> data2 = env.readTextFile(inputPath);data1.print("data1");data2.print("data2");env.execute();}
}

执行后:

data1> zhiyong19
data2> 好
data2> 喜欢
data2> 数码宝贝
data2> 宝宝 宝贝
data2> 宝贝 好 喜欢
data2> 123
data2> 123
data2> 123
data2> 哈哈 haha
data2> hehe 呵呵 呵呵 呵呵 呵呵
data2> hehe
data1> zhiyong17
data1> zhiyong7
data1> zhiyong7
data1> zhiyong5
data1> zhiyong11
data1> zhiyong18
data1> zhiyong14
data1> zhiyong13
data1> zhiyong5
data1> zhiyong8Process finished with exit code 130

可以看出,Flink1.14.3直接使用DataStream即可。不管是批还是流,直接当作流来处理。

使用DSL(Table API)实现流批一体

package com.zhiyong.flinkStudy;import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;/*** @program: study* @description: Flink的SQL实现流批一体* @author: zhiyong* @create: 2022-03-21 22:32**/
public class FlinkSqlApiDemo1 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);DataStreamSource<String> data1 = env.addSource(new WordCountSource1ps());String inputPath = "E:/study/flink/data/test1";DataStreamSource<String> data2 = env.readTextFile(inputPath);Table streamTable = streamTableEnv.fromDataStream(data1);Table batchTable = streamTableEnv.fromDataStream(data2);Table streamTable1 = streamTable.where($("f0").like("%2%"));Table batchTable1 = batchTable.where($("f0").like("%2%"));DataStream<Row> s1 = streamTableEnv.toDataStream(streamTable1);DataStream<Row> s2 = streamTableEnv.toDataStream(batchTable1);s1.print();s2.print();env.execute();}
}

执行后:

+I[123]
+I[123]
+I[123]
+I[zhiyong2]
+I[zhiyong12]
+I[zhiyong12]
+I[zhiyong12]
+I[zhiyong12]
+I[zhiyong12]Process finished with exit code 130

这样我们在使用Flink时,只要运算逻辑一致,就可以使用同一套算子包,不用刻意区分流和批。Flink1.8老版本还需要写2套程序,至少从Flink1.14.3开始,不需要了。代码复用性提高,意味着dev、debug及之后的op工作量大大减少!这一点目前应该是Spark望尘莫及的。

使用SQL实现流批一体

由于SQL是Table的更高层封装,更适合不需要关心平台组件底层实现的业务开发者【也就是俗称的SQL Boy】使用,既然Table层面已经实现了流批一体,那么SQL层面必然也可以实现。

package com.zhiyong.flinkStudy;import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;/*** @program: study* @description: Flink的SQL实现流批一体* @author: zhiyong* @create: 2022-03-21 22:32**/
public class FlinkSqlApiDemo1 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env);DataStreamSource<String> data1 = env.addSource(new WordCountSource1ps());String inputPath = "E:/study/flink/data/test1";DataStreamSource<String> data2 = env.readTextFile(inputPath);Table streamTable = streamTableEnv.fromDataStream(data1);Table batchTable = streamTableEnv.fromDataStream(data2);Table streamTable1 = streamTable.where($("f0").like("%2%"));Table batchTable1 = batchTable.where($("f0").like("%2%"));Table t1 = streamTableEnv.sqlQuery("SeLeCt UPPER(f0) frOm " + streamTable1);Table t2 = streamTableEnv.sqlQuery("SeLeCt UPPER(f0) frOm " + batchTable1);DataStream<Row> s1 = streamTableEnv.toDataStream(t1);DataStream<Row> s2 = streamTableEnv.toDataStream(t2);s1.print();s2.print();env.execute();}
}

执行后:

+I[123]
+I[123]
+I[123]
+I[ZHIYONG2]
+I[ZHIYONG2]
+I[ZHIYONG2]
+I[ZHIYONG12]
+I[ZHIYONG12]
+I[ZHIYONG12]
+I[ZHIYONG12]
+I[ZHIYONG12]
+I[ZHIYONG2]
+I[ZHIYONG12]Process finished with exit code 130

同样证明,Flink1.14.3中使用同一套API即可实现SQL方式的流批一体。有了SQL层面的流批一体,写业务代码的SQL Boy们就更无需关心底层实现了。技术的发展,总是让业务人员的技术水平越来越低。。。不过这不是坏事。

总结

在Flink1.14.3中,不管是顶层的SQL、次顶层的DSL还是中层的DataStream都可以实现流批一体。SQL调用DSL,DSL调用DataStream,SQL和DSL调用后都是Table对象,而Flink1.14.3中Table和DataStream又可以无缝切换,使用起来灰常方便。较Flink1.8,API大幅变化,可能对SQL Boy们来讲没什么影响,但是对平台及组件二开人员还是造成了一定阻碍,需要再投入时间和精力研习变化。但是从统一了流批API,减少开发、测试、运维工作量来说,付出的代价是值得的。

对懂底层的平台及组件二开人员来说,Flink1.14是个当之无愧的里程碑。至于SQL Boy们,懂也好,不懂也罢。DSL用户不喜欢几千行的SQL,SQL Boy们不喜欢从上到下顺序执行。黑底白斑和白底黑斑的斑马们,可能一时半会儿也不能理解彼此。天之道,不争而善胜。

这篇关于Flink1.14.3流批一体体验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/314138

相关文章

C++入门(06)安装QT并快速测试体验一个简单的C++GUI项目

文章目录 1. 清华镜像源下载2. 安装3. 开始菜单上的 QT 工具4. 打开 Qt Creator5. 简单的 GUI C++ 项目5.1 打开 Qt Creator 并创建新项目5.2 设计界面5.3 添加按钮的点击事件5.4 编译并运行项目 6. 信号和槽(Signals and Slots) 这里用到了C++类与对象的很多概念 1. 清华镜像源下载 https://

P11019 「LAOI-6」[太阳]] 请使用最新版手机 QQ 体验新功能

English statement. You must submit your code at the Chinese version of the statement. 题目描述 你的 QQ 收到了一条新消息!但是你很生气,因为你看不到别人在手机 QQ 上发送的超级表情。 消息形如一个字符串 S,包含且仅包含一个超级表情。具体地,我们将 S 的拼音采用驼峰命名法,可以化为如下形

黑神话:悟空》增加草地绘制距离MOD使游戏场景看起来更加广阔与自然,增强了游戏的沉浸式体验

《黑神话:悟空》增加草地绘制距离MOD为玩家提供了一种全新的视觉体验,通过扩展游戏中草地的绘制距离,增加了场景的深度和真实感。该MOD通过增加草地的绘制距离,使游戏场景看起来更加广阔与自然,增强了游戏的沉浸式体验。 增加草地绘制距离MOD安装 1、在%userprofile%AppDataLocalb1SavedConfigWindows目录下找到Engine.ini文件。 2、使用记事本编辑

Xinstall助力App全渠道统计,参数传递下载提升用户体验!

在移动互联网时代,App已成为我们日常生活中不可或缺的一部分。然而,对于App开发者来说,如何有效地推广和运营自己的应用,却是一个不小的挑战。尤其是在面对众多渠道、复杂的数据统计和用户需求多样化的情况下,如何精准地触达目标用户,提升用户的下载、安装和活跃度,更是考验着每一个运营者的智慧。 今天,我们就来揭秘一个能够帮助App开发者解决这些痛点的神器——Xinstall。作为一家一站式App全渠道

南卡科技“满分之选”全新开放式耳机发布,打造超越Pro的极致体验!

在音频技术的不断革新中,南卡品牌以其深厚的声学底蕴和对创新的不懈追求,再次为市场带来惊喜。今天,我们自豪地宣布,南卡OE Pro2开放式蓝牙耳机正式亮相,它不仅代表了南卡在开放式耳机领域的技术巅峰,更是对音质和佩戴舒适度的双重革新。 31°悬浮倾斜设计,无感佩戴的新高度 南卡OE Pro2将对耳机舒适性的诠释拉升到一个新境界,采用了开放式佩戴设计,彻底告别了传统耳机的堵塞感。基于上万耳

国内领先线上运动平台:如何借助AI技术实现业务腾飞与用户体验升级

&nbsp;“ 从智能训练到身体分析,再到辅助判决,AI技术正以惊人的速度渗透进体育和健身领域,为运动员和健身爱好者提供了前所未有的个性化体验。 ” AI,运动的智能伴侣 在巴黎奥运会上,AI技术的运用成为了焦点。它不仅为运动员提供了精准的训练指导,还通过对运动员身体状况的实时分析,帮助他们避免潜在的运动伤害,提升竞技状态。 同时,AI在辅助判决上的应用,确保了比赛的公平与

Flink1.12集成Hive打造自己的批流一体数仓

简介 小编在去年之前分享过参与的实时数据平台的建设,关于实时数仓也进行过分享。客观的说,我们当时做不到批流一体,小编当时的方案是将实时消息数据每隔15分钟文件同步到离线数据平台,然后用同一套SQL代码进行离线入库操作。 但是随着 Flink1.12版本的发布,Flink使用HiveCatalog可以通过批或者流的方式来处理Hive中的表。这就意味着Flink既可以作为Hive的一个批处

个性化阅读体验:Spring Boot框架的图书推荐解决方案

第5章 系统详细设计 5.1前台首页功能模块 图书个性化推荐系统,在前台首页可以查看首页、图书信息、好书推荐、留言反馈、个人中心、后台管理等内容,如图5-1所示。 图5-1首页功能界面图 学生注册、登录,在学生注册页面可以填写学号、密码、学生姓名、性别、出生日期、联系电话、班级等信息进行注册、登录,如图5-2所示。 图5-2学生注册、登录界面图 图书信息,在图书信息页面通过查看图书

智能匹配新高度:相亲交友系统如何运用AI技术提升用户体验

在数字化时代,相亲交友系统正逐渐融入人工智能(AI)技术,以提升用户体验和匹配效率。AI的引入不仅改变了传统的交友方式,还为用户带来了更加个性化和精准的交友体验。以下是一篇关于如何运用AI技术提升相亲交友系统用户体验的文章。 智能匹配新高度:相亲交友系统如何运用AI技术提升用户体验 随着人工智能技术的飞速发展,相亲交友系统正迎来一场革命。AI的引入不仅提高了匹配的精准度,还极大地丰富了

Windows系统使用小皮面板搭建Kodcloud结合内网穿透体验私有云盘

文章目录 1.前言2. Kodcloud网站搭建2.1. Kodcloud下载和安装2.2 Kodcloud网页测试 3. cpolar内网穿透的安装和注册4. 本地网页发布4.1 Cpolar云端设置4.2 Cpolar本地设置 5. 公网访问测试6.结语 1.前言 本文主要为大家介绍一款国人自研的在线Web文件管理器可道云,能够支持在线管理图片、播放音乐视频、编辑和查看文件