spring结合activemq消息过期配置

包括queue和topic的消息过期配置。发送消息使用的spring-jms提供的JmsTemplate。

queue的配置
设置pubSubDomain为false,默认即为false。需要将explicitQosEnabled设置为true,过期时间要生效依赖它。timeToLive即为过期时间,本例中设置的是10秒过期。

topic的配置
设置pubSubDomain为true,表示是发布订阅模式。explicitQosEnabled设置为true,timeToLive设置过期时间。

spring结合ActiveMQ过期消息

完整的spring配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd" >
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="fileEncoding" value="UTF-8" />
<property name="locations">
<list>
<value>classpath:mq.properties</value>
</list>
</property>
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<description>JMS连接工厂</description>
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="${mq.brokerURL}"/>
<property name="userName" value="${mq.userName}"/>
<property name="password" value="${mq.password}"/>
<!--<property name="trustAllPackages" value="true"/>-->
<property name="useAsyncSend" value="true"/>
<property name="useDedicatedTaskRunner" value="false"/>
<property name="optimizeAcknowledge" value="true"></property>
<property name="producerWindowSize" value="1024000"></property>
</bean>
</property>
<property name="sessionCacheSize" value="${mq.sessionCacheSize}"/>
</bean>
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<description>PTP模式模型</description>
<constructor-arg ref="connectionFactory"/>
<property name="defaultDestination" ref="destinationQueue" />
<property name="pubSubDomain" value="false"/>
<property name="deliveryPersistent" value="true" />
<!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久,可以不配置,默认就是持久-->
<property name="deliveryMode" value="2" />
<property name="explicitQosEnabled" value="true" />
<property name="timeToLive" value="10000" />
</bean>
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<description>发布/订阅模式模型</description>
<constructor-arg ref="connectionFactory"/>
<property name="defaultDestination" ref="destinationTopic" />
<property name="pubSubDomain" value="true"/>
<property name="deliveryPersistent" value="true" />
<!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
<property name="deliveryMode" value="2" />
<property name="explicitQosEnabled" value="true" />
<property name="timeToLive" value="10000" />
</bean>
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="test.topic" />
</bean>
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="test.queue" />
</bean>
</beans>

发送代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
ApplicationContext ac = new ClassPathXmlApplicationContext("spring-jms-producer.xml");
JmsTemplate j = (JmsTemplate) ac.getBean("jmsTopicTemplate");
j.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("hello,world");
}
});
System.out.println("消息发送完毕");
System.exit(0);
}

queue和topic的发送一样,不再赘述。

发送消息后
对于queue,在Activemq Console的queues中可以看到Messages Enqueued数量加1

10秒后,Messages Dequeued中加1,同时ActiveMQ.DLQ(即死信队列)中加1.

对于topic,可以看到Pending Queue Size加1.

10秒后,Pending Queue Size减1.
,,同时ActiveMQ.DLQ中加1.

死信队列消息的处理

死信队列的消息来源包括2部分:
1.过期的消息,如上面的例子;
2.客户端消息处理异常,导致MQ反复重新发送,最终达到阀值(默认为6次),放入死信队列。

对于第1点
如果有大量的消息,或消费者消费的太慢,或订阅者offline,会导致死信队列存在大量的过期消息。最终可能会导致磁盘爆满。
解决方法:
1.监听死信队列
这样我们可以知道哪些数据处理失败,同时因为消费了消息,死信队列中的消息减少,这样避免了磁盘爆满。

2.过期的消息不放到死信队列。
在activemq.xml中的增加一个策略:

1
2
3
4
5
<policyEntry topic=">" expireMessagesPeriod="30000">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
</policyEntry>

expireMessagesPeriod=30000表示每隔30秒检查是否过期
processExpired为false表示不保存过期消息到死信队列,为true则是保留。
参考http://ask.csdn.net/questions/376817
ActiveMQ消息策略:http://blog.csdn.net/wangtaomtk/article/details/51531354

对于第2点
可以将重新投递的次数设置小一些,因为1次处理异常,多次也是一样。

更多ActiveMQ使用和优化

Spring+ActiveMQ消息持久化,Topic持久化订阅
http://blog.csdn.net/u014756827/article/details/77896930

activemq性能优化:
http://blog.csdn.net/yinwenjie/article/details/50991443

ActiveMQ讯息传送机制以及ACK机制
http://www.oschina.net/question/2854673_2190316?sort=time

消息预取限制:activeMQ 消息量限制 与 性能
http://www.360doc.com/content/11/1027/19/1542811_159668819.shtml

ActiveMQ消息策略
http://blog.csdn.net/wangtaomtk/article/details/51531354

折腾ActiveMQ时遇到的问题和解决方法
http://blog.163.com/_kid/blog/static/3040547620161634230453/

Donny wechat
欢迎关注我的个人公众号
打赏,是超越赞的一种表达。
Show comments from Gitment