Spark+Hbase 亿级流量分析实战(小巧高性能的ETL)

2024-01-10 10:48

本文主要是介绍Spark+Hbase 亿级流量分析实战(小巧高性能的ETL),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在上一篇文章 大猪 已经介绍了日志存储设计方案 ,我们数据已经落地到数据中心上了,那接下来如何ETL呢?毕竟可是生产环境级别的,可不能乱来。其实只要解决几个问题即可,不必要引入很大级别的组件来做,当然了各有各的千秋,本文主要从 易懂小巧简洁高性能 这三个方面去设计出发点,顺便还实现了一个精巧的 Filebeat。

9028759-f64f78b25e312817.png

要实现的功能就是扫描每天的增量日志并写入Hbase中

9028759-1e383558758292e8.png

需要搞定下面几个不务正业的小老弟

9028759-c294a9d51452c16b.png
  1. 需要把文件中的每一行数据都取出来
  2. 能处理超过10G以上的大日志文件,并且只能占用机器一定的内存,越小越好
  3. 从上图可以看到标黄的是已经写入Hbase的数据,不能重复读取
  4. 非活跃文件不能扫,因为文件过多会影响整体读取IO性能
  5. 读取中的过程要保证增量数据不能录入,因为要保证offset的时候写入mysql稳定不跳跃
9028759-4085e8474dcefb43.png

大猪 根据线上的生产环境一一把上面的功能重新分析给实现一下。

从第一点看还是比较简单的嘛?但是我们要结合上面的 5 个问题来看才行。

总结一句话就是:要实现一个高性能而且能随时重启继续工作的 loghub ETL 程序

实际也必需这样做,因为生产环境容不得马虎,不然就等着被BOSS

9028759-a51c023de6fb202d.png
9028759-bfc932f12f86e08f.png

需要有一个读取所有日志文件方法

9028759-1ea5a6cf0b8a638f.png

还要实现一个保存并读取文件进度的方法

9028759-1853910d9211c234.png

由于不能把一个日志文件全部读入内存进行处理
所以还需要一个能根据索引一行一行接着读取数据的方法

9028759-d689a95bd39e85e4.png

最后剩下一个Hbase的连接池小工具

9028759-682336126ccefe66.png

几个核心方法已经写完了,接着是我们的主程序

def run(logPath: File, defaultOffsetDay: String): Unit = {val sdfstr = Source.fromFile(seekDayFile).getLines().mkStringval offsetDay = Option(if (sdfstr == "") null else sdfstr)//读取设置读取日期的倒数一天之后的日期文件夹val noneOffsetFold = logPath.listFiles().filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString).sortBy(f => LocalDate.parse(f.getName).toEpochDay)//读取文件夹中的所有日志文件,并取出索引进行匹配val filesPar = noneOffsetFold.flatMap(files(_, file => file.getName.endsWith(".log"))).map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), file.length())).filter(tp2 => {//过滤出新文件,与有增量的日志文件val fileMd5 = MD5Hash.getMD5AsHex(tp2._1.getAbsolutePath.getBytes())val result = offsets.asScala.filter(m => fileMd5.equals(m._1))result.isEmpty || tp2._3 > result.head._2}).parfilesPar.tasksupport = poolval willUpdateOffset = new util.HashMap[String, Long]()val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")var logTime:String = nullfilesPar.foreach(tp3 => {val hbaseClient = HbasePool.getTable//因为不能全量读取数据,所有只能一条一条读取,批量提出交给HbaseClient的客户端的mutate方式优雅处理//foreach 里面的部分就是我们的业务处理部分lines(tp3._1, tp3._2, tp3._3, () => {willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3)offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3)}).foreach(line => {val jsonObject = parse(line)val time = (jsonObject \ "time").extract[Long]val data = jsonObject \ "data"val dataMap = data.values.asInstanceOf[Map[String, Any]].filter(_._2 != null).map(x => x._1 -> x._2.toString)val uid = dataMap("uid")logTime = time.getLocalDateTime.toStringval rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)val row = new Put(Bytes.toBytes(rowkey))dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.toBytes(tp2._2)))hbaseClient.mutate(row)})hbaseClient.flush()})//更新索引到文件上writeSeek(willUpdateOffset)//更新索引日期到文件上writeSeekDay(noneOffsetFold.last.getName)//把 logTime offset 写到mysql中,方便Spark+Hbase程序读取并计算}

程序很精简,没有任何没用的功能在里面,线上的生产环境就应该是这子的了。
大家还可以根据需求加入程序退出发邮件通知功能之类的。
真正去算了一下也就100行功能代码,而且占用极小的内存,都不到100M,很精很精。

9028759-31d65a54c8cfda02.png

传送门 完整ETL程序源码

心明眼亮的你、从此刻开始。

9028759-7586fee586f2075b.png
9028759-4c3677dea1cf2734.png

这篇关于Spark+Hbase 亿级流量分析实战(小巧高性能的ETL)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/590528

相关文章

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Pandas使用SQLite3实战

《Pandas使用SQLite3实战》本文主要介绍了Pandas使用SQLite3实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1 环境准备2 从 SQLite3VlfrWQzgt 读取数据到 DataFrame基础用法:读

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

找不到Anaconda prompt终端的原因分析及解决方案

《找不到Anacondaprompt终端的原因分析及解决方案》因为anaconda还没有初始化,在安装anaconda的过程中,有一行是否要添加anaconda到菜单目录中,由于没有勾选,导致没有菜... 目录问题原因问http://www.chinasem.cn题解决安装了 Anaconda 却找不到 An

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

Python实战之屏幕录制功能的实现

《Python实战之屏幕录制功能的实现》屏幕录制,即屏幕捕获,是指将计算机屏幕上的活动记录下来,生成视频文件,本文主要为大家介绍了如何使用Python实现这一功能,希望对大家有所帮助... 目录屏幕录制原理图像捕获音频捕获编码压缩输出保存完整的屏幕录制工具高级功能实时预览增加水印多平台支持屏幕录制原理屏幕

C++ 各种map特点对比分析

《C++各种map特点对比分析》文章比较了C++中不同类型的map(如std::map,std::unordered_map,std::multimap,std::unordered_multima... 目录特点比较C++ 示例代码 ​​​​​​代码解释特点比较1. std::map底层实现:基于红黑

最新Spring Security实战教程之Spring Security安全框架指南

《最新SpringSecurity实战教程之SpringSecurity安全框架指南》SpringSecurity是Spring生态系统中的核心组件,提供认证、授权和防护机制,以保护应用免受各种安... 目录前言什么是Spring Security?同类框架对比Spring Security典型应用场景传统