本文主要是介绍Kafka·Producer,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Producer发送原理
-
拦截器进行拦截
-
对key和value进行序列化
org.apache.kafka.clients.producer.KafkaProducer#doSend
-
分区选择
计算消息要发送到topic的哪个分区上
- 若指定了分区,则使用指定的值
- 没有指定的话则使用分区器计算得到
- 或者使用hash取余的方式
-
暂存消息到累加器
Producer并不会立刻发送消息到Broker中,而是把消息暂存到累加器中,当消息达到一定量后,再异步批量发送
消息被暂存到这样一个结构中,可以看到每个topic中,按照分区将消息暂存到一个Deque中
-
acks
acks,消息发送后的确认机制- acks = 0:发送即确认
- acks = 1:消息在leader落盘后确认
- acks = all/-1:默认值,leader等待其他follower(ISR中的follower)都同步后回复确认
-
重试
-
幂等性
开启幂等性配置(enable.idempotence默认true)后,生产者发送消息时,除了key-value,还会发送PID和Sequence Number。- PID:用来标识生产者
- Sequence Number:表示消息的序列号,每次发送消息都会+1产生新的序列号(连续)
也就是说PID和Sequence Number会跟随key-value一并先暂存于累加器中,最后发送到Broker中,当broker应答异常,Producer会进行重试发送
消息发送到Broker后,虽然由于某些原因造成了生产者重试发送消息,但前一条消息可能已经成功落盘到Broker了,当重试发送的消息到达broker后,会通过PID和Sequence Number来判断
- 若本次消息的Sequence Number 小于或等于 上一次消息的Sequence Number,说明消息重复,丢弃本次消息
- 若本次消息的Sequence Number 大于 (上一次消息Sequence Number+1),说明存在消息丢失现象,然后抛异常
虽然是否开启幂等性是在Producer配置的,但是幂等性是在broker中解决的(在Broker中判断PID和Sequence Number)
这篇关于Kafka·Producer的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!