RocketMQ之Producer

2024-03-11 23:18
文章标签 rocketmq producer

本文主要是介绍RocketMQ之Producer,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1

Producer有个ProducerGroup的值需要设置,因为Producer是可以分布式部署的,我们需要将逻辑上属于一个整体的producer关联起来,那就靠ProducerGroup这个值来设置的,同属一个group的producer产生的消息理论上应该是一个业务类型。构造Producer的时候可以指定:

private final DefaultMQProducer PRODUCER = new DefaultMQProducer("Producer-Group1");

2

Producer可以设置消息发送失败重试的次数和发送超时时间具体方法如下:

// 设置异步发送失败重试次数,默认2
PRODUCER.setRetryTimesWhenSendAsyncFailed(3);
// 设置同步发送失败重试次数,默认2
PRODUCER.setRetryTimesWhenSendFailed(3);
// 设置发送超时时间,包括同步和异步,默认3000ms
PRODUCER.setSendMsgTimeout(3000);

3

Producer可以设置InstanceName参数,instance代表的是Producer实例clientId,RocketMQ默认会使用ip@instanceName@unitName的方式来区分Producer实例,instanceName如果缺省就会被替换成PID,unitName缺省会为空,如果在一个JVM里启动多个Producer实例且缺省InstanceName值,那么clientId将会是一样,这时他们会使用同一个MQClientInstance,MQClientInstance是Consumer、Producer与NameServer、Broker通信的API,这样会导致消息只会发送到一个Broker

PRODUCER.setInstanceName("producer-instance1");

4

队列个数设置,同时在Broker的配置文件中也可以设置defaultTopicQueueNums=8,而且读、写队列的个数可以不一致,读队列针对Consumer,写队列针对Producer,如果写队列个数大于读队列个数,那么Producer会轮询写入各个写队列,但是多出来的队列不会被Consumer消费,也就是说有消息无法被消费,如果写队列个数小于读队列个数,那么多出来的是空的队列,没意义,所以读队列是依赖写队列的,实际存在的队列应该是以写队列

PRODUCER.setDefaultTopicQueueNums(6);

5

一个Producer启动时只与一个NameServer建立联系,然后从NameServer拉取Broker地址列表,然后与所有Broker建立联系,每个长连接都会有心跳机制保活。Producer启动的时候也会启动各种定时服务。主要有1、Producer与NameServer维护长连接,每隔30S获取其发送消息topic的最新数据,覆盖上次的MessageQueue,2、Producer每隔10min向其订阅topic所在的所有Broker发送心跳信息,心跳信息仅包含ProducerGroup。Broker为与Producer的长连接注册了ChannelEventListener,当Producer宕机时,立即删除此Producer的心跳信息。定时任务在Producer启动时开启,主要是MQClientInstance#start()方法。Producer和Consumer启动时都会调用MQClientInstance#start()方法,这里面有很多定时任务。

6

在RocketMQ中DefaultMQProducer是暴露给用户使用的,真正的实现是DefaultMQProducerImpl,这两个类没有继承关系,只是聚合关系,DefaultMQProducerImpl有自己独立的继承关系,和DefaultMQProducer没有任何关系,虽然他俩名字很暧昧。从具体设计上看到DefaultMQProducer因为面向应用,所以暴露的API和拥有的数据成员都是用户使用相关。我们主要看DefaultMQProducerImpl和MQClientInstance实现的区别和联系。一般情况下MQClientInstance和DefaultMQProducerImpl是一一对应的关系,当然你可以达到一对多的效果。MQClientInstance是客户端统一处理类,他涵盖了Producer和Consumer和mqadmin工具等的处理,这个是架构设计的表现了。也就是说Producer、Consumer、mqadmin等都会分别调用MQClientInstance的一部分功能,这里只看DefaultMQProducerImpl使用了MQClientInstance的哪些能力。

 

这篇关于RocketMQ之Producer的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/799387

相关文章

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

【Rocketmq入门-基本概念】

Rocketmq入门-基本概念 名词解释名称服务器(NameServer)消息队列(Message Queue)主题(Topic)标签(Tag)生产者(Producer)消费者(Consumer)拉取模式(Pull)推送模式(Push)消息模型(Message Model) 关键组件Broker消息存储工作流程 名词解释 名称服务器(NameServer) 定义: 名称服务器

centos7 安装rocketmq4.7.0以及RocketMQ-Console-Ng控制台

一、前置工作 1.1安装jdk8 https://blog.csdn.net/pang_ping/article/details/80570011 1.2安装maven https://www.cnblogs.com/116970u/p/11211963.html 1.3安装git https://blog.csdn.net/xwj1992930/article/details/964

RocketMQ 介绍

前言 消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360°的刁难。 作为一个在互联网公司面一次拿一次Offer的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。 于是在一个寂寞难耐的夜晚,我痛定思痛,决定开始写《吊打面试官》系列,希望能帮助各位读者以后面试势如破竹,

基于 RocketMQ 的云原生 MQTT 消息引擎设计

作者:沁君 概述 随着智能家居、工业互联网和车联网的迅猛发展,面向 IoT(物联网)设备类的消息通讯需求正在经历前所未有的增长。在这样的背景下,高效和可靠的消息传输标准成为了枢纽。MQTT 协议作为新一代物联网场景中得到广泛认可的协议,正逐渐成为行业标准。 本次我们将介绍搭建在 RocketMQ 基础上实现的 MQTT 核心设计,本文重点分析 RocketMQ 如何适应这些变化,通过优化存储

Rocketmq源码分析(1)

此次源码分析-rocketmq-spring-boot-starter,starter众所周知入口点就是AutoConfiguration.RocketMQAutoConfiguration.class // 标识为配置类@Configuration//将RocketMQProperties识别为配置属性类,创建对象并注入到spring容器中@EnableConfigurationProp

Docker创建Rocketmq-4.8.0镜像

rocketmq包含namesrv和broker两部分,这里不使用docker-compose编排,而是将这两部分分别创建容器. 一. namesrv 1. Dockerfile编写 FROM java:8ENV ROCKETMQ_VERSION 4.8.0ENV NAMESRV_HOME /home/rocketmq/namesrv-${ROCKETMQ_VERSION}ENV JAVA_

RocketMQ广播消费消息

1、 基础概念 RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。 集群消费模式(Cluster): 在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。 广播消费模式(Broadcast)

ActiveMQ、RocketMQ、RabbitMQ、Kafka

特点:解耦、异步、削峰 特性ActiveMQRabbitMQRocketMQkafka开发语言javaerlangjavascala单机吞吐量万级万级10万级10万级时效性ms级us级ms级ms级以内可用性高(主从架构)高(主从架构)非常高(分布式架构)非常高(分布式架构)功能特性成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好基于erlang开发,所以并发能力很强,性能极其好,延

RocketMQ高级特性三-消费者分类

目录 前言 概述 区别 PullConsumer 定义与概述 原理机制 使用场景 优缺点 Java 代码示例 SimpleConsumer 定义与概述 原理机制 使用场景 优缺点 Java 代码示例 PushConsumer 定义与概述 原理机制 使用场景 优缺点 Java 代码示例 总结 前言 RocketMQ中的消费者分类主要包括三种类型: