网站日志实时分析之Flink处理实时热门和PVUV统计

2024-09-06 20:38

本文主要是介绍网站日志实时分析之Flink处理实时热门和PVUV统计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

实时热门统计

操作步骤:

  • 先从Kafka读取消费数据

  • 使用map算子对数据进行预处理

  • 过滤数据,只留住pv数据

  • 使用timewindow,每隔10秒创建一个20秒的window

  • 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据

  • 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby

  • 使用底层API操作,对每个时间窗口内的数据进行排序,取top

package com.ongbo.hotAnalysisimport java.sql.Timestamp
import java.util.Propertiesimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffer/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)object HotItems {def main(args: Array[String]): Unit = {//1:创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//设置为事件事件env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//2:读取数据/*kafka源*/val properties = new Properties()properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")properties.setProperty("group.id","web-consumer-group")properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset","latest")val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
//    val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv").map(data =>{System.out.println("data:"+data)val dataArray = data.split(",")
//        if(dataArray(0).equals("ij"))UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L)//3:transform处理数据val processStream = dataStream//筛选出埋点pv数据.filter(_.behavior.equals("pv"))//先对itemID进行分组.keyBy(_.itemId)//然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口.timeWindow(Time.seconds(20), Time.seconds(10))//窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby.aggregate(new CountAgg(), new WindowResult()).keyBy(_.windowEnd)      //按照窗口分组.process(new TopNHotItems(10))//sink:输出数据processStream.print("processStream::")
//    dataStream.print()//执行env.execute("hot Items Job")}
}/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{//累加器初始值override def createAccumulator(): Long = 0//每来一次就加一override def add(in: UserBehavior, acc: Long): Long = acc+1//override def getResult(acc: Long): Long = accoverride def merge(acc: Long, acc1: Long): Long = acc + acc1
}//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit =  {out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))}
}//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {private var itemState: ListState[ItemViewCount] = _override def open(parameters: Configuration): Unit = {itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))}override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {//把每条数据存入状态列表itemState.add(value)//注册一个定时器ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)}//定时器触发时,对所有的数据排序,并输出结果override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {//将所有state中的数据取出,放到一个list Buffer中val allItems: ListBuffer[ItemViewCount] = new ListBuffer()import scala.collection.JavaConversions._for(item <- itemState.get()){allItems += item}//按照点计量count大小排序,sortBy默认是升序,并且取前三个val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)//清空状态itemState.clear()//格式化输出排名结果val result : StringBuilder = new StringBuilderresult.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")//输出每一个商品信息for(i<- sortedItems.indices){val currentItem = sortedItems(i)result.append("No").append(i+1).append(":").append("  商品ID:").append(currentItem.itemId).append("  浏览量:").append(currentItem.count).append("\n")}result.append("============================\n")//控制输出频率Thread.sleep(1000)out.collect(result.toString())}
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{override def createAccumulator(): (Long, Int) = (0L,0)override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}

实时PV统计

这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。
这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。
package com.ongbo.NetWorkFlow_Analysisimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)object PageVies {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile(resource.getPath).map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map(data => ("pv", 1)).keyBy(_._1).timeWindow(Time.hours(1)).sum(1)dataStream.print("pv count")env.execute("PV")}
}

实时UV统计:布隆过滤器

我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
package com.ongbo.NetWorkFlow_Analysisimport com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedisobject UvWithBloom {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv").map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map( data => ("dummyKey",data.userId)).keyBy(_._1).timeWindow(Time.hours(1)).trigger(new MyTrigger()).process(new UvCountWithBloom())dataStream.print()env.execute()}
}//自定义窗口触发器
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {//每来一条数据就直接触发窗口操作,并清空所有状态TriggerResult.FIRE_AND_PURGE}override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] {// 定义Redis连接lazy val jedis = new Jedis("114.116.219.97",5000)//29位,也就是64Mlazy val bloom = new Bloom(1 << 29)override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {//位图的存储方式 , key是windowwen,value是位图val storeKey = context.window.getEnd.toStringvar count = 0L//把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取if(jedis.hget("count",storeKey) != null){
//      System.out.println(v)count = jedis.hget("count",storeKey).toLong}//用布隆过滤器判断当前用户是否已经存在val userId = elements.last._2.toStringval offset = bloom.hash(userId, 61)//定义一个标志位,判断Redis位图中有没有这一位val isExist = jedis.getbit(storeKey, offset)if(!isExist){//如果不存在位图对应位置变成1,count+1jedis.setbit(storeKey,offset,true)jedis.hset("count",storeKey,(count+1).toString)out.collect(UvCount(storeKey.toLong,count+1))}else{out.collect(UvCount(storeKey.toLong,count))}}
}class Bloom(size: Long) extends Serializable{//位图大小private val cap = if(size>0) size else 1 << 27//定义Hash函数def hash(value: String, seed: Int) : Long = {var result:Long = 0Lfor(i <- 0 until value.length){result = result * seed + value.charAt(i)}result & (cap-1)}
}

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于网站日志实时分析之Flink处理实时热门和PVUV统计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143059

相关文章

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

Spring Boot整合log4j2日志配置的详细教程

《SpringBoot整合log4j2日志配置的详细教程》:本文主要介绍SpringBoot项目中整合Log4j2日志框架的步骤和配置,包括常用日志框架的比较、配置参数介绍、Log4j2配置详解... 目录前言一、常用日志框架二、配置参数介绍1. 日志级别2. 输出形式3. 日志格式3.1 PatternL

nginx部署https网站的实现步骤(亲测)

《nginx部署https网站的实现步骤(亲测)》本文详细介绍了使用Nginx在保持与http服务兼容的情况下部署HTTPS,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值... 目录步骤 1:安装 Nginx步骤 2:获取 SSL 证书步骤 3:手动配置 Nginx步骤 4:测

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

开启mysql的binlog日志步骤详解

《开启mysql的binlog日志步骤详解》:本文主要介绍MySQL5.7版本中二进制日志(bin_log)的配置和使用,文中通过图文及代码介绍的非常详细,需要的朋友可以参考下... 目录1.查看是否开启bin_log2.数据库会把日志放进logs目录中3.查看log日志总结 mysql版本5.71.查看

Redis如何使用zset处理排行榜和计数问题

《Redis如何使用zset处理排行榜和计数问题》Redis的ZSET数据结构非常适合处理排行榜和计数问题,它可以在高并发的点赞业务中高效地管理点赞的排名,并且由于ZSET的排序特性,可以轻松实现根据... 目录Redis使用zset处理排行榜和计数业务逻辑ZSET 数据结构优化高并发的点赞操作ZSET 结

微服务架构之使用RabbitMQ进行异步处理方式

《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发... 目录一.什么是RabbitMQ?二.异步调用处理逻辑:三.RabbitMQ的基本使用1.安装2.架构