网站日志实时分析之Flink处理实时热门和PVUV统计

2024-09-06 20:38

本文主要是介绍网站日志实时分析之Flink处理实时热门和PVUV统计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

实时热门统计

操作步骤:

  • 先从Kafka读取消费数据

  • 使用map算子对数据进行预处理

  • 过滤数据,只留住pv数据

  • 使用timewindow,每隔10秒创建一个20秒的window

  • 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据

  • 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby

  • 使用底层API操作,对每个时间窗口内的数据进行排序,取top

package com.ongbo.hotAnalysisimport java.sql.Timestamp
import java.util.Propertiesimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffer/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)object HotItems {def main(args: Array[String]): Unit = {//1:创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//设置为事件事件env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//2:读取数据/*kafka源*/val properties = new Properties()properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")properties.setProperty("group.id","web-consumer-group")properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset","latest")val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
//    val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv").map(data =>{System.out.println("data:"+data)val dataArray = data.split(",")
//        if(dataArray(0).equals("ij"))UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L)//3:transform处理数据val processStream = dataStream//筛选出埋点pv数据.filter(_.behavior.equals("pv"))//先对itemID进行分组.keyBy(_.itemId)//然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口.timeWindow(Time.seconds(20), Time.seconds(10))//窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby.aggregate(new CountAgg(), new WindowResult()).keyBy(_.windowEnd)      //按照窗口分组.process(new TopNHotItems(10))//sink:输出数据processStream.print("processStream::")
//    dataStream.print()//执行env.execute("hot Items Job")}
}/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{//累加器初始值override def createAccumulator(): Long = 0//每来一次就加一override def add(in: UserBehavior, acc: Long): Long = acc+1//override def getResult(acc: Long): Long = accoverride def merge(acc: Long, acc1: Long): Long = acc + acc1
}//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit =  {out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))}
}//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {private var itemState: ListState[ItemViewCount] = _override def open(parameters: Configuration): Unit = {itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))}override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {//把每条数据存入状态列表itemState.add(value)//注册一个定时器ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)}//定时器触发时,对所有的数据排序,并输出结果override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {//将所有state中的数据取出,放到一个list Buffer中val allItems: ListBuffer[ItemViewCount] = new ListBuffer()import scala.collection.JavaConversions._for(item <- itemState.get()){allItems += item}//按照点计量count大小排序,sortBy默认是升序,并且取前三个val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)//清空状态itemState.clear()//格式化输出排名结果val result : StringBuilder = new StringBuilderresult.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")//输出每一个商品信息for(i<- sortedItems.indices){val currentItem = sortedItems(i)result.append("No").append(i+1).append(":").append("  商品ID:").append(currentItem.itemId).append("  浏览量:").append(currentItem.count).append("\n")}result.append("============================\n")//控制输出频率Thread.sleep(1000)out.collect(result.toString())}
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{override def createAccumulator(): (Long, Int) = (0L,0)override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}

实时PV统计

这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。
这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。
package com.ongbo.NetWorkFlow_Analysisimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)object PageVies {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile(resource.getPath).map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map(data => ("pv", 1)).keyBy(_._1).timeWindow(Time.hours(1)).sum(1)dataStream.print("pv count")env.execute("PV")}
}

实时UV统计:布隆过滤器

我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
package com.ongbo.NetWorkFlow_Analysisimport com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedisobject UvWithBloom {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv").map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map( data => ("dummyKey",data.userId)).keyBy(_._1).timeWindow(Time.hours(1)).trigger(new MyTrigger()).process(new UvCountWithBloom())dataStream.print()env.execute()}
}//自定义窗口触发器
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {//每来一条数据就直接触发窗口操作,并清空所有状态TriggerResult.FIRE_AND_PURGE}override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] {// 定义Redis连接lazy val jedis = new Jedis("114.116.219.97",5000)//29位,也就是64Mlazy val bloom = new Bloom(1 << 29)override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {//位图的存储方式 , key是windowwen,value是位图val storeKey = context.window.getEnd.toStringvar count = 0L//把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取if(jedis.hget("count",storeKey) != null){
//      System.out.println(v)count = jedis.hget("count",storeKey).toLong}//用布隆过滤器判断当前用户是否已经存在val userId = elements.last._2.toStringval offset = bloom.hash(userId, 61)//定义一个标志位,判断Redis位图中有没有这一位val isExist = jedis.getbit(storeKey, offset)if(!isExist){//如果不存在位图对应位置变成1,count+1jedis.setbit(storeKey,offset,true)jedis.hset("count",storeKey,(count+1).toString)out.collect(UvCount(storeKey.toLong,count+1))}else{out.collect(UvCount(storeKey.toLong,count))}}
}class Bloom(size: Long) extends Serializable{//位图大小private val cap = if(size>0) size else 1 << 27//定义Hash函数def hash(value: String, seed: Int) : Long = {var result:Long = 0Lfor(i <- 0 until value.length){result = result * seed + value.charAt(i)}result & (cap-1)}
}

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于网站日志实时分析之Flink处理实时热门和PVUV统计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143059

相关文章

Go语言使用Buffer实现高性能处理字节和字符

《Go语言使用Buffer实现高性能处理字节和字符》在Go中,bytes.Buffer是一个非常高效的类型,用于处理字节数据的读写操作,本文将详细介绍一下如何使用Buffer实现高性能处理字节和... 目录1. bytes.Buffer 的基本用法1.1. 创建和初始化 Buffer1.2. 使用 Writ

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

C++中实现调试日志输出

《C++中实现调试日志输出》在C++编程中,调试日志对于定位问题和优化代码至关重要,本文将介绍几种常用的调试日志输出方法,并教你如何在日志中添加时间戳,希望对大家有所帮助... 目录1. 使用 #ifdef _DEBUG 宏2. 加入时间戳:精确到毫秒3.Windows 和 MFC 中的调试日志方法MFC

SpringBoot如何使用TraceId日志链路追踪

《SpringBoot如何使用TraceId日志链路追踪》文章介绍了如何使用TraceId进行日志链路追踪,通过在日志中添加TraceId关键字,可以将同一次业务调用链上的日志串起来,本文通过实例代码... 目录项目场景:实现步骤1、pom.XML 依赖2、整合logback,打印日志,logback-sp

Python视频处理库VidGear使用小结

《Python视频处理库VidGear使用小结》VidGear是一个高性能的Python视频处理库,本文主要介绍了Python视频处理库VidGear使用小结,文中通过示例代码介绍的非常详细,对大家的... 目录一、VidGear的安装二、VidGear的主要功能三、VidGear的使用示例四、VidGea

Python结合requests和Cheerio处理网页内容的操作步骤

《Python结合requests和Cheerio处理网页内容的操作步骤》Python因其简洁明了的语法和强大的库支持,成为了编写爬虫程序的首选语言之一,requests库是Python中用于发送HT... 目录一、前言二、环境搭建三、requests库的基本使用四、Cheerio库的基本使用五、结合req

使用Python处理CSV和Excel文件的操作方法

《使用Python处理CSV和Excel文件的操作方法》在数据分析、自动化和日常开发中,CSV和Excel文件是非常常见的数据存储格式,ython提供了强大的工具来读取、编辑和保存这两种文件,满足从基... 目录1. CSV 文件概述和处理方法1.1 CSV 文件格式的基本介绍1.2 使用 python 内

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery