本文主要是介绍物流实时数仓:数仓搭建(DWS)二,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
系列文章目录
物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二
物流实时数仓:数仓搭建(DWS)一
物流实时数仓:数仓搭建(DWS)二
文章目录
- 系列文章目录
- 前言
- 一、代码编写
- 1.修复错误
- 2.交易域货物类型粒度下单当日汇总表
- 1.交易域货物类型下单聚合统计实体类
- 2.交易域:货物类型下单数以及下单金额聚合统计
- 3.CK建表
- 3.交易域机构粒度下单当日汇总表
- 1.交易域货物类型下单聚合统计实体类
- 2.交易域机构粒度下单统计
- 3.CK建表
- 4.物流域转运完成当日汇总表
- 1.物流域转运完成实体类
- 2.物流域转运完成统计
- 3.CK建表
- 5.物流域发单当日汇总表
- 1.物流域发单统计实体类
- 2.物流域发单聚合统计
- 3.CK建表
- 6.物流域机构粒度派送成功当日汇总表
- 1.物流域机构派送成功统计实体类
- 2.物流域机构派送成功统计
- 3.CK建表
- 7.物流域机构粒度揽收当日汇总表
- 1.物流域机构粒度揽收统计实体类
- 2.物流域机构粒度揽收聚合统计
- 3.CK建表
- 8.物流域机构卡车类别粒度运输完成当日汇总表
- 1.物流域机构卡车类别粒度统计实体类
- 2.物流域机构卡车类别粒度聚合统计
- 3.CK建表
- 二、代码测试
- 1.修改topic分区数
- 2.集群启动
- 3.代码测试
- 1.交易域货物类型粒度下单当日汇总表
- 2.交易域机构粒度下单当日汇总表
- 3.物流域转运完成当日汇总表
- 4.物流域发单当日汇总表
- 5.物流域机构粒度派送成功当日汇总表
- 6.物流域机构粒度揽收当日汇总表
- 7.物流域机构卡车类别粒度运输完成当日汇总表
- 总结
前言
上一次的博客中,我们编写了很多第三方的工具类,所以剩下的内容搭建会简单一些。
一、代码编写
1.修复错误
在后期编写代码测试的时候,发现了一个之前代码的错误。
在dwd层中的DwdTransTransFinish文件,在计算TransportTime参数时出现了负数,后来发现是两个数值做差的时候位置错了,要修改一下。
在代码约67行的位置。
源代码
finishBean.setTransportTime(Long.parseLong(finishBean.getActualStartTime()) - Long.parseLong(finishBean.getActualEndTime()));
修改后
finishBean.setTransportTime(Long.parseLong(finishBean.getActualEndTime()) - Long.parseLong(finishBean.getActualStartTime()));
由于代码错误,所以我们已经将错误的代码写入了kafka,所以我们需要删除之前的topic,然后从新生成一个。
kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic tms_dwd_trans_trans_finish
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_trans_finish
2.交易域货物类型粒度下单当日汇总表
要求:统计当日各货物类型下单次数和金额。
1.交易域货物类型下单聚合统计实体类
DwsTradeCargoTypeOrderDayBean.java
package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;import java.math.BigDecimal;/*** 交易域货物类型下单聚合统计实体类*/
@Data
@Builder
public class DwsTradeCargoTypeOrderDayBean {// 当前日期String curDate;// 货物类型IDString cargoType;// 货物类型名称String cargoTypeName;// 下单金额BigDecimal orderAmountBase;// 下单次数Long orderCountBase;// 时间戳Long ts;
}
2.交易域:货物类型下单数以及下单金额聚合统计
DwsTradeCargoTypeOrderDay.java
package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTradeOrderDetailBean;
import com.atguigu.tms.realtime.beans.DwsTradeCargoTypeOrderDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;//交易域:货物类型下单数以及下单金额聚合统计
public class DwsTradeCargoTypeOrderDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 从Kafka读取数据String topic = "tms_dwd_trade_order_detail";String groupId = "dws_trade_cargo_type_order_group";KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 对流中的数据进行类型转换 jsonStr->实体类对象SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> mapDS = kafkaStrDS.map(new MapFunction<String, DwsTradeCargoTypeOrderDayBean>() {@Overridepublic DwsTradeCargoTypeOrderDayBean map(String JsonStr) throws Exception {DwdTradeOrderDetailBean dwdTradeOrderDetailBean = JSON.parseObject(JsonStr, DwdTradeOrderDetailBean.class);DwsTradeCargoTypeOrderDayBean bean = DwsTradeCargoTypeOrderDayBean.builder().cargoType(dwdTradeOrderDetailBean.getCargoType()).orderAmountBase(dwdTradeOrderDetailBean.getAmount()).orderCountBase(1L).ts(dwdTradeOrderDetailBean.getTs() + 8 * 60 * 60 * 1000).build();return bean;}});// 指定Watermark以及提起事件时间字段SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> withWatermarkDS = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTradeCargoTypeOrderDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsTradeCargoTypeOrderDayBean>() {@Overridepublic long extractTimestamp(DwsTradeCargoTypeOrderDayBean element, long l) {return element.getTs();}}));// 按照货物类型进行分组KeyedStream<DwsTradeCargoTypeOrderDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsTradeCargoTypeOrderDayBean::getCargoType);// 开窗WindowedStream<DwsTradeCargoTypeOrderDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1)));// 指定自定义触发器WindowedStream<DwsTradeCargoTypeOrderDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<DwsTradeCargoTypeOrderDayBean>());// 聚合计算SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> aggregateDS = triggerDS.aggregate(new MyAggregationFunction<DwsTradeCargoTypeOrderDayBean>() {@Overridepublic DwsTradeCargoTypeOrderDayBean add(DwsTradeCargoTypeOrderDayBean value, DwsTradeCargoTypeOrderDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setOrderAmountBase(value.getOrderAmountBase().add(accumulator.getOrderAmountBase()));accumulator.setOrderCountBase(value.getOrderCountBase() + accumulator.getOrderCountBase());return accumulator;}},new ProcessWindowFunction<DwsTradeCargoTypeOrderDayBean, DwsTradeCargoTypeOrderDayBean, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<DwsTradeCargoTypeOrderDayBean, DwsTradeCargoTypeOrderDayBean, String, TimeWindow>.Context context, Iterable<DwsTradeCargoTypeOrderDayBean> elements, Collector<DwsTradeCargoTypeOrderDayBean> out) throws Exception {long sst = context.window().getStart() - 8 * 60 * 60 * 1000L;for (DwsTradeCargoTypeOrderDayBean bean : elements) {String curDate = DateFormatUtil.toDate(sst);bean.setCurDate(curDate);bean.setTs(System.currentTimeMillis());out.collect(bean);}}});// 关联货物维度SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> withCargoTypeDS = AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunction<DwsTradeCargoTypeOrderDayBean>("dim_base_dic") {@Overridepublic void join(DwsTradeCargoTypeOrderDayBean bean, JSONObject dimInfoJsonObj) {bean.setCargoTypeName(dimInfoJsonObj.getString("name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTradeCargoTypeOrderDayBean bean) {return Tuple2.of("id",bean.getCargoType());}},60,TimeUnit.SECONDS);// 将结果写入ckwithCargoTypeDS.print(">>>>");withCargoTypeDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trade_cargo_type_order_day_base values(?,?,?,?,?,?)"));env.execute();}
}
3.CK建表
在我们创建的tms_realtime数据库中建表和视图。
CREATE TABLE IF NOT EXISTS dws_trade_cargo_type_order_day_base
(`cur_date` Date COMMENT '统计日期',`cargo_type` String COMMENT '货物类型ID',`cargo_type_name` String COMMENT '货物类型名称',`order_amount_base` Decimal(38, 20) COMMENT '下单金额',`order_count_base` UInt64 COMMENT '下单次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, cargo_type, cargo_type_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trade_cargo_type_order_day
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, cargo_type, cargo_type_name) AS
SELECTcur_date,cargo_type,cargo_type_name,argMaxState(order_amount_base, ts) AS order_amount,argMaxState(order_count_base, ts) AS order_count
FROM dws_trade_cargo_type_order_day_base
GROUP BYcur_date,cargo_type,cargo_type_name;
3.交易域机构粒度下单当日汇总表
要求:统计当日各机构下单次数和金额,并补充城市维度信息
1.交易域货物类型下单聚合统计实体类
DwsTradeOrgOrderDayBean.java
package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;import java.math.BigDecimal;/*** 交易域货物类型下单聚合统计实体类*/
@Data
@Builder
public class DwsTradeOrgOrderDayBean {// 日期String curDate;// 机构IDString orgId;// 机构名称String orgName;// 城市IDString cityId;// 城市名称String cityName;// 发货人区县ID@TransientSinkString senderDistrictId;// 下单金额BigDecimal orderAmountBase;// 下单次数Long orderCountBase;// 时间戳Long ts;
}
2.交易域机构粒度下单统计
DwsTradeOrgOrderDay.java
package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTradeOrderDetailBean;
import com.atguigu.tms.realtime.beans.DwsTradeOrgOrderDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;// 交易域:机构粒度下单聚合统计
public class DwsTradeOrgOrderDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 从kafka的下单实时表中读取数据String topic = "tms_dwd_trade_order_detail";String groupId = "dws_trade_org_order_group";KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 对读取的数据进行类型转换SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> mapDS = kafkaStrDS.map(new MapFunction<String, DwsTradeOrgOrderDayBean>() {@Overridepublic DwsTradeOrgOrderDayBean map(String jsonStr) throws Exception {DwdTradeOrderDetailBean dwdTradeOrderDetailBean = JSON.parseObject(jsonStr, DwdTradeOrderDetailBean.class);DwsTradeOrgOrderDayBean bean = DwsTradeOrgOrderDayBean.builder().senderDistrictId(dwdTradeOrderDetailBean.getSenderDistrictId()).cityId(dwdTradeOrderDetailBean.getSenderCityId()).orderAmountBase(dwdTradeOrderDetailBean.getAmount()).orderCountBase(1L).ts(dwdTradeOrderDetailBean.getTs()).build();return bean;}});// 关联机构维度SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> withOrgDS = AsyncDataStream.unorderedWait(mapDS,new DimAsyncFunction<DwsTradeOrgOrderDayBean>("dim_base_organ") {@Overridepublic void join(DwsTradeOrgOrderDayBean bean, JSONObject dimInfoJsonObj) {bean.setOrgId(dimInfoJsonObj.getString("id"));bean.setOrgName(dimInfoJsonObj.getString("org_name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTradeOrgOrderDayBean bean) {return Tuple2.of("region_id", bean.getSenderDistrictId());}},60, TimeUnit.SECONDS);// 指定WatermarkSingleOutputStreamOperator<DwsTradeOrgOrderDayBean> withWatermarkDS = withOrgDS.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTradeOrgOrderDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsTradeOrgOrderDayBean>() {@Overridepublic long extractTimestamp(DwsTradeOrgOrderDayBean element, long l) {return element.getTs();}}));// 按照机构id进行分组KeyedStream<DwsTradeOrgOrderDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsTradeOrgOrderDayBean::getOrgId);// 开窗WindowedStream<DwsTradeOrgOrderDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1)));// 指定自定义触发器WindowedStream<DwsTradeOrgOrderDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<DwsTradeOrgOrderDayBean>());// 聚合SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> aggregateDS = triggerDS.aggregate(new MyAggregationFunction<DwsTradeOrgOrderDayBean>() {@Overridepublic DwsTradeOrgOrderDayBean add(DwsTradeOrgOrderDayBean value, DwsTradeOrgOrderDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setOrderAmountBase(value.getOrderAmountBase().add(accumulator.getOrderAmountBase()));accumulator.setOrderCountBase(value.getOrderCountBase() + accumulator.getOrderCountBase());return accumulator;}},new ProcessWindowFunction<DwsTradeOrgOrderDayBean, DwsTradeOrgOrderDayBean, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<DwsTradeOrgOrderDayBean, DwsTradeOrgOrderDayBean, String, TimeWindow>.Context context, Iterable<DwsTradeOrgOrderDayBean> elements, Collector<DwsTradeOrgOrderDayBean> out) throws Exception {long stt = context.window().getStart() - 8 * 60 * 60 * 1000;String curDare = DateFormatUtil.toDate(stt);for (DwsTradeOrgOrderDayBean bean : elements) {bean.setCurDate(curDare);bean.setTs(System.currentTimeMillis());out.collect(bean);}}});// 补充城市维度信息SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> withCityDS = AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunction<DwsTradeOrgOrderDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTradeOrgOrderDayBean bean, JSONObject dimInfoJsonObj) {bean.setCityName(dimInfoJsonObj.getString("name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTradeOrgOrderDayBean bean) {return Tuple2.of("id", bean.getCityId());}},60, TimeUnit.SECONDS);// 将结果写入ck中withCityDS.print(">>>");withCityDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trade_org_order_day_base values(?,?,?,?,?,?,?,?)"));env.execute();}
}
3.CK建表
CREATE TABLE IF NOT EXISTS dws_trade_org_order_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '机构ID',`org_name` String COMMENT '机构名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`order_amount_base` Decimal(38, 20) COMMENT '下单金额',`order_count_base` UInt64 COMMENT '下单次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trade_org_order_day
(`cur_date` Date, `org_id` String, `org_name` String, `city_id` String, `city_name` String, `order_amount` AggregateFunction(argMax, Decimal(38, 20), UInt64), `order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, city_id, city_name, argMaxState(order_amount_base, ts) AS order_amount, argMaxState(order_count_base, ts) AS order_count
FROM dws_trade_org_order_day_base
GROUP BY cur_date, org_id, org_name, city_id, city_name;
4.物流域转运完成当日汇总表
要求:统计当日转运完成运单数,写入ClickHouse对应表。
1.物流域转运完成实体类
DwsTransBoundFinishDayBean.java
package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;
/*** 物流域转运完成实体类*/
@Data
@Builder
public class DwsTransBoundFinishDayBean {// 统计日期String curDate;// 转运完成次数Long boundFinishOrderCountBase;// 时间戳Long ts;
}
2.物流域转运完成统计
DwsTransBoundFinishDay.java
package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransDispatchDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransBoundFinishDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
/*** 物流域转运完成统计*/
public class DwsTransBoundFinishDay {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);// 并行度设置,部署时应注释,通过 args 指定全局并行度env.setParallelism(4);// TODO 2. 从 Kafka tms_dwd_trans_bound_finish_detail 主题读取数据String topic = "tms_dwd_trans_bound_finish_detail";String groupId = "dws_trans_bound_finish_day";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> source = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// TODO 3. 转换数据结构SingleOutputStreamOperator<DwsTransBoundFinishDayBean> mappedStream = source.map(jsonStr -> {DwdTransDispatchDetailBean dispatchDetailBean = JSON.parseObject(jsonStr, DwdTransDispatchDetailBean.class);return DwsTransBoundFinishDayBean.builder().boundFinishOrderCountBase(1L).ts(dispatchDetailBean.getTs() + 8 * 60 * 60 * 1000L).build();});// TODO 4. 设置水位线SingleOutputStreamOperator<DwsTransBoundFinishDayBean> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTransBoundFinishDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTransBoundFinishDayBean>() {@Overridepublic long extractTimestamp(DwsTransBoundFinishDayBean element, long recordTimestamp) {return element.getTs();}})).uid("watermark_stream");// TODO 5. 开窗AllWindowedStream<DwsTransBoundFinishDayBean, TimeWindow> windowedStream =withWatermarkStream.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.days(1L)));// TODO 6. 引入触发器AllWindowedStream<DwsTransBoundFinishDayBean, TimeWindow> triggerStream = windowedStream.trigger(new MyTriggerFunction<DwsTransBoundFinishDayBean>());// TODO 7. 聚合SingleOutputStreamOperator<DwsTransBoundFinishDayBean> aggregatedStream = triggerStream.aggregate(new MyAggregationFunction<DwsTransBoundFinishDayBean>() {public DwsTransBoundFinishDayBean add(DwsTransBoundFinishDayBean value, DwsTransBoundFinishDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setBoundFinishOrderCountBase(accumulator.getBoundFinishOrderCountBase() + value.getBoundFinishOrderCountBase());return accumulator;}},new ProcessAllWindowFunction<DwsTransBoundFinishDayBean, DwsTransBoundFinishDayBean, TimeWindow>() {@Overridepublic void process(Context context, Iterable<DwsTransBoundFinishDayBean> elements, Collector<DwsTransBoundFinishDayBean> out) throws Exception {for (DwsTransBoundFinishDayBean element : elements) {String curDate = DateFormatUtil.toDate(context.window().getStart() - 8 * 60 * 60 * 1000L);// 补充统计日期字段element.setCurDate(curDate);// 补充时间戳字段element.setTs(System.currentTimeMillis());out.collect(element);}}}).uid("aggregate_stream");// TODO 8. 写出到 ClickHouseaggregatedStream.print(">>>>");aggregatedStream.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_bound_finish_day_base values(?,?,?)")).uid("clickhouse_sink");env.execute();}
}
3.CK建表
CREATE TABLE IF NOT EXISTS dws_trans_bound_finish_day_base
(`cur_date` Date COMMENT '统计日期',`bound_finish_order_count_base` UInt64 COMMENT '转运完成次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY cur_date;CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_bound_finish_day
(`cur_date` Date, `bound_finish_order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY cur_date
SETTINGS index_granularity = 8192 AS
SELECT cur_date, argMaxState(bound_finish_order_count_base, ts) AS bound_finish_order_count
FROM dws_trans_bound_finish_day_base
GROUP BY cur_date;
5.物流域发单当日汇总表
要求:统计当日发单数,写入ClickHouse。
1.物流域发单统计实体类
DwsTransDispatchDayBean.java
package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;/*** 物流域发单统计实体类*/
@Data
@Builder
public class DwsTransDispatchDayBean {// 统计日期String curDate;// 发单数Long dispatchOrderCountBase;// 时间戳Long ts;
}
2.物流域发单聚合统计
DwsTransDispatchDay.java
package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransDispatchDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransDispatchDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** 物流域发单聚合统计*/
public class DwsTransDispatchDay {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);// 并行度设置,部署时应注释,通过 args 指定全局并行度env.setParallelism(4);// TODO 2. 从 Kafka tms_dwd_trans_dispatch_detail 主题读取数据String topic = "tms_dwd_trans_dispatch_detail";String groupId = "dws_trans_dispatch_day";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> source = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// TODO 3. 转换数据结构SingleOutputStreamOperator<DwsTransDispatchDayBean> mappedStream = source.map(jsonStr -> {DwdTransDispatchDetailBean dispatchDetailBean = JSON.parseObject(jsonStr, DwdTransDispatchDetailBean.class);return DwsTransDispatchDayBean.builder().dispatchOrderCountBase(1L).ts(dispatchDetailBean.getTs() + 8 * 60 * 60 * 1000L).build();});// TODO 4. 设置水位线SingleOutputStreamOperator<DwsTransDispatchDayBean> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
// WatermarkStrategy.<DwsTransDispatchDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L))WatermarkStrategy.<DwsTransDispatchDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsTransDispatchDayBean>() {@Overridepublic long extractTimestamp(DwsTransDispatchDayBean element, long recordTimestamp) {return element.getTs();}})).uid("watermark_stream");// TODO 5. 开窗AllWindowedStream<DwsTransDispatchDayBean, TimeWindow> windowedStream =withWatermarkStream.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.days(1L)));// TODO 6. 引入触发器AllWindowedStream<DwsTransDispatchDayBean, TimeWindow> triggerStream = windowedStream.trigger(new MyTriggerFunction<DwsTransDispatchDayBean>());// TODO 7. 聚合SingleOutputStreamOperator<DwsTransDispatchDayBean> aggregatedStream = triggerStream.aggregate(new MyAggregationFunction<DwsTransDispatchDayBean>() {@Overridepublic DwsTransDispatchDayBean add(DwsTransDispatchDayBean value, DwsTransDispatchDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setDispatchOrderCountBase(accumulator.getDispatchOrderCountBase() + value.getDispatchOrderCountBase());return accumulator;}},new ProcessAllWindowFunction<DwsTransDispatchDayBean, DwsTransDispatchDayBean, TimeWindow>() {@Overridepublic void process(Context context, Iterable<DwsTransDispatchDayBean> elements, Collector<DwsTransDispatchDayBean> out) throws Exception {for (DwsTransDispatchDayBean element : elements) {String curDate = DateFormatUtil.toDate(context.window().getStart() - 8 * 60 * 60 * 1000L);// 补充统计日期字段element.setCurDate(curDate);// 补充时间戳字段element.setTs(System.currentTimeMillis());out.collect(element);}}}).uid("aggregate_stream");// TODO 8. 写出到 ClickHouseaggregatedStream.print(">>>>");aggregatedStream.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_dispatch_day_base values(?,?,?)")).uid("clickhouse_stream");env.execute();}
}
3.CK建表
CREATE TABLE IF NOT EXISTS dws_trans_dispatch_day_base
(`cur_date` Date COMMENT '统计日期',`dispatch_order_count_base` UInt64 COMMENT '发单数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY cur_date;CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_dispatch_day
(`cur_date` Date, `dispatch_order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY cur_date
SETTINGS index_granularity = 8192 AS
SELECT cur_date, argMaxState(dispatch_order_count_base, ts) AS dispatch_order_count
FROM dws_trans_dispatch_day_base
GROUP BY cur_date;
6.物流域机构粒度派送成功当日汇总表
要求:统计当日各机构派送成功次数(运单数),写入ClickHouse。
1.物流域机构派送成功统计实体类
DwsTransOrgDeliverSucDayBean.java
package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;
/*** 物流域机构派送成功统计实体类*/
@Data
@Builder
public class DwsTransOrgDeliverSucDayBean {// 统计日期String curDate;// 机构 IDString orgId;// 机构名称String orgName;// 地区 ID@TransientSinkString districtId;// 城市 IDString cityId;// 城市名称String cityName;// 省份 IDString provinceId;// 省份名称String provinceName;// 派送成功次数Long deliverSucCountBase;// 时间戳Long ts;
}
2.物流域机构派送成功统计
DwsTransOrgDeliverSucDay.java
package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransDeliverSucDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransOrgDeliverSucDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.concurrent.TimeUnit;/*** 物流域机构派送成功统计*/
public class DwsTransOrgDeliverSucDay {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);// 并行度设置,部署时应注释,通过 args 指定全局并行度env.setParallelism(4);// TODO 2. 从 Kafka tms_dwd_trans_deliver_detail 主题读取数据String topic = "tms_dwd_trans_deliver_detail";String groupId = "dws_trans_org_deliver_suc_day";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> source = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// TODO 3. 转换数据结构SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> mappedStream = source.map(jsonStr -> {DwdTransDeliverSucDetailBean dwdTransDeliverSucDetailBean = JSON.parseObject(jsonStr, DwdTransDeliverSucDetailBean.class);return DwsTransOrgDeliverSucDayBean.builder().districtId(dwdTransDeliverSucDetailBean.getReceiverDistrictId()).cityId(dwdTransDeliverSucDetailBean.getReceiverCityId()).provinceId(dwdTransDeliverSucDetailBean.getReceiverProvinceId()).deliverSucCountBase(1L).ts(dwdTransDeliverSucDetailBean.getTs() + 8 * 60 * 60 * 1000L).build();});// TODO 4. 获取维度信息// 获取机构 IDSingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withOrgIdStream = AsyncDataStream.unorderedWait(mappedStream,new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj) {bean.setOrgId(dimJsonObj.getString("id"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgDeliverSucDayBean bean) {return Tuple2.of("region_id", bean.getDistrictId());}},60, TimeUnit.SECONDS).uid("with_org_id_stream");// TODO 5. 设置水位线SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withWatermarkStream = withOrgIdStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTransOrgDeliverSucDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTransOrgDeliverSucDayBean>() {@Overridepublic long extractTimestamp(DwsTransOrgDeliverSucDayBean element, long recordTimestamp) {return element.getTs();}}).withIdleness(Duration.ofSeconds(20))).uid("watermark_stream");// TODO 6. 按照机构 ID 分组KeyedStream<DwsTransOrgDeliverSucDayBean, String> keyedStream = withWatermarkStream.keyBy(DwsTransOrgDeliverSucDayBean::getOrgId);// TODO 7. 开窗WindowedStream<DwsTransOrgDeliverSucDayBean, String, TimeWindow> windowStream =keyedStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.days(1L)));// TODO 8. 引入触发器WindowedStream<DwsTransOrgDeliverSucDayBean, String, TimeWindow> triggerStream = windowStream.trigger(new MyTriggerFunction<>());// TODO 9. 聚合SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> aggregatedStream = triggerStream.aggregate(new MyAggregationFunction<DwsTransOrgDeliverSucDayBean>() {@Overridepublic DwsTransOrgDeliverSucDayBean add(DwsTransOrgDeliverSucDayBean value, DwsTransOrgDeliverSucDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setDeliverSucCountBase(accumulator.getDeliverSucCountBase() + value.getDeliverSucCountBase());return accumulator;}},new ProcessWindowFunction<DwsTransOrgDeliverSucDayBean, DwsTransOrgDeliverSucDayBean, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<DwsTransOrgDeliverSucDayBean> elements, Collector<DwsTransOrgDeliverSucDayBean> out) throws Exception {for (DwsTransOrgDeliverSucDayBean element : elements) {long stt = context.window().getStart();element.setCurDate(DateFormatUtil.toDate(stt - 8 * 60 * 60 * 1000L));element.setTs(System.currentTimeMillis());out.collect(element);}}}).uid("aggregate_stream");// TODO 10. 补全维度信息// 10.1 补充机构名称SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withOrgNameAndRegionIdStream = AsyncDataStream.unorderedWait(aggregatedStream,new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj){bean.setOrgName(dimJsonObj.getString("org_name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgDeliverSucDayBean bean) {return Tuple2.of("id",bean.getOrgId());}},60, TimeUnit.SECONDS).uid("with_org_name_and_region_id_stream");// 10.2 补充城市名称SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withCityNameStream = AsyncDataStream.unorderedWait(withOrgNameAndRegionIdStream,new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj) {bean.setCityName(dimJsonObj.getString("name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgDeliverSucDayBean bean) {return Tuple2.of("id",bean.getCityId());}},60, TimeUnit.SECONDS).uid("with_city_name_stream");// 11.3 补充省份名称SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> fullStream = AsyncDataStream.unorderedWait(withCityNameStream,new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj) {bean.setProvinceName(dimJsonObj.getString("name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgDeliverSucDayBean bean) {return Tuple2.of("id",bean.getProvinceId());}},60, TimeUnit.SECONDS).uid("with_province_name_stream");// TODO 12. 写出到 ClickHousefullStream.print(">>>");fullStream.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_org_deliver_suc_day_base values(?,?,?,?,?,?,?,?,?)")).uid("clickhouse_stream");env.execute();}
}
3.CK建表
CREATE TABLE IF NOT EXISTS dws_trans_org_deliver_suc_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '机构ID',`org_name` String COMMENT '机构名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`province_id` String COMMENT '地区ID',`province_name` String COMMENT '地区名称',`deliver_suc_count_base` UInt64 COMMENT '派送成功次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_org_deliver_suc_day
(`cur_date` Date, `org_id` String, `org_name` String, `city_id` String, `city_name` String, `province_id` String, `province_name` String, `deliver_suc_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, city_id, city_name, province_id, province_name, argMaxState(deliver_suc_count_base, ts) AS deliver_suc_count
FROM dws_trans_org_deliver_suc_day_base
GROUP BY cur_date, org_id, org_name, city_id, city_name, province_id, province_name;
7.物流域机构粒度揽收当日汇总表
要求:统计当日各机构揽收次数,写入ClickHouse。
1.物流域机构粒度揽收统计实体类
DwsTransOrgReceiveDayBean.java
package com.atguigu.tms.realtime.beans;
import lombok.Builder;
import lombok.Data;/***物流域机构粒度揽收统计实体类*/
@Data
@Builder
public class DwsTransOrgReceiveDayBean {// 统计日期String curDate;// 转运站IDString orgId;// 转运站名称String orgName;// 地区ID@TransientSinkString districtId;// 城市IDString cityId;// 城市名称String cityName;// 省份IDString provinceId;// 省份名称String provinceName;// 揽收次数(一个订单算一次)Long receiveOrderCountBase;// 时间戳Long ts;
}
2.物流域机构粒度揽收聚合统计
DwsTransOrgReceiveDay.java
package com.atguigu.tms.realtime.app.dws;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransReceiveDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransOrgReceiveDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.concurrent.TimeUnit;/***物流域机构粒度揽收聚合统计*/
public class DwsTransOrgReceiveDay {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);// 并行度设置,部署时应注释,通过 args 指定全局并行度env.setParallelism(4);// TODO 2. 从指定主题读取数据String topic = "tms_dwd_trans_receive_detail";String groupId = "dws_trans_org_receive_day";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> source = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// TODO 3. 转换数据结构SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> mappedStream = source.map(jsonStr -> {DwdTransReceiveDetailBean dwdTransReceiveDetailBean = JSON.parseObject(jsonStr, DwdTransReceiveDetailBean.class);return DwsTransOrgReceiveDayBean.builder().districtId(dwdTransReceiveDetailBean.getSenderDistrictId()).provinceId(dwdTransReceiveDetailBean.getSenderProvinceId()).cityId(dwdTransReceiveDetailBean.getSenderCityId()).receiveOrderCountBase(1L).ts(dwdTransReceiveDetailBean.getTs() + 8 * 60 * 60 * 1000L).build();});// TODO 4. 关联维度信息// 关联机构idSingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withOrgIdStream = AsyncDataStream.unorderedWait(mappedStream,new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {bean.setOrgId(dimJsonObj.getString("id"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgReceiveDayBean bean) {return Tuple2.of("region_id", bean.getDistrictId());}}, 5 * 60,TimeUnit.SECONDS).uid("with_org_id_stream");// TODO 5. 设置水位线SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withWatermarkStream = withOrgIdStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTransOrgReceiveDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTransOrgReceiveDayBean>() {@Overridepublic long extractTimestamp(DwsTransOrgReceiveDayBean bean, long recordTimestamp) {return bean.getTs();}})).uid("watermark_stream");// TODO 7. 按照 orgID 分组KeyedStream<DwsTransOrgReceiveDayBean, String> keyedStream = withWatermarkStream.keyBy(DwsTransOrgReceiveDayBean::getOrgId);// TODO 8. 开窗WindowedStream<DwsTransOrgReceiveDayBean, String, TimeWindow> windowStream =keyedStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.days(1L)));// TODO 9. 引入触发器WindowedStream<DwsTransOrgReceiveDayBean, String, TimeWindow> triggerStream = windowStream.trigger(new MyTriggerFunction<DwsTransOrgReceiveDayBean>());// TODO 10. 聚合SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> aggregatedStream = triggerStream.aggregate(new MyAggregationFunction<DwsTransOrgReceiveDayBean>() {@Overridepublic DwsTransOrgReceiveDayBean add(DwsTransOrgReceiveDayBean value, DwsTransOrgReceiveDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setReceiveOrderCountBase(accumulator.getReceiveOrderCountBase() + value.getReceiveOrderCountBase());return accumulator;}},new ProcessWindowFunction<DwsTransOrgReceiveDayBean, DwsTransOrgReceiveDayBean, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<DwsTransOrgReceiveDayBean> elements, Collector<DwsTransOrgReceiveDayBean> out) throws Exception {for (DwsTransOrgReceiveDayBean element : elements) {// 补全统计日期字段String curDate = DateFormatUtil.toDate(context.window().getStart() - 8 * 60 * 60 * 1000L);element.setCurDate(curDate);// 补全时间戳element.setTs(System.currentTimeMillis());out.collect(element);}}}).uid("aggregate_stream");// TODO 11. 补充维度信息// 11.1 补充转运站名称SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withOrgNameStream = AsyncDataStream.unorderedWait(aggregatedStream,new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {bean.setOrgName(dimJsonObj.getString("org_name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgReceiveDayBean bean) {return Tuple2.of("id",bean.getOrgId());}},60, TimeUnit.SECONDS).uid("with_org_name_stream");// 11.2 补充城市名称SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withCityNameStream = AsyncDataStream.unorderedWait(withOrgNameStream,new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {bean.setCityName(dimJsonObj.getString("name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgReceiveDayBean bean) {return Tuple2.of("id",bean.getCityId());}},60, TimeUnit.SECONDS).uid("with_city_name_stream");// 11.3 补充省份名称SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> fullStream = AsyncDataStream.unorderedWait(withCityNameStream,new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {bean.setProvinceName(dimJsonObj.getString("name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgReceiveDayBean bean) {return Tuple2.of("id",bean.getProvinceId());}},60, TimeUnit.SECONDS).uid("with_province_name_stream");// TODO 12. 写出到 ClickHousefullStream.print(">>>");fullStream.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_org_receive_day_base values(?,?,?,?,?,?,?,?,?)")).uid("clickhouse_stream");env.execute();}
}
3.CK建表
CREATE TABLE IF NOT EXISTS dws_trans_org_receive_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '转运站ID',`org_name` String COMMENT '转运站名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`province_id` String COMMENT '地区ID',`province_name` String COMMENT '地区名称',`receive_order_count_base` UInt64 COMMENT '揽收次数',`ts` UInt64 COMMENT '时间戳'
)ENGINE = MergeTreeORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_org_receive_day
(`cur_date` Date, `org_id` String, `org_name` String, `city_id` String,`city_name` String,`province_id` String, `province_name` String, `receive_order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, city_id,city_name,province_id, province_name, argMaxState(receive_order_count_base, ts) AS receive_order_count
FROM dws_trans_org_receive_day_base
GROUP BY cur_date, org_id, org_name,city_id,city_name, province_id, province_name;
8.物流域机构卡车类别粒度运输完成当日汇总表
要求:统计各机构各类别卡车当日运输完成次数、里程和历经时长,写入ClickHouse。
1.物流域机构卡车类别粒度统计实体类
DwsTransOrgTruckModelTransFinishDayBean.java
package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;import java.math.BigDecimal;/** 物流域机构卡车类别粒度统计实体类*/
@Data
@Builder
public class DwsTransOrgTruckModelTransFinishDayBean {// 统计日期String curDate;// 机构IDString orgId;// 机构名称String orgName;// 卡车ID@TransientSinkString truckId;// 卡车型号IDString truckModelId;// 卡车型号名称String truckModelName;// 用于关联城市信息的一级机构ID@TransientSinkString joinOrgId;// 城市IDString cityId;// 城市名称String cityName;// 运输完成次数Long transFinishCountBase;// 运输完成里程BigDecimal transFinishDistanceBase;// 运输完成历经时长Long transFinishDurTimeBase;// 时间戳Long ts;
}
2.物流域机构卡车类别粒度聚合统计
DwsTransOrgTruckModelTransFinishDay.java
package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransTransFinishBean;
import com.atguigu.tms.realtime.beans.DwsTransOrgTruckModelTransFinishDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;// 物流域机构卡车类别粒度聚合统计
public class DwsTransOrgTruckModelTransFinishDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 从Kafka的运输事实表中读取数据String topic = "tms_dwd_trans_trans_finish";String groupId = "dws_trans_org_truck_model_group";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaDS = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 对流中数据进行类型转换 jsonStr->实体类 关联卡车维度SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> mapDS = kafkaDS.map(new MapFunction<String, DwsTransOrgTruckModelTransFinishDayBean>() {@Overridepublic DwsTransOrgTruckModelTransFinishDayBean map(String jsonStr) throws Exception {DwdTransTransFinishBean finishBean = JSON.parseObject(jsonStr, DwdTransTransFinishBean.class);DwsTransOrgTruckModelTransFinishDayBean bean = DwsTransOrgTruckModelTransFinishDayBean.builder().orgId(finishBean.getStartOrgId()).orgName(finishBean.getStartOrgName()).truckId(finishBean.getTruckId()).transFinishCountBase(1L).transFinishDistanceBase(finishBean.getActualDistance()).transFinishDurTimeBase(finishBean.getTransportTime()).ts(finishBean.getTs() + 8 * 60 * 60 * 1000L).build();return bean;}});// 关联卡车维度 获取卡车型号SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withTruckDS = AsyncDataStream.unorderedWait(mapDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_truck_info") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {bean.setTruckModelId(dimInfoJsonObj.getString("truck_model_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getTruckId());}},60, TimeUnit.SECONDS);// 指定Watermark的生成策略并提起事件时间字段SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withWatermarkDS = withTruckDS.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTransOrgTruckModelTransFinishDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsTransOrgTruckModelTransFinishDayBean>() {@Overridepublic long extractTimestamp(DwsTransOrgTruckModelTransFinishDayBean element, long l) {return element.getTs();}}));// 按照机构id + 卡车型号进行分组KeyedStream<DwsTransOrgTruckModelTransFinishDayBean, String> keyDS = withWatermarkDS.keyBy(new KeySelector<DwsTransOrgTruckModelTransFinishDayBean, String>() {@Overridepublic String getKey(DwsTransOrgTruckModelTransFinishDayBean bean) throws Exception {return bean.getOrgId() + "+" + bean.getTruckModelId();}});// 开窗WindowedStream<DwsTransOrgTruckModelTransFinishDayBean, String, TimeWindow> windowDS = keyDS.window(TumblingEventTimeWindows.of(Time.days(1)));// 指定自定义触发器WindowedStream<DwsTransOrgTruckModelTransFinishDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<DwsTransOrgTruckModelTransFinishDayBean>());// 聚合SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> aggregateDS = triggerDS.aggregate(new MyAggregationFunction<DwsTransOrgTruckModelTransFinishDayBean>() {@Overridepublic DwsTransOrgTruckModelTransFinishDayBean add(DwsTransOrgTruckModelTransFinishDayBean value, DwsTransOrgTruckModelTransFinishDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setTransFinishCountBase(value.getTransFinishCountBase() + accumulator.getTransFinishCountBase());accumulator.setTransFinishDistanceBase(value.getTransFinishDistanceBase().add(accumulator.getTransFinishDistanceBase()));accumulator.setTransFinishDurTimeBase(value.getTransFinishDurTimeBase() + accumulator.getTransFinishDurTimeBase());return accumulator;}},new ProcessWindowFunction<DwsTransOrgTruckModelTransFinishDayBean, DwsTransOrgTruckModelTransFinishDayBean, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<DwsTransOrgTruckModelTransFinishDayBean> elements, Collector<DwsTransOrgTruckModelTransFinishDayBean> out) throws Exception {Long stt = context.window().getStart() - 8 * 60 * 60 * 1000L;String curDate = DateFormatUtil.toDate(stt);for (DwsTransOrgTruckModelTransFinishDayBean element : elements) {element.setCurDate(curDate);element.setTs(System.currentTimeMillis());out.collect(element);}}});// 关联维度信息// 获取卡车型号名称SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withTruckModelDS = AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_truck_model") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {bean.setTruckModelName(dimInfoJsonObj.getString("model_name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getTruckModelId());}},60, TimeUnit.SECONDS);// 获取机构(对应的转运中心)的idSingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withJoinOrgIdDS = AsyncDataStream.unorderedWait(withTruckModelDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {String orgParentId = dimInfoJsonObj.getString("org_parent_id");bean.setJoinOrgId(orgParentId != null ? orgParentId : bean.getOrgId());}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getOrgId());}},60, TimeUnit.SECONDS);// 根据转运中心的id,到机构表中获取城市idSingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withCityIdDS = AsyncDataStream.unorderedWait(withJoinOrgIdDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {bean.setCityId(dimInfoJsonObj.getString("region_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getJoinOrgId());}},60, TimeUnit.SECONDS);;// 根据城市id 到区域表中获取城市名称SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withCityNameDS = AsyncDataStream.unorderedWait(withCityIdDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {bean.setCityName(dimInfoJsonObj.getString("name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getCityId());}},60, TimeUnit.SECONDS);// 将结果写入ckwithCityNameDS.print(">>>");withCityNameDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_org_truck_model_trans_finish_day_base values(?,?,?,?,?,?,?,?,?,?,?)"));env.execute();}
}
3.CK建表
CREATE TABLE IF NOT EXISTS dws_trans_org_truck_model_trans_finish_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '机构ID',`org_name` String COMMENT '机构名称',`truck_model_id` String COMMENT '卡车类型ID',`truch_model_name` String COMMENT '卡车类型名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`trans_finish_count_base` UInt64 COMMENT '转运完成次数',`trans_finish_distance_base` Decimal(38, 20) COMMENT '转运完成里程',`trans_finish_dur_time_base` UInt64 COMMENT '转运完成历经时长',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_org_truck_model_trans_finish_day
(`cur_date` Date, `org_id` String, `org_name` String, `truck_model_id` String, `truch_model_name` String, `city_id` String, `city_name` String, `trans_finish_count` AggregateFunction(argMax, UInt64, UInt64), `trans_finish_distance` AggregateFunction(argMax, Decimal(38, 20), UInt64), `trans_finish_dur_time` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name, argMaxState(trans_finish_count_base, ts) AS trans_finish_count, argMaxState(trans_finish_distance_base, ts) AS trans_finish_distance, argMaxState(trans_finish_dur_time_base, ts) AS trans_finish_dur_time
FROM dws_trans_org_truck_model_trans_finish_day_base
GROUP BY cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name;
二、代码测试
1.修改topic分区数
我们要保证kafka中的topic的分区数,和程序中Flink设置的并行度一样都是4。
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trade_order_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_bound_finish_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_dispatch_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_deliver_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_trans_finish --partitions 4
2.集群启动
将HDFS,zk,kf,hbase,redise和clickhouse全部即启动
因为我们要一次性测试7张表,所以我们要将ODS层和DWD层的四个文件全部启动。
OdsApp、DwdBoundRelevantApp、DwdOrderRelevantApp和DwdTransTransFinish。
3.代码测试
你可以每次打开一个DWS层的应用,然后生成数据,查看CK数据库,也可以7个全部启动,只需要生成一次数据。有些数据可能不常见,要多生成几次。
1.交易域货物类型粒度下单当日汇总表
测试代码
select cur_date,cargo_type,cargo_type_name,argMaxMerge(order_amount) as order_amount,argMaxMerge(order_count) as order_count
from dws_trade_cargo_type_order_day
group by cur_date,cargo_type,cargo_type_name
LIMIT 10;
2.交易域机构粒度下单当日汇总表
测试代码
select cur_date,org_id,org_name,city_id,city_name,argMaxMerge(order_amount) as order_amount,argMaxMerge(order_count) as order_count
from dws_trade_org_order_day
group by cur_date,org_id,org_name,city_id,city_name
LIMIT 10;
3.物流域转运完成当日汇总表
测试代码
select cur_date,argMaxMerge(bound_finish_order_count) as bound_finish_order_count
from dws_trans_bound_finish_day
group by cur_date
LIMIT 10;
4.物流域发单当日汇总表
测试代码
select cur_date,argMaxMerge(dispatch_order_count) as dispatch_order_count
from dws_trans_dispatch_day
group by cur_date
LIMIT 10;
5.物流域机构粒度派送成功当日汇总表
测试代码
SELECTcur_date,org_id,org_name,city_id,city_name,province_id,province_name,argMaxMerge(deliver_suc_count) AS deliver_suc_count
FROM dws_trans_org_deliver_suc_day
GROUP BYcur_date,org_id,org_name,city_id,city_name,province_id,province_name
LIMIT 10;
6.物流域机构粒度揽收当日汇总表
测试代码
SELECTcur_date,org_id,org_name,city_id,city_name,province_id,province_name,argMaxMerge(receive_order_count) AS receive_order_count
FROM dws_trans_org_receive_day
GROUP BYcur_date,org_id,org_name,city_id,city_name,province_id,province_name
LIMIT 10;
7.物流域机构卡车类别粒度运输完成当日汇总表
测试代码
SELECT cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name, argMaxMerge(trans_finish_count) AS trans_finish_count, argMaxMerge(trans_finish_distance) AS trans_finish_distance, argMaxMerge(trans_finish_dur_time) AS trans_finish_dur_time
FROM dws_trans_org_truck_model_trans_finish_day
GROUP BY cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name
LIMIT 10;
总结
至此实时数仓的DWS层就搭建完毕了,并且代码已经全度推到了github上。
URL:https://github.com/lcc-666/tms-parent
这篇关于物流实时数仓:数仓搭建(DWS)二的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!