flink table view datastream互转

2024-01-05 08:20

本文主要是介绍flink table view datastream互转,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

case class outer(f1:String,f2:Inner)
case class outerV1(f1:String,f2:Inner,f3:Int)
case class Inner(f3:String,f4:Int)

测试代码

package com.yy.table.convertimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.types.DataTypeobject streamPOJO2table {case class outer(f1:String,f2:Inner)case class outerV1(f1:String,f2:Inner,f3:Int)case class Inner(f3:String,f4:Int)def main(args: Array[String]): Unit = {// flink1.13 流处理环境初始化val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)import org.apache.flink.streaming.api.scala._val ds1: DataStream[outer] = env.fromElements(outer("a",Inner("b",2)),outer("d",Inner("e",4)))val table1: Table = tEnv.fromDataStream(ds1)
//    table1
//      .execute()
//      .print()/*+----+--------------------------------+--------------------------------+
| op |                             f1 |                             f2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                   (f3=b, f4=2) |
| +I |                              d |                   (f3=e, f4=4) |
+----+--------------------------------+--------------------------------+*///    table1
//      .print()/*5> +I[d, Inner(e,4)]
4> +I[a, Inner(b,2)]*/tEnv.createTemporaryView("view1", ds1)val tableResult1: TableResult = tEnv.executeSql("select f1,f2,(f2.f4 + 100) as f3 from view1")tableResult1.print()/*+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+*///val t1: Table = tEnv.sqlQuery("select f1,f2,(f2.f4 + 100) as f3 from view1")
//    t1.print()//    println(t1.getResolvedSchema)/*
+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
2 rows in set
(`f1` STRING,`f2` *com.yy.table.convert.streamPOJO2table$Inner<`f3` STRING, `f4` INT NOT NULL>* NOT NULL,`f3` INT NOT NULL
)*/println("---- 1 -------")// tableResult转datastreamval o1: DataStream[outerV1] = tEnv.toDataStream[outerV1](t1,classOf[outerV1])
//    o1.print()println("---- 2 -------")tEnv.executeSql("""|select|f1|,f2.f3|,f2.f4|from view1|""".stripMargin)
//      .print()/*+----+--------------------------------+--------------------------------+--------------------------------+
| op |                             f1 |                             f3 |                             f4 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                              a |                              b |                              c |
| +I |                              d |                              e |                              f |
+----+--------------------------------+--------------------------------+--------------------------------+*/tEnv.executeSql("""|select|f1|,(f2.f3,f2.f4)|from view1|""".stripMargin)
//      .print()env.execute("jobName1")}}

这篇关于flink table view datastream互转的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/572249

相关文章

GORM中Model和Table的区别及使用

《GORM中Model和Table的区别及使用》Model和Table是两种与数据库表交互的核心方法,但它们的用途和行为存在著差异,本文主要介绍了GORM中Model和Table的区别及使用,具有一... 目录1. Model 的作用与特点1.1 核心用途1.2 行为特点1.3 示例China编程代码2. Tab

Python实现PDF与多种图片格式之间互转(PNG, JPG, BMP, EMF, SVG)

《Python实现PDF与多种图片格式之间互转(PNG,JPG,BMP,EMF,SVG)》PDF和图片是我们日常生活和工作中常用的文件格式,有时候,我们可能需要将PDF和图片进行格式互转来满足... 目录一、介绍二、安装python库三、Python实现多种图片格式转PDF1、单张图片转换为PDF2、多张图

使用Python实现PDF与SVG互转

《使用Python实现PDF与SVG互转》SVG(可缩放矢量图形)和PDF(便携式文档格式)是两种常见且广泛使用的文件格式,本文将详细介绍如何使用Python实现SVG和PDF之间的相互转... 目录使用工具使用python将SVG转换为PDF使用Python将SVG添加到现有PDF中使用Python将PD

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

MVC(Model-View-Controller)和MVVM(Model-View-ViewModel)

1、MVC MVC(Model-View-Controller) 是一种常用的架构模式,用于分离应用程序的逻辑、数据和展示。它通过三个核心组件(模型、视图和控制器)将应用程序的业务逻辑与用户界面隔离,促进代码的可维护性、可扩展性和模块化。在 MVC 模式中,各组件可以与多种设计模式结合使用,以增强灵活性和可维护性。以下是 MVC 各组件与常见设计模式的关系和作用: 1. Model(模型)

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

vue2实践:el-table实现由用户自己控制行数的动态表格

需求 项目中需要提供一个动态表单,如图: 当我点击添加时,便添加一行;点击右边的删除时,便删除这一行。 至少要有一行数据,但是没有上限。 思路 这种每一行的数据固定,但是不定行数的,很容易想到使用el-table来实现,它可以循环读取:data所绑定的数组,来生成行数据,不同的是: 1、table里面的每一个cell,需要放置一个input来支持用户编辑。 2、最后一列放置两个b

MFC中App,Doc,MainFrame,View各指针的互相获取

纸上得来终觉浅,为了熟悉获取方法,我建了个SDI。 首先说明这四个类的执行顺序是App->Doc->Main->View 另外添加CDialog类获得各个指针的方法。 多文档的获取有点小区别,有时间也总结一下。 //  App void CSDIApp::OnApp() {      //  App      //  Doc     CDocument *pD

Caused by: android.view.WindowManager$BadTokenException: Unable to add window -- token android.os.B

一个bug日志 FATAL EXCEPTION: main03-25 14:24:07.724: E/AndroidRuntime(4135): java.lang.RuntimeException: Unable to start activity ComponentInfo{com.syyx.jingubang.ky/com.anguotech.android.activity.Init

自定义view中常用到哪些方法作用分别是什么

目录 构造函数onMeasure(int widthMeasureSpec, int heightMeasureSpec)onDraw(Canvas canvas)onLayout(boolean changed, int left, int top, int right, int bottom)onTouchEvent(MotionEvent event)onSizeChanged(int