卡夫卡 启动生产者_如何使卡夫卡生产者消费品生产做好准备

2023-10-19 10:40

本文主要是介绍卡夫卡 启动生产者_如何使卡夫卡生产者消费品生产做好准备,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

卡夫卡 启动生产者

Kafka is an open-source stream processing platform. It is developed to provide high throughput and low latency to handle real-time data.

Kafka是一个开源流处理平台。 开发它是为了提供高吞吐量和低延迟来处理实时数据。

Before we read about how to make our Kafka producer/consumer production-ready, Let’s first understand the basic terminologies of Kafka.

在我们了解如何使我们的Kafka生产者/消费者生产就绪之前,让我们首先了解Kafka的基本术语。

Kafka topic: It refers to a family or name to store a specific kind of message or a particular stream of data. A topic contains messages of a similar kind.

Kafka主题:它是指用于存储特定类型的消息或特定数据流的族或名称。 主题包含类似的消息。

Kafka partition: A topic can be partitioned into one or more partitions. It is again to segregate the messages from one topic into multiple buckets. Each partition may reside on the same/different Kafka brokers.

Kafka分区:一个主题可以分为一个或多个分区。 再次将消息从一个主题隔离到多个存储桶中。 每个分区可以驻留在相同/不同的Kafka代理上。

Kafka producer: It is a component of the Kafka ecosystem which is used to publish messages onto a Kafka topic.

Kafka生产者:它是Kafka生态系统的组成部分,用于将消息发布到Kafka主题上。

Kafka consumer: It is another component of the Kafka ecosystem which consumes messages produced by a Kafka producer on a topic.

卡夫卡消费者:这是卡夫卡生态系统的另一个组成部分,使用卡夫卡生产者产生的有关主题的消息。

Kafka生产者配置 (Kafka producer configurations)

Having knowledge of these producer configurations becomes critical for us in order to get optimal performance and to leverage the capabilities of Kafka in your streaming application in the best possible way. There are multiple configurations. Let's look at each of them in detail below:

了解这些生产者配置对于我们至关重要,以便获得最佳性能并以最佳方式在您的流式应用程序中利用Kafka的功能。 有多种配置。 让我们在下面详细查看它们:

  1. Retries: This configuration is used to set the maximum number of retry attempts to be done by the producer in case of any message publish failure. If your application can not afford to miss any data publish, we increase this count to guarantee to publish our data.

    重试:此配置用于设置在任何消息发布失败的情况下,生产方要进行的最大重试次数。 如果您的应用程序不能错过任何发布的数据,我们将增加此计数以保证发布我们的数据。

  2. Batch size: It is set to have high throughput in our producer application by combining multiple network calls into one. batch.size measures batch size in total bytes, how many bytes to be collected before sending to Kafka broker. It makes sense to increase its value when your producer is publishing data all the time to have the best throughput otherwise, it will have to wait till the given size is collected. The default value is 16384.

    批处理大小:通过将多个网络调用组合为一个,可以在我们的生产者应用程序中实现高吞吐量。 batch.size以总字节batch.size衡量批量大小,即在发送到Kafka经纪人之前要收集多少字节。 当您的生产者一直在发布数据以具有最佳吞吐量时,增加其价值是有意义的,否则,它将不得不等到收集给定的大小。 默认值为16384

  3. Linger time: This configuration collaborates with batch size to have high throughput. linger.mssets the maximum time to buffer data in asynchronous mode. Suppose we set 300kb in batch.size and 10 ms in linger.ms, then the producer waits till at least one of them is breached. By default, the producer does not wait. It sends the buffer any time data is available. This would reduce the number of requests sent, but would add up to n milliseconds of latency to records sent.

    延迟时间:此配置与批处理大小协作以实现高吞吐量。 linger.ms设置在异步模式下缓冲数据的最长时间。 假设在我们设定的300KB batch.size10毫秒linger.ms ,则生产者等待,直到它们中的至少一个被破坏。 默认情况下,生产者不等待。 只要有数据,它就会发送缓冲区。 这将减少发送的请求数,但会给发送的记录增加最多n毫秒的延迟。

  4. Send() method: There are 3 ways to publish messages onto a Kafka topic.

    Send()方法:有3种方法可以将消息发布到Kafka主题上。

    Send() method: There are 3 ways to publish messages onto a Kafka topic.A. Fire and forget — It is the fastest way to publish messages, but messages could be lost here.

    Send()方法:有3种方法可以将消息发布到Kafka主题上。 A.忘却了-这是发布消息的最快方法,但是消息可能会在此处丢失。

    Send() method: There are 3 ways to publish messages onto a Kafka topic.A. Fire and forget — It is the fastest way to publish messages, but messages could be lost here. RecordMetadata rm = producer.send(record);
    B. Synchronous — It is the slowest method, we use it when we can not afford to lose any message. It blocks the thread to produce a new message until an acknowledgment is received for the last published message. RecordMetadata rm = producer.send(record).get();
    C. Asynchronous — It gives us better throughput comparing to synchronous since we don’t wait for acknowledgments. producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata rm, Exception ex){...}
    })

    Send()方法:有3种方法可以将消息发布到Kafka主题上。 A.忘却了-这是发布消息的最快方法,但是消息可能会在此处丢失。 RecordMetadata rm = producer.send(record);
    B.同步-这是最慢的方法,当我们无法承受丢失任何消息时,我们会使用它。 它阻塞线程以产生新消息,直到收到最后发布的消息的确认为止。 RecordMetadata rm = producer.send(record).get();
    C.异步-与同步相比,它为我们提供了更好的吞吐量,因为我们不等待确认。 producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata rm, Exception ex){...}
    })

  5. Acks: It is used to set the number of replicas for which the coordinator node will wait before sending a successful acknowledgment to the producer application. There are 3 values possible for this configuration — 0,1 and -1(all). 0 means no acknowledgment is required. It is set when low latency is required. -1 (or all) means response from all the replicas for the given partition is inevitable and it is set when data consistency is most crucial.

    Acks:用于设置协调器节点向生产者应用程序发送成功确认之前等待的副本数。 此配置可能有3个值0,1 and -1(all) 。 0表示不需要确认。 需要低延迟时设置。 -1(或全部)表示来自给定分区的所有副本的响应是不可避免的,并且在数据一致性最为关键时进行设置。

    There is one more important property

    还有一个重要的属性

    min.insync.replicasthat works with acks=all property. For any publish request with acks=all to execute, there should be at least these many in-sync replicas online, otherwise the producer will get exceptions.

    acks=all属性一起使用的min.insync.replicas 。 对于要执行的acks=all带有acks=all发布请求,至少应在线上有这么多个同步副本,否则生产者将获得异常。

Image for post

卡夫卡消费者配置 (Kafka consumer configuration)

Let’s understand how to tune our Kafka consumer to consume messages from Kafka efficiently. Like a Kafka producer, a consumer also has a number of configurations.

让我们了解如何调整我们的Kafka用户以有效地使用来自Kafka的消息。 像卡夫卡生产者一样,消费者也有许多配置。

  1. Consumer group: It plays a significant role for the consumer as you can have multiple instances of the consumer application within the same consumer group to consume messages from a Kafka topic. The number of consumers should be less than or equal to the number of partitions in the Kafka topic as a partition can be assigned to only one consumer in a consumer group.

    使用者组:它对使用者起着重要的作用,因为您可以在同一使用者组中拥有多个使用者应用程序实例,以使用来自Kafka主题的消息。 使用者数量应小于或等于Kafka主题中的分区数量,因为一个分区只能分配给一个使用者组中的一个使用者。

    Best performance is achieved when

    当达到最佳性能时

    #consumers = #partitions

    #consumers = #partitions

  2. Auto vs Manual commit: A consumer has to commit the offset after consuming a message from a topic. This commit can be made automatically or manually.

    自动提交与手动提交:使用者在消费了主题消息后必须提交偏移量。 可以自动或手动进行此提交。

    Auto vs Manual commit: A consumer has to commit the offset after consuming a message from a topic. This commit can be made automatically or manually. enable.auto.commit is the property which needs to be true to enable auto-commit. It is enabled by default.

    自动提交与手动提交:使用者在消费了主题消息后必须提交偏移量。 可以自动或手动进行此提交。 enable.auto.commit是必须为true的属性,以启用自动提交。 默认情况下启用。

    Auto vs Manual commit: A consumer has to commit the offset after consuming a message from a topic. This commit can be made automatically or manually. enable.auto.commit is the property which needs to be true to enable auto-commit. It is enabled by default. auto.commit.interval.ms is the dependent property which is taken into account when auto-commit is enabled. It is used to set the max time in ms to wait to make a commit for the consumed messages.

    自动提交与手动提交:使用者在消费了主题消息后必须提交偏移量。 可以自动或手动进行此提交。 enable.auto.commit是必须为true的属性,以启用自动提交。 默认情况下启用。 auto.commit.interval.ms是从属属性,启用自动提交时会考虑该属性。 它用于设置等待提交消耗消息的最长时间(以毫秒为单位)。

    Auto vs Manual commit: A consumer has to commit the offset after consuming a message from a topic. This commit can be made automatically or manually. enable.auto.commit is the property which needs to be true to enable auto-commit. It is enabled by default. auto.commit.interval.ms is the dependent property which is taken into account when auto-commit is enabled. It is used to set the max time in ms to wait to make a commit for the consumed messages. There could be a situation where messages are read, but not processed yet and the application crashes for a reason, then the last read messages will be lost when we restore the application. Manual commit comes into the picture to prevent this situation where the consumer makes commit once it processes the messages completely.

    自动提交与手动提交:使用者在消费了主题消息后必须提交偏移量。 可以自动或手动进行此提交。 enable.auto.commit是必须为true的属性,以启用自动提交。 默认情况下启用。 auto.commit.interval.ms是从属属性,启用自动提交时会考虑该属性。 它用于设置等待提交消费消息的最长时间(以毫秒为单位)。 可能存在读取消息但尚未处理消息并且应用程序由于某种原因崩溃的情况,那么当我们还原应用程序时,最后读取的消息将丢失。 手动提交会出现在图片中,以防止这种情况,即消费者在完全处理完消息后即进行提交。

  3. Heartbeat interval: This property is used by the consumers to send their heartbeat to the Kafka broker. It tells Kafka that the given consumer is still alive and consuming messages from it.

    心跳间隔:消费者使用此属性将心跳发送到Kafka代理。 它告诉Kafka,给定的使用者仍在运行,并且正在使用该使用者的消息

    Heartbeat interval: This property is used by the consumers to send their heartbeat to the Kafka broker. It tells Kafka that the given consumer is still alive and consuming messages from it.heartbeat.interval.ms = 10ms the consumer sends its heartbeat to the Kafka broker at every 10 milliseconds.

    心跳间隔:消费者使用此属性将心跳发送到Kafka代理。 它告诉Kafka,给定的使用者仍在运行,并且正在使用该使用者的消息heartbeat.interval.ms = 10ms ,消费者每10毫秒将其心跳发送到Kafka代理。

  4. Session timeout: It is the time when the broker decides that the consumer is died and no longer available to consume.

    会话超时:这是经纪人确定消费者死亡并且不再可以消费的时间。

    Session timeout: It is the time when the broker decides that the consumer is died and no longer available to consume.session.timeout.ms = 50 ms Suppose the consumer is down and it is not sending heartbeat to Kafka broker, then the broker waits until 50 ms time is over, to decide the consumer is dead.

    会话超时:这是经纪人确定消费者死亡并且不再可以消费的时间。 session.timeout.ms = 50 ms假设使用者关闭并且没有向Kafka经纪人发送心跳,然后该经纪人等待直到50毫秒时间结束,以决定使用者死亡。

  5. Auto offset reset strategy: It is used to define the behavior of the consumer when there is no committed offset. This would be the case when the consumer is started for the first time or when an offset is out of range. There are 3 possible values for the property auto.offset.reset = earliest, latest, none . The default value is latest.

    自动偏移量重置策略:用于定义没有提交偏移量时使用者的行为。 当使用者第一次启动或偏移量超出范围时,就会出现这种情况。 属性auto.offset.reset = earliest, latest, none 3种可能的值。 默认值为最新

    Auto offset reset strategy: It is used to define the behavior of the consumer when there is no committed offset. This would be the case when the consumer is started for the first time or when an offset is out of range. There are 3 possible values for the property auto.offset.reset = earliest, latest, none . The default value is latest.Note: This configuration is applicable only when no offset commit is found.

    自动偏移量重置策略:用于定义没有提交偏移量时使用者的行为。 当使用者第一次启动或偏移量超出范围时,就会出现这种情况。 属性auto.offset.reset = earliest, latest, none 3种可能的值。 默认值为最新注意:仅当未找到偏移提交时,此配置才适用。

Image for post
The auto offset reset strategy is explained how does it work when offset commit is not found for a consumer.
解释了自动偏移量重置策略如何在找不到使用者的偏移量提交时如何工作。

One more important thing that affects what offset value will correspond to the earliest and latest configs is log retention policy. Imagine we have a topic with retention configured to 1 hour. We produce 10 messages, and then an hour later we publish 5 more messages. The latest offset will still remain the same as explained above, but the earliest one won’t be able to be 0 because Kafka will have already removed these messages and thus the earliest available offset will be 10.

更重要的一点是,日志保留策略会影响什么偏移值将对应最早和最新的配置。 想象一下,我们有一个保留时间配置为1小时的主题。 我们产生10条消息,然后一个小时后,我们又发布5条消息。 最新的偏移量仍将保持与上面说明的相同,但是最早的偏移量将不能为0,因为Kafka将已经删除了这些消息,因此最早的可用偏移量将为10。

Thanks for reading!

谢谢阅读!

翻译自: https://medium.com/swlh/how-to-make-kafka-producer-consumer-production-ready-d40915b78c9

卡夫卡 启动生产者


http://www.taodudu.cc/news/show-7999617.html

相关文章:

  • 卡夫卡-速成班
  • 从卡夫卡消费
  • 卡夫卡详解_如何宣传卡夫卡
  • 5.5卡夫卡入门
  • 卡夫卡服务器自动清理,3服务器卡夫卡集群写入Logstash
  • 卡夫卡详解_外行卡夫卡指南
  • 5.9spring整合卡夫卡
  • java卡夫卡_java – 如何在卡夫卡使用多个消费者?
  • 卡夫卡与mysql_zookeeper与卡夫卡集群搭建
  • 读道德经的心得
  • 编译原理部分知识点总结
  • 【学习周报】深度学习笔记第七周
  • 离散数学学习笔记——集合论基础
  • 西电网课雨课堂《书法鉴赏》全部课后答案
  • Webrtc音视频会议之Webrtc“不求甚解”
  • python学习forth day之不求甚解
  • sphinx python mysql_不求甚解的使用sphinx生成Python文档
  • 不求甚解1:什么是elf
  • 对新概念应当”不求甚解“
  • equals ==(不求甚解) 十六
  • 图像特征之不求甚解
  • 不求甚解系列之tailwindcss
  • 设计模式学习--不求甚解
  • 编程之不求甚解(从python中来看)
  • 不求甚解-luence
  • 不求甚解-zookeeper
  • 不求甚解-SpringBoot
  • 【不求甚解】知识点
  • Springboot事件监听+@Async注解
  • python学习first day之不求甚解
  • 这篇关于卡夫卡 启动生产者_如何使卡夫卡生产者消费品生产做好准备的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



    http://www.chinasem.cn/article/239238

    相关文章

    NameNode内存生产配置

    Hadoop2.x 系列,配置 NameNode 内存 NameNode 内存默认 2000m ,如果服务器内存 4G , NameNode 内存可以配置 3g 。在 hadoop-env.sh 文件中配置如下。 HADOOP_NAMENODE_OPTS=-Xmx3072m Hadoop3.x 系列,配置 Nam

    MySQL数据库宕机,启动不起来,教你一招搞定!

    作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

    springboot3打包成war包,用tomcat8启动

    1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided

    内核启动时减少log的方式

    内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

    用命令行的方式启动.netcore webapi

    用命令行的方式启动.netcore web项目 进入指定的项目文件夹,比如我发布后的代码放在下面文件夹中 在此地址栏中输入“cmd”,打开命令提示符,进入到发布代码目录 命令行启动.netcore项目的命令为:  dotnet 项目启动文件.dll --urls="http://*:对外端口" --ip="本机ip" --port=项目内部端口 例: dotnet Imagine.M

    Linux服务器Java启动脚本

    Linux服务器Java启动脚本 1、初版2、优化版本3、常用脚本仓库 本文章介绍了如何在Linux服务器上执行Java并启动jar包, 通常我们会使用nohup直接启动,但是还是需要手动停止然后再次启动, 那如何更优雅的在服务器上启动jar包呢,让我们一起探讨一下吧。 1、初版 第一个版本是常用的做法,直接使用nohup后台启动jar包, 并将日志输出到当前文件夹n

    衡石分析平台使用手册-单机安装及启动

    单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

    SpringBoot项目是如何启动

    启动步骤 概念 运行main方法,初始化SpringApplication 从spring.factories读取listener ApplicationContentInitializer运行run方法读取环境变量,配置信息创建SpringApplication上下文预初始化上下文,将启动类作为配置类进行读取调用 refresh 加载 IOC容器,加载所有的自动配置类,创建容器在这个过程

    嵌入式Openharmony系统构建与启动详解

    大家好,今天主要给大家分享一下,如何构建Openharmony子系统以及系统的启动过程分解。 第一:OpenHarmony系统构建      首先熟悉一下,构建系统是一种自动化处理工具的集合,通过将源代码文件进行一系列处理,最终生成和用户可以使用的目标文件。这里的目标文件包括静态链接库文件、动态链接库文件、可执行文件、脚本文件、配置文件等。      我们在编写hellowor

    如何做好网络安全

    随着互联网技术的飞速发展,网站已成为企业对外展示、交流和服务的重要窗口。然而,随之而来的网站安全问题也日益凸显,给企业的业务发展和用户数据安全带来了巨大威胁。因此,高度重视网站安全已成为网络安全的首要任务。今天我们就来详细探讨网站安全的重要性、面临的挑战以及有什么应对方案。 一、网站安全的重要性 1. 数据安全与用户隐私 网站是企业存储和传输数据的关键平台,包括用户个人信息、