物流实时数仓:数仓搭建(DWS)二

2024-01-05 17:04
文章标签 搭建 实时 物流 数仓 dws

本文主要是介绍物流实时数仓:数仓搭建(DWS)二,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二
物流实时数仓:数仓搭建(DWS)一
物流实时数仓:数仓搭建(DWS)二


文章目录

  • 系列文章目录
  • 前言
  • 一、代码编写
    • 1.修复错误
    • 2.交易域货物类型粒度下单当日汇总表
      • 1.交易域货物类型下单聚合统计实体类
      • 2.交易域:货物类型下单数以及下单金额聚合统计
      • 3.CK建表
    • 3.交易域机构粒度下单当日汇总表
      • 1.交易域货物类型下单聚合统计实体类
      • 2.交易域机构粒度下单统计
      • 3.CK建表
    • 4.物流域转运完成当日汇总表
      • 1.物流域转运完成实体类
      • 2.物流域转运完成统计
      • 3.CK建表
    • 5.物流域发单当日汇总表
      • 1.物流域发单统计实体类
      • 2.物流域发单聚合统计
      • 3.CK建表
    • 6.物流域机构粒度派送成功当日汇总表
      • 1.物流域机构派送成功统计实体类
      • 2.物流域机构派送成功统计
      • 3.CK建表
    • 7.物流域机构粒度揽收当日汇总表
      • 1.物流域机构粒度揽收统计实体类
      • 2.物流域机构粒度揽收聚合统计
      • 3.CK建表
    • 8.物流域机构卡车类别粒度运输完成当日汇总表
      • 1.物流域机构卡车类别粒度统计实体类
      • 2.物流域机构卡车类别粒度聚合统计
      • 3.CK建表
  • 二、代码测试
    • 1.修改topic分区数
    • 2.集群启动
    • 3.代码测试
      • 1.交易域货物类型粒度下单当日汇总表
      • 2.交易域机构粒度下单当日汇总表
      • 3.物流域转运完成当日汇总表
      • 4.物流域发单当日汇总表
      • 5.物流域机构粒度派送成功当日汇总表
      • 6.物流域机构粒度揽收当日汇总表
      • 7.物流域机构卡车类别粒度运输完成当日汇总表
  • 总结


前言

上一次的博客中,我们编写了很多第三方的工具类,所以剩下的内容搭建会简单一些。


一、代码编写

1.修复错误

在后期编写代码测试的时候,发现了一个之前代码的错误。
在dwd层中的DwdTransTransFinish文件,在计算TransportTime参数时出现了负数,后来发现是两个数值做差的时候位置错了,要修改一下。
在代码约67行的位置。

源代码
finishBean.setTransportTime(Long.parseLong(finishBean.getActualStartTime()) - Long.parseLong(finishBean.getActualEndTime()));
修改后
finishBean.setTransportTime(Long.parseLong(finishBean.getActualEndTime()) - Long.parseLong(finishBean.getActualStartTime()));

由于代码错误,所以我们已经将错误的代码写入了kafka,所以我们需要删除之前的topic,然后从新生成一个。

kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic tms_dwd_trans_trans_finish
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_trans_finish

2.交易域货物类型粒度下单当日汇总表

要求:统计当日各货物类型下单次数和金额。

1.交易域货物类型下单聚合统计实体类

DwsTradeCargoTypeOrderDayBean.java

package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;import java.math.BigDecimal;/*** 交易域货物类型下单聚合统计实体类*/
@Data
@Builder
public class DwsTradeCargoTypeOrderDayBean {// 当前日期String curDate;// 货物类型IDString cargoType;// 货物类型名称String cargoTypeName;// 下单金额BigDecimal orderAmountBase;// 下单次数Long orderCountBase;// 时间戳Long ts;
}

2.交易域:货物类型下单数以及下单金额聚合统计

DwsTradeCargoTypeOrderDay.java

package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTradeOrderDetailBean;
import com.atguigu.tms.realtime.beans.DwsTradeCargoTypeOrderDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;//交易域:货物类型下单数以及下单金额聚合统计
public class DwsTradeCargoTypeOrderDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 从Kafka读取数据String topic = "tms_dwd_trade_order_detail";String groupId = "dws_trade_cargo_type_order_group";KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 对流中的数据进行类型转换 jsonStr->实体类对象SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> mapDS = kafkaStrDS.map(new MapFunction<String, DwsTradeCargoTypeOrderDayBean>() {@Overridepublic DwsTradeCargoTypeOrderDayBean map(String JsonStr) throws Exception {DwdTradeOrderDetailBean dwdTradeOrderDetailBean = JSON.parseObject(JsonStr, DwdTradeOrderDetailBean.class);DwsTradeCargoTypeOrderDayBean bean = DwsTradeCargoTypeOrderDayBean.builder().cargoType(dwdTradeOrderDetailBean.getCargoType()).orderAmountBase(dwdTradeOrderDetailBean.getAmount()).orderCountBase(1L).ts(dwdTradeOrderDetailBean.getTs() + 8 * 60 * 60 * 1000).build();return bean;}});// 指定Watermark以及提起事件时间字段SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> withWatermarkDS = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTradeCargoTypeOrderDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsTradeCargoTypeOrderDayBean>() {@Overridepublic long extractTimestamp(DwsTradeCargoTypeOrderDayBean element, long l) {return element.getTs();}}));// 按照货物类型进行分组KeyedStream<DwsTradeCargoTypeOrderDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsTradeCargoTypeOrderDayBean::getCargoType);// 开窗WindowedStream<DwsTradeCargoTypeOrderDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1)));// 指定自定义触发器WindowedStream<DwsTradeCargoTypeOrderDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<DwsTradeCargoTypeOrderDayBean>());// 聚合计算SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> aggregateDS = triggerDS.aggregate(new MyAggregationFunction<DwsTradeCargoTypeOrderDayBean>() {@Overridepublic DwsTradeCargoTypeOrderDayBean add(DwsTradeCargoTypeOrderDayBean value, DwsTradeCargoTypeOrderDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setOrderAmountBase(value.getOrderAmountBase().add(accumulator.getOrderAmountBase()));accumulator.setOrderCountBase(value.getOrderCountBase() + accumulator.getOrderCountBase());return accumulator;}},new ProcessWindowFunction<DwsTradeCargoTypeOrderDayBean, DwsTradeCargoTypeOrderDayBean, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<DwsTradeCargoTypeOrderDayBean, DwsTradeCargoTypeOrderDayBean, String, TimeWindow>.Context context, Iterable<DwsTradeCargoTypeOrderDayBean> elements, Collector<DwsTradeCargoTypeOrderDayBean> out) throws Exception {long sst = context.window().getStart() - 8 * 60 * 60 * 1000L;for (DwsTradeCargoTypeOrderDayBean bean : elements) {String curDate = DateFormatUtil.toDate(sst);bean.setCurDate(curDate);bean.setTs(System.currentTimeMillis());out.collect(bean);}}});// 关联货物维度SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> withCargoTypeDS = AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunction<DwsTradeCargoTypeOrderDayBean>("dim_base_dic") {@Overridepublic void join(DwsTradeCargoTypeOrderDayBean bean, JSONObject dimInfoJsonObj) {bean.setCargoTypeName(dimInfoJsonObj.getString("name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTradeCargoTypeOrderDayBean bean) {return Tuple2.of("id",bean.getCargoType());}},60,TimeUnit.SECONDS);// 将结果写入ckwithCargoTypeDS.print(">>>>");withCargoTypeDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trade_cargo_type_order_day_base values(?,?,?,?,?,?)"));env.execute();}
}

3.CK建表

在我们创建的tms_realtime数据库中建表和视图。

CREATE TABLE IF NOT EXISTS dws_trade_cargo_type_order_day_base
(`cur_date` Date COMMENT '统计日期',`cargo_type` String COMMENT '货物类型ID',`cargo_type_name` String COMMENT '货物类型名称',`order_amount_base` Decimal(38, 20) COMMENT '下单金额',`order_count_base` UInt64 COMMENT '下单次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, cargo_type, cargo_type_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trade_cargo_type_order_day
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, cargo_type, cargo_type_name) AS
SELECTcur_date,cargo_type,cargo_type_name,argMaxState(order_amount_base, ts) AS order_amount,argMaxState(order_count_base, ts) AS order_count
FROM dws_trade_cargo_type_order_day_base
GROUP BYcur_date,cargo_type,cargo_type_name;

3.交易域机构粒度下单当日汇总表

要求:统计当日各机构下单次数和金额,并补充城市维度信息

1.交易域货物类型下单聚合统计实体类

DwsTradeOrgOrderDayBean.java

package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;import java.math.BigDecimal;/*** 交易域货物类型下单聚合统计实体类*/
@Data
@Builder
public class DwsTradeOrgOrderDayBean {// 日期String curDate;// 机构IDString orgId;// 机构名称String orgName;// 城市IDString cityId;// 城市名称String cityName;// 发货人区县ID@TransientSinkString senderDistrictId;// 下单金额BigDecimal orderAmountBase;// 下单次数Long orderCountBase;// 时间戳Long ts;
}

2.交易域机构粒度下单统计

DwsTradeOrgOrderDay.java

package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTradeOrderDetailBean;
import com.atguigu.tms.realtime.beans.DwsTradeOrgOrderDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;// 交易域:机构粒度下单聚合统计
public class DwsTradeOrgOrderDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 从kafka的下单实时表中读取数据String topic = "tms_dwd_trade_order_detail";String groupId = "dws_trade_org_order_group";KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 对读取的数据进行类型转换SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> mapDS = kafkaStrDS.map(new MapFunction<String, DwsTradeOrgOrderDayBean>() {@Overridepublic DwsTradeOrgOrderDayBean map(String jsonStr) throws Exception {DwdTradeOrderDetailBean dwdTradeOrderDetailBean = JSON.parseObject(jsonStr, DwdTradeOrderDetailBean.class);DwsTradeOrgOrderDayBean bean = DwsTradeOrgOrderDayBean.builder().senderDistrictId(dwdTradeOrderDetailBean.getSenderDistrictId()).cityId(dwdTradeOrderDetailBean.getSenderCityId()).orderAmountBase(dwdTradeOrderDetailBean.getAmount()).orderCountBase(1L).ts(dwdTradeOrderDetailBean.getTs()).build();return bean;}});// 关联机构维度SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> withOrgDS = AsyncDataStream.unorderedWait(mapDS,new DimAsyncFunction<DwsTradeOrgOrderDayBean>("dim_base_organ") {@Overridepublic void join(DwsTradeOrgOrderDayBean bean, JSONObject dimInfoJsonObj) {bean.setOrgId(dimInfoJsonObj.getString("id"));bean.setOrgName(dimInfoJsonObj.getString("org_name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTradeOrgOrderDayBean bean) {return Tuple2.of("region_id", bean.getSenderDistrictId());}},60, TimeUnit.SECONDS);// 指定WatermarkSingleOutputStreamOperator<DwsTradeOrgOrderDayBean> withWatermarkDS = withOrgDS.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTradeOrgOrderDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsTradeOrgOrderDayBean>() {@Overridepublic long extractTimestamp(DwsTradeOrgOrderDayBean element, long l) {return element.getTs();}}));// 按照机构id进行分组KeyedStream<DwsTradeOrgOrderDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsTradeOrgOrderDayBean::getOrgId);// 开窗WindowedStream<DwsTradeOrgOrderDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1)));// 指定自定义触发器WindowedStream<DwsTradeOrgOrderDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<DwsTradeOrgOrderDayBean>());// 聚合SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> aggregateDS = triggerDS.aggregate(new MyAggregationFunction<DwsTradeOrgOrderDayBean>() {@Overridepublic DwsTradeOrgOrderDayBean add(DwsTradeOrgOrderDayBean value, DwsTradeOrgOrderDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setOrderAmountBase(value.getOrderAmountBase().add(accumulator.getOrderAmountBase()));accumulator.setOrderCountBase(value.getOrderCountBase() + accumulator.getOrderCountBase());return accumulator;}},new ProcessWindowFunction<DwsTradeOrgOrderDayBean, DwsTradeOrgOrderDayBean, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<DwsTradeOrgOrderDayBean, DwsTradeOrgOrderDayBean, String, TimeWindow>.Context context, Iterable<DwsTradeOrgOrderDayBean> elements, Collector<DwsTradeOrgOrderDayBean> out) throws Exception {long stt = context.window().getStart() - 8 * 60 * 60 * 1000;String curDare = DateFormatUtil.toDate(stt);for (DwsTradeOrgOrderDayBean bean : elements) {bean.setCurDate(curDare);bean.setTs(System.currentTimeMillis());out.collect(bean);}}});// 补充城市维度信息SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> withCityDS = AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunction<DwsTradeOrgOrderDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTradeOrgOrderDayBean bean, JSONObject dimInfoJsonObj) {bean.setCityName(dimInfoJsonObj.getString("name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTradeOrgOrderDayBean bean) {return Tuple2.of("id", bean.getCityId());}},60, TimeUnit.SECONDS);// 将结果写入ck中withCityDS.print(">>>");withCityDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trade_org_order_day_base values(?,?,?,?,?,?,?,?)"));env.execute();}
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trade_org_order_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '机构ID',`org_name` String COMMENT '机构名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`order_amount_base` Decimal(38, 20) COMMENT '下单金额',`order_count_base` UInt64 COMMENT '下单次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trade_org_order_day
(`cur_date` Date, `org_id` String, `org_name` String, `city_id` String, `city_name` String, `order_amount` AggregateFunction(argMax, Decimal(38, 20), UInt64), `order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, city_id, city_name, argMaxState(order_amount_base, ts) AS order_amount, argMaxState(order_count_base, ts) AS order_count
FROM dws_trade_org_order_day_base
GROUP BY cur_date, org_id, org_name, city_id, city_name;

4.物流域转运完成当日汇总表

要求:统计当日转运完成运单数,写入ClickHouse对应表。

1.物流域转运完成实体类

DwsTransBoundFinishDayBean.java

package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;
/*** 物流域转运完成实体类*/
@Data
@Builder
public class DwsTransBoundFinishDayBean {// 统计日期String curDate;// 转运完成次数Long boundFinishOrderCountBase;// 时间戳Long ts;
}

2.物流域转运完成统计

DwsTransBoundFinishDay.java

package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransDispatchDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransBoundFinishDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
/*** 物流域转运完成统计*/
public class DwsTransBoundFinishDay {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);// 并行度设置,部署时应注释,通过 args 指定全局并行度env.setParallelism(4);// TODO 2. 从 Kafka tms_dwd_trans_bound_finish_detail 主题读取数据String topic = "tms_dwd_trans_bound_finish_detail";String groupId = "dws_trans_bound_finish_day";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> source = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// TODO 3. 转换数据结构SingleOutputStreamOperator<DwsTransBoundFinishDayBean> mappedStream = source.map(jsonStr -> {DwdTransDispatchDetailBean dispatchDetailBean = JSON.parseObject(jsonStr, DwdTransDispatchDetailBean.class);return DwsTransBoundFinishDayBean.builder().boundFinishOrderCountBase(1L).ts(dispatchDetailBean.getTs() + 8 * 60 * 60 * 1000L).build();});// TODO 4. 设置水位线SingleOutputStreamOperator<DwsTransBoundFinishDayBean> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTransBoundFinishDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTransBoundFinishDayBean>() {@Overridepublic long extractTimestamp(DwsTransBoundFinishDayBean element, long recordTimestamp) {return element.getTs();}})).uid("watermark_stream");// TODO 5. 开窗AllWindowedStream<DwsTransBoundFinishDayBean, TimeWindow> windowedStream =withWatermarkStream.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.days(1L)));// TODO 6. 引入触发器AllWindowedStream<DwsTransBoundFinishDayBean, TimeWindow> triggerStream = windowedStream.trigger(new MyTriggerFunction<DwsTransBoundFinishDayBean>());// TODO 7. 聚合SingleOutputStreamOperator<DwsTransBoundFinishDayBean> aggregatedStream = triggerStream.aggregate(new MyAggregationFunction<DwsTransBoundFinishDayBean>() {public DwsTransBoundFinishDayBean add(DwsTransBoundFinishDayBean value, DwsTransBoundFinishDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setBoundFinishOrderCountBase(accumulator.getBoundFinishOrderCountBase() + value.getBoundFinishOrderCountBase());return accumulator;}},new ProcessAllWindowFunction<DwsTransBoundFinishDayBean, DwsTransBoundFinishDayBean, TimeWindow>() {@Overridepublic void process(Context context, Iterable<DwsTransBoundFinishDayBean> elements, Collector<DwsTransBoundFinishDayBean> out) throws Exception {for (DwsTransBoundFinishDayBean element : elements) {String curDate = DateFormatUtil.toDate(context.window().getStart() - 8 * 60 * 60 * 1000L);// 补充统计日期字段element.setCurDate(curDate);// 补充时间戳字段element.setTs(System.currentTimeMillis());out.collect(element);}}}).uid("aggregate_stream");// TODO 8. 写出到 ClickHouseaggregatedStream.print(">>>>");aggregatedStream.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_bound_finish_day_base values(?,?,?)")).uid("clickhouse_sink");env.execute();}
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_bound_finish_day_base
(`cur_date` Date COMMENT '统计日期',`bound_finish_order_count_base` UInt64 COMMENT '转运完成次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY cur_date;CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_bound_finish_day
(`cur_date` Date, `bound_finish_order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY cur_date
SETTINGS index_granularity = 8192 AS
SELECT cur_date, argMaxState(bound_finish_order_count_base, ts) AS bound_finish_order_count
FROM dws_trans_bound_finish_day_base
GROUP BY cur_date;

5.物流域发单当日汇总表

要求:统计当日发单数,写入ClickHouse。

1.物流域发单统计实体类

DwsTransDispatchDayBean.java

package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;/*** 物流域发单统计实体类*/
@Data
@Builder
public class DwsTransDispatchDayBean {// 统计日期String curDate;// 发单数Long dispatchOrderCountBase;// 时间戳Long ts;
}

2.物流域发单聚合统计

DwsTransDispatchDay.java

package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransDispatchDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransDispatchDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** 物流域发单聚合统计*/
public class DwsTransDispatchDay {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);// 并行度设置,部署时应注释,通过 args 指定全局并行度env.setParallelism(4);// TODO 2. 从 Kafka tms_dwd_trans_dispatch_detail 主题读取数据String topic = "tms_dwd_trans_dispatch_detail";String groupId = "dws_trans_dispatch_day";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> source = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// TODO 3. 转换数据结构SingleOutputStreamOperator<DwsTransDispatchDayBean> mappedStream = source.map(jsonStr -> {DwdTransDispatchDetailBean dispatchDetailBean = JSON.parseObject(jsonStr, DwdTransDispatchDetailBean.class);return DwsTransDispatchDayBean.builder().dispatchOrderCountBase(1L).ts(dispatchDetailBean.getTs() + 8 * 60 * 60 * 1000L).build();});// TODO 4. 设置水位线SingleOutputStreamOperator<DwsTransDispatchDayBean> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
//                WatermarkStrategy.<DwsTransDispatchDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L))WatermarkStrategy.<DwsTransDispatchDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsTransDispatchDayBean>() {@Overridepublic long extractTimestamp(DwsTransDispatchDayBean element, long recordTimestamp) {return element.getTs();}})).uid("watermark_stream");// TODO 5. 开窗AllWindowedStream<DwsTransDispatchDayBean, TimeWindow> windowedStream =withWatermarkStream.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.days(1L)));// TODO 6. 引入触发器AllWindowedStream<DwsTransDispatchDayBean, TimeWindow> triggerStream = windowedStream.trigger(new MyTriggerFunction<DwsTransDispatchDayBean>());// TODO 7. 聚合SingleOutputStreamOperator<DwsTransDispatchDayBean> aggregatedStream = triggerStream.aggregate(new MyAggregationFunction<DwsTransDispatchDayBean>() {@Overridepublic DwsTransDispatchDayBean add(DwsTransDispatchDayBean value, DwsTransDispatchDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setDispatchOrderCountBase(accumulator.getDispatchOrderCountBase() + value.getDispatchOrderCountBase());return accumulator;}},new ProcessAllWindowFunction<DwsTransDispatchDayBean, DwsTransDispatchDayBean, TimeWindow>() {@Overridepublic void process(Context context, Iterable<DwsTransDispatchDayBean> elements, Collector<DwsTransDispatchDayBean> out) throws Exception {for (DwsTransDispatchDayBean element : elements) {String curDate = DateFormatUtil.toDate(context.window().getStart() - 8 * 60 * 60 * 1000L);// 补充统计日期字段element.setCurDate(curDate);// 补充时间戳字段element.setTs(System.currentTimeMillis());out.collect(element);}}}).uid("aggregate_stream");// TODO 8. 写出到 ClickHouseaggregatedStream.print(">>>>");aggregatedStream.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_dispatch_day_base values(?,?,?)")).uid("clickhouse_stream");env.execute();}
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_dispatch_day_base
(`cur_date` Date COMMENT '统计日期',`dispatch_order_count_base` UInt64 COMMENT '发单数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY cur_date;CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_dispatch_day
(`cur_date` Date, `dispatch_order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY cur_date
SETTINGS index_granularity = 8192 AS
SELECT cur_date, argMaxState(dispatch_order_count_base, ts) AS dispatch_order_count
FROM dws_trans_dispatch_day_base
GROUP BY cur_date;

6.物流域机构粒度派送成功当日汇总表

要求:统计当日各机构派送成功次数(运单数),写入ClickHouse。

1.物流域机构派送成功统计实体类

DwsTransOrgDeliverSucDayBean.java

package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;
/*** 物流域机构派送成功统计实体类*/
@Data
@Builder
public class DwsTransOrgDeliverSucDayBean {// 统计日期String curDate;// 机构 IDString orgId;// 机构名称String orgName;// 地区 ID@TransientSinkString districtId;// 城市 IDString cityId;// 城市名称String cityName;// 省份 IDString provinceId;// 省份名称String provinceName;// 派送成功次数Long deliverSucCountBase;// 时间戳Long ts;
}

2.物流域机构派送成功统计

DwsTransOrgDeliverSucDay.java

package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransDeliverSucDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransOrgDeliverSucDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.concurrent.TimeUnit;/*** 物流域机构派送成功统计*/
public class DwsTransOrgDeliverSucDay {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);// 并行度设置,部署时应注释,通过 args 指定全局并行度env.setParallelism(4);// TODO 2. 从 Kafka tms_dwd_trans_deliver_detail 主题读取数据String topic = "tms_dwd_trans_deliver_detail";String groupId = "dws_trans_org_deliver_suc_day";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> source = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// TODO 3. 转换数据结构SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> mappedStream = source.map(jsonStr -> {DwdTransDeliverSucDetailBean dwdTransDeliverSucDetailBean = JSON.parseObject(jsonStr, DwdTransDeliverSucDetailBean.class);return DwsTransOrgDeliverSucDayBean.builder().districtId(dwdTransDeliverSucDetailBean.getReceiverDistrictId()).cityId(dwdTransDeliverSucDetailBean.getReceiverCityId()).provinceId(dwdTransDeliverSucDetailBean.getReceiverProvinceId()).deliverSucCountBase(1L).ts(dwdTransDeliverSucDetailBean.getTs() + 8 * 60 * 60 * 1000L).build();});// TODO 4. 获取维度信息// 获取机构 IDSingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withOrgIdStream = AsyncDataStream.unorderedWait(mappedStream,new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj)  {bean.setOrgId(dimJsonObj.getString("id"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgDeliverSucDayBean bean) {return Tuple2.of("region_id", bean.getDistrictId());}},60, TimeUnit.SECONDS).uid("with_org_id_stream");// TODO 5. 设置水位线SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withWatermarkStream = withOrgIdStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTransOrgDeliverSucDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTransOrgDeliverSucDayBean>() {@Overridepublic long extractTimestamp(DwsTransOrgDeliverSucDayBean element, long recordTimestamp) {return element.getTs();}}).withIdleness(Duration.ofSeconds(20))).uid("watermark_stream");// TODO 6. 按照机构 ID 分组KeyedStream<DwsTransOrgDeliverSucDayBean, String> keyedStream = withWatermarkStream.keyBy(DwsTransOrgDeliverSucDayBean::getOrgId);// TODO 7. 开窗WindowedStream<DwsTransOrgDeliverSucDayBean, String, TimeWindow> windowStream =keyedStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.days(1L)));// TODO 8. 引入触发器WindowedStream<DwsTransOrgDeliverSucDayBean, String, TimeWindow> triggerStream = windowStream.trigger(new MyTriggerFunction<>());// TODO 9. 聚合SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> aggregatedStream = triggerStream.aggregate(new MyAggregationFunction<DwsTransOrgDeliverSucDayBean>() {@Overridepublic DwsTransOrgDeliverSucDayBean add(DwsTransOrgDeliverSucDayBean value, DwsTransOrgDeliverSucDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setDeliverSucCountBase(accumulator.getDeliverSucCountBase() + value.getDeliverSucCountBase());return accumulator;}},new ProcessWindowFunction<DwsTransOrgDeliverSucDayBean, DwsTransOrgDeliverSucDayBean, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<DwsTransOrgDeliverSucDayBean> elements, Collector<DwsTransOrgDeliverSucDayBean> out) throws Exception {for (DwsTransOrgDeliverSucDayBean element : elements) {long stt = context.window().getStart();element.setCurDate(DateFormatUtil.toDate(stt - 8 * 60 * 60 * 1000L));element.setTs(System.currentTimeMillis());out.collect(element);}}}).uid("aggregate_stream");// TODO 10. 补全维度信息// 10.1 补充机构名称SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withOrgNameAndRegionIdStream = AsyncDataStream.unorderedWait(aggregatedStream,new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj){bean.setOrgName(dimJsonObj.getString("org_name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgDeliverSucDayBean bean) {return Tuple2.of("id",bean.getOrgId());}},60, TimeUnit.SECONDS).uid("with_org_name_and_region_id_stream");// 10.2 补充城市名称SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withCityNameStream = AsyncDataStream.unorderedWait(withOrgNameAndRegionIdStream,new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj) {bean.setCityName(dimJsonObj.getString("name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgDeliverSucDayBean bean) {return Tuple2.of("id",bean.getCityId());}},60, TimeUnit.SECONDS).uid("with_city_name_stream");// 11.3 补充省份名称SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> fullStream = AsyncDataStream.unorderedWait(withCityNameStream,new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj) {bean.setProvinceName(dimJsonObj.getString("name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgDeliverSucDayBean bean) {return Tuple2.of("id",bean.getProvinceId());}},60, TimeUnit.SECONDS).uid("with_province_name_stream");// TODO 12. 写出到 ClickHousefullStream.print(">>>");fullStream.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_org_deliver_suc_day_base values(?,?,?,?,?,?,?,?,?)")).uid("clickhouse_stream");env.execute();}
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_org_deliver_suc_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '机构ID',`org_name` String COMMENT '机构名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`province_id` String COMMENT '地区ID',`province_name` String COMMENT '地区名称',`deliver_suc_count_base` UInt64 COMMENT '派送成功次数',`ts` UInt64 COMMENT '时间戳'
) 
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_org_deliver_suc_day
(`cur_date` Date, `org_id` String, `org_name` String, `city_id` String, `city_name` String, `province_id` String, `province_name` String, `deliver_suc_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, city_id, city_name, province_id, province_name, argMaxState(deliver_suc_count_base, ts) AS deliver_suc_count
FROM dws_trans_org_deliver_suc_day_base
GROUP BY cur_date, org_id, org_name, city_id, city_name, province_id, province_name;

7.物流域机构粒度揽收当日汇总表

要求:统计当日各机构揽收次数,写入ClickHouse。

1.物流域机构粒度揽收统计实体类

DwsTransOrgReceiveDayBean.java

package com.atguigu.tms.realtime.beans;
import lombok.Builder;
import lombok.Data;/***物流域机构粒度揽收统计实体类*/
@Data
@Builder
public class DwsTransOrgReceiveDayBean {// 统计日期String curDate;// 转运站IDString orgId;// 转运站名称String orgName;// 地区ID@TransientSinkString districtId;// 城市IDString cityId;// 城市名称String cityName;// 省份IDString provinceId;// 省份名称String provinceName;// 揽收次数(一个订单算一次)Long receiveOrderCountBase;// 时间戳Long ts;
}

2.物流域机构粒度揽收聚合统计

DwsTransOrgReceiveDay.java

package com.atguigu.tms.realtime.app.dws;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransReceiveDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransOrgReceiveDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.concurrent.TimeUnit;/***物流域机构粒度揽收聚合统计*/
public class DwsTransOrgReceiveDay {public static void main(String[] args) throws Exception {// TODO 1. 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);// 并行度设置,部署时应注释,通过 args 指定全局并行度env.setParallelism(4);// TODO 2. 从指定主题读取数据String topic = "tms_dwd_trans_receive_detail";String groupId = "dws_trans_org_receive_day";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> source = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// TODO 3. 转换数据结构SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> mappedStream = source.map(jsonStr -> {DwdTransReceiveDetailBean dwdTransReceiveDetailBean = JSON.parseObject(jsonStr, DwdTransReceiveDetailBean.class);return DwsTransOrgReceiveDayBean.builder().districtId(dwdTransReceiveDetailBean.getSenderDistrictId()).provinceId(dwdTransReceiveDetailBean.getSenderProvinceId()).cityId(dwdTransReceiveDetailBean.getSenderCityId()).receiveOrderCountBase(1L).ts(dwdTransReceiveDetailBean.getTs() + 8 * 60 * 60 * 1000L).build();});// TODO 4. 关联维度信息// 关联机构idSingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withOrgIdStream = AsyncDataStream.unorderedWait(mappedStream,new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {bean.setOrgId(dimJsonObj.getString("id"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgReceiveDayBean bean) {return Tuple2.of("region_id", bean.getDistrictId());}}, 5 * 60,TimeUnit.SECONDS).uid("with_org_id_stream");// TODO 5. 设置水位线SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withWatermarkStream = withOrgIdStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTransOrgReceiveDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTransOrgReceiveDayBean>() {@Overridepublic long extractTimestamp(DwsTransOrgReceiveDayBean bean, long recordTimestamp) {return bean.getTs();}})).uid("watermark_stream");// TODO 7. 按照 orgID 分组KeyedStream<DwsTransOrgReceiveDayBean, String> keyedStream = withWatermarkStream.keyBy(DwsTransOrgReceiveDayBean::getOrgId);// TODO 8. 开窗WindowedStream<DwsTransOrgReceiveDayBean, String, TimeWindow> windowStream =keyedStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.days(1L)));// TODO 9. 引入触发器WindowedStream<DwsTransOrgReceiveDayBean, String, TimeWindow> triggerStream = windowStream.trigger(new MyTriggerFunction<DwsTransOrgReceiveDayBean>());// TODO 10. 聚合SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> aggregatedStream = triggerStream.aggregate(new MyAggregationFunction<DwsTransOrgReceiveDayBean>() {@Overridepublic DwsTransOrgReceiveDayBean add(DwsTransOrgReceiveDayBean value, DwsTransOrgReceiveDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setReceiveOrderCountBase(accumulator.getReceiveOrderCountBase() + value.getReceiveOrderCountBase());return accumulator;}},new ProcessWindowFunction<DwsTransOrgReceiveDayBean, DwsTransOrgReceiveDayBean, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<DwsTransOrgReceiveDayBean> elements, Collector<DwsTransOrgReceiveDayBean> out) throws Exception {for (DwsTransOrgReceiveDayBean element : elements) {// 补全统计日期字段String curDate = DateFormatUtil.toDate(context.window().getStart() - 8 * 60 * 60 * 1000L);element.setCurDate(curDate);// 补全时间戳element.setTs(System.currentTimeMillis());out.collect(element);}}}).uid("aggregate_stream");// TODO 11. 补充维度信息// 11.1 补充转运站名称SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withOrgNameStream = AsyncDataStream.unorderedWait(aggregatedStream,new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {bean.setOrgName(dimJsonObj.getString("org_name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgReceiveDayBean bean) {return Tuple2.of("id",bean.getOrgId());}},60, TimeUnit.SECONDS).uid("with_org_name_stream");// 11.2 补充城市名称SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withCityNameStream = AsyncDataStream.unorderedWait(withOrgNameStream,new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {bean.setCityName(dimJsonObj.getString("name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgReceiveDayBean bean) {return Tuple2.of("id",bean.getCityId());}},60, TimeUnit.SECONDS).uid("with_city_name_stream");// 11.3 补充省份名称SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> fullStream = AsyncDataStream.unorderedWait(withCityNameStream,new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {bean.setProvinceName(dimJsonObj.getString("name"));}@Overridepublic Tuple2<String,String> getCondition(DwsTransOrgReceiveDayBean bean) {return Tuple2.of("id",bean.getProvinceId());}},60, TimeUnit.SECONDS).uid("with_province_name_stream");// TODO 12. 写出到 ClickHousefullStream.print(">>>");fullStream.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_org_receive_day_base values(?,?,?,?,?,?,?,?,?)")).uid("clickhouse_stream");env.execute();}
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_org_receive_day_base
(`cur_date`                 Date COMMENT '统计日期',`org_id`                   String COMMENT '转运站ID',`org_name`                 String COMMENT '转运站名称',`city_id`                  String COMMENT '城市ID',`city_name`                String COMMENT '城市名称',`province_id`                String COMMENT '地区ID',`province_name`              String COMMENT '地区名称',`receive_order_count_base` UInt64 COMMENT '揽收次数',`ts`                       UInt64 COMMENT '时间戳'
)ENGINE = MergeTreeORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_org_receive_day
(`cur_date` Date, `org_id` String, `org_name` String, `city_id` String,`city_name` String,`province_id` String, `province_name` String, `receive_order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, city_id,city_name,province_id, province_name, argMaxState(receive_order_count_base, ts) AS receive_order_count
FROM dws_trans_org_receive_day_base
GROUP BY cur_date, org_id, org_name,city_id,city_name, province_id, province_name;

8.物流域机构卡车类别粒度运输完成当日汇总表

要求:统计各机构各类别卡车当日运输完成次数、里程和历经时长,写入ClickHouse。

1.物流域机构卡车类别粒度统计实体类

DwsTransOrgTruckModelTransFinishDayBean.java

package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;import java.math.BigDecimal;/** 物流域机构卡车类别粒度统计实体类*/
@Data
@Builder
public class DwsTransOrgTruckModelTransFinishDayBean {// 统计日期String curDate;// 机构IDString orgId;// 机构名称String orgName;// 卡车ID@TransientSinkString truckId;// 卡车型号IDString truckModelId;// 卡车型号名称String truckModelName;// 用于关联城市信息的一级机构ID@TransientSinkString joinOrgId;// 城市IDString cityId;// 城市名称String cityName;// 运输完成次数Long transFinishCountBase;// 运输完成里程BigDecimal transFinishDistanceBase;// 运输完成历经时长Long transFinishDurTimeBase;// 时间戳Long ts;
}

2.物流域机构卡车类别粒度聚合统计

DwsTransOrgTruckModelTransFinishDay.java

package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransTransFinishBean;
import com.atguigu.tms.realtime.beans.DwsTransOrgTruckModelTransFinishDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;// 物流域机构卡车类别粒度聚合统计
public class DwsTransOrgTruckModelTransFinishDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 从Kafka的运输事实表中读取数据String topic = "tms_dwd_trans_trans_finish";String groupId = "dws_trans_org_truck_model_group";KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaDS = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 对流中数据进行类型转换 jsonStr->实体类 关联卡车维度SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> mapDS = kafkaDS.map(new MapFunction<String, DwsTransOrgTruckModelTransFinishDayBean>() {@Overridepublic DwsTransOrgTruckModelTransFinishDayBean map(String jsonStr) throws Exception {DwdTransTransFinishBean finishBean = JSON.parseObject(jsonStr, DwdTransTransFinishBean.class);DwsTransOrgTruckModelTransFinishDayBean bean = DwsTransOrgTruckModelTransFinishDayBean.builder().orgId(finishBean.getStartOrgId()).orgName(finishBean.getStartOrgName()).truckId(finishBean.getTruckId()).transFinishCountBase(1L).transFinishDistanceBase(finishBean.getActualDistance()).transFinishDurTimeBase(finishBean.getTransportTime()).ts(finishBean.getTs() + 8 * 60 * 60 * 1000L).build();return bean;}});// 关联卡车维度 获取卡车型号SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withTruckDS = AsyncDataStream.unorderedWait(mapDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_truck_info") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {bean.setTruckModelId(dimInfoJsonObj.getString("truck_model_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getTruckId());}},60, TimeUnit.SECONDS);// 指定Watermark的生成策略并提起事件时间字段SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withWatermarkDS = withTruckDS.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTransOrgTruckModelTransFinishDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsTransOrgTruckModelTransFinishDayBean>() {@Overridepublic long extractTimestamp(DwsTransOrgTruckModelTransFinishDayBean element, long l) {return element.getTs();}}));// 按照机构id + 卡车型号进行分组KeyedStream<DwsTransOrgTruckModelTransFinishDayBean, String> keyDS = withWatermarkDS.keyBy(new KeySelector<DwsTransOrgTruckModelTransFinishDayBean, String>() {@Overridepublic String getKey(DwsTransOrgTruckModelTransFinishDayBean bean) throws Exception {return bean.getOrgId() + "+" + bean.getTruckModelId();}});// 开窗WindowedStream<DwsTransOrgTruckModelTransFinishDayBean, String, TimeWindow> windowDS = keyDS.window(TumblingEventTimeWindows.of(Time.days(1)));// 指定自定义触发器WindowedStream<DwsTransOrgTruckModelTransFinishDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<DwsTransOrgTruckModelTransFinishDayBean>());// 聚合SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> aggregateDS = triggerDS.aggregate(new MyAggregationFunction<DwsTransOrgTruckModelTransFinishDayBean>() {@Overridepublic DwsTransOrgTruckModelTransFinishDayBean add(DwsTransOrgTruckModelTransFinishDayBean value, DwsTransOrgTruckModelTransFinishDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setTransFinishCountBase(value.getTransFinishCountBase() + accumulator.getTransFinishCountBase());accumulator.setTransFinishDistanceBase(value.getTransFinishDistanceBase().add(accumulator.getTransFinishDistanceBase()));accumulator.setTransFinishDurTimeBase(value.getTransFinishDurTimeBase() + accumulator.getTransFinishDurTimeBase());return accumulator;}},new ProcessWindowFunction<DwsTransOrgTruckModelTransFinishDayBean, DwsTransOrgTruckModelTransFinishDayBean, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<DwsTransOrgTruckModelTransFinishDayBean> elements, Collector<DwsTransOrgTruckModelTransFinishDayBean> out) throws Exception {Long stt = context.window().getStart() - 8 * 60 * 60 * 1000L;String curDate = DateFormatUtil.toDate(stt);for (DwsTransOrgTruckModelTransFinishDayBean element : elements) {element.setCurDate(curDate);element.setTs(System.currentTimeMillis());out.collect(element);}}});// 关联维度信息// 获取卡车型号名称SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withTruckModelDS = AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_truck_model") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {bean.setTruckModelName(dimInfoJsonObj.getString("model_name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getTruckModelId());}},60, TimeUnit.SECONDS);// 获取机构(对应的转运中心)的idSingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withJoinOrgIdDS = AsyncDataStream.unorderedWait(withTruckModelDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {String orgParentId = dimInfoJsonObj.getString("org_parent_id");bean.setJoinOrgId(orgParentId != null ? orgParentId : bean.getOrgId());}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getOrgId());}},60, TimeUnit.SECONDS);//         根据转运中心的id,到机构表中获取城市idSingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withCityIdDS = AsyncDataStream.unorderedWait(withJoinOrgIdDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_base_organ") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {bean.setCityId(dimInfoJsonObj.getString("region_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getJoinOrgId());}},60, TimeUnit.SECONDS);;//         根据城市id 到区域表中获取城市名称SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withCityNameDS = AsyncDataStream.unorderedWait(withCityIdDS,new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_base_region_info") {@Overridepublic void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {bean.setCityName(dimInfoJsonObj.getString("name"));}@Overridepublic Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {return Tuple2.of("id", bean.getCityId());}},60, TimeUnit.SECONDS);// 将结果写入ckwithCityNameDS.print(">>>");withCityNameDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_trans_org_truck_model_trans_finish_day_base values(?,?,?,?,?,?,?,?,?,?,?)"));env.execute();}
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_org_truck_model_trans_finish_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '机构ID',`org_name` String COMMENT '机构名称',`truck_model_id` String COMMENT '卡车类型ID',`truch_model_name` String COMMENT '卡车类型名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`trans_finish_count_base` UInt64 COMMENT '转运完成次数',`trans_finish_distance_base` Decimal(38, 20) COMMENT '转运完成里程',`trans_finish_dur_time_base` UInt64 COMMENT '转运完成历经时长',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name);CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_org_truck_model_trans_finish_day
(`cur_date` Date, `org_id` String, `org_name` String, `truck_model_id` String, `truch_model_name` String, `city_id` String, `city_name` String, `trans_finish_count` AggregateFunction(argMax, UInt64, UInt64), `trans_finish_distance` AggregateFunction(argMax, Decimal(38, 20), UInt64), `trans_finish_dur_time` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name, argMaxState(trans_finish_count_base, ts) AS trans_finish_count, argMaxState(trans_finish_distance_base, ts) AS trans_finish_distance, argMaxState(trans_finish_dur_time_base, ts) AS trans_finish_dur_time
FROM dws_trans_org_truck_model_trans_finish_day_base
GROUP BY cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name;

二、代码测试

1.修改topic分区数

我们要保证kafka中的topic的分区数,和程序中Flink设置的并行度一样都是4。

kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trade_order_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_bound_finish_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_dispatch_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_deliver_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_trans_finish --partitions 4

2.集群启动

将HDFS,zk,kf,hbase,redise和clickhouse全部即启动
因为我们要一次性测试7张表,所以我们要将ODS层和DWD层的四个文件全部启动。
OdsApp、DwdBoundRelevantApp、DwdOrderRelevantApp和DwdTransTransFinish。

3.代码测试

你可以每次打开一个DWS层的应用,然后生成数据,查看CK数据库,也可以7个全部启动,只需要生成一次数据。有些数据可能不常见,要多生成几次。

1.交易域货物类型粒度下单当日汇总表

测试代码

select cur_date,cargo_type,cargo_type_name,argMaxMerge(order_amount) as order_amount,argMaxMerge(order_count) as order_count 
from dws_trade_cargo_type_order_day
group by cur_date,cargo_type,cargo_type_name
LIMIT 10;

在这里插入图片描述

2.交易域机构粒度下单当日汇总表

测试代码

select cur_date,org_id,org_name,city_id,city_name,argMaxMerge(order_amount) as order_amount,argMaxMerge(order_count) as order_count 
from dws_trade_org_order_day
group by cur_date,org_id,org_name,city_id,city_name
LIMIT 10;

在这里插入图片描述

3.物流域转运完成当日汇总表

测试代码

select cur_date,argMaxMerge(bound_finish_order_count) as bound_finish_order_count
from dws_trans_bound_finish_day
group by cur_date
LIMIT 10;

在这里插入图片描述

4.物流域发单当日汇总表

测试代码

select cur_date,argMaxMerge(dispatch_order_count) as dispatch_order_count
from dws_trans_dispatch_day
group by cur_date
LIMIT 10;

在这里插入图片描述

5.物流域机构粒度派送成功当日汇总表

测试代码

SELECTcur_date,org_id,org_name,city_id,city_name,province_id,province_name,argMaxMerge(deliver_suc_count) AS deliver_suc_count
FROM dws_trans_org_deliver_suc_day
GROUP BYcur_date,org_id,org_name,city_id,city_name,province_id,province_name
LIMIT 10;

在这里插入图片描述

6.物流域机构粒度揽收当日汇总表

测试代码

SELECTcur_date,org_id,org_name,city_id,city_name,province_id,province_name,argMaxMerge(receive_order_count) AS receive_order_count
FROM dws_trans_org_receive_day
GROUP BYcur_date,org_id,org_name,city_id,city_name,province_id,province_name
LIMIT 10;

在这里插入图片描述

7.物流域机构卡车类别粒度运输完成当日汇总表

测试代码

SELECT cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name, argMaxMerge(trans_finish_count) AS trans_finish_count, argMaxMerge(trans_finish_distance) AS trans_finish_distance, argMaxMerge(trans_finish_dur_time) AS trans_finish_dur_time
FROM dws_trans_org_truck_model_trans_finish_day
GROUP BY cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name
LIMIT 10;

在这里插入图片描述


总结

至此实时数仓的DWS层就搭建完毕了,并且代码已经全度推到了github上。
URL:https://github.com/lcc-666/tms-parent

这篇关于物流实时数仓:数仓搭建(DWS)二的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/573557

相关文章

5分钟获取deepseek api并搭建简易问答应用

《5分钟获取deepseekapi并搭建简易问答应用》本文主要介绍了5分钟获取deepseekapi并搭建简易问答应用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1、获取api2、获取base_url和chat_model3、配置模型参数方法一:终端中临时将加

Mycat搭建分库分表方式

《Mycat搭建分库分表方式》文章介绍了如何使用分库分表架构来解决单表数据量过大带来的性能和存储容量限制的问题,通过在一对主从复制节点上配置数据源,并使用分片算法将数据分配到不同的数据库表中,可以有效... 目录分库分表解决的问题分库分表架构添加数据验证结果 总结分库分表解决的问题单表数据量过大带来的性能

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)

《Python基于火山引擎豆包大模型搭建QQ机器人详细教程(2024年最新)》:本文主要介绍Python基于火山引擎豆包大模型搭建QQ机器人详细的相关资料,包括开通模型、配置APIKEY鉴权和SD... 目录豆包大模型概述开通模型付费安装 SDK 环境配置 API KEY 鉴权Ark 模型接口Prompt

鸿蒙开发搭建flutter适配的开发环境

《鸿蒙开发搭建flutter适配的开发环境》文章详细介绍了在Windows系统上如何创建和运行鸿蒙Flutter项目,包括使用flutterdoctor检测环境、创建项目、编译HAP包以及在真机上运... 目录环境搭建创建运行项目打包项目总结环境搭建1.安装 DevEco Studio NEXT IDE

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

【IPV6从入门到起飞】5-1 IPV6+Home Assistant(搭建基本环境)

【IPV6从入门到起飞】5-1 IPV6+Home Assistant #搭建基本环境 1 背景2 docker下载 hass3 创建容器4 浏览器访问 hass5 手机APP远程访问hass6 更多玩法 1 背景 既然电脑可以IPV6入站,手机流量可以访问IPV6网络的服务,为什么不在电脑搭建Home Assistant(hass),来控制你的设备呢?@智能家居 @万物互联

pico2 开发环境搭建-基于ubuntu

pico2 开发环境搭建-基于ubuntu 安装编译工具链下载sdk 和example编译example 安装编译工具链 sudo apt install cmake gcc-arm-none-eabi libnewlib-arm-none-eabi libstdc++-arm-none-eabi-newlib 注意cmake的版本,需要在3.17 以上 下载sdk 和ex

京东物流查询|开发者调用API接口实现

快递聚合查询的优势 1、高效整合多种快递信息。2、实时动态更新。3、自动化管理流程。 聚合国内外1500家快递公司的物流信息查询服务,使用API接口查询京东物流的便捷步骤,首先选择专业的数据平台的快递API接口:物流快递查询API接口-单号查询API - 探数数据 以下示例是参考的示例代码: import requestsurl = "http://api.tanshuapi.com/a