分发策略

queue的分发策略

可插拔的分发策略只适用于topic。queue的分发策略比较固定:轮询(默认)或按照严格顺序。同时我们也应该了解prefect的意义。

ActiveMQ的prefetch缺省参数是针对处理大量消息时的高性能和高吞吐量而设置的,因此默认的prefect值很大,默认的分发策略会尽快尝试将预取缓冲区填满(prefetch buffers)。

然而在有些情况下,例如只有少量的消息而且单个消息的处理时间比较长,那么在缺省的prefetch和dispatch policies下,这些少量的消息总是倾向于被分发到个别的consumer上。这样就会因为负载的不均衡分配而导致处理时间的增加。

对于队列,你可以选择使用轮询或按严格顺序(strictOrderDispatch)。
strictOrderDispatch表示在直到当前消费者的prefetch缓冲区满了之后才选择下一个消费者进行消息的分发。

通过下面的方式来启用按严格顺序分发的策略:

1
<policyEntry queue=">" strictOrderDispatch="false" />

如果你有几个优先级不同的消费者,消息会先发送给优先级最高的消费者,直到它的prefect缓冲区满,然后再下一个,等等。

从5.14.0版开始——strictOrderDispatch=true选项将确保只有一个消费者时重新发送消息的顺序是严格的。

topic的分发策略

所有实现了org.apache.activemq.broker.region.policy.DispatchPolicy的都可以。默认实现是org.apache.activemq.broker.region.policy.SimpleDispatchPolicy,它将消息传递给所有的订阅者。一个更高级的实现示例是org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy,它只会分发给拥有最高优先级的网络消费者。这在循环网络拓扑结构中非常有用,因为在这种拓扑结构中,到消费者的路由不止一条。

下面是一个Destination策略配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<destinationPolicy> 
  <policyMap> 
    <policyEntries> 
      <policyEntry topic="FOO.>"> 
        <dispatchPolicy> 
          <roundRobinDispatchPolicy/> 
        </dispatchPolicy>  
        <subscriptionRecoveryPolicy> 
          <lastImageSubscriptionRecoveryPolicy/> 
        </subscriptionRecoveryPolicy> 
      </policyEntry>  
      <policyEntry topic="ORDERS.>"> 
        <dispatchPolicy> 
          <strictOrderDispatchPolicy/> 
        </dispatchPolicy>  
        <!-- 1 minutes worth -->  
        <subscriptionRecoveryPolicy> 
          <timedSubscriptionRecoveryPolicy recoverDuration="60000"/> 
        </subscriptionRecoveryPolicy> 
      </policyEntry>  
      <policyEntry topic="PRICES.>"> 
        <!-- lets force old messages to be discarded for slow consumers -->  
        <pendingMessageLimitStrategy> 
          <constantPendingMessageLimitStrategy limit="10"/> 
        </pendingMessageLimitStrategy>  
        <!-- 10 seconds worth -->  
        <subscriptionRecoveryPolicy> 
          <timedSubscriptionRecoveryPolicy recoverDuration="10000"/> 
        </subscriptionRecoveryPolicy> 
      </policyEntry>  
      <policyEntry tempTopic="true" advisoryForConsumed="true"/>  
      <policyEntry tempQueue="true" advisoryForConsumed="true"/> 
    </policyEntries> 
  </policyMap> 
</destinationPolicy>

参考:http://activemq.apache.org/dispatch-policies.html
参考:https://blog.51cto.com/1754966750/1923299