本文主要是介绍sparkstreaming的实时黑名单过滤太慢,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
官网推荐如下这种方法进行过滤,但是这种方法其实有很大弊端,left out join如果黑名单数据量很大就会很伤,其实真不好。
object TransformBlackList {def main(args: Array[String]): Unit = {//获取streamingContextval sc=new StreamingContext(new SparkConf().setAppName("transform").setMaster("local[2]"),Durations.seconds(8))/*** 创建模拟数据*/val black=List(("lily",true))//需要sparkContextval blackRDD=sc.sparkContext.parallelize(black)//监听h15上的9999端口val logs=sc.socketTextStream("localhost", 9999)//分隔mapval ds=logs.map { x => (x.split(" ")(1),x)}//创建transform操作val endDs =ds.transform( my=>{//左内连接:对于rdd和DStream连接 join是rdd和rdd连接val joinsRDD=my.leftOuterJoin(blackRDD)joinsRDD.foreach(x=>println(x))//过滤val endRDD=joinsRDD.filter(tuple=>{/*** 举例说明:* val cd=scores.getOrElse("Bob", 0)* 如果scores包含Bob,那么返回Bob,如果不包含,那么返回0*///意思是:tuple._2._2能get到值,返回值,如果不能得到值,返回falseif (tuple._2._2.getOrElse(false)) {false}else{true}})//返回值endRDD.map(_._2._1)})//打印endDs.print()//开启sc.start()//等待sc.awaitTermination()//关闭资源sc.stop()}
}
解决方案(我就不提供代码了,提供以下思路):
我是把黑名单查询出来放入到hashmap中,然后广播出去,处理业务逻辑时,IF(!hashmap.containtKey(key))不在里面的数据进行处理,这样你的时间复杂度永远为1,如果你用left out join 来做200万的黑名单乘以你的sparkstreaming窗口处理的数据量1万条,所以left join方式很不好
这篇关于sparkstreaming的实时黑名单过滤太慢的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!