简述

activemq提供了多种方式来保证activemq的可靠性。
包括:

  • 纯Master/Slave
  • Shared File System Master Slave
  • JDBC Master Slave
  • Broker clusters-静态
  • Broker clusters-动态(基于组播,动态发现brokers)

但单纯的使用上面的一种没法既达到高可用,同时有具有负载均衡的能力。

生成环境集群搭建建议

所以可以考虑Master/Slave+Broker clusters-静态来实现。Master/Slave保证了Slave复制master的数据,Broker clusters-静态实现了非消息生产者的broker拥有对外提供消费的能力。即在broker1上生产了消息,如果broker1与broker2配置了static network Connectors,那么客户端监听broker2也可以拿到broker1生产的消息。所以生产环境建议二者集合。

生产环境:

  • 如果数据量不大,可以考虑Zookeeper来搭建Master/Slave。
    由于ZK选举至少需要2N+1个节点,所以mq至少要3个节点。如果生产消息的broker挂掉,ZK会从其他的节点选择一个作为Master。客户端使用failover会自动连接到提升为Master的节点,消费挂掉的broker的消息没有问题。

  • 数据量比较大,考虑Zookeeper+静态网络连接来实现高可用与负载均衡能力。
    比如2个ZK(假定分别为zk1和zk2),每个ZK分别管理1组(3台)MQ节点。这样启动全部的MQ节点,2组MQ节点中会分别有1台Master对外提供服务。然后2组MQ之间通过static network Connectors+duplex=true来实现failover功能。

配置1:

这种配置,一个缺点就是没有保障ZK的高可用。

如果希望ZK也高可用,则每组ZK至少配置3个ZK节点(ZN+1原则)。
比如下面的配置2组ZK+2组MQ。

作用 openwire端口 admin端口 zk端口
mq1 61616 8161 zk1,2,3的2181,2182,2183 Group1
mq2 61617 8162 zk1,2,3的2181,2182,2183 Group1
mq3 61618 8163 zk1,2,3的2181,2182,2183 Group1
mq4 61616 8161 zk4,5,6的2181,2182,2183 Group2
mq5 61617 8162 zk4,5,6的2181,2182,2183 Group2
mq6 61618 8163 zk4,5,6的2181,2182,2183 Group2
zk1 / / 2181 Group1
zk2 / / 2182 Group1
zk3 / / 2183 Group1
zk4 / / 2181 Group2
zk5 / / 2182 Group2
zk6 / / 2183 Group2

这里假定Group1和Group2是2台机器,ZK1和ZK2为2台机器。其中,mq1-3为Group1,zkAddress=zk1:2181,zk2:2182,zk3:2183.mq4-6为Group2,zkAddress=zk4:2181,zk5:2182,zk6:2183.注意:6台MQ的brokerName必须全部一样。

当然了,你还可以继续扩展,比如3组ZK+3组MQ,这样同时对外提供服务的MQ就是3台。

关键配置

1.ZK集群搭建

那一组ZK来说
在该组ZK的根目录创建data目录,然后创建3个子目录,名字分别为1,2,3,在每个子目录下面创建myid文件,内容与目录名字相同(即如果属于目录1,则内容为1)。

在每个ZK节点的配置文件zoo.cfg中最后加入:

1
2
3
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

每个ZK节点的dataDir指向它所属的data目录。比如ZK1的dataDir=/usr/local/zookeeper/data/1。其中/usr/local/zookeeper是该组ZK集群的根目录。

2.ActiveMQ集群配置

打开activemq.xml,修改或加入如下内容:

  1. 修改brokerName;
  2. 中的内容为:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    <persistenceAdapter>
               <!-- <kahaDB directory="${activemq.data}/kahadb"/>-->
               <replicatedLevelDB 
                   directory="${activemq.data}/leveldb" 
                   replicas="3"
                   bind="tcp://0.0.0.0:0"
                   zkAddress="127.0.0.1:2181,127.0.0.1:2181,127.0.0.1:2183"
                   zkPassword=""
                   zkPath="/activemq-cluster/leveldb-stores/group1"
                   hostname="vm1"
                   sync="local_disk"/>
    

此处需注意:replicas是MQ节点的数量,需要为2N+1.zkAddress指定ZK集群的地址,多个地址用逗号分隔,zkPath保证不与其他ZK组的path一样(如果2组ZK在不同的机器可以忽略),否则会导致选举Master出现问题,因为各个ZK组都查找到了相同的节点。

  1. 修改transportConnector
    1
    2
    3
    4
    
    <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>
    

一个是修改openwrie的端口,然后把其他协议的注释掉。(这里根据需要,只保留使用的协议)。

  1. 增加static network Connector配置
    1
    2
    3
    
    <networkConnectors>
                <networkConnector uri="static:(tcp://192.168.199.199:61619,tcp://192.168.199.199:61620,tcp://192.168.199.199:61621)" duplex="true" />
            </networkConnectors>
    

这里的IP和端口是其他Group的IP和端口。因为你希望在Group1生产的数据在Group2的节点能够消费,反之亦然。所以这里配置的是其他组的IP和端口。另外,需要配置duplex=true。否则,比如Group1的mq1生产消息,然后停止mq1,此时消费者连接上Group2,是没法消费消息的。配置上duplex=true,就可以保证在Group2也能消费到Group1生产的消息。反之亦然。

  1. 修改每个mq的管理端口
    修改jetty.xml中的port即可。

  2. 应用程序配置
    程序使用failover来连接broker。
    比如:

    1
    2
    3
    4
    5
    6
    7
    
    final static String uris = 
    "failover:(tcp://192.168.199.199:61616,tcp://192.168.199.199:61617,tcp://192.168.199.199:61618," +        
    "tcp://192.168.199.199:61619,tcp://192.168.199.199:61620,tcp://192.168.199.199:61621)" +        
    "?randomize=true&initialReconnectDelay=1000&maxReconnectDelay=30000";
    
    final static String MQ_USERNAME = "admin";
    final static String MQ_PASSWORD = "admin";
    

上面配置完毕后,先启动各个ZK,没问题再启动各个MQ节点。观察日志输出,每组ZK只会有一个是Master,其他是Slave。每组MQ只会有一个是Master,其他是Slave。

配置完毕后,将MQ的openwire的端口对外开放,程序就可以访问了。

集群测试

  1. 测试某组MQ节点之间数据是否正常(测试Master/Slave功能);
    生产者和消费者都使用failover连接所有MQ节点,生产者发送消息。消息发送完毕后,将发送消息的MQ节点停掉,然后消费者连接发送消息的MQ所在组的其他MQ节点,看是否正常消费消息。
    比如Group1节点分别为mq1,mq2,mq3,假定发送消息的MQ节点是mq1,那么发送消息完毕后,停掉mq1.然后启动消费者,看消费者能否消费消息。这里生产者和消费者都使用failover连接mq1,mq2,mq3.
  1. 测试组间数据是否正常(测试Static NetWork Connector功能)。
    生产者与#1一致,发送消息后关闭该mq节点。消费者使用failover连接另外一组MQ,看是否能够消费消息。

各种MQ集群配置参考:ActiveMQ集群搭建详解