数据序列化机制-Avro

2024-04-26 02:08
文章标签 数据 机制 序列化 avro

本文主要是介绍数据序列化机制-Avro,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

序列化主要是将内存缓冲区、数据结构或者对象中的数据转换为能够在网路上传输或者持久化存储(比如磁盘)中存储的二进制文件。

1.Avro的特性?

1)与语言无关

2)基于模式:Avro会序列化数据时会将模式写入其中,Avro序列化数据到一个压缩的二进制格式

3)使用类Json的格式来描述数据的结构,并且支持多种语言,像Java, C, C++, C#, Python, and Ruby。

4)序列化速度快且序列化过后数据存储体积小

5)支持多种数据类型

2.Avro的schema

Avro的Schema用JSON表示。Schema定义了简单数据类型和复杂数据类型。

基本类型

其中简单数据类型有以下8种:

类型含义
null没有值
boolean布尔值
int32位有符号整数
long64位有符号整数
float单精度(32位)的IEEE 754浮点数
double双精度(64位)的IEEE 754浮点数
bytes8位无符号字节序列
string字符串

基本类型没有属性,基本类型的名字也就是类型的名字,比如:

{"type": "string"}

复杂类型

Avro提供了6种复杂类型。分别是Record,Enum,Array,Map,Union和Fixed。

Record

Record类型使用的类型名字是 “record”,还支持其它属性的设置:

name:record类型的名字(必填)

namespace:命名空间(可选)

doc:这个类型的文档说明(可选)

aliases:record类型的别名,是个字符串数组(可选)

fields:record类型中的字段,是个对象数组(必填)。每个字段需要以下属性:

  1. name:字段名字(必填)
  2. doc:字段说明文档(可选)
  3. type:一个schema的json对象或者一个类型名字(必填)
  4. default:默认值(可选)
  5. order:排序(可选),只有3个值ascending(默认),descending或ignore
  6. aliases:别名,字符串数组(可选)

一个Record类型例子,定义一个元素类型是Long的链表:

{"type": "record", "name": "LongList","aliases": ["LinkedLongs"],                      // old name for this"fields" : [{"name": "value", "type": "long"},             // each element has a long{"name": "next", "type": ["null", "LongList"]} // optional next element]
}

 

Enum

枚举类型的类型名字是”enum”,还支持其它属性的设置:

name:枚举类型的名字(必填)
namespace:命名空间(可选)
aliases:字符串数组,别名(可选)
doc:说明文档(可选)
symbols:字符串数组,所有的枚举值(必填),不允许重复数据。

一个枚举类型的例子:

{ "type": "enum","name": "Suit","symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}

Array

数组类型的类型名字是”array”并且只支持一个属性:

items:数组元素的schema

一个数组例子:

{"type": "array", "items": "string"}

Map

Map类型的类型名字是”map”并且只支持一个属性:

values:map值的schema

Map的key必须是字符串。

一个Map例子:

{"type": "map", "values": "long"}

Union

组合类型,表示各种类型的组合,使用数组进行组合。比如[“null”, “string”]表示类型可以为null或者string。

组合类型的默认值是看组合类型的第一个元素,因此如果一个组合类型包括null类型,那么null类型一般都会放在第一个位置,这样子的话这个组合类型的默认值就是null。

组合类型中不允许同一种类型的元素的个数不会超过1个,除了record,fixed和enum。比如组合类中有2个array类型或者2个map类型,这是不允许的。

组合类型不允许嵌套组合类型。

Fixed

混合类型的类型名字是fixed,支持以下属性:

name:名字(必填)
namespace:命名空间(可选)
aliases:字符串数组,别名(可选)
size:一个整数,表示每个值的字节数(必填)

比如16个字节数的fixed类型例子如下:

{"type": "fixed", "size": 16, "name": "md5"}

1个Avro例子

首先定义一个User的schema:

{
"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name", "type": "string"},{"name": "favorite_number",  "type": "int"},{"name": "favorite_color", "type": "string"}]
}

User有3个属性,分别是name,favorite_number和favorite_color。

json文件内容:

{"name":"format","favorite_number":1,"favorite_color":"red"}
{"name":"format2","favorite_number":2,"favorite_color":"black"}
{"name":"format3","favorite_number":666,"favorite_color":"blue"}

使用avro工具将json文件转换成avro文件:

ava -jar avro-tools-1.8.0.jar fromjson --schema-file user.avsc user.json > user.avro

可以设置压缩格式:

java -jar avro-tools-1.8.0.jar fromjson --codec snappy --schema-file user.avsc user.json > user2.avro

将avro文件反转换成json文件:

java -jar avro-tools-1.8.0.jar tojson user.avro
java -jar avro-tools-1.8.0.jar --pretty tojson user.avro

得到avro文件的meta:

java -jar avro-tools-1.8.0.jar getmeta user.avro

输出:

avro.codec    null
avro.schema    {"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"},{"name":"favorite_color","type":"string"}]}

 将文本文件转换成avro文件:

java -jar avro-tools-1.8.0.jar fromtext user.txt usertxt.avro

Avro使用生成的代码进行序列化和反序列化

以上面一个例子的schema为例讲解。

Avro可以根据schema自动生成对应的类:

java -jar /path/to/avro-tools-1.8.0.jar compile schema user.avsc .

user.avsc的namespace为example.avro,name为User。最终在当前目录生成的example/avro目录下有个User.java文件。

├── example │ └── avro │ └── User.java

使用Avro生成的代码创建User:

User user1 = new User();
user1.setName("Format");
user1.setFavoriteColor("red");
user1.setFavoriteNumber(666);User user2 = new User("Format2", 66, "blue");User user3 = User.newBuilder().setName("Format3").setFavoriteNumber(6).setFavoriteColor("black").build();

可以使用有参的构造函数和无参的构造函数,也可以使用Builder构造User。

序列化:

DatumWrite接口用来把java对象转换成内存中的序列化格式,SpecificDatumWriter用来生成类并且指定生成的类型。

最后使用DataFileWriter来进行具体的序列化,create方法指定文件和schema信息,append方法用来写数据,最后写完后close文件

DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

反序列化:

反序列化跟序列化很像,相应的Writer换成Reader。这里只创建一个User对象是为了性能优化,每次都重用这个User对象,如果文件量很大,对象分配和垃圾收集处理的代价很昂贵。如果不考虑性能,可以使用 for (User user : dataFileReader) 循环遍历对象

File file = new File("users.avro");
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader);
User user = null;
while(dataFileReader.hasNext()) {user = dataFileReader.next(user);System.out.println(user);
}

打印出:

{"name": "Format", "favorite_number": 666, "favorite_color": "red"}
{"name": "Format2", "favorite_number": 66, "favorite_color": "blue"}
{"name": "Format3", "favorite_number": 6, "favorite_color": "black"}

Avro不使用生成的代码进行序列化和反序列化

虽然Avro为我们提供了根据schema自动生成类的方法,我们也可以自己创建类,不使用Avro的自动生成工具。

创建User:

首先使用Parser读取schema信息并且创建Schema类:

Schema schema = new Schema.Parser().parse(new File("user.avsc"));

有了Schema之后可以创建record:

GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Format");
user1.put("favorite_number", 666);
user1.put("favorite_color", "red");GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Format2");
user2.put("favorite_number", 66);
user2.put("favorite_color", "blue");

使用GenericRecord表示User,GenericRecord会根据schema验证字段是否正确,如果put进了不存在的字段 user1.put(“favorite_animal”, “cat”) ,那么运行的时候会得到AvroRuntimeException异常。

序列化:

序列化跟生成的User类似,只不过schema是自己构造的,不是User中拿的。

Schema schema = new Schema.Parser().parse(new File("user.avsc"));
GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Format");
user1.put("favorite_number", 666);
user1.put("favorite_color", "red");GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Format2");
user2.put("favorite_number", 66);
user2.put("favorite_color", "blue");DatumWriter<GenericRecord> datumWriter = new SpecificDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, new File("users2.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.close();

反序列化:

反序列化跟生成的User类似,只不过schema是自己构造的,不是User中拿的。

Schema schema = new Schema.Parser().parse(new File("user.avsc"));
File file = new File("users2.avro");
DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
GenericRecord user = null;
while(dataFileReader.hasNext()) {user = dataFileReader.next(user);System.out.println(user);
}

打印出:

{"name": "Format", "favorite_number": 666, "favorite_color": "red"}
{"name": "Format2", "favorite_number": 66, "favorite_color": "blue"}

一些注意点

Avro解析json文件的时候,如果类型是Record并且里面有字段是union并且允许空值的话,需要进行转换。因为[“bytes”, “string”]和[“int”,”long”]这2个union类型在json中是有歧义的,第一个union在json中都会被转换成string类型,第二个union在json中都会被转换成数字类型。

所以如果json值的null的话,在avro提供的json中直接写null,否则使用只有一个键值对的对象,键是类型,值的具体的值。

比如:

{
"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name", "type": "string"},{"name": "favorite_number",  "type": ["int","null"]},{"name": "favorite_color", "type": ["string","null"]}]
}

在要转换成json文件的时候要写成这样:

{"name":"format","favorite_number":{"int":1},"favorite_color":{"string":"red"}}
{"name":"format2","favorite_number":null,"favorite_color":{"string":"black"}}
{"name":"format3","favorite_number":{"int":66},"favorite_color":null}

Spark读取Avro文件

直接遍历avro文件,得到GenericRecord进行处理:

val conf = new SparkConf().setMaster("local").setAppName("AvroTest")val sc = new SparkContext(conf)val rdd = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](this.getClass.getResource("/").toString + "users.avro")val nameRdd = rdd.map(s => s._1.datum().get("name").toString)nameRdd.collect().foreach(println)

使用Avro需要注意的地方

笔者使用Avro的时候暂时遇到了下面2个坑。先记录一下,以后遇到新的坑会更新这篇文章。

1.如果定义了unions类型的字段,而且unions中有null选项的schema,比如如下schema:

{
"namespace": "example.avro","type": "record","name": "User2","fields": [{"name": "name", "type": "string"},{"name": "favorite_number",  "type": ["null","int"]},{"name": "favorite_color", "type": ["null","string"]}]
}

这样的schema,如果不使用Avro自动生成的model代码进行insert,并且insert中的model数据有null数据的话。然后用spark读avro文件的话,会报org.apache.avro.AvroTypeException: Found null, expecting int … 这样的错误。

这一点很奇怪,但是使用Avro生成的Model进行insert的话,sprak读取就没有任何问题。 很困惑。

2.如果使用了Map类型的字段,avro生成的model中的Map的Key默认类型为CharSequence。这种model我们insert数据的话,用String是没有问题的。但是spark读取之后要根据Key拿这个Map数据的时候,永远得到的是null。

stackoverflow上有一个页面说到了这个问题。http://stackoverflow.com/questions/19728853/apache-avro-map-uses-charsequence-as-key

需要在map类型的字段里加上”avro.java.string”: “String”这个选项, 然后compile的时候使用-string参数即可。

比如以下这个schema:

{
"namespace": "example.avro","type": "record","name": "User3","fields": [{"name": "name", "type": "string"},{"name": "favorite_number",  "type": ["null","int"]},{"name": "favorite_color", "type": ["null","string"]},{"name": "scores", "type": ["null", {"type": "map", "values": "string", "avro.java.string": "String"}]}]
}

 

这篇关于数据序列化机制-Avro的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/936431

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X

pandas数据过滤

Pandas 数据过滤方法 Pandas 提供了多种方法来过滤数据,可以根据不同的条件进行筛选。以下是一些常见的 Pandas 数据过滤方法,结合实例进行讲解,希望能帮你快速理解。 1. 基于条件筛选行 可以使用布尔索引来根据条件过滤行。 import pandas as pd# 创建示例数据data = {'Name': ['Alice', 'Bob', 'Charlie', 'Dav