Spark Streaming模拟网络热搜词和黑客过滤

2023-12-10 16:59

本文主要是介绍Spark Streaming模拟网络热搜词和黑客过滤,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.网络热搜词

* Created by Jason Shu on 2017/8/5.      */      
import org.apache.spark.streaming.StreamingContext      
import org.apache.spark.SparkConf      
import org.apache.spark.streaming.Seconds      
object Top5 {      def main(args:Array[String]){      val conf=new SparkConf()      conf.setAppName("Top5").setMaster("spark://SparkMaster:7077")//此时程序在Spark集群模式运行      val ssc=new StreamingContext(conf,Seconds(5))//创建StreamingContext,两个参数分别为SparkConf和Durations      val hottestStream=ssc.socketTextStream("SparkMaster:7077", 9999)//设置socket端口号,通过socket端口来手动输入数据      val searchPair=hottestStream.map(_.split("")(1)).map(item=>(item,1))    val hottestDStream=searchPair.reduceByKeyAndWindow((v1:Int,v2:Int)=>v1+v2,Seconds(60),Seconds(20))//设置窗口,时间为60秒,设置滑动,时间为20秒      hottestDStream.transform(hottestItemRDD=>{      val top5=hottestItemRDD.map(pair=>(pair._2,pair._1)).sortByKey(false)      .map(pair=>(pair._2,pair._1)).take(3)      for(item<-top5){      println(item)      }      hottestItemRDD}      ).print()      ssc.start()      ssc.awaitTermination()      }} 
启动hadoop
$HADOOP_HOME$/sbin#./start-dfs.sh
启动Spark
$SPARK_HOME$/sbin#./strat-all.sh
此时jps看一下进程
3857 SecondaryNameNode
3665 NameNode
4021 Master
4061 Jps
Spark和Hadoop都启动之后,将程序打包到集群上运行
$SPARK_HOME$/bin#./spark-submit --class com.dt.sparkstreaming.Top5  --master spark://SparkMaster:7077  /root/Documents/top5.jar
打开Socket端口
nc -lk 9999
手动输入数据
spark hadoop flume spark SQL
spark streaming socket println
sortbykey transform start namenode
master hadoop SQL server catch
exception RDD jps secondary namenode
sbin home submit top5

得到前5热搜词

spark
SQL
hadoop
start
namenode

2.黑客过滤


import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object BlackListFilter {def main(args:Array[String]): Unit ={val conf=new SparkConf()conf.setAppName("BlackListFilter").setMaster("spark://SparkMaster:7077")val sc=new SparkContext(conf)val ssc=new StreamingContext(sc,Seconds(2))//创建StreamingContext,设置每一秒刷新一次。val blackList=Array(("Jim",true),("Kim",true),("KAT",true))//设置需要过滤的黑名单val blackListRDD=ssc.sparkContext.parallelize(blackList,3)//设置并行度,这里指定为3val socketText=ssc.socketTextStream("SparkMaster:7077",9999)//对输入数据进行转换,(id, user) => (user, id user) ,以便对每个批次RDD,与之前定义好的黑名单进行leftOuterJoin操作。val users = socketText.map { l => (l.split(" ")(1),l) }//调用左外连接操作leftOuterJoin,进行黑名单匹配,过滤掉。val validRddDS = users.transform(ld => {val ljoinRdd = ld.leftOuterJoin(blackListRDD)val fRdd = ljoinRdd.filter(tuple => {if(tuple._2._2.getOrElse(false)) {false} else {true}})val validRdd = fRdd.map(tuple => tuple._2._1)validRdd})validRddDS.print()//打印白名单ssc.start()ssc.awaitTermination()}
}

在终端9999中输入一下内容

0001 Kim
0003 hack
0002 Slick


得到如下内容

hack
Slick
可以看到已经将黑名单里面的Kim给过滤掉。




























































这篇关于Spark Streaming模拟网络热搜词和黑客过滤的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/477718

相关文章

如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别详解

《如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别详解》:本文主要介绍如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别的相关资料,描述了如何使用海康威视设备网络SD... 目录前言开发流程问题和解决方案dll库加载不到的问题老旧版本sdk不兼容的问题关键实现流程总结前言作为

SSID究竟是什么? WiFi网络名称及工作方式解析

《SSID究竟是什么?WiFi网络名称及工作方式解析》SID可以看作是无线网络的名称,类似于有线网络中的网络名称或者路由器的名称,在无线网络中,设备通过SSID来识别和连接到特定的无线网络... 当提到 Wi-Fi 网络时,就避不开「SSID」这个术语。简单来说,SSID 就是 Wi-Fi 网络的名称。比如

Java实现任务管理器性能网络监控数据的方法详解

《Java实现任务管理器性能网络监控数据的方法详解》在现代操作系统中,任务管理器是一个非常重要的工具,用于监控和管理计算机的运行状态,包括CPU使用率、内存占用等,对于开发者和系统管理员来说,了解这些... 目录引言一、背景知识二、准备工作1. Maven依赖2. Gradle依赖三、代码实现四、代码详解五

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Mybatis拦截器如何实现数据权限过滤

《Mybatis拦截器如何实现数据权限过滤》本文介绍了MyBatis拦截器的使用,通过实现Interceptor接口对SQL进行处理,实现数据权限过滤功能,通过在本地线程变量中存储数据权限相关信息,并... 目录背景基础知识MyBATis 拦截器介绍代码实战总结背景现在的项目负责人去年年底离职,导致前期规

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

usaco 1.2 Transformations(模拟)

我的做法就是一个一个情况枚举出来 注意计算公式: ( 变换后的矩阵记为C) 顺时针旋转90°:C[i] [j]=A[n-j-1] [i] (旋转180°和270° 可以多转几个九十度来推) 对称:C[i] [n-j-1]=A[i] [j] 代码有点长 。。。 /*ID: who jayLANG: C++TASK: transform*/#include<

ASIO网络调试助手之一:简介

多年前,写过几篇《Boost.Asio C++网络编程》的学习文章,一直没机会实践。最近项目中用到了Asio,于是抽空写了个网络调试助手。 开发环境: Win10 Qt5.12.6 + Asio(standalone) + spdlog 支持协议: UDP + TCP Client + TCP Server 独立的Asio(http://www.think-async.com)只包含了头文件,不依