大数据-玩转数据-双流JOIN

2023-10-03 20:54
文章标签 数据 玩转 join 双流

本文主要是介绍大数据-玩转数据-双流JOIN,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、双流JOIN

在Flink中, 支持两种方式的流的Join: Window Join和Interval Join

二、Window Join

窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素.
注意:
1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了)
2.join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳。
Window join 仍然可分为 滚动窗口、滑动窗口Join、会话窗口Join

滚动窗口Join代码段示例
在这里插入图片描述

package com.lyh.flink12;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/24 22:09*/
public class Flink01_Join_Window_Tumbling {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));s1.join(s2).where(WaterSensor::getId).equalTo(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 必须使用窗口.apply(new JoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic String join(WaterSensor first, WaterSensor second) throws Exception {return "first: " + first + ", second: " + second;}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

三、Interval Join

间隔流join(Interval Join), 是指使用一个流的数据按照key去join另外一条流的指定范围的数据.
如下图: 橙色的流去join绿色的流.范围是由橙色流的event-time + lower bound和event-time + upper bound来决定的.
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

在这里插入图片描述
Interval Join只支持event-time
必须是keyBy之后的流才可以interval join

package com.lyh.flink12;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;

import java.time.Duration;public class  Sql_Join_Windows_Interval{public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));s1.keyBy(WaterSensor::getId).intervalJoin(s2.keyBy(WaterSensor::getId)).between(Time.seconds(-2),Time.seconds(3)).process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor left,WaterSensor right,Context ctx,Collector<String> out) throws Exception {out.collect(left + "," + right);}}).print();try{env.execute();} catch (Exception e){e.printStackTrace();}}}

运行结果:
在这里插入图片描述

这篇关于大数据-玩转数据-双流JOIN的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/701

相关文章

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

数据库使用之union、union all、各种join的用法区别解析

《数据库使用之union、unionall、各种join的用法区别解析》:本文主要介绍SQL中的Union和UnionAll的区别,包括去重与否以及使用时的注意事项,还详细解释了Join关键字,... 目录一、Union 和Union All1、区别:2、注意点:3、具体举例二、Join关键字的区别&php

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下

Java中注解与元数据示例详解

《Java中注解与元数据示例详解》Java注解和元数据是编程中重要的概念,用于描述程序元素的属性和用途,:本文主要介绍Java中注解与元数据的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参... 目录一、引言二、元数据的概念2.1 定义2.2 作用三、Java 注解的基础3.1 注解的定义3.2 内