sparkstreaming的实时黑名单过滤太慢

2024-08-24 19:58

本文主要是介绍sparkstreaming的实时黑名单过滤太慢,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

官网推荐如下这种方法进行过滤,但是这种方法其实有很大弊端,left out join如果黑名单数据量很大就会很伤,其实真不好。

object TransformBlackList {def main(args: Array[String]): Unit = {//获取streamingContextval sc=new StreamingContext(new SparkConf().setAppName("transform").setMaster("local[2]"),Durations.seconds(8))/*** 创建模拟数据*/val black=List(("lily",true))//需要sparkContextval blackRDD=sc.sparkContext.parallelize(black)//监听h15上的9999端口val logs=sc.socketTextStream("localhost", 9999)//分隔mapval ds=logs.map { x => (x.split(" ")(1),x)}//创建transform操作val endDs =ds.transform( my=>{//左内连接:对于rdd和DStream连接     join是rdd和rdd连接val joinsRDD=my.leftOuterJoin(blackRDD)joinsRDD.foreach(x=>println(x))//过滤val endRDD=joinsRDD.filter(tuple=>{/*** 举例说明:* val cd=scores.getOrElse("Bob", 0)* 如果scores包含Bob,那么返回Bob,如果不包含,那么返回0*///意思是:tuple._2._2能get到值,返回值,如果不能得到值,返回falseif (tuple._2._2.getOrElse(false)) {false}else{true}})//返回值endRDD.map(_._2._1)})//打印endDs.print()//开启sc.start()//等待sc.awaitTermination()//关闭资源sc.stop()}
}

解决方案(我就不提供代码了,提供以下思路):

我是把黑名单查询出来放入到hashmap中,然后广播出去,处理业务逻辑时,IF(!hashmap.containtKey(key))不在里面的数据进行处理,这样你的时间复杂度永远为1,如果你用left out join 来做200万的黑名单乘以你的sparkstreaming窗口处理的数据量1万条,所以left join方式很不好

这篇关于sparkstreaming的实时黑名单过滤太慢的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1103460

相关文章

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

java streamfilter list 过滤的实现

《javastreamfilterlist过滤的实现》JavaStreamAPI中的filter方法是过滤List集合中元素的一个强大工具,可以轻松地根据自定义条件筛选出符合要求的元素,本文就来... 目录1. 创建一个示例List2. 使用Stream的filter方法进行过滤3. 自定义过滤条件1. 定

Redis如何实现刷票过滤

《Redis如何实现刷票过滤》:本文主要介绍Redis如何实现刷票过滤问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录引言一、概述二、技术选型三、搭建开发环境四、使用Redis存储数据四、使用SpringBoot开发应用五、 实现同一IP每天刷票不得超过次数六

Mybatis拦截器如何实现数据权限过滤

《Mybatis拦截器如何实现数据权限过滤》本文介绍了MyBatis拦截器的使用,通过实现Interceptor接口对SQL进行处理,实现数据权限过滤功能,通过在本地线程变量中存储数据权限相关信息,并... 目录背景基础知识MyBATis 拦截器介绍代码实战总结背景现在的项目负责人去年年底离职,导致前期规

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

pandas数据过滤

Pandas 数据过滤方法 Pandas 提供了多种方法来过滤数据,可以根据不同的条件进行筛选。以下是一些常见的 Pandas 数据过滤方法,结合实例进行讲解,希望能帮你快速理解。 1. 基于条件筛选行 可以使用布尔索引来根据条件过滤行。 import pandas as pd# 创建示例数据data = {'Name': ['Alice', 'Bob', 'Charlie', 'Dav

三.海量数据实时分析-FlinkCDC实现Mysql数据同步到Doris

FlinkCDC 同步Mysql到Doris 参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/get-started/quickstart/mysql-to-doris/ 1.安装Flink 下载 Flink 1.18.0,下载后把压缩包上传到服务器,使用tar -zxvf flink-xxx-

Java8特性:分组、提取字段、去重、过滤、差集、交集

总结下自己使用过的特性 将对象集合根据某个字段分组 //根据id分组Map<String, List<Bean>> newMap = successCf.stream().collect(Collectors.groupingBy(b -> b.getId().trim())); 获取对象集合里面的某个字段的集合 List<Bean> list = new ArrayList<>