Flink BucketingSink with Custom AvroParquetWriter create empty file

2024-06-02 17:58

本文主要是介绍Flink BucketingSink with Custom AvroParquetWriter create empty file,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

问题:

I have created a writer for BucketingSink. The sink and writer works without error but when it comes to the writer writing avro genericrecord to parquet, the file was created from in-progress, pending to complete. But the files are empty with 0 bytes. Can anyone tell me what is wrong with the code ? I have tried placing the initialization of AvroParquetWriter at the open() method, but result still the same.

When debugging the code, I confirm that writer.write(element) does executed and element contain the avro genericrecord data

Streaming Data

 

BucketingSink<DataEventRecord> sink =new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/");sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm"));
sink.setWriter(new ParquetSinkWriter<DataEventRecord>());

 

ParquetSinkWriter

import java.io.File;
import java.io.IOException;import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import com.any.DataEventRecord;public class ParquetSinkWriter<T> extends StreamWriterBase<T> {private transient ParquetWriter<GenericRecord> writer;private Path path;private FileSystem fs;private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;private final int blockSize = 256 * 1024 * 1024;private final int pageSize = 64 * 1024;@Override// workaroundpublic void open(FileSystem fs, Path path) throws IOException {super.open(fs, path);this.path = path;this.fs = fs;}@Overridepublic void write(T event) throws IOException {DataEventRecord element = (DataEventRecord) event;if (writer == null) {writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize);}if (writer != null) {GenericRecord datum = element.getRecord();writer.write(datum);}}@Overridepublic void close() throws IOException {if (writer != null) {writer.close();}super.close();}@Overridepublic Writer<T> duplicate() {return new ParquetSinkWriter<T>();}}

 

回答1:

Directly implementing Writer should look like

import org.apache.flink.util.Preconditions;import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;import java.io.IOException;/*** Parquet writer.** @param <T>*/
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> {private static final long serialVersionUID = -975302556515811398L;private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;private final int pageSize = 64 * 1024;private final String schemaRepresentation;private transient Schema schema;private transient ParquetWriter<GenericRecord> writer;private transient Path path;private int position;public ParquetSinkWriter(String schemaRepresentation) {this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation);}@Overridepublic void open(FileSystem fs, Path path) throws IOException {this.position = 0;this.path = path;if (writer != null) {writer.close();}writer = createWriter();}@Overridepublic long flush() throws IOException {Preconditions.checkNotNull(writer);position += writer.getDataSize();writer.close();writer = createWriter();return position;}@Overridepublic long getPos() throws IOException {Preconditions.checkNotNull(writer);return position + writer.getDataSize();}@Overridepublic void close() throws IOException {if (writer != null) {writer.close();writer = null;}}@Overridepublic void write(T element) throws IOException {Preconditions.checkNotNull(writer);writer.write(element);}@Overridepublic Writer<T> duplicate() {return new ParquetSinkWriter<>(schemaRepresentation);}private ParquetWriter<GenericRecord> createWriter() throws IOException {if (schema == null) {schema = new Schema.Parser().parse(schemaRepresentation);}return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).withDataModel(new GenericData()).withCompressionCodec(compressionCodecName).withPageSize(pageSize).build();}
}

 

 

这篇关于Flink BucketingSink with Custom AvroParquetWriter create empty file的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024683

相关文章

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

Open a folder or workspace... (File -> Open Folder)

问题:vscode Open with Live Server 时 显示Open a folder or workspace... (File -> Open Folder)报错 解决:不可以单独打开文件1.html ; 需要在文件夹里打开 像这样

android java.io.IOException: open failed: ENOENT (No such file or directory)-api23+权限受权

问题描述 在安卓上,清单明明已经受权了读写文件权限,但偏偏就是创建不了目录和文件 调用mkdirs()总是返回false. <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/><uses-permission android:name="android.permission.READ_E

bash: arm-linux-gcc: No such file or directory

ubuntu出故障重装了系统,一直用着的gcc使用不了,提示bash: arm-linux-gcc: No such file or directorywhich找到的命令所在的目录 在google上翻了一阵发现此类问题的帖子不多,后来在Freescale的的LTIB环境配置文档中发现有这么一段:     # Packages required for 64-bit Ubuntu

编译linux内核出现 arm-eabi-gcc: error: : No such file or directory

external/e2fsprogs/lib/ext2fs/tdb.c:673:29: warning: comparison between : In function 'max2165_set_params': -。。。。。。。。。。。。。。。。。。 。。。。。。。。。。。。。 。。。。。。。。 host asm: libdvm <= dalvik/vm/mterp/out/Inte

file-max与ulimit的关系与差别

http://zhangxugg-163-com.iteye.com/blog/1108402 http://ilikedo.iteye.com/blog/1554822

瑞芯微Parameter File Format解析

Rockchip android系统平台使用parameter文件来配置一些系统参数 主要包含:串口号:nandflash分区 固件版本,按键信息等; 如下是台电P98HD的parameter参数: FIRMWARE_VER:4.1.1        // 固件版本 //固件版本,打包 updata.img 时会使用到,升级工具会根据这个识别固件版本。 //Boot loader 会读取

error while loading shared libraries: libnuma.so.1: cannot open shared object file:

腾讯云CentOS,安装Mysql时: 1.yum remove libnuma.so.1 2.yum install numactl.x86_64

Vue3图片上传报错:Required part ‘file‘ is not present.

错误 "Required part 'file' is not present" 通常表明服务器期望在接收到的 multipart/form-data 请求中找到一个名为 file 的部分(即文件字段),但实际上没有找到。这可能是因为以下几个原因: 请求体构建不正确:在发送请求时,可能没有正确地将文件添加到 FormData 对象中,或者使用了错误的字段名。 前端代码错误:在前端代码中,可能

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执