rabbitmq单个消费者与多个消费者混合使用

吐槽

springboot对rabbitmq的集成很好,但springboot的配置只能对所有Queue统一配置,但有时候需要一个队列一个消费者,有时候一个队列多个消费者。这时就没法使用springboot的rabbitmq的相关配置了,只能通过代码来实现。

要实现上面的目的,有两种实现方式:

1.单消费者使用simple模式,多消费者根据需要选择其他工作模式;

2.通过在消费者的@RabbitListner上指定containerFactory来实现。

实现

实现方式1

关于rabbitmq的simple模式介绍,可以参考:https://zhuanlan.zhihu.com/p/24335916

我们的发送方代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
public class RabbitMQSender {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg(String exchange,String routingKey,Object data) {
rabbitTemplate.convertAndSend(exchange,routingKey,data);
}

/**
* 简单模式,1对1,生产者将消息放入队列,消费者从队列取消息。
* @param routingKey
* @param data
*/
public void sendSimpleMsg(String routingKey,Object data) {
rabbitTemplate.convertAndSend(routingKey,data);
}

public void sendSimpleMsg() {
IntStream.rangeClosed(1,10).forEach(idx-> {
sendSimpleMsg("test","hello " + idx);
});
}
}

如上,sendSimpleMsg这里的routingkey实际是等于要发送到的queue,发送到默认的exchange。通过这种方式可以实现单个队列单个消费者,多个队列多个消费者就通过上面的sendMsg(exchange,routingKey,data)来发送,接收端的代码都一致。

接收端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
@ConditionalOnProperty(name="spring.rabbitmq.consumer.test.enable", havingValue = "true")
@Slf4j
public class RatioMqConsumer {
@RabbitHandler
@RabbitListener(queues = "test")
public void onMessage(Message message,Channel channel) throws IOException, InterruptedException {
String data = new String(message.getBody(), "UTF-8");
log.info("ratio>>>{}接收到消息:{}",Thread.currentThread().getName(),data);
Map<String,Object> headers = message.getMessageProperties().getHeaders();
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//channel.basicAck(deliverTag, false);
}
}

实现方式2

发送端:都通过sendMsg(exchange,routingKey,data)来发送;

接收端:

a.多个消费者还是使用原来的代码;

b.单个消费者@RabbitListener加上containerFactory属性,在containerFactory的配置中指定concurrentConsumers属性为1。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RabbitHandler
@RabbitListener(queues = "test",containerFactory = "singleListenerContainer")
public void onMessage(Message message,Channel channel) throws IOException, InterruptedException {
String data = new String(message.getBody(), "UTF-8");
log.info("ratio>>>{}接收到消息:{}",Thread.currentThread().getName(),data);
Map<String,Object> headers = message.getMessageProperties().getHeaders();
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//channel.basicAck(deliverTag, false);
}

/**
* 单一消费者
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(@Qualifier("mqConnectionFactory") ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new SimpleMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}

注意:虽然@RabbitListener注解提供了一个concurrency参数,但是我发现配置为1还是会有多个消费者线程。

另外,如果要指定prefetch等参数,springboot的rabbitmq配置参数仍然只能全局配置,这个时候只能通过在消费端的@RabbitListener上指定containerFactory,通过ContainerFactory来指定了。

Author: Donny
Link: https://tommy88.top/2020/02/28/rabbitmq单个消费者与多个消费者混合使用/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
微信打赏