Kafka 实战演练:创建、配置与测试 Kafka全面教程

2024-09-07 12:36

本文主要是介绍Kafka 实战演练:创建、配置与测试 Kafka全面教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1.配置文件
  • 2.消费者
    • 1.注解方式
    • 2.KafkaConsumer
  • 3.依赖
    • 1.注解依赖
    • 2.KafkaConsumer依赖

本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得

1.配置文件

  1. Yml配置
spring:kafka:bootstrap-servers: consumer:group-id: iot-testaaaaaaaaaa11aaaaaaaaaauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:security:protocol: SASL_SSLsasl:mechanism: PLAINjaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";ssl:truststore:type: JKSlocation: src/main/resources/client.truststore.jkspassword: endpoint.identification.algorithm:

yaml配置

2.消费者

1.注解方式

    @KafkaListener(topics = {"abcd"})public void listen(ConsumerRecord<?, ?> record){Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();System.out.println("---->"+record);System.out.println("---->"+message);}}

注解方式

2.KafkaConsumer

/*** @author XHao*/
public class MqsConsumer {public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";private KafkaConsumer<Object, Object> consumer;MqsConsumer(String path) {Properties props = new Properties();try {InputStream in = new BufferedInputStream(new FileInputStream(path));props.load(in);} catch (IOException e) {e.printStackTrace();return;}consumer = new KafkaConsumer<Object, Object>(props);}public MqsConsumer() {Properties props = new Properties();try {props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);} catch (IOException e) {e.printStackTrace();return;}consumer = new KafkaConsumer<Object, Object>(props);}public void consume(List topics) {consumer.subscribe(topics);}public ConsumerRecords<Object, Object> poll(long timeout) {return consumer.poll(timeout);}public void close() {consumer.close();}/*** get classloader from thread context if no classloader found in thread* context return the classloader which has loaded this class** @return classloader*/public static ClassLoader getCurrentClassLoader() {ClassLoader classLoader = Thread.currentThread().getContextClassLoader();if (classLoader == null) {classLoader = MqsConsumer.class.getClassLoader();}return classLoader;}/*** 从classpath 加载配置信息** @param configFileName 配置文件名称* @return 配置信息* @throws IOException*/public static Properties loadFromClasspath(String configFileName) throws IOException {ClassLoader classLoader = getCurrentClassLoader();Properties config = new Properties();List<URL> properties = new ArrayList<URL>();Enumeration<URL> propertyResources = classLoader.getResources(configFileName);while (propertyResources.hasMoreElements()) {properties.add(propertyResources.nextElement());}for (URL url : properties) {InputStream is = null;try {is = url.openStream();config.load(is);} finally {if (is != null) {is.close();is = null;}}}return config;}
}

在这里插入图片描述

3.依赖

1.注解依赖

      <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2.KafkaConsumer依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version></dependency>

可视化大屏项目经常用到消息转换,实时状态等 记录一下吧

如果点赞多,评论多会更新详细教程,待补充。

这篇关于Kafka 实战演练:创建、配置与测试 Kafka全面教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1145071

相关文章

windos server2022的配置故障转移服务的图文教程

《windosserver2022的配置故障转移服务的图文教程》本文主要介绍了windosserver2022的配置故障转移服务的图文教程,以确保服务和应用程序的连续性和可用性,文中通过图文介绍的非... 目录准备环境:步骤故障转移群集是 Windows Server 2022 中提供的一种功能,用于在多个

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

Window Server2016 AD域的创建的方法步骤

《WindowServer2016AD域的创建的方法步骤》本文主要介绍了WindowServer2016AD域的创建的方法步骤,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一、准备条件二、在ServerA服务器中常见AD域管理器:三、创建AD域,域地址为“test.ly”

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

关于Maven中pom.xml文件配置详解

《关于Maven中pom.xml文件配置详解》pom.xml是Maven项目的核心配置文件,它描述了项目的结构、依赖关系、构建配置等信息,通过合理配置pom.xml,可以提高项目的可维护性和构建效率... 目录1. POM文件的基本结构1.1 项目基本信息2. 项目属性2.1 引用属性3. 项目依赖4. 构

龙蜥操作系统Anolis OS-23.x安装配置图解教程(保姆级)

《龙蜥操作系统AnolisOS-23.x安装配置图解教程(保姆级)》:本文主要介绍了安装和配置AnolisOS23.2系统,包括分区、软件选择、设置root密码、网络配置、主机名设置和禁用SELinux的步骤,详细内容请阅读本文,希望能对你有所帮助... ‌AnolisOS‌是由阿里云推出的开源操作系统,旨

PyTorch使用教程之Tensor包详解

《PyTorch使用教程之Tensor包详解》这篇文章介绍了PyTorch中的张量(Tensor)数据结构,包括张量的数据类型、初始化、常用操作、属性等,张量是PyTorch框架中的核心数据结构,支持... 目录1、张量Tensor2、数据类型3、初始化(构造张量)4、常用操作5、常用属性5.1 存储(st

Java操作PDF文件实现签订电子合同详细教程

《Java操作PDF文件实现签订电子合同详细教程》:本文主要介绍如何在PDF中加入电子签章与电子签名的过程,包括编写Word文件、生成PDF、为PDF格式做表单、为表单赋值、生成文档以及上传到OB... 目录前言:先看效果:1.编写word文件1.2然后生成PDF格式进行保存1.3我这里是将文件保存到本地后

windows系统下shutdown重启关机命令超详细教程

《windows系统下shutdown重启关机命令超详细教程》shutdown命令是一个强大的工具,允许你通过命令行快速完成关机、重启或注销操作,本文将为你详细解析shutdown命令的使用方法,并提... 目录一、shutdown 命令简介二、shutdown 命令的基本用法三、远程关机与重启四、实际应用