【极数系列】Flink集成DataSource读取集合数据(07)

2024-01-30 08:52

本文主要是介绍【极数系列】Flink集成DataSource读取集合数据(07),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于集合读取数据
    • 3.1 集合创建数据流
    • 3.2 迭代器创建数据流
    • 3.3 给定对象创建数据流
    • 3.4 迭代并行器创建数据流
    • 3.5 基于时间间隔创建数据流
    • 3.6 自定义数据流
  • 04 源码实战demo
    • 4.1 pom.xml依赖
    • 4.2 创建集合数据流作业
    • 4.3 运行结果日志

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkListSourceJob(集合)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于集合读取数据

3.1 集合创建数据流

fromCollection(Collection)函数
从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型

3.2 迭代器创建数据流

fromCollection(Iterator, Class) 
从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。

3.3 给定对象创建数据流

fromElements(T ...)
从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

3.4 迭代并行器创建数据流

注意!使用迭代器的时候对象必须是实现持久化的,否则报错,详情可以看我的另外一篇文章、

错误:org.apache.flink.api.common.InvalidProgramException: java.util.Arrays$ArrayItr@784c3487 is not serializable

fromParallelCollection(SplittableIterator, Class) 
从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型

3.5 基于时间间隔创建数据流

generateSequence 
基于给定间隔内的数字序列并行生成数据流。

3.6 自定义数据流

addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。

04 源码实战demo

4.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.18.0</flink.version><!--scala版本--><scala.binary.version>2.11</scala.binary.version><!--log4j依赖--><log4j.version>2.17.1</log4j.version></properties><!--通用依赖--><dependencies><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--================================集成外部依赖==========================================--><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!--集成日志框架 end--></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build><!--配置Maven项目中需要使用的远程仓库--><repositories><repository><id>aliyun-repos</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><!--用来配置maven插件的远程仓库--><pluginRepositories><pluginRepository><id>aliyun-plugin</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>

4.2 创建集合数据流作业

注意:Flink根据集群撇嘴可能会启动多个并行度运行,可能导致数据重复处理,可以通过.setParallelism(1)设置为一个平行度运行即可

package com.aurora.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.sql.DataSource;
import java.util.*;/*** @description flink的list集合source应用* @author 浅夏的猫* @datetime 23:03 2024/1/28
*/
public class FlinkListSourceJob {private static final Logger logger = LoggerFactory.getLogger(FlinkListSourceJob.class);public static void main(String[] args) throws Exception {//1.创建Flink运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.设置Flink运行模式://STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);List<String> list = Arrays.asList("测试", "开发", "运维");// 01 从集合创建数据流DataStreamSource<String> dataStreamSource_01 = env.fromCollection(list);// 02 从迭代器创建数据流,这里直接使用list的迭代器会报错,因为没有ArrayList没有进行持久化,需要深入了解的,可以看我的另外一篇文章
//        DataStreamSource<String> dataStreamSource_02 = env.fromCollection(list.iterator(),String.class);// 03 从给定的对象序列中创建数据流DataStreamSource<String> dataStreamSource_03 = env.fromElements("测试", "开发", "运维");// 04 从迭代器并行创建数据流NumberSequenceIterator splittableIterator = new NumberSequenceIterator(1,10);DataStreamSource dataStreamSource_04=env.fromParallelCollection(splittableIterator,Long.TYPE);// 05 基于给定间隔内的数字序列并行生成数据流DataStreamSource<Long> dataStreamSource_05 = env.generateSequence(1, 10);//自定义数据流DataStreamSource<String> dataStreamSource_06 = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> sourceContext) throws Exception {//自定义你自己的数据来源for (int i = 0; i < 10; i++) {sourceContext.collect("测试数据" + i);}}@Overridepublic void cancel() {}});//5.输出打印dataStreamSource_01.print();
//        dataStreamSource_02.print();dataStreamSource_03.print();dataStreamSource_04.print();dataStreamSource_05.print();dataStreamSource_06.print();//6.启动运行env.execute();}}

4.3 运行结果日志

在这里插入图片描述

这篇关于【极数系列】Flink集成DataSource读取集合数据(07)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/659712

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X