本文主要是介绍Spark WordCount使用示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
package com.sparktestimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/** * 使用scala开发本地测试的Spark WordCount程序 */ object WordCount {def main(args: Array[String]): Unit = {/** * 第一步:创建Spark配置对象SparkConf,设置Spark程序的运行时的配置信息 * 例如,通过setMaster来设置程序要连接的Spark集群的Master的URL,若设置为local,则代表Spark程序在本地运行 */ val conf = new SparkConf()//创建SparkConf对象 conf.setAppName("WordCount")//设置应用程序的名称,在程序运行的监控界面可以看到名称 conf.setMaster("local")//此时程序在本地运行,不需要安装Spark集群 /** * 第二步:创建SparkContext对象 * SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala、Java、Python、R等都必须有一个SparkContext * sparkContext核心作用:初始化Spark应用程序运行所运行的核心组件,包括DAGScheduler、TaskScheduler、Scheduler * 同时还会负责Spark程序往Master注册程序等 * SparkContext是整个Spark应用程序中最为重要的对象 */ val sc = new SparkContext(conf)//创建SparkContext,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息 /** * 第三步:根据具体的数据来源,例如HDFS、HBase、Local FS、DB、S3等,通过SparkContext来创建RDD * RDD的创建基本有3中方式: * 3.1.根据外部的数据来源,例如HDFS等 * 3.2.scala集合 * 3.3.由其他的RDD操作产生 * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴 */ val lines = sc.textFile("D://spark-1.6.0-bin-hadoop2.6//README.MD",4)//读取本地文件,并设置成一个Partition // val lines:RDD[String] = sc.textFile("D://spark-1.6.0-bin-hadoop2.6//README.MD",4)//读取本地文件,并设置成一个Partition /** * 第四步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数的编程,来进行具体的数据计算 * 4.1将每一行的字符串拆分成单个的单词 */ val words = lines.flatMap{line => line.split(" ")}//对每一行的字符串,进行单词拆分,并把所有行的拆分结果通过flatMap合并成为一个大的单词集合 /** * 第四步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数的编程,来进行具体的数据计算 * 4.2在单词拆分的基础上对每个单词实例计数为1,也就是word => (word,1) */ val pairs = words.map(word => (word,1))/** * 第四步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数的编程,来进行具体的数据计算 * 4.3在单词实例计数为1的基础上统计每个单词在文件中出现的总次数 */ val wordCounts = pairs.reduceByKey(_+_)//对相同的Key进行Value的累加(包括local和Reducer级别同时Reduce) wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))sc.stop()} }
这篇关于Spark WordCount使用示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!