本文主要是介绍kafka 集成整合外部插件(springboot,flume,flink,spark),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一 kafka集成springboot
1.工程结构
2.pom文件
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/> <!-- lookup parent from repository --></parent>
<!--springboot 启动 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
3.resouce配置文件
spring.application.name=kf-demo
# 指定 kafka 的地址
spring.kafka.bootstrapservers=192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092
#指定 key 和 value 的序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer# =========消费者配置开始=========
# 指定 kafka 的地址
#spring.kafka.bootstrapservers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 指定 key 和 value 的反序列化器
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=test2
4.生产者
package com.ljf.spring.boot.demo.producerspt;import com.ljf.spring.boot.demo.utils.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;/*** @ClassName: ProducerSpt* @Description: TODO* @Author: liujianfu* @Date: 2022/04/12 08:16:35* @Version: V1.0**/
@RestController
public class ProducerSpt {// Kafka 模板用来向 kafka 发送数据@AutowiredKafkaTemplate<String, String> kafka;@RequestMapping("/send")public String data(String msg) {Map map= new HashMap<>();map.put("name","beijing");map.put("time", DateUtils.dateToStr(new Date(), "yyyy-MM-dd HH:mm:ss"));map.put("msg",msg);kafka.send("kafka-ljf", map.toString());return "ok";}
}
5.配置消费者
package com.ljf.spring.boot.demo.consumerspt;import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;/*** @ClassName: KafkaConsumer* @Description: TODO* @Author: liujianfu* @Date: 2022/04/12 08:21:59* @Version: V1.0**/
@Configuration
public class KafkaConsumer {// 指定要监听的 topic@KafkaListener(topics = "kafka-ljf")public void consumeTopic(String msg) { // 参数: 收到的 valueSystem.out.println("收到的信息: " + msg);}
}
6.启动zk,kafka集群,启动程序
启动程序
7.测试
生产端:
消费端:
二 kafka集成flume
1.作为生产者
2.作为消费者
具体代码实现见百度网盘pdf
三 kafka集成flink
1.作为生产者
2.作为消费者
见百度网盘
四 集成spark
资料见百度网盘
这篇关于kafka 集成整合外部插件(springboot,flume,flink,spark)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!