Sprint Boot集成Kafka

2023-12-02 09:08
文章标签 boot 集成 kafka sprint

本文主要是介绍Sprint Boot集成Kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

Sprint Boot集成Kafka

 

 

 

pom.xml

		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.1.RELEASE</version></dependency>

 

 

 

application.properties

......#============== kafka ===================
kafka.consumer.zookeeper.connect=192.168.2.10:2181
kafka.consumer.servers=192.168.2.10:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10kafka.producer.servers=192.168.2.10:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960......

 

 

KafkaProducerConfig

package com.youfan.kafka;import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${kafka.producer.servers}")private String servers;@Value("${kafka.producer.retries}")private int retries;@Value("${kafka.producer.batch.size}")private int batchSize;@Value("${kafka.producer.linger}")private int linger;@Value("${kafka.producer.buffer.memory}")private int bufferMemory;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, linger);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}
}

 

 

@Autowired    
private KafkaTemplate kafkaTemplate;

 

/*** 埋点收集日志,频道数量少,该模块的频道id固定,id为1,代表生鲜*/long pindaoid = 1;//频道id//productytpeid//产品类别id//producid//产品id//用户idlong userid = 0;//游客//ip地址String ipaddress = IpUtil.getIpAddress(request);System.out.println(ipaddress);HttpSession sesion = request.getSession();Object userobject = sesion.getAttribute("user");if(userobject!=null){User user = (User)userobject;userid =  user.getId();}//获取浏览器信息以及操作系统信息String osandbrowser = BrowserInfoUtil.getOsAndBrowserInfo(request);System.out.println(osandbrowser);String[] temps = osandbrowser.split("---");String os = temps[0].trim();String browser = temps[1].trim();System.out.println(os);System.out.println(browser);Productscanlog productscanlog = new Productscanlog();//根据ip获取地区和运营商try {AreaAndnetwork areaAndnetwork = AreaAndNetworkUtil.getAddressByIp(ipaddress);productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setProvice(areaAndnetwork.getProvice());productscanlog.setCity(areaAndnetwork.getCity());productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setNetwork(areaAndnetwork.getNetwork());}catch (Exception e){e.printStackTrace();}productscanlog.setPindaoid(pindaoid);productscanlog.setProductytpeid(productytpeid);productscanlog.setProducid(Long.valueOf(producid+""));productscanlog.setUserid(userid);productscanlog.setIp(ipaddress);productscanlog.setBrowser(browser);productscanlog.setOs(os);productscanlog.setTimestamp(new Date().getTime());String productscanlogstring = JSONObject.toJSONString(productscanlog);System.out.println(productscanlogstring);kafkaTemplate.send("productscanlog", "key", productscanlogstring);String productflume = userid +"\t" + pindaoid+"\t"+productscanlog.getTimestamp();kafkaTemplate.send("productscanlogflume","key",productflume);

 

 


==============================
QQ群:143522604
群里有相关资源
欢迎和大家一起学习、交流、提升!
==============================

 

 

这篇关于Sprint Boot集成Kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/444728

相关文章

Spring Boot中WebSocket常用使用方法详解

《SpringBoot中WebSocket常用使用方法详解》本文从WebSocket的基础概念出发,详细介绍了SpringBoot集成WebSocket的步骤,并重点讲解了常用的使用方法,包括简单消... 目录一、WebSocket基础概念1.1 什么是WebSocket1.2 WebSocket与HTTP

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr

深度解析Spring Boot拦截器Interceptor与过滤器Filter的区别与实战指南

《深度解析SpringBoot拦截器Interceptor与过滤器Filter的区别与实战指南》本文深度解析SpringBoot中拦截器与过滤器的区别,涵盖执行顺序、依赖关系、异常处理等核心差异,并... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现

如何在Spring Boot项目中集成MQTT协议

《如何在SpringBoot项目中集成MQTT协议》本文介绍在SpringBoot中集成MQTT的步骤,包括安装Broker、添加EclipsePaho依赖、配置连接参数、实现消息发布订阅、测试接口... 目录1. 准备工作2. 引入依赖3. 配置MQTT连接4. 创建MQTT配置类5. 实现消息发布与订阅

SpringBoot集成LiteFlow工作流引擎的完整指南

《SpringBoot集成LiteFlow工作流引擎的完整指南》LiteFlow作为一款国产轻量级规则引擎/流程引擎,以其零学习成本、高可扩展性和极致性能成为微服务架构下的理想选择,本文将详细讲解Sp... 目录一、LiteFlow核心优势二、SpringBoot集成实战三、高级特性应用1. 异步并行执行2

Spring Boot 实现 IP 限流的原理、实践与利弊解析

《SpringBoot实现IP限流的原理、实践与利弊解析》在SpringBoot中实现IP限流是一种简单而有效的方式来保障系统的稳定性和可用性,本文给大家介绍SpringBoot实现IP限... 目录一、引言二、IP 限流原理2.1 令牌桶算法2.2 漏桶算法三、使用场景3.1 防止恶意攻击3.2 控制资源

SpringBoot3应用中集成和使用Spring Retry的实践记录

《SpringBoot3应用中集成和使用SpringRetry的实践记录》SpringRetry为SpringBoot3提供重试机制,支持注解和编程式两种方式,可配置重试策略与监听器,适用于临时性故... 目录1. 简介2. 环境准备3. 使用方式3.1 注解方式 基础使用自定义重试策略失败恢复机制注意事项

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte