Flink(3):DataSet的Source、Transform和Sink算子,以及计数器

2024-05-24 10:32

本文主要是介绍Flink(3):DataSet的Source、Transform和Sink算子,以及计数器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、概述

Flink批处理DataSet的处理流程Source、Transform和Sink算子。

二、source

【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sources】

1.基于文件创建

(1)readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

(2)readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

(3)readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) delimited fields. Returns a DataSet of tuples, case class objects, or POJOs. Supports the basic java types and their Value counterparts as field types.

(4)readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer using the given delimiter.

(5)readSequenceFile(Key, Value, path) / SequenceFileInputFormat - Creates a JobConf and reads file from the specified path with type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.

2.基于集合创建

(1)fromCollection(Seq) - Creates a data set from a Seq. All elements in the collection must be of the same type.

(2)fromCollection(Iterator) - Creates a data set from an Iterator. The class specifies the data type of the elements returned by the iterator.

(3)fromElements(elements: _*) - Creates a data set from the given sequence of objects. All objects must be of the same type.

(4)fromParallelCollection(SplittableIterator) - Creates a data set from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

(5)generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel

3.基于压缩文件

默认都是可以直接读取

Compression methodFile extensionsParallelizable
DEFLATE.deflateno
GZip.gz.gzipno
Bzip2.bz2no
XZ.xzno


4.实例代码

package com.bd.flink._1203DataSetimport org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** Created by Administrator on 2019/12/3.*/
object DataSetDataSourceApp {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//从集合创建
//    fromCollection(env)//从文件创建
//    fromTextFile(env)//从文件夹创建
//    fromTextFileFolder(env)//从csv文件创建
//    fromCsvFile(env)//读取递归文件
//    fromRecursiveFiles(env)//读取压缩文件fromCompressFile(env)}/*** 1.从集合创建* @param env*/def fromCollection(env:ExecutionEnvironment): Unit ={import org.apache.flink.api.scala._val data=1 to 10env.fromCollection(data).print()}/*** 2.从文件读取数据* @param env*/def fromTextFile(env:ExecutionEnvironment): Unit ={import org.apache.flink.api.scala._val filepath="data//hello.txt"env.readTextFile(filepath).print()}/*** 3.从文件夹读取数据* @param env*/def fromTextFileFolder(env:ExecutionEnvironment): Unit ={import org.apache.flink.api.scala._val filepath="data//"env.readTextFile(filepath).print()}/*** 4.从csv文件读取* @param env*/def fromCsvFile(env:ExecutionEnvironment): Unit ={import org.apache.flink.api.scala._val filepath="data//people.csv"//依据元组方式解析csv:全列env.readCsvFile[(String,Int,String)](filepath,ignoreFirstLine=true).print()//依据元组方式解析csv:只要第1和3列:includedFields参数指定第几列env.readCsvFile[(String,String)](filepath,ignoreFirstLine=true,includedFields=Array(0,2)).print()//依据case class方式实现case class MyClass(name:String,age:Int)env.readCsvFile[MyClass](filepath,ignoreFirstLine=true,includedFields=Array(0,1)).print()//依据java pojo方式实现//结果://People{name='Bob', age=32, work='Developer'}//People{name='Jorge', age=30, work='Developer'}env.readCsvFile[People](filepath,ignoreFirstLine=true,pojoFields=Array("name","age","work")).print()}/*** 5.读取递归文件夹的内容* @param env*///参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sourcesdef fromRecursiveFiles(env:ExecutionEnvironment): Unit ={val filepath="data//"
//    env.readTextFile(filepath).print()
//    println("----------------分割线------------------")val parameters=new Configurationparameters.setBoolean("recursive.file.enumeration",true)env.readTextFile(filepath).withParameters(parameters).print()}/*** 6.从压缩文件读取* @param env*///.deflate,.gz, .gzip,.bz2,.xzdef fromCompressFile(env:ExecutionEnvironment): Unit ={val filePath="data//compress"env.readTextFile(filePath).print()  //readTextFile可以实现直接读取,}}

三、Transformation

【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#dataset-transformations】

1.基本算子

Map, FlatMap, MapPartition, Filter, Reduce等
2.实例代码

package com.bd.flink._1203DataSetimport org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironmentimport scala.collection.mutable.ListBuffer
//隐式转换
import org.apache.flink.api.scala._
/*** Created by Administrator on 2019/12/3.*/
object DataSetTransformation {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)
//    firstFunction(env)
//    flatMapFunction(env)
//    joinFunction(env)
//    outerjoinFunction(env)crossFunction(env)}//1.mapdef mapFunction(env:ExecutionEnvironment): Unit ={val data=env.fromCollection(List(1,2,3,4,5,6,7,8,10))//实现方式1
//    data.map(x=>x*2).print()print("===============分割线=============")//实现方式2data.map(_*2).print()}//2.filterdef filterFunction(env:ExecutionEnvironment): Unit ={val data=env.fromCollection(List(1,2,3,4,5,6,7,8,10))//实现方式1//    data.map(x=>x*2).print()print("===============分割线=============")//实现方式2data.filter(_>2).print()}//3.mappartiton//DataSource 100个元素,结果插入数据库中def mapPartitonFunction(env:ExecutionEnvironment): Unit ={val students=new ListBuffer[String]for(i<-1 to 100){students.append("student: "+ i)}val data=env.fromCollection(students)//mapPartition:每个分区创建一个数据库连接,而map则是每一条数据则创建一个数据库连接data.mapPartition( x =>{//获取链接val connect="这个是数据库链接" //connect=DBUtils.getConnection//每一个元素都要存储到数据库//-》执行x的操作val result= connect+xx}).print()}//4.first-ndef firstFunction(env:ExecutionEnvironment): Unit ={val info = ListBuffer[(Int ,String)]()info.append((1,"hadoop"))info.append((1,"yarn"))info.append((3,"mapreduce"))info.append((3,"hbase"))info.append((5,"spark"))info.append((5,"storm"))info.append((5,"solr"))info.append((5,"zookeeper"))val data=env.fromCollection(info)//打印出前三个data.first(3).print()//分组以后,每个组内取前2个data.groupBy(0).first(2).print()//分组以后,在组内排序//排序按照名字升序/降序排序data.groupBy(0).sortGroup(1,Order.ASCENDING).first(2).print()}def flatMapFunction(env:ExecutionEnvironment): Unit ={val info=ListBuffer[String]()info.append("hadoop,spark")info.append("flink,spark")info.append("flume,spark")val data=env.fromCollection(info)//    data.print()
//    data.map(x=>x.split(",")).print()//flatmap效果
//    data.flatMap(_.split(",")).print()//WordCount实现data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()}//distinctdef distinctFunction(env:ExecutionEnvironment){val info=ListBuffer[String]()info.append("hadoop,spark")info.append("flink,spark")info.append("flume,spark")val data=env.fromCollection(info)data.flatMap(_.split(",")).distinct().print()}//joindef joinFunction(env:ExecutionEnvironment): Unit ={val info1 = ListBuffer[(Int ,String)]()info1.append((1,"hadoop"))info1.append((1,"yarn"))info1.append((3,"mapreduce"))info1.append((3,"hbase"))val info2=ListBuffer[(Int ,String)]()info2.append((1,"Shanghai"))info2.append((2,"Beijing"))info2.append((4,"Shenzhen"))info2.append((5,"WuHan"))//实例与解释// In this case tuple fields are used as keys. "0" is the join field on the first tuple// "1" is the join field on the second tuple.
//     val result = input1.join(input2).where(0).equalTo(1)val data1=env.fromCollection(info1)val data2=env.fromCollection(info2)//第一部分:输出为//((1,hadoop),(1,Shanghai))//((1,yarn),(1,Shanghai))data1.join(data2).where(0).equalTo(0).print()//第二部分:输出为元组//(1,hadoop,Shanghai)//(1,yarn,Shanghai)data1.join(data2).where(0).equalTo(0).apply((first,second)=>{(first._1,first._2,second._2)}).print()}//outjoindef outerjoinFunction(env:ExecutionEnvironment): Unit ={val info1 = ListBuffer[(Int ,String)]()info1.append((1,"hadoop"))info1.append((1,"yarn"))info1.append((3,"mapreduce"))info1.append((3,"hbase"))val info2=ListBuffer[(Int ,String)]()info2.append((1,"Shanghai"))info2.append((2,"Beijing"))info2.append((4,"Shenzhen"))info2.append((5,"WuHan"))//实例与解释// In this case tuple fields are used as keys. "0" is the join field on the first tuple// "1" is the join field on the second tuple.//     val result = input1.join(input2).where(0).equalTo(1)val data1=env.fromCollection(info1)val data2=env.fromCollection(info2)//第一部分:输出为//((1,hadoop),(1,Shanghai))//((1,yarn),(1,Shanghai))
//    data1.join(data2).where(0).equalTo(0).print()//第二部分:(1)左外连接输出为元组data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{if(second==null){ //这个注意,空值的情况(first._1,first._2,"-")}else{(first._1,first._2,second._2)}}).print()//第二部分:(2)右外连接输出为元组data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{if(first==null){ //这个注意,空值的情况(second._1,"-",second._2)}else{(first._1,first._2,second._2)}}).print()//第二部分:(3)全外连接输出为元组data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{if((second==null)){(first._1,first._2,"-")}else if(first==null){ //这个注意,空值的情况(second._1,"-",second._2)}else{(first._1,first._2,second._2)}}).print()}//笛卡尔积//结果:2*3=6个
//  (皇马,1)
//  (皇马,4)
//  (皇马,2)
//  (巴萨,1)
//  (巴萨,4)
//  (巴萨,2)def crossFunction(env:ExecutionEnvironment): Unit ={val info1=List("皇马","巴萨")val info2=List(1,4,2)val data1=env.fromCollection(info1)val data2=env.fromCollection(info2)data1.cross(data2).print()}}


四、sink

【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sinks】

1.基本算子

writeAsText,writeAsCsv,print,write,output

2.实例代码

package com.bd.flink._1203DataSetimport org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.fs.FileSystem.WriteMode/*** Created by Administrator on 2019/12/4.*/
object DataSetSink {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval data=1.to(10)import org.apache.flink.api.scala._val text=env.fromCollection(data)val filepath="data//output"//writeAsText//参数:FileSystem.WriteMode.OVERWRITE,指覆盖之前文件//默认写到文件text.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE)//设置并行度,则写到data//output目录下的两个文件text.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE).setParallelism(2)env.execute("DataSet Sink")}
}

 

五、计数器

1.实现功能

不同分区中,统一计数,而不是各个分区记录自己的。

2.实现代码

package com.bd.flink._1203DataSetimport org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem/*** Created by Administrator on 2019/12/4.* 计数器*/
object DataSetCounter {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data=env.fromElements("hadoop","hdfs","yarn","spark","flume","flink")//不符合预期的计数器//因为如果并行度不为1,则计算的是每个分区中的个数
//    data.map(new RichMapFunction[String,Long] (){
//      var counter=0l
//      override def map(in: String): Long = {
//        counter=counter+1
//        println("counter :"+counter)
//        counter
//      }
//    }).setParallelism(3).print()//改进:使用计数器val result=data.map(new RichMapFunction[String,String] (){//第1步:定义计数器val counterAcc=new LongCounter()override def open(parameters: Configuration): Unit = {//第2步:注册计数器getRuntimeContext.addAccumulator("ele-counts-scala",counterAcc)}override def map(in: String): String = {counterAcc.add(1)
//        println("counter :"+counterAcc)in}}) //.setParallelism(3)
//        result.print()val filepath="data//output-scala-count"//默认写到文件result.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE).setParallelism(3) //设置并行度也无所谓//执行获得结果resultval jobResult= env.execute("CounterAcc")//第3步:获取计数器val num=jobResult.getAccumulatorResult[Long]("ele-counts-scala")println("num:"+num)}}

 

这篇关于Flink(3):DataSet的Source、Transform和Sink算子,以及计数器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/998150

相关文章

HTML5自定义属性对象Dataset

原文转自HTML5自定义属性对象Dataset简介 一、html5 自定义属性介绍 之前翻译的“你必须知道的28个HTML5特征、窍门和技术”一文中对于HTML5中自定义合法属性data-已经做过些介绍,就是在HTML5中我们可以使用data-前缀设置我们需要的自定义属性,来进行一些数据的存放,例如我们要在一个文字按钮上存放相对应的id: <a href="javascript:" d

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

SQL2005 性能监视器计数器错误解决方法

【系统环境】 windows 2003 +sql2005 【问题状况】 用户在不正当删除SQL2005后会造成SQL2005 性能监视器计数器错误,如下图 【解决办法】 1、在 “开始” --> “运行”中输入 regedit,开启注册表编辑器,定位到 [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVer

10 Source-Get-Post-JsonP 网络请求

划重点 使用vue-resource.js库 进行网络请求操作POST : this.$http.post ( … )GET : this.$http.get ( … ) 小鸡炖蘑菇 <!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-w

论文精读-Supervised Raw Video Denoising with a Benchmark Dataset on Dynamic Scenes

论文精读-Supervised Raw Video Denoising with a Benchmark Dataset on Dynamic Scenes 优势 1、构建了一个用于监督原始视频去噪的基准数据集。为了多次捕捉瞬间,我们手动为对象s创建运动。在高ISO模式下捕获每一时刻的噪声帧,并通过对多个噪声帧进行平均得到相应的干净帧。 2、有效的原始视频去噪网络(RViDeNet),通过探

fetch-event-source 如何通过script全局引入

fetchEventSource源码中导出了两种类型的包cjs和esm。但是有个需求如何在原生是js中通过script标签引呢?需要加上type=module。今天介绍另一种方法 下载源码文件: https://github.com/Azure/fetch-event-source.git 安装: npm install --save-dev webpack webpack-cli ts

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

css-transform对position:fixed影响

在betterScroll尝试使用position:fixed固定首列,然而并不能实现固定。因为 bscroll / iscroll 是基于 transform 属性实现滚动的, 所以 iscroll 会通过实时修改元素的 transform 属性以达到滚动的效果。父元素如果存在 transform 属性,子元素的 position: fixed 属性无效。betterScroll有个 useTr

Open Source, Open Life 第九届中国开源年会论坛征集正式启动

中国开源年会 COSCon 是业界最具影响力的开源盛会之一,由开源社在2015年首次发起,而今年我们将迎来第九届 COSCon! 以其独特定位及日益增加的影响力,COSCon 吸引了越来越多的国内外企业、高校、开源组织/社区的大力支持。与一般企业、IT 媒体、行业协会举办的行业大会不同,COSCon 具有跨组织、跨项目、跨社区的广泛覆盖面,也吸引了众多国内外开源开发者和开源爱好者的关注及参与

是谁还不会flink的checkpoint呀~

1、State Vs Checkpoint State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。 Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息 一句话概括: Checkpoint就是State的快照 目的:假设作业停止了,下次启动的