JMS支持Request/Reply模式,在消息生产者发送消息时设置回复消息的目的地,在Consumer接收到消息后,可以回复消息。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
* jms请求回复模式。producer发送消息到某个Destination,在发送时设置回复的目的地,consumer在接收到消息后可以在回复的目的地回复消息。
* @author Donny
* @email luckystar88@aliyun.com
* @date 2019/03/16 17:39
*/
public class ReplyToMessage {

    public static void main(String[] args) {
        final String sendQueue = "testQueue";
        final String replyQueue = "replyQueue";
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = null;
        Session session = null;
        try {
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            // 发送目的地
            Queue sendQ = session.createQueue(sendQueue);
            Queue replyQ = session.createQueue(replyQueue);
            MessageProducer producer = session.createProducer(sendQ);
            TextMessage msg = session.createTextMessage("hello,this is a message.");
            // 设置消息回复的目的地
            msg.setJMSReplyTo(replyQ);
            producer.send(msg);

            // 消息消费者,监听sendQ
            MessageConsumer consumer = session.createConsumer(sendQ);
            Session _session = session;
            consumer.setMessageListener(message -> {
                try {
                    System.out.println("Consumer received msg:" + ((TextMessage)(message)).getText());
                    // 回复消息
                    MessageProducer mp = _session.createProducer(replyQ);
                    TextMessage replyMsg = _session.createTextMessage("hello,message is received.");
                    mp.send(replyMsg);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });

            // 监听回复的消息目的地
            MessageConsumer replyMC = session.createConsumer(replyQ);
            replyMC.setMessageListener(message -> {
                try {
                    System.out.println("Consumer2 received reply msg:" +((TextMessage)(message)).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });

            System.in.read();
        } catch (JMSException | IOException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

控制打印如下:

1
2
Consumer received msg:hello,this is a message.
Consumer2 received reply msg:hello,message is received.