本文主要是介绍ActiveMQ-Prefetch机制和constantPendingMessageLimitStrategy,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
首先简要介绍一下prefetch机制。ActiveMQ通过prefetch机制来提高性能,这意味这 客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由prefetch limit来控 制。当某个consumer的prefetch buffer已经达到上限,那么broker不会再向consumer 分发消息,直到consumer向broker发送消息的确认。可以通过在 ActiveMQConnectionFactory或者ActiveMQConnection上设置 ActiveMQPrefetchPolicy对象来配置prefetch policy。也可以通过connection options或者destination options来配置。例如:
1 2 3 4 5 6 7 8 9 10 11 | tcp: //localhost:61616?jms.prefetchPolicy.all=50 tcp: //localhost:61616?jms.prefetchPolicy.queuePrefetch=1 queue = new ActiveMQQueue( "TEST.QUEUE?consumer.prefetchSize=10" ); prefetch size 的缺省值如下: • persistent queues ( default value: 1000 ) • non-persistent queues ( default value: 1000 ) • persistent topics ( default value: 100 ) • non-persistent topics ( default value: Short.MAX_VALUE - 1 ) |
慢消费者会在非持久的 topics 和 queue上导致问题:一旦消息积压起来,会导致 broker 把大量消息保存在内存中,broker 也会因此而变慢。ActiveMQ采用一定的缓存策略来进行控制,例如
1 2 3 4 5 6 7 | <policyEntry topic= "PRICES.>" > <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit= "50" /> // constantPendingMessageLimitStrategy 限制内存中只保留50条最新消息,其余将会被剔除或保存在temp store中 </pendingMessageLimitStrategy> </policyEntry> <prefetchRatePendingMessageLimitStrategy multiplier= "2.5" /> // prefetchRatePendingMessageLimitStrategy是倍数策略,如果prefetchSize为100,则保留2.5 * 100条消息 |
这篇关于ActiveMQ-Prefetch机制和constantPendingMessageLimitStrategy的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!