将flink输出到hdfs的数据压缩成gzip格式

2024-06-08 21:58

本文主要是介绍将flink输出到hdfs的数据压缩成gzip格式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

BaseRow.class

import java.io.Serializable;/*** 里面保存的要输出的分区目录和数据*/
public class BaseRow implements Serializable {/*** 分区目录*/private String partPath;/*** 输出数据*/private String result;public BaseRow() {}public BaseRow(String partPath, String result) {this.partPath = partPath;this.result = result;}public String getPartPath() {return partPath;}public void setPartPath(String partPath) {this.partPath = partPath;}public String getResult() {return result;}public void setResult(String result) {this.result = result;}@Overridepublic String toString() {return "BaseRow{" +"partPath='" + partPath + '\'' +", result='" + result + '\'' +'}';}
}

CompressionOutputStreamWrapper.class

import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.Serializable;public class CompressionOutputStreamWrapper implements Serializable {private CompressionOutputStream compressionOutputStream;private long pos;public CompressionOutputStreamWrapper() {}public CompressionOutputStreamWrapper(CompressionOutputStream compressionOutputStream, long pos) {this.compressionOutputStream = compressionOutputStream;this.pos = pos;}public CompressionOutputStream getCompressionOutputStream() {return compressionOutputStream;}public void setCompressionOutputStream(CompressionOutputStream compressionOutputStream) {this.compressionOutputStream = compressionOutputStream;}public long getPos() {return pos;}public void setPos(long pos) {this.pos = pos;}@Overridepublic String toString() {return "CompressionOutputStreamWrapper{" +"compressionOutputStream=" + compressionOutputStream +", pos=" + pos +'}';}
}

MyStreamWriterBase.class

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;import java.io.IOException;public abstract class MyStreamWriterBase<T> implements Writer<T> {private static final long serialVersionUID = 2L;/*** The {@code FSDataOutputStream} for the current part file.*/private transient FSDataOutputStream outStream;//private transient CompressionOutputStream compressionOutputStream;private transient CompressionOutputStreamWrapper compressionOutputStreamWrapper;private boolean syncOnFlush;private String compressionCodec;public MyStreamWriterBase() {}public MyStreamWriterBase(String compressionCodec) {this.compressionCodec = compressionCodec;}protected MyStreamWriterBase(MyStreamWriterBase<T> other) {this.syncOnFlush = other.syncOnFlush;this.compressionCodec = other.compressionCodec;}/*** Controls whether to sync {@link FSDataOutputStream} on flush.*/public void setSyncOnFlush(boolean syncOnFlush) {this.syncOnFlush = syncOnFlush;}/*** Returns the current output stream, if the stream is open.* //*/@Overridepublic void open(FileSystem fs, Path path) throws IOException {if (outStream != null) {throw new IllegalStateException("Writer has already been opened");}outStream = fs.create(path, false);Class<?> codecClass = null;try {codecClass = Class.forName(compressionCodec);} catch (ClassNotFoundException e) {e.printStackTrace();}Configuration conf = fs.getConf();CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);compressionOutputStream = codec.createOutputStream(outStream);compressionOutputStreamWrapper = new CompressionOutputStreamWrapper();compressionOutputStreamWrapper.setCompressionOutputStream(compressionOutputStream);compressionOutputStreamWrapper.setPos(0);}@Overridepublic long flush() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}if (!syncOnFlush) {compressionOutputStream.flush();}return compressionOutputStreamWrapper.getPos();}@Overridepublic long getPos() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}return compressionOutputStreamWrapper.getPos();}@Overridepublic void close() throws IOException {if (compressionOutputStream != null) {flush();compressionOutputStream.close();compressionOutputStream = null;}if (outStream != null) {outStream.close();outStream = null;}}public boolean isSyncOnFlush() {return syncOnFlush;}protected CompressionOutputStream getCompressionStream() {if (compressionOutputStream == null) {throw new IllegalStateException("Output stream has not been opened");}return compressionOutputStream;}public CompressionOutputStreamWrapper getCompressionOutputStreamWrapper() {return compressionOutputStreamWrapper;}public void setCompressionOutputStreamWrapper(CompressionOutputStreamWrapper compressionOutputStreamWrapper) {this.compressionOutputStreamWrapper = compressionOutputStreamWrapper;}
}

MyStringWriter.class

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;public class MyStringWriter<T> extends MyStreamWriterBase<T> {private static final long serialVersionUID = 1L;private String charsetName;private transient Charset charset;/*** Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert* strings to bytes.*/public MyStringWriter() {this("UTF-8");}public MyStringWriter(String compressionCodec, String charsetName) {super(compressionCodec);if(StringUtils.isBlank(charsetName)) {this.charsetName = "UTF-8";} else {this.charsetName = charsetName;}}/*** Creates a new {@code StringWriter} that uses the given charset to convert* strings to bytes.** @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}*/public MyStringWriter(String charsetName) {this.charsetName = charsetName;}protected MyStringWriter(MyStringWriter<T> other) {super(other);this.charsetName = other.charsetName;}@Overridepublic void open(FileSystem fs, Path path) throws IOException {super.open(fs, path);try {this.charset = Charset.forName(charsetName);} catch (IllegalCharsetNameException e) {throw new IOException("The charset " + charsetName + " is not valid.", e);} catch (UnsupportedCharsetException e) {throw new IOException("The charset " + charsetName + " is not supported.", e);}}@Overridepublic void write(T element) throws IOException {BaseRow baseRow = (BaseRow) element;CompressionOutputStreamWrapper compressionOutputStreamWrapper = getCompressionOutputStreamWrapper();CompressionOutputStream outputStream = compressionOutputStreamWrapper.getCompressionOutputStream();byte[] bytes = baseRow.getResult().getBytes(charset);outputStream.write(bytes);outputStream.write('\n');long pos = compressionOutputStreamWrapper.getPos();pos += bytes.length + 1;compressionOutputStreamWrapper.setPos(pos);}@Overridepublic MyStringWriter<T> duplicate() {return new MyStringWriter<>(this);}String getCharsetName() {return charsetName;}
}

使用方式

DataStream<BaseRow> dataStream;dataStream = rowDataStream.map((MapFunction<Row, BaseRow>) row -> {try {return new BaseRow(null, (String) row.getField(0));} catch (Exception ex) {return null;}});
sink.setWriter(new MyStringWriter("org.apache.hadoop.io.compress.GzipCodec", null));
dataStream.addSink(sink);

这篇关于将flink输出到hdfs的数据压缩成gzip格式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1043425

相关文章

SpringBoot3实现Gzip压缩优化的技术指南

《SpringBoot3实现Gzip压缩优化的技术指南》随着Web应用的用户量和数据量增加,网络带宽和页面加载速度逐渐成为瓶颈,为了减少数据传输量,提高用户体验,我们可以使用Gzip压缩HTTP响应,... 目录1、简述2、配置2.1 添加依赖2.2 配置 Gzip 压缩3、服务端应用4、前端应用4.1 N

如何自定义Nginx JSON日志格式配置

《如何自定义NginxJSON日志格式配置》Nginx作为最流行的Web服务器之一,其灵活的日志配置能力允许我们根据需求定制日志格式,本文将详细介绍如何配置Nginx以JSON格式记录访问日志,这种... 目录前言为什么选择jsON格式日志?配置步骤详解1. 安装Nginx服务2. 自定义JSON日志格式各

python dict转换成json格式的实现

《pythondict转换成json格式的实现》本文主要介绍了pythondict转换成json格式的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下... 一开始你变成字典格式data = [ { 'a' : 1, 'b' : 2, 'c编程' : 3,

python多种数据类型输出为Excel文件

《python多种数据类型输出为Excel文件》本文主要介绍了将Python中的列表、元组、字典和集合等数据类型输出到Excel文件中,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参... 目录一.列表List二.字典dict三.集合set四.元组tuplepython中的列表、元组、字典

Python中Windows和macOS文件路径格式不一致的解决方法

《Python中Windows和macOS文件路径格式不一致的解决方法》在Python中,Windows和macOS的文件路径字符串格式不一致主要体现在路径分隔符上,这种差异可能导致跨平台代码在处理文... 目录方法 1:使用 os.path 模块方法 2:使用 pathlib 模块(推荐)方法 3:统一使

Java中使用注解校验手机号格式的详细指南

《Java中使用注解校验手机号格式的详细指南》在现代的Web应用开发中,数据校验是一个非常重要的环节,本文将详细介绍如何在Java中使用注解对手机号格式进行校验,感兴趣的小伙伴可以了解下... 目录1. 引言2. 数据校验的重要性3. Java中的数据校验框架4. 使用注解校验手机号格式4.1 @NotBl

Python批量调整Word文档中的字体、段落间距及格式

《Python批量调整Word文档中的字体、段落间距及格式》这篇文章主要为大家详细介绍了如何使用Python的docx库来批量处理Word文档,包括设置首行缩进、字体、字号、行间距、段落对齐方式等,需... 目录关键代码一级标题设置  正文设置完整代码运行结果最近关于批处理格式的问题我查了很多资料,但是都没

基于Python开发PDF转Doc格式小程序

《基于Python开发PDF转Doc格式小程序》这篇文章主要为大家详细介绍了如何基于Python开发PDF转Doc格式小程序,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用python实现PDF转Doc格式小程序以下是一个使用Python实现PDF转DOC格式的GUI程序,采用T

Spring AI集成DeepSeek实现流式输出的操作方法

《SpringAI集成DeepSeek实现流式输出的操作方法》本文介绍了如何在SpringBoot中使用Sse(Server-SentEvents)技术实现流式输出,后端使用SpringMVC中的S... 目录一、后端代码二、前端代码三、运行项目小天有话说题外话参考资料前面一篇文章我们实现了《Spring

Python如何实现读取csv文件时忽略文件的编码格式

《Python如何实现读取csv文件时忽略文件的编码格式》我们再日常读取csv文件的时候经常会发现csv文件的格式有多种,所以这篇文章为大家介绍了Python如何实现读取csv文件时忽略文件的编码格式... 目录1、背景介绍2、库的安装3、核心代码4、完整代码1、背景介绍我们再日常读取csv文件的时候经常