本文主要是介绍Flink BucketingSink with Custom AvroParquetWriter create empty file,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
问题:
I have created a writer for BucketingSink. The sink and writer works without error but when it comes to the writer writing avro genericrecord to parquet, the file was created from in-progress, pending to complete. But the files are empty with 0 bytes. Can anyone tell me what is wrong with the code ? I have tried placing the initialization of AvroParquetWriter at the open() method, but result still the same.
When debugging the code, I confirm that writer.write(element) does executed and element contain the avro genericrecord data
Streaming Data
BucketingSink<DataEventRecord> sink =new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/");sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm"));
sink.setWriter(new ParquetSinkWriter<DataEventRecord>());
ParquetSinkWriter
import java.io.File;
import java.io.IOException;import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import com.any.DataEventRecord;public class ParquetSinkWriter<T> extends StreamWriterBase<T> {private transient ParquetWriter<GenericRecord> writer;private Path path;private FileSystem fs;private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;private final int blockSize = 256 * 1024 * 1024;private final int pageSize = 64 * 1024;@Override// workaroundpublic void open(FileSystem fs, Path path) throws IOException {super.open(fs, path);this.path = path;this.fs = fs;}@Overridepublic void write(T event) throws IOException {DataEventRecord element = (DataEventRecord) event;if (writer == null) {writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize);}if (writer != null) {GenericRecord datum = element.getRecord();writer.write(datum);}}@Overridepublic void close() throws IOException {if (writer != null) {writer.close();}super.close();}@Overridepublic Writer<T> duplicate() {return new ParquetSinkWriter<T>();}}
回答1:
Directly implementing Writer
should look like
import org.apache.flink.util.Preconditions;import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;import java.io.IOException;/*** Parquet writer.** @param <T>*/
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> {private static final long serialVersionUID = -975302556515811398L;private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;private final int pageSize = 64 * 1024;private final String schemaRepresentation;private transient Schema schema;private transient ParquetWriter<GenericRecord> writer;private transient Path path;private int position;public ParquetSinkWriter(String schemaRepresentation) {this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation);}@Overridepublic void open(FileSystem fs, Path path) throws IOException {this.position = 0;this.path = path;if (writer != null) {writer.close();}writer = createWriter();}@Overridepublic long flush() throws IOException {Preconditions.checkNotNull(writer);position += writer.getDataSize();writer.close();writer = createWriter();return position;}@Overridepublic long getPos() throws IOException {Preconditions.checkNotNull(writer);return position + writer.getDataSize();}@Overridepublic void close() throws IOException {if (writer != null) {writer.close();writer = null;}}@Overridepublic void write(T element) throws IOException {Preconditions.checkNotNull(writer);writer.write(element);}@Overridepublic Writer<T> duplicate() {return new ParquetSinkWriter<>(schemaRepresentation);}private ParquetWriter<GenericRecord> createWriter() throws IOException {if (schema == null) {schema = new Schema.Parser().parse(schemaRepresentation);}return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).withDataModel(new GenericData()).withCompressionCodec(compressionCodecName).withPageSize(pageSize).build();}
}
这篇关于Flink BucketingSink with Custom AvroParquetWriter create empty file的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!