实时数仓链路分享:kafka =SparkStreaming=kudu集成kerberos

本文主要是介绍实时数仓链路分享:kafka =SparkStreaming=kudu集成kerberos,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

本文档主要介绍在cdh集成kerberos情况下,sparkstreaming怎么消费kafka数据,并存储在kudu里面
  • 假设kafka集成kerberos

  • 假设kudu集成kerberos

  • 假设用非root用户操作

  • spark基于yarn-cluster模式

代码编写,这里只介绍关键代码
  • 主类,以下代码仅供参考

package deng.yb.sparkStreaming;import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kudu.spark.kudu.KuduContext;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.springframework.context.support.ClassPathXmlApplicationContext;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;import deng.yb.sparkStreaming.kafka.KafkaTools;
import deng.yb.sparkStreaming.utils.NginxInfo;
import deng.yb.sparkStreaming.utils.SpringContextUtil;/*** Hello world!**/
@SuppressWarnings("unchecked")
public class EApp {private static final Logger logger = Logger.getLogger(App.class);private static final String BEAN_CONF = "classpath:spring/spring-bean.xml";private static Map<String, String> conf = new HashMap<String, String>();/*** epp接口-request*/private static final String EPP_REQUEST = "POST /api/sky_server_data_app/track/user_time HTTP/1.1";/*** app接口-request*/private static final String APP_REQUEST = "POST /api/sky_server_data_app/code/app_code HTTP/1.1";/*** 在spring 配置的参数id*/private static final String CONFIG = "commonConfig";/*** 以下配置参数皆为配置key spark模式*/private static final String MASTER = "master";/*** spark-appName*/private static final String APP_NAME = "appName";/*** 自定义字段*/private static final String COLUMNS = "columns";/*** topic*/private static final String TOPIC = "topic";/*** 表名*/private static final String TABLE = "tables";static {String[] confs = new String[] { BEAN_CONF };// 把actx设置进去,后续可以共用SpringContextUtil.setApplicationContext(new ClassPathXmlApplicationContext(confs));conf = (Map<String, String>) SpringContextUtil.getBean(CONFIG);}public static void main(String args[]) {try {SparkSession spark = SparkSession.builder().appName(conf.get(APP_NAME)).master(conf.get(MASTER)).getOrCreate();Map<String, Object> confMap = KafkaTools.kafkaConf(conf);String[] topicArr = conf.get(TOPIC).split(",");Collection<String> topics = Arrays.asList(topicArr);StreamingContext sc = new StreamingContext(spark.sparkContext(),Durations.milliseconds(5000));JavaStreamingContext jssc = new JavaStreamingContext(sc);JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, confMap));jssc.sparkContext().setLogLevel("ERROR");stream.context().sparkContext().setLogLevel("ERROR");// nginx日志对应字段String[] columns = conf.get(COLUMNS).split(",");Map<String, String> colimnsMap = new LinkedHashMap<String, String>();// 把字段和类型映射String[] temp;for (String column : columns) {temp = column.split(":");colimnsMap.put(temp[0], temp[1]);}// 表名String[] tables = conf.get(TABLE).split(",");// epp表额外的字段String[] eppExtColumns = { "app_name", "end", "portal_user_id","resource", "start", "username", "app_id" };KuduContext kudu = new KuduContext(conf.get("kudu.instances"),sc.sparkContext());// dstream transform// 第一层封装// 第二层切分// 第三层转换JavaDStream<LinkedHashMap<String,String>> linkMap = stream.map(record -> {logger.info("消息进来:" + record.value());LinkedHashMap<String,String> json = new LinkedHashMap<String, String>();String[] messages = record.value().split(",");int length = colimnsMap.size();int i = 0;for (Map.Entry<String, String> entry : colimnsMap.entrySet()) {if (i < length) {json.put(entry.getKey(), messages[i]);}i += 1;}// 处理http_version字段String httpVersion;if (json.containsKey("http_version")&& (httpVersion = json.get("http_version")) != null) {String[] httpVersionArry = httpVersion.split("_");if (httpVersionArry != null&& httpVersionArry.length > 1) {json.put("portal_name", httpVersionArry[0]);json.put("channel", httpVersionArry[1]);json.put("version", httpVersionArry[2]);}}logger.info("封装完数据格式:"+json.toString());return json;}).cache();//EPP表linkMap.flatMap(new FlatMapFunction<LinkedHashMap<String,String>, JSONObject>(){@Overridepublic Iterator<JSONObject> call(LinkedHashMap<String,String> json) throws Exception {// TODO Auto-generated method stubArrayList<JSONObject> jsonArray = new ArrayList<JSONObject>();String request = json.get("request");if (request.indexOf(EPP_REQUEST) > -1) {logger.info("消息拆分:" + json.toString());// 这个进epp表String requestBody = URLDecoder.decode(json.get("app_id"), "utf-8");String[] strArr;JSONArray array = JSONObject.parseArray((strArr = requestBody.split("=")).length > 1 ? strArr[1]: strArr[0]);// 根据appid拆分for (int j = 0; j < array.size(); j++) {JSONObject obj = array.getJSONObject(j);JSONObject newJson = new JSONObject(new LinkedHashMap<String,Object>());// 把原来的属性加上for (String oldColumn : json.keySet()) {newJson.put(oldColumn,json.get(oldColumn));}for (String extColumn : eppExtColumns) {newJson.put(extColumn,obj.get(extColumn));}// kudu表一定要有主键newJson.put("id", UUID.randomUUID().toString().replace("-", ""));logger.info("生成EPP主键:"+newJson.getString("id"));jsonArray.add(newJson);}return jsonArray.iterator();}return new ArrayList().iterator();}}).map(eppRowMap -> {logger.info("消息转换为epprow:" + eppRowMap.toString());List<Object> objArry = new ArrayList<Object>();eppRowMap.forEach((key, value) -> {objArry.add(NginxInfo.valueTranForm(key, value));});return RowFactory.create(objArry.toArray());}).foreachRDD(eppRdd -> {Dataset<Row> rows = spark.createDataFrame(eppRdd,DataTypes.createStructType(NginxInfo.getStructFieldList("EPP")));kudu.insertRows(rows,tables[0]);});jssc.start();jssc.awaitTermination();logger.info("完成!");} catch (Exception e) {logger.error("处理消息错误2!", e);}}private StructType contructStructType() {List<StructField> structFields = new ArrayList<StructField>();return null;}
}
  • KafkaTools类,主要获取kafka配置,代码仅供参考

public static Map<String, Object> kafkaConf(Map<String, String> conf) {if (conf == null) {return null;}// kafka配置Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");kafkaParams.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");//kafka集成kerberos后的security.inter.broker.protocolkafkaParams.put("security.protocol", "SASL_PLAINTEXT");kafkaParams.put("sasl.kerberos.service.name", "kafka");kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,conf.get("bootStrapServers"));return kafkaParams;}
注意到,集成kerberos后,身份验证的代码并没有在项目写太多,只有kafka客户端配置加上kafkaParams.put("security.protocol", "SASL_PLAINTEXT")而已
  • 身份验证的操作分别交给spark-submit处理和调度器linux crontab 处理

  • 假设我用的是wms这个账号去跑任务

  • 新建kafka_client_jaas.conf文件

cd /usr/wms/sparkstreaming/#该文件给kafka身份验证用
[wms@node1 sparkstreaming]$ vi kafka_client_jaas.conf
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=trueuseTicketCache=falseserviceName="kafka"keyTab="./wms.keytab"principal="wms@W.COM";
};#把wms.keytab也放在相应目录下,此时目录机构应该是如此
-rwxr-xr-x 1 root root 352 Jul 16 09:48 wms.keytab
[wms@node1 sparkstreaming]$ ll
总用量 114172
#conf.properties文件是spark应用的配置文件
-rwxr-xr-x 1 wms wms       897 7月  16 09:45 conf.properties
-rwxr-xr-x 1 wms wms       221 7月  16 09:45 kafka_client_jaas.conf
-rwxr-xr-x 1 wms wms       352 7月  16 09:45 wms.keytab#scp到其他目录
scp /usr/wms/sparkstreaming/* root@bi-slave1:/usr/wms/sparkstreaming/
scp /usr/wms/sparkstreaming/* root@bi-slave2:/usr/wms/sparkstreaming/
scp /usr/wms/sparkstreaming/* root@bi-slave3:/usr/wms/sparkstreaming/
  • spark启动前,先初始化driver和executor是节点票据

#该操作主要是为了保证executor节点执行kudu操作前有权限
#这里我们写了一个批处理脚本,能在所有节点执行某个命令
#我们用linux调度工具,到点初始化wms用户票据,防止票据失效
#在root权限下操作
exit
[root@node1 sparkstreaming]# crontab -e
#每五分钟,在每台机器初始化wms用户票据,防止失效
*/5 * * * * ./doCommand.sh "su wms -c 'kinit -kt /usr/wms/sparkstreaming/wms.keytab wms@W.COM'" > /usr/wms/sparkstreaming/lastupdate
  • spark-submit

# 注意需要在配置文件目录下执行spark2-submit命令
# driver节点需要配置kafka的security.auth.login.config信息
# executor节点需要配置kafka的security.auth.login.config信息
# driver根据绝对路径读取配置
# executor根据相对路径读取配置
# 通过files配置把kafka_client_jaas.conf,wms.keytab发到executor节点spark2-submit  --driver-java-options=-Djava.security.auth.login.config=/etc/wonhighconf/bi/bi-sparkstreaming/kafka_client_jaas.conf  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --files kafka_client_jaas.conf,wms.keytab --master yarn --deploy-mode cluster  --class deng.yb.sparkStreaming.App /usr/wms/sparkstreaming/sparkStreaming-0.0.1-SNAPSHOT.jar
  • spark启动后,进入yarn查看spark日志

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于实时数仓链路分享:kafka =SparkStreaming=kudu集成kerberos的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143061

相关文章

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【专题】2024飞行汽车技术全景报告合集PDF分享(附原数据表)

原文链接: https://tecdat.cn/?p=37628 6月16日,小鹏汇天旅航者X2在北京大兴国际机场临空经济区完成首飞,这也是小鹏汇天的产品在京津冀地区进行的首次飞行。小鹏汇天方面还表示,公司准备量产,并计划今年四季度开启预售小鹏汇天分体式飞行汽车,探索分体式飞行汽车城际通勤。阅读原文,获取专题报告合集全文,解锁文末271份飞行汽车相关行业研究报告。 据悉,业内人士对飞行汽车行业

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

【Shiro】Shiro 的学习教程(三)之 SpringBoot 集成 Shiro

目录 1、环境准备2、引入 Shiro3、实现认证、退出3.1、使用死数据实现3.2、引入数据库,添加注册功能后端代码前端代码 3.3、MD5、Salt 的认证流程 4.、实现授权4.1、基于角色授权4.2、基于资源授权 5、引入缓存5.1、EhCache 实现缓存5.2、集成 Redis 实现 Shiro 缓存 1、环境准备 新建一个 SpringBoot 工程,引入依赖:

java常用面试题-基础知识分享

什么是Java? Java是一种高级编程语言,旨在提供跨平台的解决方案。它是一种面向对象的语言,具有简单、结构化、可移植、可靠、安全等特点。 Java的主要特点是什么? Java的主要特点包括: 简单性:Java的语法相对简单,易于学习和使用。面向对象:Java是一种完全面向对象的语言,支持封装、继承和多态。跨平台性:Java的程序可以在不同的操作系统上运行,称为"Write once,

系统架构师-ERP+集成

ERP   集成平台end:就懒得画新的页

分享5款免费录屏的工具,搞定网课不怕错过!

虽然现在学生们不怎么上网课, 但是对于上班族或者是没有办法到学校参加课程的人来说,网课还是很重要的,今天,我就来跟大家分享一下我用过的几款录屏软件=,看看它们在录制网课时的表现如何。 福昕录屏大师 网址:https://www.foxitsoftware.cn/REC/ 这款软件给我的第一印象就是界面简洁,操作起来很直观。它支持全屏录制,也支持区域录制,这对于我这种需要同时录制PPT和老师讲

Spring Boot集成Tess4J实现OCR

1.什么是Tess4j? Tesseract是一个开源的光学字符识别(OCR)引擎,它可以将图像中的文字转换为计算机可读的文本。支持多种语言和书面语言,并且可以在命令行中执行。它是一个流行的开源OCR工具,可以在许多不同的操作系统上运行。Tess4J是一个基于Tesseract OCR引擎的Java接口,可以用来识别图像中的文本,说白了,就是封装了它的API,让Java可以直接调用。 Tess