如何在Scala中读取Hadoop集群上的gz压缩文件

2024-05-15 03:18

本文主要是介绍如何在Scala中读取Hadoop集群上的gz压缩文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩后的文件,我们直接在应用程序中如何读取里面的数据?答案是肯定的,但是比普通的文本读取要稍微复杂一点,需要使用到Hadoop的压缩工具类支持,比如处理gz,snappy,lzo,bz压缩的,前提是首先我们的Hadoop集群得支持上面提到的各种压缩文件。

本次就给出一个读取gz压缩文件的例子核心代码:

def readHdfsWriteKafkaByDate(fs:FileSystem,date:String,conf:Configuration,topic:String,finishTimeStamp:Long):Unit={//访问hdfs文件,只读取gz结尾的压缩文件,如果是.tmp结尾的不会读取val path=new Path("/collect_data/userlog/"+date+"/log*.gz")//实例化压缩工厂编码类val factory = new CompressionCodecFactory(conf)//读取通配路径val items=fs.globStatus(path)var count=0//遍历每一个路径文件items.foreach(f=>{//打印全路径println(f.getPath)//通过全路径获取其编码val codec = factory.getCodec(f.getPath())//获取编码//读取成数据流var  stream:InputStream = null;if(codec!=null){//如果编码识别直接从编码创建输入流stream = codec.createInputStream(fs.open(f.getPath()))}else{//如果不识别则直接打开stream = fs.open(f.getPath())}val writer=new StringWriter()//将字节流转成字符串流IOUtils.copy(stream,writer,"UTF-8")//得到字符串内容val raw=writer.toString//根据字符串内容split出所有的行数据,至此解压数据完毕val raw_array=raw.split("\n")//遍历数据      raw_array.foreach(line=>{val array = line.split("--",2) //拆分数组val map = JSON.parseObject(array(1)).asScalaval userId = map.get("userId").getOrElse("").asInstanceOf[String] //为空为非法数据val time = map.get("time").getOrElse("") //为空为非法数据if(StringUtils.isNotEmpty(userId)&&(time+"").toLong<=finishTimeStamp){//只有数据pushToKafka(topic,userId,line)count=count+1}})})}

压缩和解压模块用的工具包是apache-commons下面的类:

import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils

如果想在Windows上调试,可以直接设置HDFS的地址即可

-     val conf = new Configuration()//获取hadoop的conf
//    conf.set("fs.defaultFS","hdfs://192.168.10.14:8020/")//windows上调试用

至此数据已经解压并读取完毕,其实并不是很复杂,用java代码和上面的代码也差不多类似,如果直接用原生的api读取会稍微复杂,但如果我们使用Hive,Spark框架的时候,框架内部会自动帮我们完成压缩文件的读取或者写入,对用户透明,当然底层也是封装了不同压缩格式的读取和写入代码,这样以来使用者将会方便许多。

参考文章:

https://blog.matthewrathbone.com/2013/12/28/reading-data-from-hdfs-even-if-it-is-compressed


有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

输入图片说明



这篇关于如何在Scala中读取Hadoop集群上的gz压缩文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/990656

相关文章

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

SpringBoot使用Apache POI库读取Excel文件的操作详解

《SpringBoot使用ApachePOI库读取Excel文件的操作详解》在日常开发中,我们经常需要处理Excel文件中的数据,无论是从数据库导入数据、处理数据报表,还是批量生成数据,都可能会遇到... 目录项目背景依赖导入读取Excel模板的实现代码实现代码解析ExcelDemoInfoDTO 数据传输

Python读取TIF文件的两种方法实现

《Python读取TIF文件的两种方法实现》本文主要介绍了Python读取TIF文件的两种方法实现,包括使用tifffile库和Pillow库逐帧读取TIFF文件,具有一定的参考价值,感兴趣的可以了解... 目录方法 1:使用 tifffile 逐帧读取安装 tifffile:逐帧读取代码:方法 2:使用

Nacos集群数据同步方式

《Nacos集群数据同步方式》文章主要介绍了Nacos集群中服务注册信息的同步机制,涉及到负责节点和非负责节点之间的数据同步过程,以及DistroProtocol协议在同步中的应用... 目录引言负责节点(发起同步)DistroProtocolDistroSyncChangeTask获取同步数据getDis

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数