第5章 Kafka,构建TB级异步消息系统【仿牛客网社区论坛项目】

2024-05-15 08:04

本文主要是介绍第5章 Kafka,构建TB级异步消息系统【仿牛客网社区论坛项目】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

第5章 Kafka,构建TB级异步消息系统【仿牛客网社区论坛项目】

  • 前言
  • 推荐
  • 项目总结
    • 第5章 Kafka,构建TB级异步消息系统
      • 1.阻塞队列
      • 2. Kafka入门
      • 3.Spring整合Kafka
      • 4.发送系统通知
      • 5.显示系统通知
  • 最后

前言

2023-4-30 20:42:51

以下内容源自【Java面试项目】
仅供学习交流使用

推荐

仿牛客网项目【面试】

项目总结

第5章 Kafka,构建TB级异步消息系统

1.阻塞队列

2. Kafka入门

3.Spring整合Kafka

4.发送系统通知

package com.jsss.community.event;import com.alibaba.fastjson.JSONObject;
import com.jsss.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class EventProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;// 处理事件public void fireEvent(Event event) {// 将事件发布到指定的主题kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));}}
package com.jsss.community.event;import com.alibaba.fastjson.JSONObject;
import com.jsss.community.entity.DiscussPost;
import com.jsss.community.entity.Event;
import com.jsss.community.entity.Message;
import com.jsss.community.service.DiscussPostService;import com.jsss.community.service.ElasticsearchService;
import com.jsss.community.service.MessageService;
import com.jsss.community.util.CommunityConstant;
import com.jsss.community.util.CommunityUtil;import com.qiniu.common.QiniuException;
import com.qiniu.common.Zone;
import com.qiniu.http.Response;
import com.qiniu.storage.Configuration;
import com.qiniu.storage.UploadManager;
import com.qiniu.util.Auth;
import com.qiniu.util.StringMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaListeners;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;@Component
public class EventConsumer implements CommunityConstant {private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;@Value("${wk.image.command}")private String wkImageCommand;@Value("${wk.image.storage}")private String wkImageStorage;//七牛云@Value("${qiniu.key.access}")private String accessKey;@Value("${qiniu.key.secret}")private String secretKey;@Value("${qiniu.bucket.share.name}")private String shareBucketName;//定时任务@Autowiredprivate ThreadPoolTaskScheduler taskScheduler;//消费通知事件@KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误!");return;}// 发送站内通知Message message = new Message();message.setFromId(SYSTEM_USER_ID);message.setToId(event.getEntityUserId());//复用message conversation_id 做 topicmessage.setConversationId(event.getTopic());message.setCreateTime(new Date());//拼接语句  用户nowcoder 评论了你的 帖子 , 点击查看 !Map<String, Object> content = new HashMap<>();content.put("userId", event.getUserId());content.put("entityType", event.getEntityType());content.put("entityId", event.getEntityId());//附加信息 一律存入contentif (!event.getData().isEmpty()) {for (Map.Entry<String, Object> entry : event.getData().entrySet()) {content.put(entry.getKey(), entry.getValue());}}message.setContent(JSONObject.toJSONString(content));//复用message conversation_id 做 topic//92,1,111,like,"{""entityType"":1,""entityId"":237,""postId"":237,""userId"":112}",1,2019-04-13 22:20:58messageService.addMessage(message);}// 消费发帖事件@KafkaListener(topics = {TOPIC_PUBLISH})public void handlePublishMessage(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误!");return;}DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());elasticsearchService.saveDiscussPost(post);}// 消费删帖事件@KafkaListener(topics = {TOPIC_DELETE})public void handleDeleteMessage(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误!");return;}elasticsearchService.deleteDiscussPost(event.getEntityId());}// 消费分享事件@KafkaListener(topics = TOPIC_SHARE)public void handleShareMessage(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误!");return;}String htmlUrl = (String) event.getData().get("htmlUrl");String fileName = (String) event.getData().get("fileName");String suffix = (String) event.getData().get("suffix");String cmd = wkImageCommand + " --quality 75 "+ htmlUrl + " " + wkImageStorage + "/" + fileName + suffix;try {Runtime.getRuntime().exec(cmd);logger.info("生成长图成功: " + cmd);} catch (IOException e) {logger.error("生成长图失败: " + e.getMessage());}// 启用定时器,监视该图片,一旦生成了,则上传至七牛云.UploadTask task = new UploadTask(fileName, suffix);Future future = taskScheduler.scheduleAtFixedRate(task, 500);task.setFuture(future);}class UploadTask implements Runnable {// 文件名称private String fileName;// 文件后缀private String suffix;// 启动任务的返回值private Future future;// 开始时间private long startTime;// 上传次数private int uploadTimes;public UploadTask(String fileName, String suffix) {this.fileName = fileName;this.suffix = suffix;this.startTime = System.currentTimeMillis();}public void setFuture(Future future) {this.future = future;}@Overridepublic void run() {// 生成失败if (System.currentTimeMillis() - startTime > 30000) {logger.error("执行时间过长,终止任务:" + fileName);future.cancel(true);return;}// 上传失败if (uploadTimes >= 3) {logger.error("上传次数过多,终止任务:" + fileName);future.cancel(true);return;}String path = wkImageStorage + "/" + fileName + suffix;File file = new File(path);if (file.exists()) {logger.info(String.format("开始第%d次上传[%s].", ++uploadTimes, fileName));// 设置响应信息StringMap policy = new StringMap();policy.put("returnBody", CommunityUtil.getJSONString(0));// 生成上传凭证Auth auth = Auth.create(accessKey, secretKey);String uploadToken = auth.uploadToken(shareBucketName, fileName, 3600, policy);// 指定上传机房UploadManager manager = new UploadManager(new Configuration(Zone.zone1()));try {// 开始上传图片Response response = manager.put(path, fileName, uploadToken, null, "image/" + suffix, false);// 处理响应结果JSONObject json = JSONObject.parseObject(response.bodyString());if (json == null || json.get("code") == null || !json.get("code").toString().equals("0")) {logger.info(String.format("第%d次上传失败[%s].", uploadTimes, fileName));} else {logger.info(String.format("第%d次上传成功[%s].", uploadTimes, fileName));future.cancel(true);}} catch (QiniuException e) {logger.info(String.format("第%d次上传失败[%s].", uploadTimes, fileName));}} else {logger.info("等待图片生成[" + fileName + "].");}}}}

5.显示系统通知

   // 得到通知列表@RequestMapping(path = "/notice/list", method = RequestMethod.GET)public String getNoticeList(Model model) {User user = hostHolder.getUser();// 查询评论类通知Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT);if (message != null) {Map<String, Object> messageVO = new HashMap<>();messageVO.put("message", message);//还原commentString content = HtmlUtils.htmlUnescape(message.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);messageVO.put("user", userService.findUserById((Integer) data.get("userId")));messageVO.put("entityType", data.get("entityType"));messageVO.put("entityId", data.get("entityId"));messageVO.put("postId", data.get("postId"));int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT);messageVO.put("count", count);int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT);messageVO.put("unread", unread);model.addAttribute("commentNotice", messageVO);}// 查询点赞类通知message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE);if (message != null) {Map<String, Object> messageVO = new HashMap<>();messageVO.put("message", message);String content = HtmlUtils.htmlUnescape(message.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);messageVO.put("user", userService.findUserById((Integer) data.get("userId")));messageVO.put("entityType", data.get("entityType"));messageVO.put("entityId", data.get("entityId"));messageVO.put("postId", data.get("postId"));int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE);messageVO.put("count", count);int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE);messageVO.put("unread", unread);model.addAttribute("likeNotice", messageVO);}// 查询关注类通知message = messageService.findLatestNotice(user.getId(), TOPIC_FOLLOW);if (message != null) {Map<String, Object> messageVO = new HashMap<>();messageVO.put("message", message);String content = HtmlUtils.htmlUnescape(message.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);messageVO.put("user", userService.findUserById((Integer) data.get("userId")));messageVO.put("entityType", data.get("entityType"));messageVO.put("entityId", data.get("entityId"));int count = messageService.findNoticeCount(user.getId(), TOPIC_FOLLOW);messageVO.put("count", count);int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_FOLLOW);messageVO.put("unread", unread);model.addAttribute("followNotice", messageVO);}// 查询未读消息数量int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);model.addAttribute("letterUnreadCount", letterUnreadCount);int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);model.addAttribute("noticeUnreadCount", noticeUnreadCount);return "site/notice";}// 得到通知详情@RequestMapping(path = "/notice/detail/{topic}", method = RequestMethod.GET)public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model) {User user = hostHolder.getUser();//分页信息page.setLimit(5);page.setPath("/notice/detail/" + topic);page.setRows(messageService.findNoticeCount(user.getId(), topic));//通知信息List<Message> noticeList = messageService.findNotices(user.getId(), topic, page.getOffset(), page.getLimit());List<Map<String, Object>> noticeVoList = new ArrayList<>();if (noticeList != null) {for (Message notice : noticeList) {Map<String, Object> map = new HashMap<>();// 通知map.put("notice", notice);// 内容String content = HtmlUtils.htmlUnescape(notice.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);map.put("user", userService.findUserById((Integer) data.get("userId")));map.put("entityType", data.get("entityType"));map.put("entityId", data.get("entityId"));map.put("postId", data.get("postId"));// 通知作者map.put("fromUser", userService.findUserById(notice.getFromId()));noticeVoList.add(map);}}model.addAttribute("notices", noticeVoList);// 设置已读List<Integer> ids = getLetterIds(noticeList);if (!ids.isEmpty()) {messageService.readMessage(ids);}return "site/notice-detail";}

最后

这篇博客能写好的原因是:站在巨人的肩膀上

这篇博客要写好的目的是:做别人的肩膀

开源:为爱发电

学习:为我而行

这篇关于第5章 Kafka,构建TB级异步消息系统【仿牛客网社区论坛项目】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/991267

相关文章

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

在C#中获取端口号与系统信息的高效实践

《在C#中获取端口号与系统信息的高效实践》在现代软件开发中,尤其是系统管理、运维、监控和性能优化等场景中,了解计算机硬件和网络的状态至关重要,C#作为一种广泛应用的编程语言,提供了丰富的API来帮助开... 目录引言1. 获取端口号信息1.1 获取活动的 TCP 和 UDP 连接说明:应用场景:2. 获取硬

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

2.1/5.1和7.1声道系统有什么区别? 音频声道的专业知识科普

《2.1/5.1和7.1声道系统有什么区别?音频声道的专业知识科普》当设置环绕声系统时,会遇到2.1、5.1、7.1、7.1.2、9.1等数字,当一遍又一遍地看到它们时,可能想知道它们是什... 想要把智能电视自带的音响升级成专业级的家庭影院系统吗?那么你将面临一个重要的选择——使用 2.1、5.1 还是

高效管理你的Linux系统: Debian操作系统常用命令指南

《高效管理你的Linux系统:Debian操作系统常用命令指南》在Debian操作系统中,了解和掌握常用命令对于提高工作效率和系统管理至关重要,本文将详细介绍Debian的常用命令,帮助读者更好地使... Debian是一个流行的linux发行版,它以其稳定性、强大的软件包管理和丰富的社区资源而闻名。在使用

Ubuntu系统怎么安装Warp? 新一代AI 终端神器安装使用方法

《Ubuntu系统怎么安装Warp?新一代AI终端神器安装使用方法》Warp是一款使用Rust开发的现代化AI终端工具,该怎么再Ubuntu系统中安装使用呢?下面我们就来看看详细教程... Warp Terminal 是一款使用 Rust 开发的现代化「AI 终端」工具。最初它只支持 MACOS,但在 20

windows系统下shutdown重启关机命令超详细教程

《windows系统下shutdown重启关机命令超详细教程》shutdown命令是一个强大的工具,允许你通过命令行快速完成关机、重启或注销操作,本文将为你详细解析shutdown命令的使用方法,并提... 目录一、shutdown 命令简介二、shutdown 命令的基本用法三、远程关机与重启四、实际应用

Python 中 requests 与 aiohttp 在实际项目中的选择策略详解

《Python中requests与aiohttp在实际项目中的选择策略详解》本文主要介绍了Python爬虫开发中常用的两个库requests和aiohttp的使用方法及其区别,通过实际项目案... 目录一、requests 库二、aiohttp 库三、requests 和 aiohttp 的比较四、requ

Debian如何查看系统版本? 7种轻松查看Debian版本信息的实用方法

《Debian如何查看系统版本?7种轻松查看Debian版本信息的实用方法》Debian是一个广泛使用的Linux发行版,用户有时需要查看其版本信息以进行系统管理、故障排除或兼容性检查,在Debia... 作为最受欢迎的 linux 发行版之一,Debian 的版本信息在日常使用和系统维护中起着至关重要的作