简介

慢消费者在非持久性主题上会出现问题,因为它们会强制broker为它们在内存保留旧消息。一旦填满,就会导致broker放慢生产者的速度,导致快的消费者也会变慢。

目前,有一个策略可以让你配置broker除了prefect bufer之外还将为消费者保留的最大匹配的消息数。在达到此最大值后,当新消息进入时,旧消息将被丢弃。这将允许你在内存中保留当前消息并继续向慢消费者发送消息,但会丢弃旧消息。

Pending Message Limit Strategy

你可以在Destination map配置PendingMessageLimitStrategy的实现类,以便不同topic有不用的策略来处理慢速消费者。例如,你可能希望将此策略引用于价格非常高的,但对于交易和订单而言,你可能不希望丢弃旧的消息。

该策略计算消费者在内存中保留的最大待处理消息数(高于prefetch size)。值为0意味着除了prefect size外不保留任何消息。大于0的值将保留该数量的消息,在新消息进入时丢弃旧的消息。值为-1表示禁止丢弃消息。

目前有两种不同的策略实现:

  • ConstantPendingMessageLimitStrategy
  • PrefetchRatePendingMessageLimitStrategy

ConstantPendingMessageLimitStrategy

此策略对所有使用者使用常量限制(高于其预取大小)。

示例:

1
<constantPendingMessageLimitStrategy limit="50"/>

PrefetchRatePendingMessageLimitStrategy

此种策略是将prefect size 乘以一个你配置的数来计算待处理消息的最大数量。比如,你可以为每个消费者保留大约2.5倍的prefect size的消息。

示例:

1
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

使用Prefect策略来配置限制

在JMS客户端,您可以为持久性的和非持久性的queue和topic配置prefect策略。prefect策略还允许为每个连接/消费者指定最大的预处理消息的数量。

prefect 策略参考:http://activemq.apache.org/what-is-the-prefetch-limit-for.html

配置驱逐策略

ActiveMQ有一个MessageEvictionStrategy,用于决定哪个消息应该在慢速消费者身上被驱逐。 默认实现是:

1
<oldestMessageEvictionStrategy/>

这表示丢弃最旧的消息。

你还可以根据JMS消息属性来丢弃给定属性的消息。

示例:

1
<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>

propertyName是为特定的价格指定的JMS消息属性。上面的示例表示移除有STOCK属性且最旧的消息。

另外,还可以删除最低优先级且最旧的消息。

1
<oldestMessageWithLowestPriorityEvictionStrategy/>

示例

下面的例子展示了ActiveMQ borker的配置文件。对于PRICES.>通配符范围的topic,pendingMessageLimitStrategy属性设置为仅为每个消费者保留除Prefetch size外,再保留10条消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?xml version="1.0" encoding="utf-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="${brokername}">
<!-- lets define the dispatch policy -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>">
<!--消息分发策略:使用轮询策略-->
<dispatchPolicy>
<roundRobinDispatchPolicy/>
</dispatchPolicy>
<!--恢复策略:只恢复最后1个消息-->
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<!--对于ORDERS.开头的topic,消息分发策略为:按顺序分发-->
<policyEntry topic="ORDERS.>">
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<!-- 恢复最近1分钟内的消息 -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="PRICES.>">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
</pendingMessageLimitStrategy>
<!-- 10 seconds worth -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="10000"/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry tempTopic="true" advisoryForConsumed="true"/>
<policyEntry tempQueue="true" advisoryForConsumed="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>

使用技巧

如果你知道某个特定的消费者会变慢,那么设置它的prefect size小于快速消费者的大小。

例如,如果您知道特定服务器速度很慢并且您的消息速率非常高并且您有一些非常快的消费者,那么您可能希望启用此功能并将慢速服务器上的预取设置为略低于 快速服务器。

监测慢消费者的状况

您还可以使用JMX控制台查看活动订阅的统计信息。 这允许您在TopicSubscriptionViewMBean上查看以下统计信息:

统计信息 说明
discarded 在成为慢速消费者后,在订阅的生命周期中丢弃了多少条消息
matched 当前匹配的消息数量,只要预取缓冲区中有一些容量可用,就会立即发送到订阅。因此,非零值意味着此订阅的预取缓冲区已满

参考:http://activemq.apache.org/slow-consumer-handling.html