Sprint Boot集成Kafka

2023-12-02 09:08
文章标签 boot 集成 kafka sprint

本文主要是介绍Sprint Boot集成Kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

Sprint Boot集成Kafka

 

 

 

pom.xml

		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.1.RELEASE</version></dependency>

 

 

 

application.properties

......#============== kafka ===================
kafka.consumer.zookeeper.connect=192.168.2.10:2181
kafka.consumer.servers=192.168.2.10:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10kafka.producer.servers=192.168.2.10:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960......

 

 

KafkaProducerConfig

package com.youfan.kafka;import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${kafka.producer.servers}")private String servers;@Value("${kafka.producer.retries}")private int retries;@Value("${kafka.producer.batch.size}")private int batchSize;@Value("${kafka.producer.linger}")private int linger;@Value("${kafka.producer.buffer.memory}")private int bufferMemory;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, linger);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}
}

 

 

@Autowired    
private KafkaTemplate kafkaTemplate;

 

/*** 埋点收集日志,频道数量少,该模块的频道id固定,id为1,代表生鲜*/long pindaoid = 1;//频道id//productytpeid//产品类别id//producid//产品id//用户idlong userid = 0;//游客//ip地址String ipaddress = IpUtil.getIpAddress(request);System.out.println(ipaddress);HttpSession sesion = request.getSession();Object userobject = sesion.getAttribute("user");if(userobject!=null){User user = (User)userobject;userid =  user.getId();}//获取浏览器信息以及操作系统信息String osandbrowser = BrowserInfoUtil.getOsAndBrowserInfo(request);System.out.println(osandbrowser);String[] temps = osandbrowser.split("---");String os = temps[0].trim();String browser = temps[1].trim();System.out.println(os);System.out.println(browser);Productscanlog productscanlog = new Productscanlog();//根据ip获取地区和运营商try {AreaAndnetwork areaAndnetwork = AreaAndNetworkUtil.getAddressByIp(ipaddress);productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setProvice(areaAndnetwork.getProvice());productscanlog.setCity(areaAndnetwork.getCity());productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setNetwork(areaAndnetwork.getNetwork());}catch (Exception e){e.printStackTrace();}productscanlog.setPindaoid(pindaoid);productscanlog.setProductytpeid(productytpeid);productscanlog.setProducid(Long.valueOf(producid+""));productscanlog.setUserid(userid);productscanlog.setIp(ipaddress);productscanlog.setBrowser(browser);productscanlog.setOs(os);productscanlog.setTimestamp(new Date().getTime());String productscanlogstring = JSONObject.toJSONString(productscanlog);System.out.println(productscanlogstring);kafkaTemplate.send("productscanlog", "key", productscanlogstring);String productflume = userid +"\t" + pindaoid+"\t"+productscanlog.getTimestamp();kafkaTemplate.send("productscanlogflume","key",productflume);

 

 


==============================
QQ群:143522604
群里有相关资源
欢迎和大家一起学习、交流、提升!
==============================

 

 

这篇关于Sprint Boot集成Kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/444728

相关文章

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

【Shiro】Shiro 的学习教程(三)之 SpringBoot 集成 Shiro

目录 1、环境准备2、引入 Shiro3、实现认证、退出3.1、使用死数据实现3.2、引入数据库,添加注册功能后端代码前端代码 3.3、MD5、Salt 的认证流程 4.、实现授权4.1、基于角色授权4.2、基于资源授权 5、引入缓存5.1、EhCache 实现缓存5.2、集成 Redis 实现 Shiro 缓存 1、环境准备 新建一个 SpringBoot 工程,引入依赖:

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

Spring Boot 入门篇

一、简介 Spring Boot是一款开源的Java Web应用框架,旨在简化Spring应用的初始搭建以及开发过程。它整合了Spring技术栈中的诸多关键组件,为开发者提供了一种快速、简便的Spring应用开发方式。Spring Boot遵循“约定优于配置”的原则,通过自动配置、起步依赖和内置的Servlet容器,极大地简化了传统Spring应用的配置和部署过程。 二、Spring Boot

系统架构师-ERP+集成

ERP   集成平台end:就懒得画新的页

Spring Boot集成Tess4J实现OCR

1.什么是Tess4j? Tesseract是一个开源的光学字符识别(OCR)引擎,它可以将图像中的文字转换为计算机可读的文本。支持多种语言和书面语言,并且可以在命令行中执行。它是一个流行的开源OCR工具,可以在许多不同的操作系统上运行。Tess4J是一个基于Tesseract OCR引擎的Java接口,可以用来识别图像中的文本,说白了,就是封装了它的API,让Java可以直接调用。 Tess

部署若依Spring boot项目

nohup和& nohup命令解释 nohup命令:nohup 是 no hang up 的缩写,就是不挂断的意思,但没有后台运行,终端不能标准输入。nohup :不挂断的运行,注意并没有后台运行的功能,就是指,用nohup运行命令可以使命令永久的执行下去,和用户终端没有关系,注意了nohup没有后台运行的意思;&才是后台运行在缺省情况下该作业的所有输出都被重定向到一个名为nohup.o

使用Spring Boot集成Spring Data JPA和单例模式构建库存管理系统

引言 在企业级应用开发中,数据库操作是非常重要的一环。Spring Data JPA提供了一种简化的方式来进行数据库交互,它使得开发者无需编写复杂的JPA代码就可以完成常见的CRUD操作。此外,设计模式如单例模式可以帮助我们更好地管理和控制对象的创建过程,从而提高系统的性能和可维护性。本文将展示如何结合Spring Boot、Spring Data JPA以及单例模式来构建一个基本的库存管理系统

Spring Boot集成PDFBox实现电子签章

概述 随着无纸化办公的普及,电子文档的使用越来越广泛。电子签章作为一种有效的身份验证方式,在很多场景下替代了传统的纸质文件签名。Apache PDFBox 是一个开源的Java库,可以用来渲染、生成、填写PDF文档等操作。本文将介绍如何使用Spring Boot框架结合PDFBox来实现电子签章功能。 准备工作 环境搭建:确保你的开发环境中安装了JDK 8或更高版本,并且配置好了Maven或