flink 从monggo读取PB级全部数据根据分组统计数据 样例

2024-06-19 01:44

本文主要是介绍flink 从monggo读取PB级全部数据根据分组统计数据 样例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简述

  1. 从mongoDB数据库中读取千万级数据。
  2. 根据某个字段进行汇总统计数据。
  3. 将获取的数据保存到redis中,获取最终的统计数据。

maven

 		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId><version>1.1.0-1.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.8.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>   <dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency>     

代码

RedisExampleMapper

package com.wfg.flink.connector.redis;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @author wfg*/
public class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> data) {return data.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> data) {return data.f1.toString();}
}

根据名字统计访问次数

package com.wfg.flink.connector.mongodb;import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.redis.RedisExampleMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.bson.BsonDocument;import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION;/*** @author wfg* 根据名字统计访问次数*/
public class MongoAllNameRedisCounts {public static void main(String[] args) throws Exception {String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss");System.out.println("StartTime:" + startTime);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启Checkpointing,设置Checkpoint间隔env.enableCheckpointing(30000);// 设置Checkpoint模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置最小Checkpoint间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 设置最大并发Checkpoint数目env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 使用RocksDB作为状态后端env.setStateBackend(new HashMapStateBackend());env.setParallelism(10);// 配置MongoDB源MongoSource<String> mongoSource = MongoSource.<String>builder().setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin").setDatabase("sjzz").setCollection(MONGO_TEST_PV_COLLECTION).setFetchSize(2048)
//                .setLimit(1000).setNoCursorTimeout(true).setPartitionStrategy(PartitionStrategy.SINGLE).setPartitionSize(MemorySize.ofMebiBytes(64))
//                .setSamplesPerPartition(10).setDeserializationSchema(new MongoDeserializationSchema<>() {@Overridepublic String deserialize(BsonDocument document) {return document.toJson();}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).build();// 创建MongoDB数据流DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
// 转换数据,提取人名作为KeyDataStream<Tuple2<String, Integer>> nameCountStream = sourceStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);return Tuple2.of(data.getUserName(), 1);}}).keyBy(value -> value.f0).reduce(new ReduceFunction<>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}});FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();RedisSink<Tuple2<String, Integer>> sink = new RedisSink<>(conf, new RedisExampleMapper());nameCountStream.addSink(sink);// 输出结果env.execute("Flink MongoDB Name Count ");System.out.println("-----------------------------------");System.out.println("startTime: " + startTime);System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"));}
}

注意:
大数据生成参考:批量数据生成
中间件部署参考:

  • kafka:部署
  • mongo:部署

这篇关于flink 从monggo读取PB级全部数据根据分组统计数据 样例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1073662

相关文章

Python在二进制文件中进行数据搜索的实战指南

《Python在二进制文件中进行数据搜索的实战指南》在二进制文件中搜索特定数据是编程中常见的任务,尤其在日志分析、程序调试和二进制数据处理中尤为重要,下面我们就来看看如何使用Python实现这一功能吧... 目录简介1. 二进制文件搜索概述2. python二进制模式文件读取(rb)2.1 二进制模式与文本

Springboot配置文件相关语法及读取方式详解

《Springboot配置文件相关语法及读取方式详解》本文主要介绍了SpringBoot中的两种配置文件形式,即.properties文件和.yml/.yaml文件,详细讲解了这两种文件的语法和读取方... 目录配置文件的形式语法1、key-value形式2、数组形式读取方式1、通过@value注解2、通过

C#实现将XML数据自动化地写入Excel文件

《C#实现将XML数据自动化地写入Excel文件》在现代企业级应用中,数据处理与报表生成是核心环节,本文将深入探讨如何利用C#和一款优秀的库,将XML数据自动化地写入Excel文件,有需要的小伙伴可以... 目录理解XML数据结构与Excel的对应关系引入高效工具:使用Spire.XLS for .NETC

Qt实现对Word网页的读取功能

《Qt实现对Word网页的读取功能》文章介绍了几种在Qt中实现Word文档(.docx/.doc)读写功能的方法,包括基于QAxObject的COM接口调用、DOCX模板替换及跨平台解决方案,重点讨论... 目录1. 核心实现方式2. 基于QAxObject的COM接口调用(Windows专用)2.1 环境

MySQL数据目录迁移的完整过程

《MySQL数据目录迁移的完整过程》文章详细介绍了将MySQL数据目录迁移到新硬盘的整个过程,包括新硬盘挂载、创建新的数据目录、迁移数据(推荐使用两遍rsync方案)、修改MySQL配置文件和重启验证... 目录1,新硬盘挂载(如果有的话)2,创建新的 mysql 数据目录3,迁移 MySQL 数据(推荐两

Python数据验证神器Pydantic库的使用和实践中的避坑指南

《Python数据验证神器Pydantic库的使用和实践中的避坑指南》Pydantic是一个用于数据验证和设置的库,可以显著简化API接口开发,文章通过一个实际案例,展示了Pydantic如何在生产环... 目录1️⃣ 崩溃时刻:当你的API接口又双叒崩了!2️⃣ 神兵天降:3行代码解决验证难题3️⃣ 深度

MySQL快速复制一张表的四种核心方法(包括表结构和数据)

《MySQL快速复制一张表的四种核心方法(包括表结构和数据)》本文详细介绍了四种复制MySQL表(结构+数据)的方法,并对每种方法进行了对比分析,适用于不同场景和数据量的复制需求,特别是针对超大表(1... 目录一、mysql 复制表(结构+数据)的 4 种核心方法(面试结构化回答)方法 1:CREATE

详解C++ 存储二进制数据容器的几种方法

《详解C++存储二进制数据容器的几种方法》本文主要介绍了详解C++存储二进制数据容器,包括std::vector、std::array、std::string、std::bitset和std::ve... 目录1.std::vector<uint8_t>(最常用)特点:适用场景:示例:2.std::arra

MySQL中的DELETE删除数据及注意事项

《MySQL中的DELETE删除数据及注意事项》MySQL的DELETE语句是数据库操作中不可或缺的一部分,通过合理使用索引、批量删除、避免全表删除、使用TRUNCATE、使用ORDERBY和LIMI... 目录1. 基本语法单表删除2. 高级用法使用子查询删除删除多表3. 性能优化策略使用索引批量删除避免

MySQL 数据库进阶之SQL 数据操作与子查询操作大全

《MySQL数据库进阶之SQL数据操作与子查询操作大全》本文详细介绍了SQL中的子查询、数据添加(INSERT)、数据修改(UPDATE)和数据删除(DELETE、TRUNCATE、DROP)操作... 目录一、子查询:嵌套在查询中的查询1.1 子查询的基本语法1.2 子查询的实战示例二、数据添加:INSE