聊聊flink Table的Distinct Aggregation

2024-06-02 17:48

本文主要是介绍聊聊flink Table的Distinct Aggregation,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

本文主要研究一下flink Table的Distinct Aggregation

实例

 
  1. //Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation.

  2. Table orders = tableEnv.scan("Orders");

  3. // Distinct aggregation on group by

  4. Table groupByDistinctResult = orders

  5. .groupBy("a")

  6. .select("a, b.sum.distinct as d");

  7. // Distinct aggregation on time window group by

  8. Table groupByWindowDistinctResult = orders

  9. .window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")

  10. .select("a, b.sum.distinct as d");

  11. // Distinct aggregation on over window

  12. Table result = orders

  13. .window(Over

  14. .partitionBy("a")

  15. .orderBy("rowtime")

  16. .preceding("UNBOUNDED_RANGE")

  17. .as("w"))

  18. .select("a, b.avg.distinct over w, b.max over w, b.min over w");

  19.  
  20. //User-defined aggregation function can also be used with DISTINCT modifiers

  21. Table orders = tEnv.scan("Orders");

  22. // Use distinct aggregation for user-defined aggregate functions

  23. tEnv.registerFunction("myUdagg", new MyUdagg());

  24. orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");

  • Distinct Aggregation可以用于内置的及自定义的aggregation function;内置的aggregation function诸如GroupBy Aggregation、GroupBy Window Aggregation、Over Window Aggregation

AggregateFunction

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/functions/AggregateFunction.scala

 
  1. /**

  2. * Base class for User-Defined Aggregates.

  3. *

  4. * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom

  5. * methods. An [[AggregateFunction]] needs at least three methods:

  6. * - createAccumulator,

  7. * - accumulate, and

  8. * - getValue.

  9. *

  10. * There are a few other methods that can be optional to have:

  11. * - retract,

  12. * - merge, and

  13. * - resetAccumulator

  14. *

  15. * All these methods must be declared publicly, not static and named exactly as the names

  16. * mentioned above. The methods createAccumulator and getValue are defined in the

  17. * [[AggregateFunction]] functions, while other methods are explained below.

  18. *

  19. *

  20. * {{{

  21. * Processes the input values and update the provided accumulator instance. The method

  22. * accumulate can be overloaded with different custom types and arguments. An AggregateFunction

  23. * requires at least one accumulate() method.

  24. *

  25. * @param accumulator the accumulator which contains the current aggregated results

  26. * @param [user defined inputs] the input value (usually obtained from a new arrived data).

  27. *

  28. * def accumulate(accumulator: ACC, [user defined inputs]): Unit

  29. * }}}

  30. *

  31. *

  32. * {{{

  33. * Retracts the input values from the accumulator instance. The current design assumes the

  34. * inputs are the values that have been previously accumulated. The method retract can be

  35. * overloaded with different custom types and arguments. This function must be implemented for

  36. * datastream bounded over aggregate.

  37. *

  38. * @param accumulator the accumulator which contains the current aggregated results

  39. * @param [user defined inputs] the input value (usually obtained from a new arrived data).

  40. *

  41. * def retract(accumulator: ACC, [user defined inputs]): Unit

  42. * }}}

  43. *

  44. *

  45. * {{{

  46. * Merges a group of accumulator instances into one accumulator instance. This function must be

  47. * implemented for datastream session window grouping aggregate and dataset grouping aggregate.

  48. *

  49. * @param accumulator the accumulator which will keep the merged aggregate results. It should

  50. * be noted that the accumulator may contain the previous aggregated

  51. * results. Therefore user should not replace or clean this instance in the

  52. * custom merge method.

  53. * @param its an [[java.lang.Iterable]] pointed to a group of accumulators that will be

  54. * merged.

  55. *

  56. * def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit

  57. * }}}

  58. *

  59. *

  60. * {{{

  61. * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for

  62. * dataset grouping aggregate.

  63. *

  64. * @param accumulator the accumulator which needs to be reset

  65. *

  66. * def resetAccumulator(accumulator: ACC): Unit

  67. * }}}

  68. *

  69. *

  70. * @tparam T the type of the aggregation result

  71. * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the

  72. * aggregated values which are needed to compute an aggregation result.

  73. * AggregateFunction represents its state using accumulator, thereby the state of the

  74. * AggregateFunction must be put into the accumulator.

  75. */

  76. abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {

  77. /**

  78. * Creates and init the Accumulator for this [[AggregateFunction]].

  79. *

  80. * @return the accumulator with the initial value

  81. */

  82. def createAccumulator(): ACC

  83.  
  84. /**

  85. * Called every time when an aggregation result should be materialized.

  86. * The returned value could be either an early and incomplete result

  87. * (periodically emitted as data arrive) or the final result of the

  88. * aggregation.

  89. *

  90. * @param accumulator the accumulator which contains the current

  91. * aggregated results

  92. * @return the aggregation result

  93. */

  94. def getValue(accumulator: ACC): T

  95.  
  96. /**

  97. * Returns true if this AggregateFunction can only be applied in an OVER window.

  98. *

  99. * @return true if the AggregateFunction requires an OVER window, false otherwise.

  100. */

  101. def requiresOver: Boolean = false

  102.  
  103. /**

  104. * Returns the TypeInformation of the AggregateFunction's result.

  105. *

  106. * @return The TypeInformation of the AggregateFunction's result or null if the result type

  107. * should be automatically inferred.

  108. */

  109. def getResultType: TypeInformation[T] = null

  110.  
  111. /**

  112. * Returns the TypeInformation of the AggregateFunction's accumulator.

  113. *

  114. * @return The TypeInformation of the AggregateFunction's accumulator or null if the

  115. * accumulator type should be automatically inferred.

  116. */

  117. def getAccumulatorType: TypeInformation[ACC] = null

  118. }

  • AggregateFunction继承了UserDefinedFunction;它有两个泛型,一个T表示value的泛型,一个ACC表示Accumulator的泛型;它定义了createAccumulator、getValue、getResultType、getAccumulatorType方法(这几个方法中子类必须实现createAccumulator、getValue方法)
  • 对于AggregateFunction,有一个accumulate方法这里没定义,但是需要子类定义及实现,该方法接收ACC,T两个参数,返回void;另外还有retract、merge、resetAccumulator三个方法是可选的,需要子类根据情况去定义及实现
  • 对于datastream bounded over aggregate操作,要求实现restract方法,该方法接收ACC,T两个参数,返回void;对于datastream session window grouping aggregate以及dataset grouping aggregate操作,要求实现merge方法,该方法接收ACC,java.lang.Iterable<T>两个参数,返回void;对于dataset grouping aggregate操作,要求实现resetAccumulator方法,该方法接收ACC参数,返回void

小结

  • Table的Distinct Aggregation可以用于内置的及自定义的aggregation function;内置的aggregation function诸如GroupBy Aggregation、GroupBy Window Aggregation、Over Window Aggregation
  • AggregateFunction继承了UserDefinedFunction;它有两个泛型,一个T表示value的泛型,一个ACC表示Accumulator的泛型;它定义了createAccumulator、getValue、getResultType、getAccumulatorType方法(这几个方法中子类必须实现createAccumulator、getValue方法)
  • 对于AggregateFunction,有一个accumulate方法这里没定义,但是需要子类定义及实现,该方法接收ACC,T两个参数,返回void;另外还有retract、merge、resetAccumulator三个方法是可选的,需要子类根据情况去定义及实现(对于datastream bounded over aggregate操作,要求实现restract方法,该方法接收ACC,T两个参数,返回void;对于datastream session window grouping aggregate以及dataset grouping aggregate操作,要求实现merge方法,该方法接收ACC,java.lang.Iterable\<T\>两个参数,返回void;对于dataset grouping aggregate操作,要求实现resetAccumulator方法,该方法接收ACC参数,返回void)

doc

  • Aggregations

这篇关于聊聊flink Table的Distinct Aggregation的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024661

相关文章

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

vue2实践:el-table实现由用户自己控制行数的动态表格

需求 项目中需要提供一个动态表单,如图: 当我点击添加时,便添加一行;点击右边的删除时,便删除这一行。 至少要有一行数据,但是没有上限。 思路 这种每一行的数据固定,但是不定行数的,很容易想到使用el-table来实现,它可以循环读取:data所绑定的数组,来生成行数据,不同的是: 1、table里面的每一个cell,需要放置一个input来支持用户编辑。 2、最后一列放置两个b

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

通过Ajax请求后台数据,返回JSONArray(JsonObject),页面(Jquery)以table的形式展示

点击“会商人员情况表”,弹出层,显示一个表格,如下图: 利用Ajax和Jquery和JSONArray和JsonObject来实现: 代码如下: 在hspersons.html中: <!DOCTYPE html><html><head><meta charset="UTF-8"><title>会商人员情况表</title><script type="text/javasc

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

UVa 10820 Send a Table (Farey数列欧拉函数求和)

这里先说一下欧拉函数的求法 先说一下筛选素数的方法 void Get_Prime(){ /*筛选素数法*/for(int i = 0; i < N; i++) vis[i] = 1;vis[0] = vis[1] = 0;for(int i = 2; i * i < N; i++)if(vis[i]){for(int j = i * i; j < N; j += i)vis[j] =

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

css-table

设置table的文字不换行:给th,td添加white-space: nowrap; 设置单元格内容及其边框的距离:使用html的cellpadding属性,还有一种方式设置padding。在CSS中,table, th, td{padding:0;}效果等同于cellpadding="0″。 设置table的单元格边距:border-spacing如果定义一个 length 参数,那么定义的是水

react antd table expandable defaultExpandAllRows 不生效问题

原因:defaultExpandAllRows只会在第一次渲染时触发 解决方案:渲染前判断table 的datasource 数据是否已准备好 {pageList.length > 0 ? (<TablerowSelection={rowSelection}columns={columns}dataSource={pageList}style={{ marginTop: 24 }}pagina

是谁还不会flink的checkpoint呀~

1、State Vs Checkpoint State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。 Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息 一句话概括: Checkpoint就是State的快照 目的:假设作业停止了,下次启动的