package _0722rdd/*** Created by Administrator on 2018/7/16.*/
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object  Wordcount_product {def main(args: Array[String]) {//1/创建sparkconf和spark上下文/**A master URL must be set in your configurationAn application name must be set in your configuration所有的配置文件信息,其实都是在sparkconf当中加载的,所以如果你要设置配置文件的信息的话,conf.set("key","value")*///四步//1.创建sparkContext上下文val conf = new SparkConf()//本地模式,* 会在运行期间检查当前环境下还剩下多少cpu核心,占满
//      .setMaster("local[*]") //设定运行位置【否则报错!】.setAppName("idea_start_wc") //设置运行程序名称【否则报错!】val sc = new SparkContext(conf)//    2.读取数据-》形成RDD//    3.数据处理-》RDD->API的调用val colsesceNumInt: Int =Integer.parseInt(conf.get(""))//    (1)hdfs文件//    上传到hdfsbin/hdfs dfs -put /opt/modules/spark-2.1.0-bin-2.7.3/ /
//    val resultRdd: RDD[(String, Int)] = sc.textFile("hdfs://")
//    val resultRdd: RDD[(String, Int)] =  sc.textFile("file:///opt/modules/spark-2.1.0-bin-2.7.3/")val resultRdd: RDD[(String, Int)] =  sc.textFile("file:///opt/modules/cdh5.7.0/spark-2.1.0-bin-2.6.0-cdh5.7.0/").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _) //到这一步已经实现>(-t._2,t._1)).sortByKey().map(t=>(t._2,-t._1)) //这一步做排序//repartition   coalesce//以上这两个重分区的api有什么区别?.coalesce(colsesceNumInt)//    (2)本地文件:从本地项目下的data文件读取word.txt,存储到本地项目下的result/wc文件//      val path="data/word.txt"//      val savePath=s"result/wc"//4.保存结果-》RDD数据输出保存resultRdd.saveAsTextFile("hdfs://hadoop:8020/sparkrwordcount_20191114")//    打印到控制台//    resultRdd.foreachPartition(iter=>iter.foreach(println))//调用线程等待,为了方便去页面上看结果信息Thread.sleep(100000000)//程序终止(通过正常手段关闭程序)sc.stop()}



3.上传到linux (ws-hadoop01)










date=`date +"%Y%m%d%H%M"`
/opt/modules/cdh5.7.0/spark-2.1.0-bin-2.6.0-cdh5.7.0/bin/spark-submit \
--master yarn \
--deploy-mode client \
--class _0722rdd.Wordcount_product \
--driver-memory   1G \
--driver-cores 1 \
--executor-memory 1G \
--executor-cores 1 \
--num-executors 1 \
--conf \



