Kafka 实战演练:创建、配置与测试 Kafka全面教程

2024-09-07 12:36

本文主要是介绍Kafka 实战演练:创建、配置与测试 Kafka全面教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1.配置文件
  • 2.消费者
    • 1.注解方式
    • 2.KafkaConsumer
  • 3.依赖
    • 1.注解依赖
    • 2.KafkaConsumer依赖

本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得

1.配置文件

  1. Yml配置
spring:kafka:bootstrap-servers: consumer:group-id: iot-testaaaaaaaaaa11aaaaaaaaaauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:security:protocol: SASL_SSLsasl:mechanism: PLAINjaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";ssl:truststore:type: JKSlocation: src/main/resources/client.truststore.jkspassword: endpoint.identification.algorithm:

yaml配置

2.消费者

1.注解方式

    @KafkaListener(topics = {"abcd"})public void listen(ConsumerRecord<?, ?> record){Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();System.out.println("---->"+record);System.out.println("---->"+message);}}

注解方式

2.KafkaConsumer

/*** @author XHao*/
public class MqsConsumer {public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";private KafkaConsumer<Object, Object> consumer;MqsConsumer(String path) {Properties props = new Properties();try {InputStream in = new BufferedInputStream(new FileInputStream(path));props.load(in);} catch (IOException e) {e.printStackTrace();return;}consumer = new KafkaConsumer<Object, Object>(props);}public MqsConsumer() {Properties props = new Properties();try {props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);} catch (IOException e) {e.printStackTrace();return;}consumer = new KafkaConsumer<Object, Object>(props);}public void consume(List topics) {consumer.subscribe(topics);}public ConsumerRecords<Object, Object> poll(long timeout) {return consumer.poll(timeout);}public void close() {consumer.close();}/*** get classloader from thread context if no classloader found in thread* context return the classloader which has loaded this class** @return classloader*/public static ClassLoader getCurrentClassLoader() {ClassLoader classLoader = Thread.currentThread().getContextClassLoader();if (classLoader == null) {classLoader = MqsConsumer.class.getClassLoader();}return classLoader;}/*** 从classpath 加载配置信息** @param configFileName 配置文件名称* @return 配置信息* @throws IOException*/public static Properties loadFromClasspath(String configFileName) throws IOException {ClassLoader classLoader = getCurrentClassLoader();Properties config = new Properties();List<URL> properties = new ArrayList<URL>();Enumeration<URL> propertyResources = classLoader.getResources(configFileName);while (propertyResources.hasMoreElements()) {properties.add(propertyResources.nextElement());}for (URL url : properties) {InputStream is = null;try {is = url.openStream();config.load(is);} finally {if (is != null) {is.close();is = null;}}}return config;}
}

在这里插入图片描述

3.依赖

1.注解依赖

      <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2.KafkaConsumer依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version></dependency>

可视化大屏项目经常用到消息转换,实时状态等 记录一下吧

如果点赞多,评论多会更新详细教程,待补充。

这篇关于Kafka 实战演练:创建、配置与测试 Kafka全面教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1145071

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Zookeeper安装和配置说明

一、Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。 ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境; ■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例; ■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble) Zookeeper通过复制来实现

CentOS7安装配置mysql5.7 tar免安装版

一、CentOS7.4系统自带mariadb # 查看系统自带的Mariadb[root@localhost~]# rpm -qa|grep mariadbmariadb-libs-5.5.44-2.el7.centos.x86_64# 卸载系统自带的Mariadb[root@localhost ~]# rpm -e --nodeps mariadb-libs-5.5.44-2.el7

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

NameNode内存生产配置

Hadoop2.x 系列,配置 NameNode 内存 NameNode 内存默认 2000m ,如果服务器内存 4G , NameNode 内存可以配置 3g 。在 hadoop-env.sh 文件中配置如下。 HADOOP_NAMENODE_OPTS=-Xmx3072m Hadoop3.x 系列,配置 Nam

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置