概述

在发生以下情形时,消息会给重发给客户端:

  1. 使用了一个事务性的会话且调用了rollback()方法。
  2. 在调用commit()方法前一个事务性的会话被关闭了。
  3. 一个会话使用CLIENT_ACKNOWLEDGE的ACK模式,且调用了Session.recover()方法。
  4. 一个客户端连接超时(可能正被执行的代码执行的时间超过配置的超时时间)。

客户端可以通过ActiveMQConnection.getRedeliveryPolicy()ActiveMQConnectionFactory.getRedeliveryPolicyMap()来覆盖策略设置。

方法:

1
2
3
4
5
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

当一个消息被redelivered超过maximumRedeliveries(缺省为6次,具体设置请参考后面的链接)次数时,会给broker发送一个”Poison ack”,这个消息被认为是a poison pill,这时broker会将这个消息发送到DLQ,以便后续处理。

缺省的死信队列是ActiveMQ.DLQ,所有不能投递的消息都会被发送到这个队列,这将会导致难以管理。你可以在activemq.xml这个配置文件的destinationPolicy配置individualDeadLetterStrategy ,它可以让给为一个给定的queue或topic指定特定的死信队列前缀。如果你希望所有的queue都拥有自己的死信队列,你可以使用通配符。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<broker>
   
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <!-- 使用>通配符为所有的queue设置下面的策略 你也可以指定具体的队列名-->
        <policyEntry queue=">">
          <deadLetterStrategy>
            <!--
              queuePrefix:指定死信队列的前缀为DLQ.
useQueueForQueueMessages=true,表示使用队列来保存死信
            -->
            <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
          </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
   
</broker>

自动丢弃过期消息

如果你只想丢弃过期的消息,不想发送到死信队列,你可以在一个死信队列策略中配置processExpired=false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<broker>
   
  <destinationPolicy>
   <policyMap>
     <policyEntries>
       <!-- 使用>通配符来使所有的队列使用下面的策略 -->
       <policyEntry queue=">">
         <!--
           Tell the dead letter strategy not to process expired messages
           so that they will just be discarded instead of being sent to
           the DLQ
         -->
         <deadLetterStrategy>
           <sharedDeadLetterStrategy processExpired="false" />
         </deadLetterStrategy>
       </policyEntry>
     </policyEntries>
   </policyMap>
  </destinationPolicy>
   
</broker>

非持久消息保存到死信队列

默认情况下,ActiveMQ不会将不能投递的非持久消息放到死信队列。如果你希望将非持久消息存储到死信队列,你可以在死信队列的策略中设置processNonPersistent="true"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<broker>
   
  <destinationPolicy>
   <policyMap>
     <policyEntries>
       <!-- Set the following policy on all queues using the '>' wildcard -->
       <policyEntry queue=">">
         <!--
           Tell the dead letter strategy to also place non-persisted messages
           onto the dead-letter queue if they can't be delivered.
         -->
         <deadLetterStrategy>
           <sharedDeadLetterStrategy processNonPersistent="true" />
         </deadLetterStrategy>
       </policyEntry>
     </policyEntries>
   </policyMap>
  </destinationPolicy>
   
</broker>

设置死信队列中消息的过期时间

默认情况下,ActiveMQ永远不会使发送到DLQ的消息失效。 但是,在ActiveMQ 5.12中,deadLetterStrategy支持到期属性,其值以毫秒为单位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<broker>
   
  <destinationPolicy>
   <policyMap>
     <policyEntries>
       <policyEntry queue="QueueWhereItIsOkToExpireDLQEntries">
         <deadLetterStrategy>
           <.... expiration="300000"/>
         </deadLetterStrategy>
       </policyEntry>
     </policyEntries>
   </policyMap>
  </destinationPolicy>
   
</broker>

DLQ丢弃插件

从ActiveMQ5.9开始,一个Destination的policyEntry支持丢弃的deadLetterStrategy。

1
2
3
<deadLetterStrategy>
    <discarding/>
</deadLetterStrategy>

这个与丢弃插件的功能相同,但这个是基于每个Destination的。丢弃插件支持正则表达式匹配,这在某些情况下很有用。

此插件允许全部或基于Java SE的正则表达式匹配的queue和topic,丢弃发送到DLQ中的消息。这在使用ConstantPendingMessageLimit策略或其他逐出规则,但不想有一个其他的消费者来清理DLQ时很有用。

下面是一个丢弃所有内容的一个基本配置:

1
2
3
4
5
6
7
<beans>
  <broker>
    <plugins>
      <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true"/>
    </plugins>
  </broker>
</beans>

下面是一个稍微复杂点的例子:

1
2
3
4
5
6
7
<beans>
  <broker>
    <plugins>
      <discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000"/>
    </plugins>
  </broker>
</beans>

这个例子中,只针对指定的queue和topic有效。各个destination用空格隔开。
reportInterval属性用于表示我们输出丢弃的消息的频率 - 使用0来禁用。

下面是一个使用正则表达式的例子:

1
2
3
4
5
6
7
<beans>
  <broker>
    <plugins>
      <discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}" reportInterval="3000"/>
    </plugins>
  </broker>
</beans>

这里匹配的是以000~999结尾的目的地。

Broker消息重发插件

默认情况下,在消息重新投递次数达到配置的最大投递次数(或默认的6次),broker会将消息放入DLQ。我们可以使用broker消息重发插件来改变这一行为。即在一定延迟后,将消息重新投递给原始Destination,如果达到最大重试次数,则放入DLQ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<broker schedulerSupport="true">

<plugins>
<!-- 重发策略,对于超过重发次数的消息将会被添加到DLQ -->
<redeliveryPlugin fallbackToDeadLetter="true"
sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<redeliveryPolicyEntries>
<!--
重发机制,默认重发6,重发延迟基于backOff模式
-->
<redeliveryPolicy queue="SpecialQueue"
maximumRedeliveries="4"
redeliveryDelay="10000"/>
</redeliveryPolicyEntries>

<defaultEntry>
<!-- 其他Destination的默认处理策略 -->
<redeliveryPolicy maximumRedeliveries="4"
initialRedeliveryDelay="5000"
redeliveryDelay="10000"/>
</defaultEntry>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>

</broker>

sendToDlqIfMaxRetriesExceeded:如果为true,则在达到最大重试次数后,会发送到DLQ,否则会删除该消息。

参考:http://activemq.apache.org/message-redelivery-and-dlq-handling.html