本文主要是介绍flink 从monggo读取PB级全部数据根据分组统计数据 样例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
简述
- 从mongoDB数据库中读取千万级数据。
- 根据某个字段进行汇总统计数据。
- 将获取的数据保存到redis中,获取最终的统计数据。
maven
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId><version>1.1.0-1.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.8.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency> <dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency>
代码
RedisExampleMapper
package com.wfg.flink.connector.redis;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @author wfg*/
public class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> data) {return data.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> data) {return data.f1.toString();}
}
根据名字统计访问次数
package com.wfg.flink.connector.mongodb;import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.redis.RedisExampleMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.bson.BsonDocument;import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION;/*** @author wfg* 根据名字统计访问次数*/
public class MongoAllNameRedisCounts {public static void main(String[] args) throws Exception {String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss");System.out.println("StartTime:" + startTime);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启Checkpointing,设置Checkpoint间隔env.enableCheckpointing(30000);// 设置Checkpoint模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置最小Checkpoint间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 设置最大并发Checkpoint数目env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 使用RocksDB作为状态后端env.setStateBackend(new HashMapStateBackend());env.setParallelism(10);// 配置MongoDB源MongoSource<String> mongoSource = MongoSource.<String>builder().setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin").setDatabase("sjzz").setCollection(MONGO_TEST_PV_COLLECTION).setFetchSize(2048)
// .setLimit(1000).setNoCursorTimeout(true).setPartitionStrategy(PartitionStrategy.SINGLE).setPartitionSize(MemorySize.ofMebiBytes(64))
// .setSamplesPerPartition(10).setDeserializationSchema(new MongoDeserializationSchema<>() {@Overridepublic String deserialize(BsonDocument document) {return document.toJson();}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).build();// 创建MongoDB数据流DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
// 转换数据,提取人名作为KeyDataStream<Tuple2<String, Integer>> nameCountStream = sourceStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);return Tuple2.of(data.getUserName(), 1);}}).keyBy(value -> value.f0).reduce(new ReduceFunction<>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);}});FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();RedisSink<Tuple2<String, Integer>> sink = new RedisSink<>(conf, new RedisExampleMapper());nameCountStream.addSink(sink);// 输出结果env.execute("Flink MongoDB Name Count ");System.out.println("-----------------------------------");System.out.println("startTime: " + startTime);System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"));}
}
注意:
大数据生成参考:批量数据生成
中间件部署参考:
- kafka:部署
- mongo:部署
这篇关于flink 从monggo读取PB级全部数据根据分组统计数据 样例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!