将flink输出到hdfs的数据压缩成gzip格式

2024-06-08 21:58

本文主要是介绍将flink输出到hdfs的数据压缩成gzip格式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

BaseRow.class

import java.io.Serializable;/*** 里面保存的要输出的分区目录和数据*/
public class BaseRow implements Serializable {/*** 分区目录*/private String partPath;/*** 输出数据*/private String result;public BaseRow() {}public BaseRow(String partPath, String result) {this.partPath = partPath;this.result = result;}public String getPartPath() {return partPath;}public void setPartPath(String partPath) {this.partPath = partPath;}public String getResult() {return result;}public void setResult(String result) {this.result = result;}@Overridepublic String toString() {return "BaseRow{" +"partPath='" + partPath + '\'' +", result='" + result + '\'' +'}';}
}

CompressionOutputStreamWrapper.class

import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.Serializable;public class CompressionOutputStreamWrapper implements Serializable {private CompressionOutputStream compressionOutputStream;private long pos;public CompressionOutputStreamWrapper() {}public CompressionOutputStreamWrapper(CompressionOutputStream compressionOutputStream, long pos) {this.compressionOutputStream = compressionOutputStream;this.pos = pos;}public CompressionOutputStream getCompressionOutputStream() {return compressionOutputStream;}public void setCompressionOutputStream(CompressionOutputStream compressionOutputStream) {this.compressionOutputStream = compressionOutputStream;}public long getPos() {return pos;}public void setPos(long pos) {this.pos = pos;}@Overridepublic String toString() {return "CompressionOutputStreamWrapper{" +"compressionOutputStream=" + compressionOutputStream +", pos=" + pos +'}';}
}

MyStreamWriterBase.class

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;import java.io.IOException;public abstract class MyStreamWriterBase<T> implements Writer<T> {private static final long serialVersionUID = 2L;/*** The {@code FSDataOutputStream} for the current part file.*/private transient FSDataOutputStream outStream;//private transient CompressionOutputStream compressionOutputStream;private transient CompressionOutputStreamWrapper compressionOutputStreamWrapper;private boolean syncOnFlush;private String compressionCodec;public MyStreamWriterBase() {}public MyStreamWriterBase(String compressionCodec) {this.compressionCodec = compressionCodec;}protected MyStreamWriterBase(MyStreamWriterBase<T> other) {this.syncOnFlush = other.syncOnFlush;this.compressionCodec = other.compressionCodec;}/*** Controls whether to sync {@link FSDataOutputStream} on flush.*/public void setSyncOnFlush(boolean syncOnFlush) {this.syncOnFlush = syncOnFlush;}/*** Returns the current output stream, if the stream is open.* //*/@Overridepublic void open(FileSystem fs, Path path) throws IOException {if (outStream != null) {throw new IllegalStateException("Writer has already been opened");}outStream = fs.create(path, false);Class<?> codecClass = null;try {codecClass = Class.forName(compressionCodec);} catch (ClassNotFoundException e) {e.printStackTrace();}Configuration conf = fs.getConf();CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);compressionOutputStream = codec.createOutputStream(outStream);compressionOutputStreamWrapper = new CompressionOutputStreamWrapper();compressionOutputStreamWrapper.setCompressionOutputStream(compressionOutputStream);compressionOutputStreamWrapper.setPos(0);}@Overridepublic long flush() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}if (!syncOnFlush) {compressionOutputStream.flush();}return compressionOutputStreamWrapper.getPos();}@Overridepublic long getPos() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}return compressionOutputStreamWrapper.getPos();}@Overridepublic void close() throws IOException {if (compressionOutputStream != null) {flush();compressionOutputStream.close();compressionOutputStream = null;}if (outStream != null) {outStream.close();outStream = null;}}public boolean isSyncOnFlush() {return syncOnFlush;}protected CompressionOutputStream getCompressionStream() {if (compressionOutputStream == null) {throw new IllegalStateException("Output stream has not been opened");}return compressionOutputStream;}public CompressionOutputStreamWrapper getCompressionOutputStreamWrapper() {return compressionOutputStreamWrapper;}public void setCompressionOutputStreamWrapper(CompressionOutputStreamWrapper compressionOutputStreamWrapper) {this.compressionOutputStreamWrapper = compressionOutputStreamWrapper;}
}

MyStringWriter.class

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;public class MyStringWriter<T> extends MyStreamWriterBase<T> {private static final long serialVersionUID = 1L;private String charsetName;private transient Charset charset;/*** Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert* strings to bytes.*/public MyStringWriter() {this("UTF-8");}public MyStringWriter(String compressionCodec, String charsetName) {super(compressionCodec);if(StringUtils.isBlank(charsetName)) {this.charsetName = "UTF-8";} else {this.charsetName = charsetName;}}/*** Creates a new {@code StringWriter} that uses the given charset to convert* strings to bytes.** @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}*/public MyStringWriter(String charsetName) {this.charsetName = charsetName;}protected MyStringWriter(MyStringWriter<T> other) {super(other);this.charsetName = other.charsetName;}@Overridepublic void open(FileSystem fs, Path path) throws IOException {super.open(fs, path);try {this.charset = Charset.forName(charsetName);} catch (IllegalCharsetNameException e) {throw new IOException("The charset " + charsetName + " is not valid.", e);} catch (UnsupportedCharsetException e) {throw new IOException("The charset " + charsetName + " is not supported.", e);}}@Overridepublic void write(T element) throws IOException {BaseRow baseRow = (BaseRow) element;CompressionOutputStreamWrapper compressionOutputStreamWrapper = getCompressionOutputStreamWrapper();CompressionOutputStream outputStream = compressionOutputStreamWrapper.getCompressionOutputStream();byte[] bytes = baseRow.getResult().getBytes(charset);outputStream.write(bytes);outputStream.write('\n');long pos = compressionOutputStreamWrapper.getPos();pos += bytes.length + 1;compressionOutputStreamWrapper.setPos(pos);}@Overridepublic MyStringWriter<T> duplicate() {return new MyStringWriter<>(this);}String getCharsetName() {return charsetName;}
}

使用方式

DataStream<BaseRow> dataStream;dataStream = rowDataStream.map((MapFunction<Row, BaseRow>) row -> {try {return new BaseRow(null, (String) row.getField(0));} catch (Exception ex) {return null;}});
sink.setWriter(new MyStringWriter("org.apache.hadoop.io.compress.GzipCodec", null));
dataStream.addSink(sink);

这篇关于将flink输出到hdfs的数据压缩成gzip格式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1043425

相关文章

HDFS—存储优化(纠删码)

纠删码原理 HDFS 默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。 Hadoop3.x 引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间。 此种方式节约了空间,但是会增加 cpu 的计算。 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。 默认只开启对 RS-6-3-1024k

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

顺序表之创建,判满,插入,输出

文章目录 🍊自我介绍🍊创建一个空的顺序表,为结构体在堆区分配空间🍊插入数据🍊输出数据🍊判断顺序表是否满了,满了返回值1,否则返回0🍊main函数 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以:点赞+关注+评论+收藏(一键四连)哦~ 🍊自我介绍   Hello,大家好,我是小珑也要变强(也是小珑),我是易编程·终身成长社群的一名“创始团队·嘉宾”

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出 在数字化时代,文本到语音(Text-to-Speech, TTS)技术已成为人机交互的关键桥梁,无论是为视障人士提供辅助阅读,还是为智能助手注入声音的灵魂,TTS 技术都扮演着至关重要的角色。从最初的拼接式方法到参数化技术,再到现今的深度学习解决方案,TTS 技术经历了一段长足的进步。这篇文章将带您穿越时

easyui同时验证账户格式和ajax是否存在

accountName: {validator: function (value, param) {if (!/^[a-zA-Z][a-zA-Z0-9_]{3,15}$/i.test(value)) {$.fn.validatebox.defaults.rules.accountName.message = '账户名称不合法(字母开头,允许4-16字节,允许字母数字下划线)';return fal

[数据集][目标检测]血细胞检测数据集VOC+YOLO格式2757张4类别

数据集格式:Pascal VOC格式+YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):2757 标注数量(xml文件个数):2757 标注数量(txt文件个数):2757 标注类别数:4 标注类别名称:["Platelets","RBC","WBC","sickle cell"] 每个类别标注的框数:

如何将一个文件里不包含某个字符的行输出到另一个文件?

第一种: grep -v 'string' filename > newfilenamegrep -v 'string' filename >> newfilename 第二种: sed -n '/string/!'p filename > newfilenamesed -n '/string/!'p filename >> newfilename

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录 在深度学习项目中,目标检测是一项重要的任务。本文将详细介绍如何使用Detectron2进行目标检测模型的复现训练,涵盖训练数据准备、训练命令、训练日志分析、训练指标以及训练输出目录的各个文件及其作用。特别地,我们将演示在训练过程中出现中断后,如何使用 resume 功能继续训练,并将我们复现的模型与Model Zoo中的

一步一步将PlantUML类图导出为自定义格式的XMI文件

一步一步将PlantUML类图导出为自定义格式的XMI文件 说明: 首次发表日期:2024-09-08PlantUML官网: https://plantuml.com/zh/PlantUML命令行文档: https://plantuml.com/zh/command-line#6a26f548831e6a8cPlantUML XMI文档: https://plantuml.com/zh/xmi