本文主要是介绍sring coud 2集成kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
安装zookeeper
docker run --privileged=true --name zookeeper -p 2181:2181 -d zookeeper
安装kafka
192.168.0.33为外网访问地址
docker run --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.33:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.33:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d wurstmeister/kafka
maven
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
application.yml
Spring Cloud 2 中 zk-nodes不用设置
spring:cloud:stream:bindings:shop_input:binder: kafka1consumer:headerMode: rawproducer:headerMode: raw#绑定的kafka topic名称destination: shop-topiccontent-type: text/plainshop_output:binder: kafka1consumer:headerMode: rawproducer:headerMode: rawdestination: shop-topiccontent-type: text/plainbinders:#可以配置多个kafkakafka1:type: kafkaenvironment:spring:cloud:stream:kafka:binder:#kafka地址brokers: http://kafka:9092auto-add-partitions: trueauto-create-topics: truemin-partition-count: 1
Source
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;/*** @author yl*/
public interface MySource {String SHOP_OUTPUT = "shop_output";@Output(MySource.SHOP_OUTPUT)MessageChannel output();}
Sink
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;/*** @author yl*/
public interface MySink {String SHOP_INPUT = "shop_input";@Input(MySink.SHOP_INPUT)SubscribableChannel input();}
KafkaSendTemplate
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;/*** kafka消息发送模板** @author yl*/
@EnableBinding(MySource.class)
public class KafkaSendTemplate {@Autowiredprivate MySource source;public void sendMessage(String msg) {try {source.output().send(MessageBuilder.withPayload(msg).build());} catch (Exception e) {e.printStackTrace();}}
}
KafkaConsumer
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;/*** kafka消息监听** @author yl*/
@EnableBinding(MySink.class)
@Slf4j
public class KafkaConsumer {@AutowiredSpCartService spCartService;@StreamListener(MySink.SHOP_INPUT)public void onReceive(String shopJson) {log.info(shopJson);ShopKafkaDTO shopKafkaDTO = JSONObject.parseObject(shopJson, ShopKafkaDTO.class);log.info("get Kafka message:{}", shopKafkaDTO);}
}
MyController
定义一个controller发送消息
@RestController
public class MyController {@Autowiredprivate KafkaSendTemplate kafkaSendTemplate;@GetMapping("/send")public void sendMessage(@RequestParam("message") String message) {kafkaSendTemplate.sendMessage(message);}
}
这篇关于sring coud 2集成kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!