本文主要是介绍聊聊flink Table的Distinct Aggregation,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下flink Table的Distinct Aggregation
实例
-
//Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation.
-
Table orders = tableEnv.scan("Orders");
-
// Distinct aggregation on group by
-
Table groupByDistinctResult = orders
-
.groupBy("a")
-
.select("a, b.sum.distinct as d");
-
// Distinct aggregation on time window group by
-
Table groupByWindowDistinctResult = orders
-
.window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")
-
.select("a, b.sum.distinct as d");
-
// Distinct aggregation on over window
-
Table result = orders
-
.window(Over
-
.partitionBy("a")
-
.orderBy("rowtime")
-
.preceding("UNBOUNDED_RANGE")
-
.as("w"))
-
.select("a, b.avg.distinct over w, b.max over w, b.min over w");
-
//User-defined aggregation function can also be used with DISTINCT modifiers
-
Table orders = tEnv.scan("Orders");
-
// Use distinct aggregation for user-defined aggregate functions
-
tEnv.registerFunction("myUdagg", new MyUdagg());
-
orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");
- Distinct Aggregation可以用于内置的及自定义的aggregation function;内置的aggregation function诸如GroupBy Aggregation、GroupBy Window Aggregation、Over Window Aggregation
AggregateFunction
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/functions/AggregateFunction.scala
-
/**
-
* Base class for User-Defined Aggregates.
-
*
-
* The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom
-
* methods. An [[AggregateFunction]] needs at least three methods:
-
* - createAccumulator,
-
* - accumulate, and
-
* - getValue.
-
*
-
* There are a few other methods that can be optional to have:
-
* - retract,
-
* - merge, and
-
* - resetAccumulator
-
*
-
* All these methods must be declared publicly, not static and named exactly as the names
-
* mentioned above. The methods createAccumulator and getValue are defined in the
-
* [[AggregateFunction]] functions, while other methods are explained below.
-
*
-
*
-
* {{{
-
* Processes the input values and update the provided accumulator instance. The method
-
* accumulate can be overloaded with different custom types and arguments. An AggregateFunction
-
* requires at least one accumulate() method.
-
*
-
* @param accumulator the accumulator which contains the current aggregated results
-
* @param [user defined inputs] the input value (usually obtained from a new arrived data).
-
*
-
* def accumulate(accumulator: ACC, [user defined inputs]): Unit
-
* }}}
-
*
-
*
-
* {{{
-
* Retracts the input values from the accumulator instance. The current design assumes the
-
* inputs are the values that have been previously accumulated. The method retract can be
-
* overloaded with different custom types and arguments. This function must be implemented for
-
* datastream bounded over aggregate.
-
*
-
* @param accumulator the accumulator which contains the current aggregated results
-
* @param [user defined inputs] the input value (usually obtained from a new arrived data).
-
*
-
* def retract(accumulator: ACC, [user defined inputs]): Unit
-
* }}}
-
*
-
*
-
* {{{
-
* Merges a group of accumulator instances into one accumulator instance. This function must be
-
* implemented for datastream session window grouping aggregate and dataset grouping aggregate.
-
*
-
* @param accumulator the accumulator which will keep the merged aggregate results. It should
-
* be noted that the accumulator may contain the previous aggregated
-
* results. Therefore user should not replace or clean this instance in the
-
* custom merge method.
-
* @param its an [[java.lang.Iterable]] pointed to a group of accumulators that will be
-
* merged.
-
*
-
* def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit
-
* }}}
-
*
-
*
-
* {{{
-
* Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
-
* dataset grouping aggregate.
-
*
-
* @param accumulator the accumulator which needs to be reset
-
*
-
* def resetAccumulator(accumulator: ACC): Unit
-
* }}}
-
*
-
*
-
* @tparam T the type of the aggregation result
-
* @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
-
* aggregated values which are needed to compute an aggregation result.
-
* AggregateFunction represents its state using accumulator, thereby the state of the
-
* AggregateFunction must be put into the accumulator.
-
*/
-
abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
-
/**
-
* Creates and init the Accumulator for this [[AggregateFunction]].
-
*
-
* @return the accumulator with the initial value
-
*/
-
def createAccumulator(): ACC
-
/**
-
* Called every time when an aggregation result should be materialized.
-
* The returned value could be either an early and incomplete result
-
* (periodically emitted as data arrive) or the final result of the
-
* aggregation.
-
*
-
* @param accumulator the accumulator which contains the current
-
* aggregated results
-
* @return the aggregation result
-
*/
-
def getValue(accumulator: ACC): T
-
/**
-
* Returns true if this AggregateFunction can only be applied in an OVER window.
-
*
-
* @return true if the AggregateFunction requires an OVER window, false otherwise.
-
*/
-
def requiresOver: Boolean = false
-
/**
-
* Returns the TypeInformation of the AggregateFunction's result.
-
*
-
* @return The TypeInformation of the AggregateFunction's result or null if the result type
-
* should be automatically inferred.
-
*/
-
def getResultType: TypeInformation[T] = null
-
/**
-
* Returns the TypeInformation of the AggregateFunction's accumulator.
-
*
-
* @return The TypeInformation of the AggregateFunction's accumulator or null if the
-
* accumulator type should be automatically inferred.
-
*/
-
def getAccumulatorType: TypeInformation[ACC] = null
-
}
- AggregateFunction继承了UserDefinedFunction;它有两个泛型,一个T表示value的泛型,一个ACC表示Accumulator的泛型;它定义了createAccumulator、getValue、getResultType、getAccumulatorType方法(
这几个方法中子类必须实现createAccumulator、getValue方法
) - 对于AggregateFunction,有一个accumulate方法这里没定义,但是需要子类定义及实现,该方法接收ACC,T两个参数,返回void;另外还有retract、merge、resetAccumulator三个方法是可选的,需要子类根据情况去定义及实现
- 对于datastream bounded over aggregate操作,要求实现restract方法,该方法接收ACC,T两个参数,返回void;对于datastream session window grouping aggregate以及dataset grouping aggregate操作,要求实现merge方法,该方法接收ACC,java.lang.Iterable<T>两个参数,返回void;对于dataset grouping aggregate操作,要求实现resetAccumulator方法,该方法接收ACC参数,返回void
小结
- Table的Distinct Aggregation可以用于内置的及自定义的aggregation function;内置的aggregation function诸如GroupBy Aggregation、GroupBy Window Aggregation、Over Window Aggregation
- AggregateFunction继承了UserDefinedFunction;它有两个泛型,一个T表示value的泛型,一个ACC表示Accumulator的泛型;它定义了createAccumulator、getValue、getResultType、getAccumulatorType方法(
这几个方法中子类必须实现createAccumulator、getValue方法
) - 对于AggregateFunction,有一个accumulate方法这里没定义,但是需要子类定义及实现,该方法接收ACC,T两个参数,返回void;另外还有retract、merge、resetAccumulator三个方法是可选的,需要子类根据情况去定义及实现(
对于datastream bounded over aggregate操作,要求实现restract方法,该方法接收ACC,T两个参数,返回void;对于datastream session window grouping aggregate以及dataset grouping aggregate操作,要求实现merge方法,该方法接收ACC,java.lang.Iterable\<T\>两个参数,返回void;对于dataset grouping aggregate操作,要求实现resetAccumulator方法,该方法接收ACC参数,返回void
)
doc
- Aggregations
这篇关于聊聊flink Table的Distinct Aggregation的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!