本文主要是介绍Sprint Boot集成Kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Sprint Boot集成Kafka
pom.xml
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.1.RELEASE</version></dependency>
application.properties
......#============== kafka ===================
kafka.consumer.zookeeper.connect=192.168.2.10:2181
kafka.consumer.servers=192.168.2.10:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10kafka.producer.servers=192.168.2.10:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960......
KafkaProducerConfig
package com.youfan.kafka;import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${kafka.producer.servers}")private String servers;@Value("${kafka.producer.retries}")private int retries;@Value("${kafka.producer.batch.size}")private int batchSize;@Value("${kafka.producer.linger}")private int linger;@Value("${kafka.producer.buffer.memory}")private int bufferMemory;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, linger);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}
}
@Autowired
private KafkaTemplate kafkaTemplate;
/*** 埋点收集日志,频道数量少,该模块的频道id固定,id为1,代表生鲜*/long pindaoid = 1;//频道id//productytpeid//产品类别id//producid//产品id//用户idlong userid = 0;//游客//ip地址String ipaddress = IpUtil.getIpAddress(request);System.out.println(ipaddress);HttpSession sesion = request.getSession();Object userobject = sesion.getAttribute("user");if(userobject!=null){User user = (User)userobject;userid = user.getId();}//获取浏览器信息以及操作系统信息String osandbrowser = BrowserInfoUtil.getOsAndBrowserInfo(request);System.out.println(osandbrowser);String[] temps = osandbrowser.split("---");String os = temps[0].trim();String browser = temps[1].trim();System.out.println(os);System.out.println(browser);Productscanlog productscanlog = new Productscanlog();//根据ip获取地区和运营商try {AreaAndnetwork areaAndnetwork = AreaAndNetworkUtil.getAddressByIp(ipaddress);productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setProvice(areaAndnetwork.getProvice());productscanlog.setCity(areaAndnetwork.getCity());productscanlog.setCounty(areaAndnetwork.getCounty());productscanlog.setNetwork(areaAndnetwork.getNetwork());}catch (Exception e){e.printStackTrace();}productscanlog.setPindaoid(pindaoid);productscanlog.setProductytpeid(productytpeid);productscanlog.setProducid(Long.valueOf(producid+""));productscanlog.setUserid(userid);productscanlog.setIp(ipaddress);productscanlog.setBrowser(browser);productscanlog.setOs(os);productscanlog.setTimestamp(new Date().getTime());String productscanlogstring = JSONObject.toJSONString(productscanlog);System.out.println(productscanlogstring);kafkaTemplate.send("productscanlog", "key", productscanlogstring);String productflume = userid +"\t" + pindaoid+"\t"+productscanlog.getTimestamp();kafkaTemplate.send("productscanlogflume","key",productflume);
==============================
QQ群:143522604
群里有相关资源
欢迎和大家一起学习、交流、提升!
==============================
这篇关于Sprint Boot集成Kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!