简介

ActiveMQ的设计目标是成为一个高性能的消息总线。这意味着使用SEDA架构可以异步执行尽可能多的工作。 为了有效利用网络资源,Broker利用“推送”模型向消费者发送消息。 这可确保消费者始终拥有准备处理的消息的本地缓冲区。 替代方案是让消费者明确地从Broker那里提取消息。 单独提取消息不是非常有效,并且会增加每个消息的延迟。

但是,在不限制推送给消费者的消息数量的情况下,客户端的资源可能会被耗尽。因为消息消费通常比消息发送慢的多。为了避免这种情况,ActiveMQ使用预取限制(Prefetch Limit)来限制可以一次发送给单个消费者的最大消息数。消费者依次使用预取限制来调整其预取消息缓冲区的大小。

一旦broker向消费者发送了预取数量的消息,它将不会再向该消费者发送任何消息,直到消费者至少确认了50%的预取消息,比如prefetch size / 2。当broker收到确认后,它将向消费者发送prefect size一半的消息,以填充消费者的本地缓冲区。注意,可以为每个消费者指定预取消息数量。

在数据量较大的情况下,建议使用较大的预取值。但是,对于数据量较小的情况,每个消息需要很长时间处理,预取值应该设置为1.这可以保证消费者一次只处理一条消息。但是,将预取值设置为0将导致消费者每次轮询一条消息,而不是将消息推送给消费者。

  • 何为慢速消费者?

    一个消费者当前待处理的消息数量达到配置的prefetch size的2倍以上,那么该消费者就是慢速消费者。

指定Prefect策略

你可以在ActiveMQConnectionFactory或ActiveMQConnection上指定ActiveMQPrefectPolicy。这允许你配置单独的预取值。
比如:

如果是在AciveMQConnectionFactory上配置PrefetchPolicy,那么就是所有的Connection都使用此策略;如果是在某个ActiveMQConnection上配置,则是对此Connection起作用。

  • 持久化的queue(默认值:1000)
  • 非持久化的queue(默认值:1000)
  • 持久化的topic(默认值:100)
  • 非持久化的topic(默认值:Short.MAX_VALUE-1)

还可以在用来和broker建立连接的URI上配置prefect限制。

1
tcp://localhost:61616?jms.prefetchPolicy.all=50

上面的配置表示所有的Destination的预取值限制为50.

1
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

上面的配置表示所有的queue的预取值为1,即每次只推送1条消息给queue的消费者。

也可以针对某个consumer配置,比如:

1
2
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);

消费者池与预取

使用消费者池消费消息时,预取将会是一个问题。未被消费的预取消息只有在消费者关闭时才会释放,但是为了池中的消费者可以被重用,关闭会被延迟到消费者池关闭。这将导致预取的消息直到消费者被重用时才会被消费。从性能角度来看,这个特性是可以被接受的。但是,当池中有多个消费者存在时,这将导致消息投递顺序错乱。 因此,org.apache.activemq.pool.PooledConnectionFactory不会对消费者进行池化。

Springs CachingConnectionFactory支持池化消费者(默认情况下关闭)。如果您在Spring的 DefaultMessageListenerContainer(DMLC)中配置了一个包含多个Consumer线程一起使用的CachingConnectionFactory,那么您要么关闭CachingConnectionFactory中的消费者池(默认情况下是关闭的),要么在使用消费者池时将预取值设置为0.这与每次调用receive(timeout)方法,消费者将使用pool的方式来获取消息。通常建议关闭Spring的CachingConnectionFactory以及任何其他允许池化JMS消费者框架中的缓存。

需要注意的是,Spring的DefaultMessageListenerContainer(DMLC)及其CACHE_CONSUMER缓存级别不受此问题的影响!Spring的DMLC在某种意义上并不池化消费者(pool consumers),即其内部并不使用由多个消费者实例组成的消费者池。相反,它会缓存消费者,也就是说,在DMLC实例的整个生命周期中,都会重用同一个JMS 消费者对象来接收所有消息。其表现很像单纯的手写JMS代码-创建JMS连接,会话,消费者,然后用这个消费者实例接收所有消息。

因此在Spring的DMLC中使用CACHE_CONSUMER是没问题的,即使是使用多个消费者线程,除非你在使用XA事务。CACHE_CONSUMER对XA事务并不生效。然而本地JMS事务和非事务性消费者在Spring的DMLC中使用CACHE_CONSUMER是没问题的。

另请注意,Camel的JMS或ActiveMQ组件在内部使用Springs DMLC。 所以关于Springs DMLC和CACHE_CONSUMER的所有内容都适用于这两个Camel组件。

内存与性能的权衡

设置相对较高的预取值可以提高性能。因此,默认值通常大于1000,并且对于对于topic会更高。预取值的大小决定了客户端RAM中保存的消息个数。因此如果RAM有限,你可能需要设置一个较低的值,比如1或10等。

参考:http://activemq.apache.org/what-is-the-prefetch-limit-for.html
参考:https://blog.csdn.net/a19881029/article/details/85730150