flink 从monggo读取PB级全部数据根据分组统计数据 样例

2024-06-19 01:44

本文主要是介绍flink 从monggo读取PB级全部数据根据分组统计数据 样例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简述

  1. 从mongoDB数据库中读取千万级数据。
  2. 根据某个字段进行汇总统计数据。
  3. 将获取的数据保存到redis中,获取最终的统计数据。

maven

 		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId><version>1.1.0-1.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.8.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>   <dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency>     

代码

RedisExampleMapper

package com.wfg.flink.connector.redis;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @author wfg*/
public class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> data) {return data.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> data) {return data.f1.toString();}
}

根据名字统计访问次数

package com.wfg.flink.connector.mongodb;import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.redis.RedisExampleMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.bson.BsonDocument;import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION;/*** @author wfg* 根据名字统计访问次数*/
public class MongoAllNameRedisCounts {public static void main(String[] args) throws Exception {String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss");System.out.println("StartTime:" + startTime);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启Checkpointing,设置Checkpoint间隔env.enableCheckpointing(30000);// 设置Checkpoint模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置最小Checkpoint间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 设置最大并发Checkpoint数目env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 使用RocksDB作为状态后端env.setStateBackend(new HashMapStateBackend());env.setParallelism(10);// 配置MongoDB源MongoSource<String> mongoSource = MongoSource.<String>builder().setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin").setDatabase("sjzz").setCollection(MONGO_TEST_PV_COLLECTION).setFetchSize(2048)
//                .setLimit(1000).setNoCursorTimeout(true).setPartitionStrategy(PartitionStrategy.SINGLE).setPartitionSize(MemorySize.ofMebiBytes(64))
//                .setSamplesPerPartition(10).setDeserializationSchema(new MongoDeserializationSchema<>() {@Overridepublic String deserialize(BsonDocument document) {return document.toJson();}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).build();// 创建MongoDB数据流DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
// 转换数据,提取人名作为KeyDataStream<Tuple2<String, Integer>> nameCountStream = sourceStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);return Tuple2.of(data.getUserName(), 1);}}).keyBy(value -> value.f0).reduce(new ReduceFunction<>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}});FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();RedisSink<Tuple2<String, Integer>> sink = new RedisSink<>(conf, new RedisExampleMapper());nameCountStream.addSink(sink);// 输出结果env.execute("Flink MongoDB Name Count ");System.out.println("-----------------------------------");System.out.println("startTime: " + startTime);System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"));}
}

注意:
大数据生成参考:批量数据生成
中间件部署参考:

  • kafka:部署
  • mongo:部署

这篇关于flink 从monggo读取PB级全部数据根据分组统计数据 样例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1073662

相关文章

详谈redis跟数据库的数据同步问题

《详谈redis跟数据库的数据同步问题》文章讨论了在Redis和数据库数据一致性问题上的解决方案,主要比较了先更新Redis缓存再更新数据库和先更新数据库再更新Redis缓存两种方案,文章指出,删除R... 目录一、Redis 数据库数据一致性的解决方案1.1、更新Redis缓存、删除Redis缓存的区别二

Redis事务与数据持久化方式

《Redis事务与数据持久化方式》该文档主要介绍了Redis事务和持久化机制,事务通过将多个命令打包执行,而持久化则通过快照(RDB)和追加式文件(AOF)两种方式将内存数据保存到磁盘,以防止数据丢失... 目录一、Redis 事务1.1 事务本质1.2 数据库事务与redis事务1.2.1 数据库事务1.

Oracle Expdp按条件导出指定表数据的方法实例

《OracleExpdp按条件导出指定表数据的方法实例》:本文主要介绍Oracle的expdp数据泵方式导出特定机构和时间范围的数据,并通过parfile文件进行条件限制和配置,文中通过代码介绍... 目录1.场景描述 2.方案分析3.实验验证 3.1 parfile文件3.2 expdp命令导出4.总结

更改docker默认数据目录的方法步骤

《更改docker默认数据目录的方法步骤》本文主要介绍了更改docker默认数据目录的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1.查看docker是否存在并停止该服务2.挂载镜像并安装rsync便于备份3.取消挂载备份和迁

使用C#如何创建人名或其他物体随机分组

《使用C#如何创建人名或其他物体随机分组》文章描述了一个随机分配人员到多个团队的代码示例,包括将人员列表随机化并根据组数分配到不同组,最后按组号排序显示结果... 目录C#创建人名或其他物体随机分组此示例使用以下代码将人员分配到组代码首先将lstPeople ListBox总结C#创建人名或其他物体随机分组

不删数据还能合并磁盘? 让电脑C盘D盘合并并保留数据的技巧

《不删数据还能合并磁盘?让电脑C盘D盘合并并保留数据的技巧》在Windows操作系统中,合并C盘和D盘是一个相对复杂的任务,尤其是当你不希望删除其中的数据时,幸运的是,有几种方法可以实现这一目标且在... 在电脑生产时,制造商常为C盘分配较小的磁盘空间,以确保软件在运行过程中不会出现磁盘空间不足的问题。但在

SpringBoot使用Apache POI库读取Excel文件的操作详解

《SpringBoot使用ApachePOI库读取Excel文件的操作详解》在日常开发中,我们经常需要处理Excel文件中的数据,无论是从数据库导入数据、处理数据报表,还是批量生成数据,都可能会遇到... 目录项目背景依赖导入读取Excel模板的实现代码实现代码解析ExcelDemoInfoDTO 数据传输

Python读取TIF文件的两种方法实现

《Python读取TIF文件的两种方法实现》本文主要介绍了Python读取TIF文件的两种方法实现,包括使用tifffile库和Pillow库逐帧读取TIFF文件,具有一定的参考价值,感兴趣的可以了解... 目录方法 1:使用 tifffile 逐帧读取安装 tifffile:逐帧读取代码:方法 2:使用

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

Mybatis拦截器如何实现数据权限过滤

《Mybatis拦截器如何实现数据权限过滤》本文介绍了MyBatis拦截器的使用,通过实现Interceptor接口对SQL进行处理,实现数据权限过滤功能,通过在本地线程变量中存储数据权限相关信息,并... 目录背景基础知识MyBATis 拦截器介绍代码实战总结背景现在的项目负责人去年年底离职,导致前期规