Kafka(五)消费者回调 +定时重试 + 理解Rebalance

2023-11-26 23:15

本文主要是介绍Kafka(五)消费者回调 +定时重试 + 理解Rebalance,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 消费者回调
    • 如何抽象callBack消息?
      • 为什么要设置serverId?
      • 如何消费callBack消息?
  • 定时重试
    • 消息失败表的设计
    • 重试逻辑设计
  • 理解Rabalance
    • 通过日志来理解rebalance
  • 参考资料
  • 结语
  • 示例源码仓库

消费者回调

有些邮件发送成功之后,需要执行后续逻辑,例如更新数据库等。那么我们这时需要将Message Server变成生产者, 向Kafak中投递callBack消息;Business Server 此时是消费者, 消费callBack消息。

如何抽象callBack消息?

callBack的逻辑根据业务场景相关,如何在保证满足不同callBack业务逻辑的同时还满足callBack消息格式的统一呢? 我们使用反射来实现这一目的

@JsonDeserialize(builder = CallbackMetaData.CallbackMetaDataBuilder.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Getter
@Builder
@ToString
public class CallbackMetaData implements Serializable {@JsonProperty("id")private String messageId;/*** 应该是hostName, InetAddress.getLocalHost().getHostName()*/@JsonProperty("serverId")private String serverId;@JsonProperty("className")private String className;/*** this string is the json string of the instance of the class, generated by Jackson.* for example:* className instance = new className();* objectMapper.writeValueAsString(instance);*/@JsonProperty("instanceJsonStr")private String instanceJsonStr;@JsonProperty("methodName")private String methodName;@JsonProperty("arguments")private Object[] arguments;}

上述内容,会作为邮件消息的一部分,发送给Message Server. 当Message Server发送完邮件之后, 会检查是否包含callback消息,如果包含,则将CallbackMetaData发送到相关topic

为什么要设置serverId?

有两点原因:

  1. 代码是滚动部署的,为了兼容性必须是:回调消息是由哪台Business Server携带的,就该回调到哪台Business Server。
  2. 一开始我们的思路是使用同一个topic,所有Business Server都订阅该topic,并且使用不同的消费者组,以达到广播消息的目的。这样在消费消息时通过判断当前消息的serverId是否当前server,如果是则消费,如果不是则直接提交offset。但是后来我们发现这样设计,会使得所有服务器每时每刻都在消费消息,即使该消息不是当前服务器的。改进后的设计是:每个服务器都有自己的callback topic, 只消费自己的callback topic下的消息即可。 callback topic的名字是:callback-serverId

serverId使用机器的hostName而不是IP, 因为IP有可能会变。

如何消费callBack消息?

其他消费者相关的代码我不再赘述,请参考上一篇博文的详细内容, 执行消费逻辑的代码就3行

ConsumerRecords<String, CallbackMetaData> records = consumer.poll(Duration.ofSeconds(10));
records.forEach(each -> {Class<?> destClass;try {//        核心消费代码, 通过反射调用目标方法destClass = Class.forName(each.value().getClassName());Object instance = objectMapper.readValue(each.value().getInstanceJsonStr(), destClass);MethodUtils.invokeMethod(instance, true, each.value().getMethodName(), each.value().getArguments());} catch (Exception e) {e.printStackTrace();}
});

定时重试

前两篇博文提到,无论生产者还是消费者,最终重试N次之后依旧失败的我们会把消息存储到数据库,以便后期通过定时任务进行重试。为了减轻业务服务器的负担,所有失败消息的重试都由Message Server负责。

消息失败表的设计

@Getter
@Setter
@ToString
@EqualsAndHashCode(of = {"messageId", "failedPhrases"})
public class MessageFailedEntity {/*** 主键*/private Long id;/*** 消息id*/private String messageId;/*** JSON格式的消息内容*/private String messageContentJsonFormat;/*** 消息类型* EMAIL 表示此消息为邮件* EMAIL_CALLBACK 表示此消息为邮件回调**/private MessageType messageType;/*** 消息失败的阶段:* PRODUCER 表示在生产者发送消息的时候失败* CONSUMER 表示在消费者消费消息的时候失败*/private MessageFailedPhrase messageFailedPhrase;/*** 失败时的异常堆栈信息*/private String failedReason;/*** 消息重试次数*/private Integer retryCount;/*** 消息重试状态* 0 表示重试失败* 1 表示重试成功*/private Integer retryStatus;/*** 时间戳*/private LocalDateTime lastUpdateTime;}

重试逻辑设计

重试的思路很简单:

  1. 从数据库查询消息失败表获得一批记录,每次可能100条或者10000条,根据实际场景自己确定
  2. 根据消息类型和消息的JSON格式,序列化为对应类的对象,调用不同的生产者发送消息到Kafka
  3. 如果该消息失败阶段是PRODUCER,那么重试成功之后,则更新该记录未重试成功
  4. 如果该消息失败阶段是CONSUMER,那么重试成功之后,则只更新重试次数,由对应的消费者去更新是否重试成功。因为CONSUMER只有消费成功才算重试成功。
  5. 设置最大重试次数,如果超过最大重试次数,则不再进行重试
  6. 如果是部署了多个Message Server,那么执行定时重试任务时,可以使用分布式锁以确保只有同一时刻只有一个Message Server在执行任务,这样做的目的主要是防止为了多个任务同时进行时,从数据库中查询的记录是同一批,当然也可以在表中增加一个标志位来区分该记录是否在重试中来达到相同的目的,根据实际情况选择即可

到此为止,结合前两篇博文,我们处理了在整个消息系统中可能出现所有的异常情况。

理解Rabalance

Kafka权威指南 > 第四章 第一节

Moving partition ownership from one consumer to another is called a rebalance.

一开始接触rebalance时候,我在思考一个问题,如果我的消费者还在消费消息中, 此时Kafka要进行rebalance,这对我的消费者业务逻辑有什么影响?会不会我还在消费中,然后被打断,如果是这样的话,那对我的消费业务逻辑的幂等性来说增加了不少挑战。

带着这些疑问,我搜索了一些资料,在confluent官网上发现了一篇博客,详细讲了rebalance过程, 文章链接

以下内容来自于上述文章链接

  1. Suppose we have an existing consumer group with a set assignment of topic-partitions to consumers. This consumer group consists of a number of consumers, each with a member id as well as a group leader (usually the consumer that was first to join the group). A new consumer comes along and requests to join the consumer group by sending a request of JoinGroup to the Group Coordinator along with the topics it would like to subscribe to.
  2. The Group Coordinator kicks off the rebalance by telling all current members to issue their own JoinGroup requests. This is done as part of the response to the heartbeat that consumers send to the Group Coordinator to tell it they’re still alive and well.
  3. Each consumer in the group has max.poll.interval.ms to wrap up their current processing and send their JoinGroup request, at which point the world is stopped. With all of the JoinGroup requests, the Group Coordinator knows all of the consumers in the group and which topics should be part of the consumer group. It sends JoinResponses to the members, chooses a leader from amongst the members, and leaves the leader to compute the partition assignments.
  4. All group members respond with a SyncGroup request. The group leader sends its partition assignments along with its request.
  5. At this point, the Group Coordinator can send its SyncResponse to each consumer confirming their assigned topic-partitions.
  6. Finally, consumers acknowledge their assignments and processing can resume. The world is no longer stopped

第二处高亮的地方,解决了我的疑问,在rebalance之前,会等待每个消费者把自己的消费逻辑处理完。

通过日志来理解rebalance

下面的日志是我本地的一次rebalance期间的日志,可以对照上述步骤加深理解

2023-11-08T02:23:04.180-0500 kafka-coordinator-heartbeat-thread | low-priority-email-group org.apache.kafka.clients.consumer.internals.ConsumerCoordinator WARN: [Consumer instanceId=2, clientId=consumer-low-priority-email-group-2, groupId=low-priority-email-group] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2023-11-08T02:23:04.180-0500 kafka-coordinator-heartbeat-thread | low-priority-email-group org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=2, clientId=consumer-low-priority-email-group-2, groupId=low-priority-email-group] Resetting generation and member id due to: consumer pro-actively leaving the group
2023-11-08T02:23:04.180-0500 kafka-coordinator-heartbeat-thread | low-priority-email-group org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=2, clientId=consumer-low-priority-email-group-2, groupId=low-priority-email-group] Request joining group due to: consumer pro-actively leaving the group2023-11-08T02:32:21.607-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.620-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-4
2023-11-08T02:32:21.621-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.712-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.712-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-9
2023-11-08T02:32:21.712-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.723-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.723-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-8
2023-11-08T02:32:21.723-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.739-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.739-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-1
2023-11-08T02:32:21.739-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.741-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.741-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-5
2023-11-08T02:32:21.741-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.745-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.745-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-6
2023-11-08T02:32:21.745-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.752-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.753-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-3
2023-11-08T02:32:21.753-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.903-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.903-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-0
2023-11-08T02:32:21.903-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.919-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.919-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-7
2023-11-08T02:32:21.919-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.920-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='1-f1f4f621-3bde-4e02-9621-a706320300ae', protocol='range'}
2023-11-08T02:32:21.920-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='10-cd494fa6-1bf4-4dd8-ae28-a494b21ad247', protocol='range'}
2023-11-08T02:32:21.920-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='7-e77860ca-7059-4a38-bfc9-7db3cc862a38', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='3-ec05c279-8059-4f03-b804-3da299f93b88', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='5-d08406a6-2bd1-4bed-aed9-5f65d7f75260', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='9-bab69ccd-a0b1-4f94-99d5-869fce905f3e', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='6-67b38de0-90e0-4f53-aaae-49c10a91a463', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='4-b24fc609-920a-4cc3-b507-2a4a1f1a568b', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='8-70cd2b5d-e17f-4346-bfcb-b170e766db39', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Finished assignment for group at generation 12: {10-cd494fa6-1bf4-4dd8-ae28-a494b21ad247=Assignment(partitions=[low-priority-email-2]), 7-e77860ca-7059-4a38-bfc9-7db3cc862a38=Assignment(partitions=[low-priority-email-7]), 3-ec05c279-8059-4f03-b804-3da299f93b88=Assignment(partitions=[low-priority-email-3]), 6-67b38de0-90e0-4f53-aaae-49c10a91a463=Assignment(partitions=[low-priority-email-6]), 8-70cd2b5d-e17f-4346-bfcb-b170e766db39=Assignment(partitions=[low-priority-email-8]), 1-f1f4f621-3bde-4e02-9621-a706320300ae=Assignment(partitions=[low-priority-email-0, low-priority-email-1]), 4-b24fc609-920a-4cc3-b507-2a4a1f1a568b=Assignment(partitions=[low-priority-email-4]), 5-d08406a6-2bd1-4bed-aed9-5f65d7f75260=Assignment(partitions=[low-priority-email-5]), 9-bab69ccd-a0b1-4f94-99d5-869fce905f3e=Assignment(partitions=[low-priority-email-9])}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='10-cd494fa6-1bf4-4dd8-ae28-a494b21ad247', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='1-f1f4f621-3bde-4e02-9621-a706320300ae', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-0, low-priority-email-1])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-2])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-2
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-0, low-priority-email-1
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='3-ec05c279-8059-4f03-b804-3da299f93b88', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='5-d08406a6-2bd1-4bed-aed9-5f65d7f75260', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-3])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-3
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='7-e77860ca-7059-4a38-bfc9-7db3cc862a38', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-5])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='8-70cd2b5d-e17f-4346-bfcb-b170e766db39', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-7])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='4-b24fc609-920a-4cc3-b507-2a4a1f1a568b', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-7
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-5
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-8])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='9-bab69ccd-a0b1-4f94-99d5-869fce905f3e', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='6-67b38de0-90e0-4f53-aaae-49c10a91a463', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-4])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-8
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-9])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-4
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-6])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-9
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-6
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Setting offset for partition low-priority-email-2 to the committed offset FetchPosition{offset=13436, offsetEpoch=Optional[8], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=8}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Setting offset for partition low-priority-email-1 to the committed offset FetchPosition{offset=13301, offsetEpoch=Optional[8], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=8}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Setting offset for partition low-priority-email-0 to the committed offset FetchPosition{offset=24087, offsetEpoch=Optional[8], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=8}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Setting offset for partition low-priority-email-3 to the committed offset FetchPosition{offset=13299, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Setting offset for partition low-priority-email-4 to the committed offset FetchPosition{offset=13352, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Setting offset for partition low-priority-email-8 to the committed offset FetchPosition{offset=13190, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Setting offset for partition low-priority-email-9 to the committed offset FetchPosition{offset=13151, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Setting offset for partition low-priority-email-6 to the committed offset FetchPosition{offset=13211, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Setting offset for partition low-priority-email-7 to the committed offset FetchPosition{offset=13303, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Setting offset for partition low-priority-email-5 to the committed offset FetchPosition{offset=13338, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}

参考资料

  • confluent解释rebalance的文章
  • infoq上关于rebalance的文章

结语

至此为止,利用Kafka实现一个消息系统就基本完成了,所有关键的代码都在不同的博文中并进行了详细说明,说过想要体会完整的设计、实现思路,请移步源码仓库获取完整代码。

下一篇关于Kafak的博文打算分享一下如何利用Kafka Connect将Oracle数据库的数据同步到Postgre SQL中。

示例源码仓库

  • Github地址

这篇关于Kafka(五)消费者回调 +定时重试 + 理解Rebalance的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/426467

相关文章

Spring Retry 实现乐观锁重试实践记录

《SpringRetry实现乐观锁重试实践记录》本文介绍了在秒杀商品SKU表中使用乐观锁和MybatisPlus配置乐观锁的方法,并分析了测试环境和生产环境的隔离级别对乐观锁的影响,通过简单验证,... 目录一、场景分析 二、简单验证 2.1、可重复读 2.2、读已提交 三、最佳实践 3.1、配置重试模板

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

深入理解Apache Airflow 调度器(最新推荐)

《深入理解ApacheAirflow调度器(最新推荐)》ApacheAirflow调度器是数据管道管理系统的关键组件,负责编排dag中任务的执行,通过理解调度器的角色和工作方式,正确配置调度器,并... 目录什么是Airflow 调度器?Airflow 调度器工作机制配置Airflow调度器调优及优化建议最

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

基于Python开发电脑定时关机工具

《基于Python开发电脑定时关机工具》这篇文章主要为大家详细介绍了如何基于Python开发一个电脑定时关机工具,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 简介2. 运行效果3. 相关源码1. 简介这个程序就像一个“忠实的管家”,帮你按时关掉电脑,而且全程不需要你多做

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

深入理解C语言的void*

《深入理解C语言的void*》本文主要介绍了C语言的void*,包括它的任意性、编译器对void*的类型检查以及需要显式类型转换的规则,具有一定的参考价值,感兴趣的可以了解一下... 目录一、void* 的类型任意性二、编译器对 void* 的类型检查三、需要显式类型转换占用的字节四、总结一、void* 的

深入理解Redis大key的危害及解决方案

《深入理解Redis大key的危害及解决方案》本文主要介绍了深入理解Redis大key的危害及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、背景二、什么是大key三、大key评价标准四、大key 产生的原因与场景五、大key影响与危