Storm-Kafka: Offset lags for kafka not supported for older versions

2024-04-16 18:48

本文主要是介绍Storm-Kafka: Offset lags for kafka not supported for older versions,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

最近又要迁移HADOOP,迁移过程不做表述,因为我使用了Storm实时流组件,之前版本是1.0.2,目前最新版1.2.2. 老版本storm和kafka结合的包是storm-kafka, 新版本的包为storm kafka client。我用老的JAR部署到STORM显示了一个告警:

Offset lags for kafka not supported for older versions


就是上面kafka spouts Lag里面没有数据显示,这个地方不会影响消费数据,但是使用新的consumer API可以解决。

先添加依赖包,默认scope是provided, 我测试之后发现找不到类,因此手动修改为compile,以便编译的时候把依赖包打进去。

		<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client --><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>1.2.2</version><scope>compile</scope></dependency></dependencies>

修改spout代码,使用最新的API:

	TopologyBuilder builder = new TopologyBuilder();KafkaSpoutConfig<String,String> config = KafkaSpoutConfig.builder("datanode01-ucloud.isesol.com:9092,datanode02-ucloud.isesol.com:9092,datanode03-ucloud.isesol.com:9092,hue-ucloud.isesol.com:9092","2001").setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST).setGroupId("test-2001").build();

老的SPOUT 代码为:

		String zkConnString = "datanode01-ucloud.isesol.com,datanode02-ucloud.isesol.com,datanode03-ucloud.isesol.com";String topicName = "2001";String zkRoot = "/storm";BrokerHosts hosts = new ZkHosts(zkConnString);SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, "jlwang");spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

可以看到,老版本是通过ZK来保存offset, 先版本使用的是__consumer_offset来保存。通过如下命令查__consumer_offsets:

/opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer   --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config
[test-2001,2001,0]::[OffsetMetadata[783176,{"topologyId":"getkafka-70-1530781168","taskId":4,"threadName":"Thread-11-kafka-reader-executor[4 4]"}],CommitTime 1530785032518,ExpirationTime 1530871432518]
[com.isesol.Raphael.kafka-reader-5008version0.0.25,5008,0]::[OffsetMetadata[3946,{"topologyId":"Kafka2Hbase-5008-57-1530697548","taskId":7,"threadName":"Thread-5-kafka-reader-5008-executor[7 7]"}],CommitTime 1530785039407,ExpirationTime 1530871439407]
[com.isesol.Raphael.kafka-reader-2001trans0.0.36,2001,0]::[OffsetMetadata[783228,{"topologyId":"KafkaToHbase-2001-41-1530611039","taskId":6,"threadName":"Thread-7-kafka-reader-2001-executor[6 6]"}],CommitTime 1530785042006,ExpirationTime 1530871442006]
[test-2001,2001,0]::[OffsetMetadata[783262,{"topologyId":"getkafka-70-1530781168","taskId":4,"threadName":"Thread-11-kafka-reader-executor[4 4]"}],CommitTime 1530785062582,ExpirationTime 1530871462582]
[com.isesol.Raphael.kafka-reader-2001trans0.0.36,2001,0]::[OffsetMetadata[783313,{"topologyId":"KafkaToHbase-2001-41-1530611039","taskId":6,"threadName":"Thread-7-kafka-reader-2001-executor[6 6]"}],CommitTime 1530785072198,ExpirationTime 1530871472198]
稍等片刻即可以查看到消息的offsets,最前面的test-2001就是我设置的 group.id.

完整的storm-kafka-hbase代码之前也发过,这次仅仅是使用了新的API:

package com.isesol.storm;import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.*;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.hbase.bolt.HBaseBolt;
import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.topology.InputDeclarer;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;import java.security.MessageDigest;
import java.security.acl.Group;public class getKafka {public static void main(String[] args)throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {/*	String zkConnString = "datanode01-ucloud.isesol.com,datanode02-ucloud.isesol.com,datanode03-ucloud.isesol.com";String topicName = "2001";String zkRoot = "/storm";BrokerHosts hosts = new ZkHosts(zkConnString);SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, "jlwang");spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); */List<String> fieldNameList = new ArrayList<String>();// fieldNameList.add("rowkey");// for (Field field : Topic2001.class.getDeclaredFields()) {// fieldNameList.add(field.getName());// }TopologyBuilder builder = new TopologyBuilder();KafkaSpoutConfig<String, String> config = KafkaSpoutConfig.builder("datanode01-ucloud.isesol.com:9092,datanode02-ucloud.isesol.com:9092,datanode03-ucloud.isesol.com:9092,hue-ucloud.isesol.com:9092","2001").setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST).setGroupId("test-2001").build();fieldNameList.add("line");builder.setSpout("kafka-reader", new KafkaSpout<>(config), 1);// builder.setSpout("kafka-reader", kafkaSpout, 1).setNumTasks(1);builder.setBolt("kafkaBolt", new kafkaBolt(), 1).shuffleGrouping("kafka-reader");builder.setBolt("transferBolt", new transferBolt(), 1).shuffleGrouping("kafkaBolt");Config conf = new Config();Map<String, String> HBConfig = Maps.newHashMap();HBConfig.put("hbase.zookeeper.property.clientPort", "2181");HBConfig.put("hbase.zookeeper.quorum","datanode01-ucloud.isesol.com:2181,datanode02-ucloud.isesol.com:2181,datanode03-ucloud.isesol.com:2181");HBConfig.put("zookeeper.znode.parent", "/hbase");conf.put("HBCONFIG", HBConfig);SimpleHBaseMapper mapper = new SimpleHBaseMapper();mapper.withColumnFamily("cf");mapper.withColumnFields(new Fields(fieldNameList));mapper.withRowKeyField("rowkey");HBaseBolt hBaseBolt = new HBaseBolt("test", mapper).withConfigKey("HBCONFIG");hBaseBolt.withFlushIntervalSecs(15);hBaseBolt.withBatchSize(5000);builder.setBolt("hbasehandler", hBaseBolt, 1).shuffleGrouping("transferBolt");String name = getKafka.class.getSimpleName();conf.setNumWorkers(2);// conf.setNumAckers(20);// conf.setNumEventLoggers(1);// conf.setMaxSpoutPending(20000);conf.setMessageTimeoutSecs(90);// LocalCluster localCluster = new LocalCluster();// localCluster.submitTopology(name, conf, builder.createTopology());StormSubmitter.submitTopology("getkafka", conf, builder.createTopology());// Utils.sleep(9999999);}
}class transferBolt extends BaseRichBolt {private Map conf;private TopologyContext context;private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.conf = stormConf;this.context = context;this.collector = collector;}public void execute(Tuple input) {// TODO Auto-generated method stubtry {String line = input.getString(0);collector.emit(input, new Values(UUID.randomUUID().toString(), line));collector.ack(input);} catch (Exception ex) {collector.fail(input);}}public void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("rowkey", "line"));}}class kafkaBolt extends BaseRichBolt {private Map conf;private TopologyContext context;private OutputCollector collector;public void execute(Tuple input) {// TODO Auto-generated method stubtry {String line = input.getString(0);collector.emit(input, new Values(line));collector.ack(input);} catch (Exception ex) {collector.fail(input);}}public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {// TODO Auto-generated method stubthis.conf = arg0;this.context = arg1;this.collector = arg2;}public void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("line"));}
}class HandlerBolt extends BaseRichBolt {ObjectMapper objectMapper;List<String> fieldNameList;private Map conf;private TopologyContext context;private OutputCollector collector;public HandlerBolt(List<String> fieldNameList) {this.fieldNameList = fieldNameList;}public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.conf = stormConf;this.context = context;this.collector = collector;this.objectMapper = new ObjectMapper();if (this.fieldNameList == null) {this.fieldNameList = new ArrayList<String>();fieldNameList.add("rowkey");for (Field field : Topic2001.class.getDeclaredFields()) {this.fieldNameList.add(field.getName());}}}public static String getMD5(String message) {String md5str = "";try {// 1 创建一个提供信息摘要算法的对象,初始化为md5算法对象MessageDigest md = MessageDigest.getInstance("MD5");// 2 将消息变成byte数组byte[] input = message.getBytes();// 3 计算后获得字节数组,这就是那128位了byte[] buff = md.digest(input);// 4 把数组每一字节(一个字节占八位)换成16进制连成md5字符串md5str = bytesToHex(buff);} catch (Exception e) {e.printStackTrace();}return md5str;}public static String bytesToHex(byte[] bytes) {StringBuffer md5str = new StringBuffer();// 把数组每一字节换成16进制连成md5字符串int digital;for (int i = 0; i < bytes.length; i++) {digital = bytes[i];if (digital < 0) {digital += 256;}if (digital < 16) {md5str.append("0");}md5str.append(Integer.toHexString(digital));}return md5str.toString().toUpperCase();}public void execute(Tuple input) {try {String jsonStr = input.getString(0);Map<String, Object> objMap = null;objMap = objectMapper.readValue(jsonStr, Map.class);String rowKey = String.valueOf(objMap.get("rowKey"));String contentStr = String.valueOf(objMap.get("messageContent"));Map contentMap;contentMap = objectMapper.readValue(contentStr, Map.class);List<Object> content = new ArrayList<Object>();for (String fieldName : fieldNameList) {if ("rowkey".equals(fieldName)) {content.add(rowKey);} else {Object fieldContent = contentMap.get(fieldName);content.add(fieldContent == null ? "" : String.valueOf(fieldContent));}}Values outPut = new Values();outPut.addAll(content);collector.emit(input, outPut);collector.ack(input);} catch (Exception e) {e.printStackTrace();collector.fail(input);}}public void declareOutputFields(OutputFieldsDeclarer declarer) {Fields fields = new Fields(this.fieldNameList);declarer.declare(fields);}}

完整的pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><groupId>com.isesol</groupId><artifactId>storm</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>storm</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.0.2</version><scope>provided</scope></dependency><!-- 0.9.0-kafka-2.0.1 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.9.0-kafka-2.0.1</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka --><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>1.0.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.9.0-kafka-2.0.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hbase --><dependency><groupId>org.apache.storm</groupId><artifactId>storm-hbase</artifactId><version>1.1.0</version><exclusions><exclusion><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0-kafka-3.0.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client --><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>1.2.2</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><archive><manifest><mainClass>com.isesol.storm.getKafka</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>assembly</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build>
</project>

这篇关于Storm-Kafka: Offset lags for kafka not supported for older versions的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/909605

相关文章

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Kafka 实战演练:创建、配置与测试 Kafka全面教程

文章目录 1.配置文件2.消费者1.注解方式2.KafkaConsumer 3.依赖1.注解依赖2.KafkaConsumer依赖 本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得 1.配置文件 Yml配置 spring:kafka:bootstrap-servers: cons

Kafka【十二】消费者拉取主题分区的分配策略

【1】消费者组、leader和follower 消费者想要拉取主题分区的数据,首先必须要加入到一个组中。 但是一个组中有多个消费者的话,那么每一个消费者该如何消费呢,是不是像图中一样的消费策略呢?如果是的话,那假设消费者组中只有2个消费者或有4个消费者,和分区的数量不匹配,怎么办? 所以这里,我们需要学习Kafka中基本的消费者组中的消费者和分区之间的分配规则: 同一个消费者组的消费者都订

Kafka【十三】消费者消费消息的偏移量

偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据。如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。 【1】起始偏移量 在消费者的配置中,我们可以增加偏移量相关参数auto.offset.re

Kafka的三高设计原理

1.生产者缓存机制--高性能 生产者缓存机制的主要目的是将消息打包,减少网络IO频率 kafka生产者端存在消息累加器RecordAccumulator,它会对每个Partition维护一个双端队列,队列中消息到达一定数量后 或者 到达一定时间后,通过sender线程批量的将消息发送给kafka服务端。(批量发送) 2.发送应答机制--高可用 发送应发机制保证了消息可以安全到达服务端!

Flink读取kafka数据并以parquet格式写入HDFS

《2021年最新版大数据面试题全面开启更新》 《2021年最新版大数据面试题全面开启更新》 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中; 目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read

Pulsar与Kafka消费模型对比

kafka kafka 属于 Stream 的消费模型,为了支持多 partition 的消费关系,引入了 consumer group 的概念,同时支持在消费端动态的 reblance 操作,当多个 Consumer 订阅了同一个 Topic 时,会根据分区策略进行消费者订阅分区的重分配。只要 consumer-group 与 topic 之间的关系发生变更,就会动态触发 reblance 操