Parquet 文件生成和读取

2024-02-27 23:20
文章标签 读取 生成 parquet

本文主要是介绍Parquet 文件生成和读取,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

      • 一、什么是 Parquet
      • 二、实现 Java 读写 Parquet 的流程
        • 方式一:
        • 遇到的坑:
          • 坑1:ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge
          • 坑2:No FileSystem for scheme "file"
          • 坑3:与 spark-sql 的引入冲突
        • 方式二:

一、什么是 Parquet

  Parquet 是一种列式存储格式,用于高效地存储和处理大规模数据集。它被广泛应用于大数据处理和分析场景中,例如 Apache Hadoop、Apache Spark 等。

  与传统的行式存储格式(如CSV和JSON)相比,Parquet 能够显著提高读写性能和存储效率。它将数据按列进行存储,而不是按行存储,这样可以更好地利用存储空间,减少 I/O 开销,并提供更高的压缩比。

二、实现 Java 读写 Parquet 的流程

方式一:

  Maven 依赖:

        <dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version></dependency>
[root@local~]# vim schema.avsc
{"type": "record","name": "User","fields": [{"name": "field1","type": "string"}, {"name": "field2","type": "int"}]
}
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;import java.io.File;
import java.io.IOException;public class WriteToParquet {public static void main(String[] args) {try {// 创建Schema对象Schema schema = new Schema.Parser().parse(new File("schema.avsc"));// 方式二:不需要读文件// Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"int\"}]}");// 创建GenericRecord对象GenericRecord record = new GenericData.Record(schema);record.put("field1", "value1");record.put("field2", 123);// 创建ParquetWriter对象ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path("output.parquet")).withSchema(schema).withCompressionCodec(CompressionCodecName.SNAPPY).build();// 将数据写入Parquet文件writer.write(record);// 关闭ParquetWriterwriter.close();// 创建ParquetReader对象ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path("output.parquet")).build();// 读取Parquet文件中的数据// GenericRecord record;while ((record = reader.read()) != null) {// 处理每一条记录System.out.println(record.get("field1"));System.out.println(record.get("field2"));}// 关闭ParquetReaderreader.close();} catch (IOException e) {e.printStackTrace();}}
}
[root@local~]# java -cp /huiq/only-maven-1.0-SNAPSHOT-jar-with-dependencies.jar WriteToParquet
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (org.apache.htrace.core.Tracer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
value1
123

参考:
java写parquet
java parquet AvroParquetWriter

遇到的坑:
坑1:ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge

  一开始引入的依赖:

        <dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.0.0</version></dependency>

  报错:

Exception in thread "main" java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonMergeat com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.<clinit>(JacksonAnnotationIntrospector.java:50)at com.fasterxml.jackson.databind.ObjectMapper.<clinit>(ObjectMapper.java:351)at org.apache.avro.Schema.<clinit>(Schema.java:109)at org.apache.avro.Schema$Parser.parse(Schema.java:1413)at WriteToParquet.main(WriteToParquet.java:21)
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMergeat java.net.URLClassLoader.findClass(URLClassLoader.java:381)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 5 more

  解决:

        <dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.0.0</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId></exclusion></exclusions></dependency>

  原因:我看当引入 hadoop-client 3.3.1 版本的时候 maven 依赖库里是 jackson-annotations-2.11.3.jar,但引入 hadoop-client 3.0.0 版本的时候 maven 依赖库里是 jackson-annotations-2.7.8.jar 执行程序会报上面那个错,于是在 3.0.0 版本中去掉 jackson-annotations 依赖后看 maven 依赖库里就是 jackson-annotations-2.11.3.jar 了。后来测试 jackson-annotations-2.6.7.jar 也正常。

坑2:No FileSystem for scheme “file”

  整合到项目中报错:org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
  解决:增加如下代码

            Configuration conf = new Configuration();conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");// 或者
//            conf.set("fs.hdfs.impl",
//                    org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
//            );
//            conf.set("fs.file.impl",
//                    org.apache.hadoop.fs.LocalFileSystem.class.getName()
//            );FileSystem fs = FileSystem.get(conf); // 这行必须有虽然没有被引用

参考:
java.io.IOException: No FileSystem for scheme: file
MapReduce 踩坑 - hadoop No FileSystem for scheme: file/hdfs
FileSystem及其源码分析

坑3:与 spark-sql 的引入冲突
        <dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.0</version></dependency>

  报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/parquet/schema/LogicalTypeAnnotationat org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:157)at org.apache.parquet.avro.AvroParquetWriter.access$200(AvroParquetWriter.java:36)at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(AvroParquetWriter.java:190)at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:533)at com.heheda.app.SparkWriteCsvToParquet.main(SparkWriteCsvToParquet.java:46)
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.schema.LogicalTypeAnnotationat java.net.URLClassLoader.findClass(URLClassLoader.java:381)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 5 more

  一开始的思路:

        <dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.0</version><exclusion><groupId>org.apache.parquet</groupId><artifactId>parquet-column</artifactId></exclusion></dependency>

  接着又报错:

Exception in thread "main" java.lang.AbstractMethodError: org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg/apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/Statistics;Lorg/apache/parquet/column/Encoding;Lorg
/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;)V	at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)at com.heheda.app.SparkWriteCsvToParquet.main(SparkWriteCsvToParquet.java:52)

注:文章里说不需要 Hadoop 也行,但我没成功,提交到有 Hadoop 环境的服务器上可以运行,但本地 Idea 中报错生成了 parquet 空文件或者没有文件生成:

Exception in thread "main" java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblemsat org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:736)at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:271)at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:287)at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:433)at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521)at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)at org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:327)at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:292)at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:646)at WriteToParquet.main(WriteToParquet.java:33)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblemsat org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:548)at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:569)at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:592)at org.apache.hadoop.util.Shell.<clinit>(Shell.java:689)at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3741)at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3736)at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3520)at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:288)at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)at org.apache.parquet.hadoop.util.HadoopOutputFile.fromPath(HadoopOutputFile.java:58)at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:643)... 1 more
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:468)at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:439)at org.apache.hadoop.util.Shell.<clinit>(Shell.java:516)... 11 more
方式二:

  网上许多写入 parquet 需要在本地安装 haddop 环境,下面介绍一种不需要安装 haddop 即可写入 parquet 文件的方式;

  来自:列式存储格式之parquet读写

  Maven 依赖:

        <dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-core</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>1.8.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro --><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.8.1</version></dependency>
public class User {private String id;private String name;private String password;public User() {}public User(String id, String name, String password) {this.id = id;this.name = name;this.password = password;}public String getId() {return id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}@Overridepublic String toString() {return "User{" +"id='" + id + '\'' +", name='" + name + '\'' +", password='" + password + '\'' +'}';}
}

注:这种方式的 User 实体类和上面方式的 schema.avsc 文件中的 "name": "User" 有冲突,报错:

Exception in thread "main" org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/heheda/output.parquetat org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)at WriteToParquet.main(WriteToParquet.java:55)
Caused by: java.lang.ClassCastException: User cannot be cast to org.apache.avro.generic.IndexedRecordat org.apache.avro.generic.GenericData.setField(GenericData.java:818)at org.apache.parquet.avro.AvroRecordConverter.set(AvroRecordConverter.java:396)at org.apache.parquet.avro.AvroRecordConverter$2.add(AvroRecordConverter.java:132)at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary(AvroConverters.java:64)at org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390)at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)... 3 more
import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;public class WriteToParquet {public static void main(String[] args) {try {List<User> users = new ArrayList<>();User user1 = new User("1","huangchixin","123123");User user2 = new User("2","huangchixin2","123445");users.add(user1);users.add(user2);Path dataFile = new Path("output.parquet");ParquetWriter<User> writer = AvroParquetWriter.<User>builder(dataFile).withSchema(ReflectData.AllowNull.get().getSchema(User.class)).withDataModel(ReflectData.get()).withConf(new Configuration()).withCompressionCodec(SNAPPY).withWriteMode(OVERWRITE).build();for (User user : users) {writer.write(user);}writer.close();} catch (IOException e) {e.printStackTrace();}}
}

  Idea 本地执行:

在这里插入图片描述

这篇关于Parquet 文件生成和读取的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/753873

相关文章

AI一键生成 PPT

AI一键生成 PPT 操作步骤 作为一名打工人,是不是经常需要制作各种PPT来分享我的生活和想法。但是,你们知道,有时候灵感来了,时间却不够用了!😩直到我发现了Kimi AI——一个能够自动生成PPT的神奇助手!🌟 什么是Kimi? 一款月之暗面科技有限公司开发的AI办公工具,帮助用户快速生成高质量的演示文稿。 无论你是职场人士、学生还是教师,Kimi都能够为你的办公文

pdfmake生成pdf的使用

实际项目中有时会有根据填写的表单数据或者其他格式的数据,将数据自动填充到pdf文件中根据固定模板生成pdf文件的需求 文章目录 利用pdfmake生成pdf文件1.下载安装pdfmake第三方包2.封装生成pdf文件的共用配置3.生成pdf文件的文件模板内容4.调用方法生成pdf 利用pdfmake生成pdf文件 1.下载安装pdfmake第三方包 npm i pdfma

poj 1258 Agri-Net(最小生成树模板代码)

感觉用这题来当模板更适合。 题意就是给你邻接矩阵求最小生成树啦。~ prim代码:效率很高。172k...0ms。 #include<stdio.h>#include<algorithm>using namespace std;const int MaxN = 101;const int INF = 0x3f3f3f3f;int g[MaxN][MaxN];int n

poj 1287 Networking(prim or kruscal最小生成树)

题意给你点与点间距离,求最小生成树。 注意点是,两点之间可能有不同的路,输入的时候选择最小的,和之前有道最短路WA的题目类似。 prim代码: #include<stdio.h>const int MaxN = 51;const int INF = 0x3f3f3f3f;int g[MaxN][MaxN];int P;int prim(){bool vis[MaxN];

poj 2349 Arctic Network uva 10369(prim or kruscal最小生成树)

题目很麻烦,因为不熟悉最小生成树的算法调试了好久。 感觉网上的题目解释都没说得很清楚,不适合新手。自己写一个。 题意:给你点的坐标,然后两点间可以有两种方式来通信:第一种是卫星通信,第二种是无线电通信。 卫星通信:任何两个有卫星频道的点间都可以直接建立连接,与点间的距离无关; 无线电通信:两个点之间的距离不能超过D,无线电收发器的功率越大,D越大,越昂贵。 计算无线电收发器D

hdu 1102 uva 10397(最小生成树prim)

hdu 1102: 题意: 给一个邻接矩阵,给一些村庄间已经修的路,问最小生成树。 解析: 把已经修的路的权值改为0,套个prim()。 注意prim 最外层循坏为n-1。 代码: #include <iostream>#include <cstdio>#include <cstdlib>#include <algorithm>#include <cstri

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

poj 3723 kruscal,反边取最大生成树。

题意: 需要征募女兵N人,男兵M人。 每征募一个人需要花费10000美元,但是如果已经招募的人中有一些关系亲密的人,那么可以少花一些钱。 给出若干的男女之间的1~9999之间的亲密关系度,征募某个人的费用是10000 - (已经征募的人中和自己的亲密度的最大值)。 要求通过适当的招募顺序使得征募所有人的费用最小。 解析: 先设想无向图,在征募某个人a时,如果使用了a和b之间的关系

Thymeleaf:生成静态文件及异常处理java.lang.NoClassDefFoundError: ognl/PropertyAccessor

我们需要引入包: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>sp

matlab读取NC文件(含group)

matlab读取NC文件(含group): NC文件数据结构: 代码: % 打开 NetCDF 文件filename = 'your_file.nc'; % 替换为你的文件名% 使用 netcdf.open 函数打开文件ncid = netcdf.open(filename, 'NC_NOWRITE');% 查看文件中的组% 假设我们想读取名为 "group1" 的组groupName