本文主要是介绍kafaka发送接收消息stream方式实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.配置文件
input为接收,output为发送
如果发送接收在同一个程序中,则不需要加上consumer: headerMode:raw ,如果本程序仅是接收消息进行消费,需要加上consumer: headerMode:raw
spring: cloud: stream: bindings: input-collect: contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYgroup: account-devinput-order: consumer: headerMode: rawcontentType: text/plain;charset=UTF-8destination: ACCOUNT_ORDER_NOTIFYgroup: account-devoutput-collect: contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYoutput-watch: contentType: text/plain;charset=UTF-8destination: ACCOUNT_WATCH_NOTIFYkafka: binder: brokers: 192.168.1.158:9092,192.168.1.159:9092,192.168.1.160:9092zkNodes: 192.168.1.158:2181,192.168.1.159:2181,192.168.1.160:2181
2.发送消息outPut分类Bean
public interface NotifyMessageChannel {String COLLECT_OUTPUT = "output-collect"; String WATCH_OUTPUT ="output-watch" ; @Output(NotifyMessageChannel.COLLECT_OUTPUT)MessageChannel collectOutPut(); @Output(NotifyMessageChannel.WATCH_OUTPUT)MessageChannel watchOutPut(); }
3.发送消息service
NotifyMessageChannel中定义了2个发送MessageChannel,发送时可以直接.collectOutPut().send,选择不同的output进行发送
@Service @Slf4j @EnableBinding(NotifyMessageChannel.class) public class NotifyServiceImpl implements NotifyService {@Autowired private NotifyMessageChannel notifyMessageChannel; private ObjectMapper mapper = new ObjectMapper(); @Override public void sendUserCollectCourse(UserCollectCourseNotify userCollectCourseNotify) {try {Boolean result = notifyMessageChannel.collectOutPut().send(MessageBuilder.withPayload(mapper.writeValueAsString(userCollectCourseNotify)).build()) ; log.info("send result:"+result); } catch (Exception e) {log.error("Exception from create user UserCollectCourse.", e); }} }
4.接收消息input配置
public interface ReceiveMessageChannel {String COLLECT_INPUT = "input-collect"; String ORDER_INPUT ="input-order" ; @Input(ReceiveMessageChannel.COLLECT_INPUT)SubscribableChannel collectInput(); @Input(ReceiveMessageChannel.ORDER_INPUT)SubscribableChannel orderInput(); }
5.监听接收到的消息,进行消费处理
@Service @Slf4j @EnableBinding(ReceiveMessageChannel.class) public class CollectListener {private UserCollectCourseClient userCollectCourseService; private ObjectMapper mapper = new ObjectMapper(); public CollectListener(UserCollectCourseClient userCollectCourseClient) {this.userCollectCourseService = userCollectCourseClient; }@StreamListener(ReceiveMessageChannel.COLLECT_INPUT)public void process(Message<String> message) {log.debug("Received Notify:[{}]",message.toString()); String content = message.getPayload(); UserCollectCourseNotify uccn; try {uccn = mapper.readValue(content,UserCollectCourseNotify.class); log.debug("Received Notify:[userId:{},courseId:{}]",uccn.getUserId(),uccn.getCourseId()); if(uccn!=null){log.info("receive UserCollectCourseNotify:"+uccn); // userCollectCourseService.saveUserCollectCourse(ucci); // log.debug("Save Collect to Mongo:[userId:{},courseId:{}]",ucci.getUserId(),ucci.getCourseId()); }} catch (Exception e) {log.warn("RECEIVE Collect NOTIFY ERROR:[message_body:{},error:{}]",message.toString(),e.getLocalizedMessage()); }} }
这篇关于kafaka发送接收消息stream方式实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!