本文主要是介绍Flink(3):DataSet的Source、Transform和Sink算子,以及计数器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、概述
Flink批处理DataSet的处理流程Source、Transform和Sink算子。
二、source
【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sources】
1.基于文件创建
(1)readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.
(2)readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.
(3)readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) delimited fields. Returns a DataSet of tuples, case class objects, or POJOs. Supports the basic java types and their Value counterparts as field types.
(4)readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer using the given delimiter.
(5)readSequenceFile(Key, Value, path) / SequenceFileInputFormat - Creates a JobConf and reads file from the specified path with type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
2.基于集合创建
(1)fromCollection(Seq) - Creates a data set from a Seq. All elements in the collection must be of the same type.
(2)fromCollection(Iterator) - Creates a data set from an Iterator. The class specifies the data type of the elements returned by the iterator.
(3)fromElements(elements: _*) - Creates a data set from the given sequence of objects. All objects must be of the same type.
(4)fromParallelCollection(SplittableIterator) - Creates a data set from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
(5)generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel
3.基于压缩文件
默认都是可以直接读取
Compression method | File extensions | Parallelizable |
---|---|---|
DEFLATE | .deflate | no |
GZip | .gz , .gzip | no |
Bzip2 | .bz2 | no |
XZ | .xz | no |
4.实例代码
package com.bd.flink._1203DataSetimport org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** Created by Administrator on 2019/12/3.*/
object DataSetDataSourceApp {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//从集合创建
// fromCollection(env)//从文件创建
// fromTextFile(env)//从文件夹创建
// fromTextFileFolder(env)//从csv文件创建
// fromCsvFile(env)//读取递归文件
// fromRecursiveFiles(env)//读取压缩文件fromCompressFile(env)}/*** 1.从集合创建* @param env*/def fromCollection(env:ExecutionEnvironment): Unit ={import org.apache.flink.api.scala._val data=1 to 10env.fromCollection(data).print()}/*** 2.从文件读取数据* @param env*/def fromTextFile(env:ExecutionEnvironment): Unit ={import org.apache.flink.api.scala._val filepath="data//hello.txt"env.readTextFile(filepath).print()}/*** 3.从文件夹读取数据* @param env*/def fromTextFileFolder(env:ExecutionEnvironment): Unit ={import org.apache.flink.api.scala._val filepath="data//"env.readTextFile(filepath).print()}/*** 4.从csv文件读取* @param env*/def fromCsvFile(env:ExecutionEnvironment): Unit ={import org.apache.flink.api.scala._val filepath="data//people.csv"//依据元组方式解析csv:全列env.readCsvFile[(String,Int,String)](filepath,ignoreFirstLine=true).print()//依据元组方式解析csv:只要第1和3列:includedFields参数指定第几列env.readCsvFile[(String,String)](filepath,ignoreFirstLine=true,includedFields=Array(0,2)).print()//依据case class方式实现case class MyClass(name:String,age:Int)env.readCsvFile[MyClass](filepath,ignoreFirstLine=true,includedFields=Array(0,1)).print()//依据java pojo方式实现//结果://People{name='Bob', age=32, work='Developer'}//People{name='Jorge', age=30, work='Developer'}env.readCsvFile[People](filepath,ignoreFirstLine=true,pojoFields=Array("name","age","work")).print()}/*** 5.读取递归文件夹的内容* @param env*///参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sourcesdef fromRecursiveFiles(env:ExecutionEnvironment): Unit ={val filepath="data//"
// env.readTextFile(filepath).print()
// println("----------------分割线------------------")val parameters=new Configurationparameters.setBoolean("recursive.file.enumeration",true)env.readTextFile(filepath).withParameters(parameters).print()}/*** 6.从压缩文件读取* @param env*///.deflate,.gz, .gzip,.bz2,.xzdef fromCompressFile(env:ExecutionEnvironment): Unit ={val filePath="data//compress"env.readTextFile(filePath).print() //readTextFile可以实现直接读取,}}
三、Transformation
【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#dataset-transformations】
1.基本算子
Map, FlatMap, MapPartition, Filter, Reduce等
2.实例代码
package com.bd.flink._1203DataSetimport org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironmentimport scala.collection.mutable.ListBuffer
//隐式转换
import org.apache.flink.api.scala._
/*** Created by Administrator on 2019/12/3.*/
object DataSetTransformation {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// mapFunction(env)// filterFunction(env)
// firstFunction(env)
// flatMapFunction(env)
// joinFunction(env)
// outerjoinFunction(env)crossFunction(env)}//1.mapdef mapFunction(env:ExecutionEnvironment): Unit ={val data=env.fromCollection(List(1,2,3,4,5,6,7,8,10))//实现方式1
// data.map(x=>x*2).print()print("===============分割线=============")//实现方式2data.map(_*2).print()}//2.filterdef filterFunction(env:ExecutionEnvironment): Unit ={val data=env.fromCollection(List(1,2,3,4,5,6,7,8,10))//实现方式1// data.map(x=>x*2).print()print("===============分割线=============")//实现方式2data.filter(_>2).print()}//3.mappartiton//DataSource 100个元素,结果插入数据库中def mapPartitonFunction(env:ExecutionEnvironment): Unit ={val students=new ListBuffer[String]for(i<-1 to 100){students.append("student: "+ i)}val data=env.fromCollection(students)//mapPartition:每个分区创建一个数据库连接,而map则是每一条数据则创建一个数据库连接data.mapPartition( x =>{//获取链接val connect="这个是数据库链接" //connect=DBUtils.getConnection//每一个元素都要存储到数据库//-》执行x的操作val result= connect+xx}).print()}//4.first-ndef firstFunction(env:ExecutionEnvironment): Unit ={val info = ListBuffer[(Int ,String)]()info.append((1,"hadoop"))info.append((1,"yarn"))info.append((3,"mapreduce"))info.append((3,"hbase"))info.append((5,"spark"))info.append((5,"storm"))info.append((5,"solr"))info.append((5,"zookeeper"))val data=env.fromCollection(info)//打印出前三个data.first(3).print()//分组以后,每个组内取前2个data.groupBy(0).first(2).print()//分组以后,在组内排序//排序按照名字升序/降序排序data.groupBy(0).sortGroup(1,Order.ASCENDING).first(2).print()}def flatMapFunction(env:ExecutionEnvironment): Unit ={val info=ListBuffer[String]()info.append("hadoop,spark")info.append("flink,spark")info.append("flume,spark")val data=env.fromCollection(info)// data.print()
// data.map(x=>x.split(",")).print()//flatmap效果
// data.flatMap(_.split(",")).print()//WordCount实现data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()}//distinctdef distinctFunction(env:ExecutionEnvironment){val info=ListBuffer[String]()info.append("hadoop,spark")info.append("flink,spark")info.append("flume,spark")val data=env.fromCollection(info)data.flatMap(_.split(",")).distinct().print()}//joindef joinFunction(env:ExecutionEnvironment): Unit ={val info1 = ListBuffer[(Int ,String)]()info1.append((1,"hadoop"))info1.append((1,"yarn"))info1.append((3,"mapreduce"))info1.append((3,"hbase"))val info2=ListBuffer[(Int ,String)]()info2.append((1,"Shanghai"))info2.append((2,"Beijing"))info2.append((4,"Shenzhen"))info2.append((5,"WuHan"))//实例与解释// In this case tuple fields are used as keys. "0" is the join field on the first tuple// "1" is the join field on the second tuple.
// val result = input1.join(input2).where(0).equalTo(1)val data1=env.fromCollection(info1)val data2=env.fromCollection(info2)//第一部分:输出为//((1,hadoop),(1,Shanghai))//((1,yarn),(1,Shanghai))data1.join(data2).where(0).equalTo(0).print()//第二部分:输出为元组//(1,hadoop,Shanghai)//(1,yarn,Shanghai)data1.join(data2).where(0).equalTo(0).apply((first,second)=>{(first._1,first._2,second._2)}).print()}//outjoindef outerjoinFunction(env:ExecutionEnvironment): Unit ={val info1 = ListBuffer[(Int ,String)]()info1.append((1,"hadoop"))info1.append((1,"yarn"))info1.append((3,"mapreduce"))info1.append((3,"hbase"))val info2=ListBuffer[(Int ,String)]()info2.append((1,"Shanghai"))info2.append((2,"Beijing"))info2.append((4,"Shenzhen"))info2.append((5,"WuHan"))//实例与解释// In this case tuple fields are used as keys. "0" is the join field on the first tuple// "1" is the join field on the second tuple.// val result = input1.join(input2).where(0).equalTo(1)val data1=env.fromCollection(info1)val data2=env.fromCollection(info2)//第一部分:输出为//((1,hadoop),(1,Shanghai))//((1,yarn),(1,Shanghai))
// data1.join(data2).where(0).equalTo(0).print()//第二部分:(1)左外连接输出为元组data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{if(second==null){ //这个注意,空值的情况(first._1,first._2,"-")}else{(first._1,first._2,second._2)}}).print()//第二部分:(2)右外连接输出为元组data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{if(first==null){ //这个注意,空值的情况(second._1,"-",second._2)}else{(first._1,first._2,second._2)}}).print()//第二部分:(3)全外连接输出为元组data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{if((second==null)){(first._1,first._2,"-")}else if(first==null){ //这个注意,空值的情况(second._1,"-",second._2)}else{(first._1,first._2,second._2)}}).print()}//笛卡尔积//结果:2*3=6个
// (皇马,1)
// (皇马,4)
// (皇马,2)
// (巴萨,1)
// (巴萨,4)
// (巴萨,2)def crossFunction(env:ExecutionEnvironment): Unit ={val info1=List("皇马","巴萨")val info2=List(1,4,2)val data1=env.fromCollection(info1)val data2=env.fromCollection(info2)data1.cross(data2).print()}}
四、sink
【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sinks】
1.基本算子
writeAsText,writeAsCsv,print,write,output
2.实例代码
package com.bd.flink._1203DataSetimport org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.fs.FileSystem.WriteMode/*** Created by Administrator on 2019/12/4.*/
object DataSetSink {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval data=1.to(10)import org.apache.flink.api.scala._val text=env.fromCollection(data)val filepath="data//output"//writeAsText//参数:FileSystem.WriteMode.OVERWRITE,指覆盖之前文件//默认写到文件text.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE)//设置并行度,则写到data//output目录下的两个文件text.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE).setParallelism(2)env.execute("DataSet Sink")}
}
五、计数器
1.实现功能
不同分区中,统一计数,而不是各个分区记录自己的。
2.实现代码
package com.bd.flink._1203DataSetimport org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem/*** Created by Administrator on 2019/12/4.* 计数器*/
object DataSetCounter {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data=env.fromElements("hadoop","hdfs","yarn","spark","flume","flink")//不符合预期的计数器//因为如果并行度不为1,则计算的是每个分区中的个数
// data.map(new RichMapFunction[String,Long] (){
// var counter=0l
// override def map(in: String): Long = {
// counter=counter+1
// println("counter :"+counter)
// counter
// }
// }).setParallelism(3).print()//改进:使用计数器val result=data.map(new RichMapFunction[String,String] (){//第1步:定义计数器val counterAcc=new LongCounter()override def open(parameters: Configuration): Unit = {//第2步:注册计数器getRuntimeContext.addAccumulator("ele-counts-scala",counterAcc)}override def map(in: String): String = {counterAcc.add(1)
// println("counter :"+counterAcc)in}}) //.setParallelism(3)
// result.print()val filepath="data//output-scala-count"//默认写到文件result.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE).setParallelism(3) //设置并行度也无所谓//执行获得结果resultval jobResult= env.execute("CounterAcc")//第3步:获取计数器val num=jobResult.getAccumulatorResult[Long]("ele-counts-scala")println("num:"+num)}}
这篇关于Flink(3):DataSet的Source、Transform和Sink算子,以及计数器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!