将flink输出到hdfs的数据压缩成gzip格式

2024-06-08 21:58

本文主要是介绍将flink输出到hdfs的数据压缩成gzip格式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

BaseRow.class

import java.io.Serializable;/*** 里面保存的要输出的分区目录和数据*/
public class BaseRow implements Serializable {/*** 分区目录*/private String partPath;/*** 输出数据*/private String result;public BaseRow() {}public BaseRow(String partPath, String result) {this.partPath = partPath;this.result = result;}public String getPartPath() {return partPath;}public void setPartPath(String partPath) {this.partPath = partPath;}public String getResult() {return result;}public void setResult(String result) {this.result = result;}@Overridepublic String toString() {return "BaseRow{" +"partPath='" + partPath + '\'' +", result='" + result + '\'' +'}';}
}

CompressionOutputStreamWrapper.class

import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.Serializable;public class CompressionOutputStreamWrapper implements Serializable {private CompressionOutputStream compressionOutputStream;private long pos;public CompressionOutputStreamWrapper() {}public CompressionOutputStreamWrapper(CompressionOutputStream compressionOutputStream, long pos) {this.compressionOutputStream = compressionOutputStream;this.pos = pos;}public CompressionOutputStream getCompressionOutputStream() {return compressionOutputStream;}public void setCompressionOutputStream(CompressionOutputStream compressionOutputStream) {this.compressionOutputStream = compressionOutputStream;}public long getPos() {return pos;}public void setPos(long pos) {this.pos = pos;}@Overridepublic String toString() {return "CompressionOutputStreamWrapper{" +"compressionOutputStream=" + compressionOutputStream +", pos=" + pos +'}';}
}

MyStreamWriterBase.class

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;import java.io.IOException;public abstract class MyStreamWriterBase<T> implements Writer<T> {private static final long serialVersionUID = 2L;/*** The {@code FSDataOutputStream} for the current part file.*/private transient FSDataOutputStream outStream;//private transient CompressionOutputStream compressionOutputStream;private transient CompressionOutputStreamWrapper compressionOutputStreamWrapper;private boolean syncOnFlush;private String compressionCodec;public MyStreamWriterBase() {}public MyStreamWriterBase(String compressionCodec) {this.compressionCodec = compressionCodec;}protected MyStreamWriterBase(MyStreamWriterBase<T> other) {this.syncOnFlush = other.syncOnFlush;this.compressionCodec = other.compressionCodec;}/*** Controls whether to sync {@link FSDataOutputStream} on flush.*/public void setSyncOnFlush(boolean syncOnFlush) {this.syncOnFlush = syncOnFlush;}/*** Returns the current output stream, if the stream is open.* //*/@Overridepublic void open(FileSystem fs, Path path) throws IOException {if (outStream != null) {throw new IllegalStateException("Writer has already been opened");}outStream = fs.create(path, false);Class<?> codecClass = null;try {codecClass = Class.forName(compressionCodec);} catch (ClassNotFoundException e) {e.printStackTrace();}Configuration conf = fs.getConf();CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);compressionOutputStream = codec.createOutputStream(outStream);compressionOutputStreamWrapper = new CompressionOutputStreamWrapper();compressionOutputStreamWrapper.setCompressionOutputStream(compressionOutputStream);compressionOutputStreamWrapper.setPos(0);}@Overridepublic long flush() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}if (!syncOnFlush) {compressionOutputStream.flush();}return compressionOutputStreamWrapper.getPos();}@Overridepublic long getPos() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}return compressionOutputStreamWrapper.getPos();}@Overridepublic void close() throws IOException {if (compressionOutputStream != null) {flush();compressionOutputStream.close();compressionOutputStream = null;}if (outStream != null) {outStream.close();outStream = null;}}public boolean isSyncOnFlush() {return syncOnFlush;}protected CompressionOutputStream getCompressionStream() {if (compressionOutputStream == null) {throw new IllegalStateException("Output stream has not been opened");}return compressionOutputStream;}public CompressionOutputStreamWrapper getCompressionOutputStreamWrapper() {return compressionOutputStreamWrapper;}public void setCompressionOutputStreamWrapper(CompressionOutputStreamWrapper compressionOutputStreamWrapper) {this.compressionOutputStreamWrapper = compressionOutputStreamWrapper;}
}

MyStringWriter.class

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;public class MyStringWriter<T> extends MyStreamWriterBase<T> {private static final long serialVersionUID = 1L;private String charsetName;private transient Charset charset;/*** Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert* strings to bytes.*/public MyStringWriter() {this("UTF-8");}public MyStringWriter(String compressionCodec, String charsetName) {super(compressionCodec);if(StringUtils.isBlank(charsetName)) {this.charsetName = "UTF-8";} else {this.charsetName = charsetName;}}/*** Creates a new {@code StringWriter} that uses the given charset to convert* strings to bytes.** @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}*/public MyStringWriter(String charsetName) {this.charsetName = charsetName;}protected MyStringWriter(MyStringWriter<T> other) {super(other);this.charsetName = other.charsetName;}@Overridepublic void open(FileSystem fs, Path path) throws IOException {super.open(fs, path);try {this.charset = Charset.forName(charsetName);} catch (IllegalCharsetNameException e) {throw new IOException("The charset " + charsetName + " is not valid.", e);} catch (UnsupportedCharsetException e) {throw new IOException("The charset " + charsetName + " is not supported.", e);}}@Overridepublic void write(T element) throws IOException {BaseRow baseRow = (BaseRow) element;CompressionOutputStreamWrapper compressionOutputStreamWrapper = getCompressionOutputStreamWrapper();CompressionOutputStream outputStream = compressionOutputStreamWrapper.getCompressionOutputStream();byte[] bytes = baseRow.getResult().getBytes(charset);outputStream.write(bytes);outputStream.write('\n');long pos = compressionOutputStreamWrapper.getPos();pos += bytes.length + 1;compressionOutputStreamWrapper.setPos(pos);}@Overridepublic MyStringWriter<T> duplicate() {return new MyStringWriter<>(this);}String getCharsetName() {return charsetName;}
}

使用方式

DataStream<BaseRow> dataStream;dataStream = rowDataStream.map((MapFunction<Row, BaseRow>) row -> {try {return new BaseRow(null, (String) row.getField(0));} catch (Exception ex) {return null;}});
sink.setWriter(new MyStringWriter("org.apache.hadoop.io.compress.GzipCodec", null));
dataStream.addSink(sink);

这篇关于将flink输出到hdfs的数据压缩成gzip格式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1043425

相关文章

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

使用TomCat,service输出台出现乱码的解决

《使用TomCat,service输出台出现乱码的解决》本文介绍了解决Tomcat服务输出台中文乱码问题的两种方法,第一种方法是修改`logging.properties`文件中的`prefix`和`... 目录使用TomCat,service输出台出现乱码问题1解决方案问题2解决方案总结使用TomCat,

IDEA如何将String类型转json格式

《IDEA如何将String类型转json格式》在Java中,字符串字面量中的转义字符会被自动转换,但通过网络获取的字符串可能不会自动转换,为了解决IDEA无法识别JSON字符串的问题,可以在本地对字... 目录问题描述问题原因解决方案总结问题描述最近做项目需要使用Ai生成json,可生成String类型

C++中实现调试日志输出

《C++中实现调试日志输出》在C++编程中,调试日志对于定位问题和优化代码至关重要,本文将介绍几种常用的调试日志输出方法,并教你如何在日志中添加时间戳,希望对大家有所帮助... 目录1. 使用 #ifdef _DEBUG 宏2. 加入时间戳:精确到毫秒3.Windows 和 MFC 中的调试日志方法MFC

Python使用Colorama库美化终端输出的操作示例

《Python使用Colorama库美化终端输出的操作示例》在开发命令行工具或调试程序时,我们可能会希望通过颜色来区分重要信息,比如警告、错误、提示等,而Colorama是一个简单易用的Python库... 目录python Colorama 库详解:终端输出美化的神器1. Colorama 是什么?2.

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

HDFS—存储优化(纠删码)

纠删码原理 HDFS 默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。 Hadoop3.x 引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间。 此种方式节约了空间,但是会增加 cpu 的计算。 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。 默认只开启对 RS-6-3-1024k

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

顺序表之创建,判满,插入,输出

文章目录 🍊自我介绍🍊创建一个空的顺序表,为结构体在堆区分配空间🍊插入数据🍊输出数据🍊判断顺序表是否满了,满了返回值1,否则返回0🍊main函数 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以:点赞+关注+评论+收藏(一键四连)哦~ 🍊自我介绍   Hello,大家好,我是小珑也要变强(也是小珑),我是易编程·终身成长社群的一名“创始团队·嘉宾”