JMS Selectors用在获取消息的时候,可以基于消息属性和Xpath语法对消息进行过滤。JMS Selectors由SQL92语义定义。以下是个Selectors的例子:

1
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");

1:JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等

2:需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值

3:表达式中的属性不会自动进行类型转换,例如:

1
myMessage.setStringProperty("NumberOfOrders", "2");

那么此时“NumberOfOrders > 1” 求值结果会是false

4:Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,

例如:

设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是“A”、“B”和“C”。然后令consumer A使用“JMXGroupID = ‘A’”作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有以下缺点:

(1)producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。

(2)如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。

下面我们使用JMS Selectors来实现消息分组功能

  • 消息生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    MessageProducer producer = session.createProducer(destination);
    for (int i=0;i<10;i++) {
    TextMessage msg = session.createTextMessage("hello " + i);
    if (i % 2 == 0)
    msg.setStringProperty("JMSXGroupID","groupA");
    else
    msg.setStringProperty("JMSXGroupID","groupB");
    producer.send(msg);
    }
  • 消费者1

    1
    2
    3
    4
    5
    6
    7
    8
    MessageConsumer consumer = session.createConsumer(destination,"JMSXGroupID='groupA'");
    consumer.setMessageListener(msg -> {
    try {
    System.out.println("Received msg:" + ((TextMessage)msg).getText());
    } catch (JMSException e) {
    e.printStackTrace();
    }
    });
  • 消费者2

    1
    2
    3
    4
    5
    6
    7
    8
    MessageConsumer consumer = session.createConsumer(destination,"JMSXGroupID='groupB'");
    consumer.setMessageListener(msg -> {
    try {
    System.out.println("Received msg:" + ((TextMessage)msg).getText());
    } catch (JMSException e) {
    e.printStackTrace();
    }
    });

消费者1控制台输出:

1
2
3
4
5
Received msg:hello 0
Received msg:hello 2
Received msg:hello 4
Received msg:hello 6
Received msg:hello 8

消费者2控制台输出:

1
2
3
4
5
Received msg:hello 1
Received msg:hello 3
Received msg:hello 5
Received msg:hello 7
Received msg:hello 9

参考:《ActiveMQ in Action》、ActiveMQ(23):Consumer高级特性