Kafka-Spark Streaming 异常: dead for group td_topic_advert_impress_blacklist

2024-05-03 06:18

本文主要是介绍Kafka-Spark Streaming 异常: dead for group td_topic_advert_impress_blacklist,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

最近在编写Spark Streaming 作业的时候,遇到了一个比较奇怪的问题,表现如下:

 

在本地连接Kafka 集群执行作业:

18/10/31 17:42:58 INFO AbstractCoordinator: Discovered coordinator kafka1:9092 (id: 2147483574 rack: null) for group td_topic_advert_impress_blacklist.
18/10/31 17:43:00 INFO AbstractCoordinator: Marking the coordinator kafka1:9092 (id: 2147483574 rack: null) dead for group td_topic_advert_impress_blacklist

18/10/31 17:42:58 INFO AbstractCoordinator: Discovered coordinator kafka1:9092 (id: 2147483574 rack: null) for group td_topic_advert_impress_blacklist.
18/10/31 17:43:00 INFO AbstractCoordinator: Marking the coordinator kafka1:9092 (id: 2147483574 rack: null) dead for group td_topic_advert_impress_blacklist

 

仅从日志我们极难定位问题的原因。经过了一大堆查找,baidu, github..

我将spark-streaming 的日志级别 从 INFO 换成了 DEBUG 终于找到了问题的原因 ,

 

设置日志级别的代码如下

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);javaSparkContext.setLogLevel("DEBUG");

 

出现的问题:

18/10/31 17:40:08 DEBUG KafkaConsumer: Starting the Kafka consumer
18/10/31 17:40:08 DEBUG Metadata: Updated cluster metadata version 1 to Cluster(id = null, nodes = [10.170.0.8:9092 (id: -3 rack: null), 10.170.0.7:9092 (id: -2 rack: null), 10.170.0.6:9092 (id: -1 rack: null)], partitions = [])
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name fetch-throttle-time
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name connections-closed:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name connections-created:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name bytes-sent-received:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name bytes-sent:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name bytes-received:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name select-time:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name io-time:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name heartbeat-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name join-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name sync-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name commit-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name bytes-fetched
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name records-fetched
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name fetch-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name records-lag
18/10/31 17:40:08 INFO AppInfoParser: Kafka version : 0.11.0.0
18/10/31 17:40:08 INFO AppInfoParser: Kafka commitId : cb8625948210849f
18/10/31 17:40:08 DEBUG KafkaConsumer: Kafka consumer created
18/10/31 17:40:08 DEBUG KafkaConsumer: Subscribed to topic(s): topic_advert_impress
18/10/31 17:40:08 DEBUG AbstractCoordinator: Sending GroupCoordinator request for group td_topic_advert_impress_blacklist to broker 10.170.0.6:9092 (id: -1 rack: null)
18/10/31 17:40:08 DEBUG NetworkClient: Initiating connection to node -1 at 10.170.0.6:9092.
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--1.bytes-sent
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--1.bytes-received
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--1.latency
18/10/31 17:40:08 DEBUG Selector: Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
18/10/31 17:40:08 DEBUG NetworkClient: Completed connection to node -1.  Fetching API versions.
18/10/31 17:40:08 DEBUG NetworkClient: Initiating API versions fetch from node -1.
18/10/31 17:40:08 DEBUG NetworkClient: Initialize connection to node -3 for sending metadata request
18/10/31 17:40:08 DEBUG NetworkClient: Initiating connection to node -3 at 10.170.0.8:9092.
18/10/31 17:40:08 DEBUG NetworkClient: Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
18/10/31 17:40:08 DEBUG NetworkClient: Sending metadata request (type=MetadataRequest, topics=topic_advert_impress) to node -1
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--3.bytes-sent
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--3.bytes-received
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--3.latency
18/10/31 17:40:08 DEBUG Selector: Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -3
18/10/31 17:40:08 DEBUG NetworkClient: Completed connection to node -3.  Fetching API versions.
18/10/31 17:40:08 DEBUG NetworkClient: Initiating API versions fetch from node -3.
18/10/31 17:40:08 DEBUG Metadata: Updated cluster metadata version 2 to Cluster(id = -dsyi_nCTMGfFcaeYx-9Wg, nodes = [kafka3:9092 (id: 72 rack: null), kafka2:9092 (id: 71 rack: null), kafka1:9092 (id: 73 rack: null)], partitions = [Partition(topic = topic_advert_impress, partition = 3, leader = 72, replicas = [72,71,73], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 2, leader = 71, replicas = [71,72,73], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 5, leader = 71, replicas = [71,73,72], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 4, leader = 73, replicas = [73,72,71], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 1, leader = 73, replicas = [73,71,72], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 0, leader = 72, replicas = [72,73,71], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 7, leader = 73, replicas = [73,71,72], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 6, leader = 72, replicas = [72,73,71], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 8, leader = 71, replicas = [71,72,73], isr = [71,72,73])])
18/10/31 17:40:08 DEBUG NetworkClient: Recorded API versions for node -3: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
18/10/31 17:40:08 DEBUG AbstractCoordinator: Received GroupCoordinator response ClientResponse(receivedTimeMs=1540978808909, latencyMs=125, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=0,client_id=consumer-1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka1:9092 (id: 73 rack: null))) for group td_topic_advert_impress_blacklist
18/10/31 17:40:08 INFO AbstractCoordinator: Discovered coordinator kafka1:9092 (id: 2147483574 rack: null) for group td_topic_advert_impress_blacklist.
18/10/31 17:40:08 DEBUG NetworkClient: Initiating connection to node 2147483574 at kafka1:9092.
18/10/31 17:40:20 DEBUG NetworkClient: Error connecting to node 2147483574 at kafka1:9092:
java.io.IOException: Can't resolve address: kafka1:9092at org.apache.kafka.common.network.Selector.connect(Selector.java:195)at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762)at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:462)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:598)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:579)at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:214)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165)at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.nio.channels.UnresolvedAddressExceptionat sun.nio.ch.Net.checkAddress(Net.java:123)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)at org.apache.kafka.common.network.Selector.connect(Selector.java:192)... 37 more
18/10/31 17:40:20 INFO AbstractCoordinator: Marking the coordinator kafka1:9092 (id: 2147483574 rack: null) dead for group td_topic_advert_impress_blacklist
18/10/31 17:40:20 DEBUG AbstractCoordinator: Sending GroupCoordinator request for group td_topic_advert_impress_blacklist to broker kafka3:9092 (id: 72 rack: null)

 

这样我们就定位到了最终的问题:

java.io.IOException: Can't resolve address: kafka1:9092at org.apache.kafka.common.network.Selector.connect(Selector.java:195)at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762)at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(Co

 

可以看到就是域名不解析的问题:

我们只需要修改 windows 下的  host 文件即可

 

这篇关于Kafka-Spark Streaming 异常: dead for group td_topic_advert_impress_blacklist的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/956012

相关文章

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Thymeleaf:生成静态文件及异常处理java.lang.NoClassDefFoundError: ognl/PropertyAccessor

我们需要引入包: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>sp

深入理解数据库的 4NF:多值依赖与消除数据异常

在数据库设计中, "范式" 是一个常常被提到的重要概念。许多初学者在学习数据库设计时,经常听到第一范式(1NF)、第二范式(2NF)、第三范式(3NF)以及 BCNF(Boyce-Codd范式)。这些范式都旨在通过消除数据冗余和异常来优化数据库结构。然而,当我们谈到 4NF(第四范式)时,事情变得更加复杂。本文将带你深入了解 多值依赖 和 4NF,帮助你在数据库设计中消除更高级别的异常。 什么是

matlab读取NC文件(含group)

matlab读取NC文件(含group): NC文件数据结构: 代码: % 打开 NetCDF 文件filename = 'your_file.nc'; % 替换为你的文件名% 使用 netcdf.open 函数打开文件ncid = netcdf.open(filename, 'NC_NOWRITE');% 查看文件中的组% 假设我们想读取名为 "group1" 的组groupName

消除安卓SDK更新时的“https://dl-ssl.google.com refused”异常的方法

消除安卓SDK更新时的“https://dl-ssl.google.com refused”异常的方法   消除安卓SDK更新时的“https://dl-ssl.google.com refused”异常的方法 [转载]原地址:http://blog.csdn.net/x605940745/article/details/17911115 消除SDK更新时的“

ActiveMQ—Queue与Topic区别

Queue与Topic区别 转自:http://blog.csdn.net/qq_21033663/article/details/52458305 队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:         1、点对点(point-to-point,简称PTP)Queue消息传递模型:         通过该消息传递模型,一个应用程序(即消息生产者)可以

JVM 常见异常及内存诊断

栈内存溢出 栈内存大小设置:-Xss size 默认除了window以外的所有操作系统默认情况大小为 1MB,window 的默认大小依赖于虚拟机内存。 栈帧过多导致栈内存溢出 下述示例代码,由于递归深度没有限制且没有设置出口,每次方法的调用都会产生一个栈帧导致了创建的栈帧过多,而导致内存溢出(StackOverflowError)。 示例代码: 运行结果: 栈帧过大导致栈内存

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

org.hibernate.hql.ast.QuerySyntaxException:is not mapped 异常总结

org.hibernate.hql.ast.QuerySyntaxException: User is not mapped [select u from User u where u.userName=:userName and u.password=:password] 上面的异常的抛出主要有几个方面:1、最容易想到的,就是你的from是实体类而不是表名,这个应该大家都知道,注意