本文主要是介绍Spark Streaming模拟网络热搜词和黑客过滤,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.网络热搜词
* Created by Jason Shu on 2017/8/5. */
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
object Top5 { def main(args:Array[String]){ val conf=new SparkConf() conf.setAppName("Top5").setMaster("spark://SparkMaster:7077")//此时程序在Spark集群模式运行 val ssc=new StreamingContext(conf,Seconds(5))//创建StreamingContext,两个参数分别为SparkConf和Durations val hottestStream=ssc.socketTextStream("SparkMaster:7077", 9999)//设置socket端口号,通过socket端口来手动输入数据 val searchPair=hottestStream.map(_.split("")(1)).map(item=>(item,1)) val hottestDStream=searchPair.reduceByKeyAndWindow((v1:Int,v2:Int)=>v1+v2,Seconds(60),Seconds(20))//设置窗口,时间为60秒,设置滑动,时间为20秒 hottestDStream.transform(hottestItemRDD=>{ val top5=hottestItemRDD.map(pair=>(pair._2,pair._1)).sortByKey(false) .map(pair=>(pair._2,pair._1)).take(3) for(item<-top5){ println(item) } hottestItemRDD} ).print() ssc.start() ssc.awaitTermination() }}
启动hadoop
$HADOOP_HOME$/sbin#./start-dfs.sh
启动Spark
$SPARK_HOME$/sbin#./strat-all.sh
此时jps看一下进程
3857 SecondaryNameNode
3665 NameNode
4021 Master
4061 Jps
Spark和Hadoop都启动之后,将程序打包到集群上运行
$SPARK_HOME$/bin#./spark-submit --class com.dt.sparkstreaming.Top5 --master spark://SparkMaster:7077 /root/Documents/top5.jar
打开Socket端口
nc -lk 9999
手动输入数据
spark hadoop flume spark SQL
spark streaming socket println
sortbykey transform start namenode
master hadoop SQL server catch
exception RDD jps secondary namenode
sbin home submit top5
得到前5热搜词
spark
SQL
hadoop
start
namenode
2.黑客过滤
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object BlackListFilter {def main(args:Array[String]): Unit ={val conf=new SparkConf()conf.setAppName("BlackListFilter").setMaster("spark://SparkMaster:7077")val sc=new SparkContext(conf)val ssc=new StreamingContext(sc,Seconds(2))//创建StreamingContext,设置每一秒刷新一次。val blackList=Array(("Jim",true),("Kim",true),("KAT",true))//设置需要过滤的黑名单val blackListRDD=ssc.sparkContext.parallelize(blackList,3)//设置并行度,这里指定为3val socketText=ssc.socketTextStream("SparkMaster:7077",9999)//对输入数据进行转换,(id, user) => (user, id user) ,以便对每个批次RDD,与之前定义好的黑名单进行leftOuterJoin操作。val users = socketText.map { l => (l.split(" ")(1),l) }//调用左外连接操作leftOuterJoin,进行黑名单匹配,过滤掉。val validRddDS = users.transform(ld => {val ljoinRdd = ld.leftOuterJoin(blackListRDD)val fRdd = ljoinRdd.filter(tuple => {if(tuple._2._2.getOrElse(false)) {false} else {true}})val validRdd = fRdd.map(tuple => tuple._2._1)validRdd})validRddDS.print()//打印白名单ssc.start()ssc.awaitTermination()}
}
在终端9999中输入一下内容
0001 Kim
0003 hack
0002 Slick
得到如下内容
hack
Slick
可以看到已经将黑名单里面的Kim给过滤掉。
这篇关于Spark Streaming模拟网络热搜词和黑客过滤的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!