虚拟目的地运允许我们创建一个映射到一个或多个实际物理目的地的逻辑目的地,客户端可以使用逻辑目的地来发送和消费消息。

ActiveMQ支持的虚拟Destinations分为有两种,分别是

  1. 虚拟主题(Virtual Topics)
  2. 组合 Destinations(CompositeDestinations)

虚拟主题

发布订阅背后的想法非常棒。允许生产者与消费者分离,这样他们甚至不知道有多少消费者对他们发布的消息感兴趣。JMS规范定义了对持久主题的支持,但是正如我们将描述的,它们有一些限制……

JMS持久主题的限制

  1. 一个topic虽然可以有多个订阅者,但每个订阅者都是获取全量的消息,没法实现负载均衡。queue可以实现消费者端消息的负载均衡,但是broker不能讲消息发送给多个应用。所以没法实现发布订阅+消费者分组的功能。
  2. 由于只能使用单个的持久订阅在,如果该订阅者出错,应用就无法处理消息了,系统的健壮性不高。

为了解决上面的2个问题,ActiveMQ实现了虚拟topic的功能。
虚拟topic以VirtualTopic.开头,比如VirtualTopic.ORDER。

虚拟主题背后的想法是生产者以通常的JMS方式发送给主题。消费者可以继续使用JMS规范中的主题语义。但是,如果主题是虚拟的,则使用者可以从物理队列中使用逻辑主题订阅,从而允许许多使用者在许多计算机和线程上运行以对负载进行负载平衡。

为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能。使用起来非常简单。
对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。
对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。

示例

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Producer {

public static void main(String[] args) {
String topicName = "VirtualTopic.TEST";
List<String> msgList = Lists.newArrayList();
int msgSize = 10;
IntStream.rangeClosed(1,msgSize).forEach(i-> {
msgList.add("hello " + i);
});

String[] msgs = new String[msgSize];
msgs = msgList.toArray(msgs);
MqUtil.sendTopicMsg(topicName,msgs ,false,true,60000);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Consumer {

public static void main(String[] args) {
String queueNameA = "Consumer.A.VirtualTopic.TEST";
String queueNameB = "Consumer.B.VirtualTopic.TEST";
MqUtil.addMessageListener(queueNameA,false,msg->{
try {
System.out.println("Consumer A1 received:" + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
},false);
MqUtil.addMessageListener(queueNameA,false,msg->{
try {
System.out.println("Consumer A2 received:" + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
},false);

MqUtil.addMessageListener(queueNameB,false,msg->{
try {
System.out.println("Consumer B1 received:" + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
},false);
MqUtil.addMessageListener(queueNameB,false,msg->{
try {
System.out.println("Consumer B2 received:" + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
},false);
}
}

MQ工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public final class MqUtil {
private final static PooledConnectionFactory FACTORY;

static {
FACTORY = new org.apache.activemq.jms.pool.PooledConnectionFactory();
ConnectionFactory _factory = new ActiveMQConnectionFactory(Constant.MQ_USERNAME, Constant.MQ_PASSWORD, Constant
.uris);
FACTORY.setConnectionFactory(_factory);
FACTORY.setMaxConnections(100);
FACTORY.start();
Runtime.getRuntime().addShutdownHook(new Thread(FACTORY::stop));
}

public static void sendMsg(String DestinationName, String[] msgs, boolean isTopic, boolean isTransaction, boolean
isPersistent,
long
expiration) {
Connection connection = null;
Session session = null;
try {
connection = isTopic ? FACTORY.createTopicConnection() : FACTORY.createQueueConnection();
connection.start();
session = connection.createSession(isTransaction, Session.AUTO_ACKNOWLEDGE);
Destination destination = isTopic ? session.createTopic(DestinationName) : session.createQueue(DestinationName);
MessageProducer producer = session.createProducer(destination);
for (String msg : msgs) {
Message message = session.createTextMessage(msg);
if (expiration > 0) {
message.setJMSExpiration(expiration);
}
message.setJMSDeliveryMode(isPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
producer.send(message);
System.out.println("消息" + msg + "]发送成功.");
}
if (isTransaction) {
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();

if (isTransaction && session != null) {
try {
session.rollback();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
} finally {
closeResource(connection, session);
}
}

private static void closeResource(Connection connection, Session session) {
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

public static void sendQueueMsg(String queueName, String msg, boolean isTransction, boolean isPersistent, long
expiration) {
sendMsg(queueName, new String[]{msg}, false, isTransction, isPersistent, expiration);
}

public static void sendTopicMsg(String topicName, String msg, boolean isTransction, boolean isPersistent, long
expiration) {
sendMsg(topicName, new String[]{msg}, true, isTransction, isPersistent, expiration);
}

public static void sendTopicMsg(String topicName, String[] msgs, boolean isTransction, boolean isPersistent, long
expiration) {
sendMsg(topicName, msgs, true, isTransction, isPersistent, expiration);
}

public static void addMessageListener(String DestinationName, boolean isTopic, MessageListener listener, boolean
isTransaction) {
Connection connection;
Session session;
try {
connection = isTopic ? FACTORY.createTopicConnection() : FACTORY.createQueueConnection();
connection.start();
session = connection.createSession(isTransaction, Session.AUTO_ACKNOWLEDGE);
Destination destination = isTopic ? session.createTopic(DestinationName) : session.createQueue(DestinationName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(listener);
} catch (JMSException e) {
e.printStackTrace();
}
}

}

消费者控制台输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Consumer A1 received:hello 1
Consumer B1 received:hello 1
Consumer A2 received:hello 2
Consumer B2 received:hello 2
Consumer A1 received:hello 3
Consumer B1 received:hello 3
Consumer A2 received:hello 4
Consumer B2 received:hello 4
Consumer A1 received:hello 5
Consumer B1 received:hello 5
Consumer A2 received:hello 6
Consumer B2 received:hello 6
Consumer A1 received:hello 7
Consumer B1 received:hello 7
Consumer A2 received:hello 8
Consumer B2 received:hello 8
Consumer A1 received:hello 9
Consumer B1 received:hello 9
Consumer A2 received:hello 10
Consumer B2 received:hello 10

可以看到,A1和A2是一组,共同消费VirtualTopic.TEST的数据,B1和B2是一组,同时A1和A2,B1和B2依次消费。即达到了topic的功能又达到了负载均衡和failover的能力,在一组消费者中任何一个挂掉也不会影响消费。

需要注意的是:即使Queue消费了消息,VirtualTopic中的消息仍然不会删除,只有真正的topic订阅者消费消息后,VirtualTopic中的消息才会删除。当然,VirtualTopic中的消息删除,不会影响Queue中的消息,因为Queue中的消息是Copy过去的。

虚拟主题的默认配置

默认情况下,虚拟主题名称必须是VirtualTopic.>,消费者队列名称必须为Consumer.*.VirtualTopic.>,当日这个你是可以修改的。下面的示例显示如何使所有主题都成为虚拟主题。下面的示例使用名称>表示“匹配所有主题”。您可以使用此通配符在不同层级应用不同的虚拟主题策略。

1
2
3
4
5
6
7
<destinationInterceptors> 
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>

配置选项

选项 默认值 描述
selectorAware false 如果consumer端有selector,则只有匹配selector的消息才会分派到对应的queue中去。使用此选项可防止在独占使用者使用选择器时构建不匹配的消息
local false when true, don’t fan out messages that were received over a network
concurrentSend false 如果为true,则消息将并行发送,同时允许日志批量写入以减少磁盘IO。
transactedSend false when true, use a transaction for fanout sends such that there is a single disk sync. A local broker transaction will be created if there is no client transaction (5.13)
dropOnResourceLimit false 如果为true,将忽略发送期间跑出的ResourceAllocationException
setOriginalDestination true 如果为true,则转发消息上的目标设置为使用者队列,而originalDestination消息属性将跟踪虚拟主题(5.16)

VirtualSelectorCacheBrokerPlugin

当selectorAware=true时,只有活动的消费者才被限定为选择器匹配的条件。如果消费者断开并重新连接,他们将错过消息。selectorAware=true的目的是不构建消息。virtualSelectorCacheBrokerPlugin提供了一个缓存,它可以跟踪与某个消费者的目的地相关联的选择器,这样它们就可以应用于该消费者。这样,所选的消息就会累积起来。现有的选择器集可以被持久化,以便在重启时可以恢复。插件以正常的方式应用于plugins部分。

1
2
3
<plugins>
<virtualSelectorCacheBrokerPlugin persistFile="<some path>/selectorcache.data" />
</plugins>

组合目的地

组合目的地允许在各个目的地上建立一对多的关系; 主要用于组合队列。 例如,当消息发送到队列A时,您可能还希望将其转发到队列B和C以及主题D.然后,组合目的地是从虚拟目的地到其他物理目的地集合的映射。 在这种情况下,映射是在broker端,客户端不知道目的地之间的映射。 这与客户端组合目的地不同,客户端使用URL表示法指定必须将消息发送到的实际物理目标。

以下示例显示如何在XML配置中设置元素,以便在将消息发送到MY.QUEUE时,它实际上会转发到物理队列FOO和主题BAR。

1
2
3
4
5
6
7
8
9
10
11
12
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="MY.QUEUE">
<forwardTo>
<queue physicalName="FOO" />
<topic physicalName="BAR" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>

默认情况下,订阅者不能直接从组合队列或主题消费消息 - 它只是一个逻辑构造。 比如上述配置,订阅者只能消费来自FOO和BAR的消息; 而不是MY.QUEUE。

1
2
3
4
5
<compositeQueue name="IncomingOrders" forwardOnly="false"> 
<forwardTo>
<topic physicalName="Notifications" />
</forwardTo>
</compositeQueue>

发送到IncomingOrders的消息将全部复制并转发到Notifications,然后再放入物理IncomingOrders队列供订阅者使用。

如果未定义forwardOnly属性或将其设置为true,则compositeQueue和compositeTopic之间没有逻辑差异 - 它们可以互换使用。 只有当通过使用forwardOnly使组合目的地成为物理目的地时,compositeTopic / compositeQueue的选择才会对行为产生影响。

使用过滤的目的地

从Apache ActiveMQ 4.2开始,您现在可以使用选择器来定义虚拟目的地。

您可能希望创建一个虚拟目的地,该目的地将消息转发到多个目的地,但首先使用选择器来确定消息是否确实必须转到特定目的地。

以下示例显示如果消息匹配指定的选择器,发送到虚拟目的地MY.QUEUE的消息将转发到FOO和BAR。

1
2
3
4
5
6
7
8
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> 
<compositeQueue name="MY.QUEUE">
<forwardTo>
<filteredDestination selector="odd = 'yes'" queue="FOO"/>
<filteredDestination selector="i = 5" topic="BAR"/>
</forwardTo>
</compositeQueue>
</virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>

避免broker网络中的重复消息

当真正的subscriber和Queue都同时存在VirtualTopic中的时候,而且你的broker架构采用了“forward-brige”结构,那么你需要增加如下配置来避免消息的重复转发问题。在forward-brige架构中,任何通道中的消息都会forward到其他network node中(其他broker上),当然这个虚拟的Queue的消息也不例外。

1
2
3
4
5
<networkConnectors> <networkConnector uri="static://(tcp://localhost:61617)">
<excludedDestinations>
<queue physicalName="Consumer.*.VirtualTopic.>"/>
</excludedDestinations>
</networkConnector> </networkConnectors>

参考:http://activemq.apache.org/virtual-destinations.html
参考:https://shift-alt-ctrl.iteye.com/blog/2065436
参考:https://blog.csdn.net/zhu_tianwei/article/details/46303419