本文主要是介绍将flink输出到hdfs的数据压缩成gzip格式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
BaseRow.class
import java.io.Serializable;/*** 里面保存的要输出的分区目录和数据*/
public class BaseRow implements Serializable {/*** 分区目录*/private String partPath;/*** 输出数据*/private String result;public BaseRow() {}public BaseRow(String partPath, String result) {this.partPath = partPath;this.result = result;}public String getPartPath() {return partPath;}public void setPartPath(String partPath) {this.partPath = partPath;}public String getResult() {return result;}public void setResult(String result) {this.result = result;}@Overridepublic String toString() {return "BaseRow{" +"partPath='" + partPath + '\'' +", result='" + result + '\'' +'}';}
}
CompressionOutputStreamWrapper.class
import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.Serializable;public class CompressionOutputStreamWrapper implements Serializable {private CompressionOutputStream compressionOutputStream;private long pos;public CompressionOutputStreamWrapper() {}public CompressionOutputStreamWrapper(CompressionOutputStream compressionOutputStream, long pos) {this.compressionOutputStream = compressionOutputStream;this.pos = pos;}public CompressionOutputStream getCompressionOutputStream() {return compressionOutputStream;}public void setCompressionOutputStream(CompressionOutputStream compressionOutputStream) {this.compressionOutputStream = compressionOutputStream;}public long getPos() {return pos;}public void setPos(long pos) {this.pos = pos;}@Overridepublic String toString() {return "CompressionOutputStreamWrapper{" +"compressionOutputStream=" + compressionOutputStream +", pos=" + pos +'}';}
}
MyStreamWriterBase.class
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;import java.io.IOException;public abstract class MyStreamWriterBase<T> implements Writer<T> {private static final long serialVersionUID = 2L;/*** The {@code FSDataOutputStream} for the current part file.*/private transient FSDataOutputStream outStream;//private transient CompressionOutputStream compressionOutputStream;private transient CompressionOutputStreamWrapper compressionOutputStreamWrapper;private boolean syncOnFlush;private String compressionCodec;public MyStreamWriterBase() {}public MyStreamWriterBase(String compressionCodec) {this.compressionCodec = compressionCodec;}protected MyStreamWriterBase(MyStreamWriterBase<T> other) {this.syncOnFlush = other.syncOnFlush;this.compressionCodec = other.compressionCodec;}/*** Controls whether to sync {@link FSDataOutputStream} on flush.*/public void setSyncOnFlush(boolean syncOnFlush) {this.syncOnFlush = syncOnFlush;}/*** Returns the current output stream, if the stream is open.* //*/@Overridepublic void open(FileSystem fs, Path path) throws IOException {if (outStream != null) {throw new IllegalStateException("Writer has already been opened");}outStream = fs.create(path, false);Class<?> codecClass = null;try {codecClass = Class.forName(compressionCodec);} catch (ClassNotFoundException e) {e.printStackTrace();}Configuration conf = fs.getConf();CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);compressionOutputStream = codec.createOutputStream(outStream);compressionOutputStreamWrapper = new CompressionOutputStreamWrapper();compressionOutputStreamWrapper.setCompressionOutputStream(compressionOutputStream);compressionOutputStreamWrapper.setPos(0);}@Overridepublic long flush() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}if (!syncOnFlush) {compressionOutputStream.flush();}return compressionOutputStreamWrapper.getPos();}@Overridepublic long getPos() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}return compressionOutputStreamWrapper.getPos();}@Overridepublic void close() throws IOException {if (compressionOutputStream != null) {flush();compressionOutputStream.close();compressionOutputStream = null;}if (outStream != null) {outStream.close();outStream = null;}}public boolean isSyncOnFlush() {return syncOnFlush;}protected CompressionOutputStream getCompressionStream() {if (compressionOutputStream == null) {throw new IllegalStateException("Output stream has not been opened");}return compressionOutputStream;}public CompressionOutputStreamWrapper getCompressionOutputStreamWrapper() {return compressionOutputStreamWrapper;}public void setCompressionOutputStreamWrapper(CompressionOutputStreamWrapper compressionOutputStreamWrapper) {this.compressionOutputStreamWrapper = compressionOutputStreamWrapper;}
}
MyStringWriter.class
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;public class MyStringWriter<T> extends MyStreamWriterBase<T> {private static final long serialVersionUID = 1L;private String charsetName;private transient Charset charset;/*** Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert* strings to bytes.*/public MyStringWriter() {this("UTF-8");}public MyStringWriter(String compressionCodec, String charsetName) {super(compressionCodec);if(StringUtils.isBlank(charsetName)) {this.charsetName = "UTF-8";} else {this.charsetName = charsetName;}}/*** Creates a new {@code StringWriter} that uses the given charset to convert* strings to bytes.** @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}*/public MyStringWriter(String charsetName) {this.charsetName = charsetName;}protected MyStringWriter(MyStringWriter<T> other) {super(other);this.charsetName = other.charsetName;}@Overridepublic void open(FileSystem fs, Path path) throws IOException {super.open(fs, path);try {this.charset = Charset.forName(charsetName);} catch (IllegalCharsetNameException e) {throw new IOException("The charset " + charsetName + " is not valid.", e);} catch (UnsupportedCharsetException e) {throw new IOException("The charset " + charsetName + " is not supported.", e);}}@Overridepublic void write(T element) throws IOException {BaseRow baseRow = (BaseRow) element;CompressionOutputStreamWrapper compressionOutputStreamWrapper = getCompressionOutputStreamWrapper();CompressionOutputStream outputStream = compressionOutputStreamWrapper.getCompressionOutputStream();byte[] bytes = baseRow.getResult().getBytes(charset);outputStream.write(bytes);outputStream.write('\n');long pos = compressionOutputStreamWrapper.getPos();pos += bytes.length + 1;compressionOutputStreamWrapper.setPos(pos);}@Overridepublic MyStringWriter<T> duplicate() {return new MyStringWriter<>(this);}String getCharsetName() {return charsetName;}
}
使用方式
DataStream<BaseRow> dataStream;dataStream = rowDataStream.map((MapFunction<Row, BaseRow>) row -> {try {return new BaseRow(null, (String) row.getField(0));} catch (Exception ex) {return null;}});
sink.setWriter(new MyStringWriter("org.apache.hadoop.io.compress.GzipCodec", null));
dataStream.addSink(sink);
这篇关于将flink输出到hdfs的数据压缩成gzip格式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!