参考ActiveMQ的集群方案对比及部署

本文使用的是activemq的master slave集群,俗称高可用,并没有考虑负载均衡。使用Zookeeper来管理各个broker.

由于zookeeper选举原则是2N+1,所以至少要有3个broker。否则会提示“Not enough cluster members when using LevelDB replication”。这样就没法选择Master。

本文的例子是在Windows64位环境。

zookeeper安装

下载就不说了,我下载的是zookeeper-3.3.6版本。下载下来后解压缩,然后在conf目录新建zoo.cfg文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=D:/luckystar88/soft/zookeeper-3.3.6/zookeeper-3.3.6/data/1
# the port at which the clients will connect
clientPort=2181

启动zookeeper
执行$zookeeper_home$/bin下的zkServer.cmd即可。

activemq安装

下载不多说,我这里使用的是apache-activemq-5.14.2版本。
解压缩,新建目录activemqtest,复制到该目录,并重命名为brokerA。

同样的,复制brokerA的副本,并重命名,得到brokerB,brokerC。这样就有3个broker。

activemq各个broker配置

brokerA的配置
打开brokerA/conf下的activemq.xml文件。

1.找到<broker…,将brokerName设置为一个值,比如broker1.

2.找到下面的代码,并注释掉。

1
2
3
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

3.增加下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
<persistenceAdapter>    
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="192.168.33.87:2181"
zkPassword=""
hostname="192.168.33.87"
sync="local_disk"
zkPath="/activemq/leveldb-stores/group1"
/>
</persistenceAdapter>

zkAddress的IP即为安装和打开zookeeper的电脑的IP,port即为zoo.cfg中配置的clientPort。

4.修改transportConnector的端口为61616.

1
2
3
4
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://192.168.33.87:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

5.修改jetty.xml中的端口为8161.

1
2
3
4
5
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>

brokerB的配置
修改同brokerA,但transportConnector的端口改为61617,jetty的端口改为8162.

brokerC的配置
修改同brokerA,但transportConnector的端口改为61618,jetty的端口改为8163.

启动activemq集群

为了启动方便,我写了一个bat脚本,即activemqtest目录下的startCluster.bat。内容如下:

1
2
3
4
5
6
7
8
9
10
d:
cd luckystar88/soft/activemqtest
cd brokerA/bin/win64
start activemq.bat

cd ../../../brokerB/bin/win64/
start activemq.bat

cd ../../../brokerC/bin/win64/
start activemq.bat

双击startCluster.bat,即可启动。(前提是zookeeper要先启动)

如上图所示,其中一个broker成为了master,其他2个成为了slave。

java client连接activemq集群

配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<bean id="jmsFactory1_node1" class="org.apache.activemq.pool.PooledConnectionFactory"  init-method="start"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!--<property name="brokerURL" value="failover://(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618,tcp://192.168.10.237:61619,tcp://192.168.10.237:61620,tcp://192.168.10.237:61621)"></property>-->
<property name="brokerURL" value="failover:(tcp://192.168.33.87:61616,tcp://192.168.33.87:61617,tcp://192.168.33.87:61618)" ></property>
<property name="userName" value="admin"></property>
<property name="password" value="admin"></property>
<property name="useAsyncSend" value="true"></property>
<property name="alwaysSessionAsync" value="false"></property>
<property name="optimizeAcknowledge" value="true"></property>
<property name="producerWindowSize" value="1024000"></property>
</bean>
</property>
<property name="maxConnections" value="5"></property>
<property name="idleTimeout" value="0"></property>
<property name="expiryTimeout" value="600000"></property>
</bean>

消息生产者可以每隔几秒钟发送一条数据到mq,消费者一直监听消息。
在发送几条数据后,关闭master的broker,可以发现还是可以继续发送信息,并不会出错。而且http://localhost:8161,http://localhost:8162,http://localhost:8163同一时间只有一个能打开。

注意事项

3个broker的brokerName,zkPath等必须一致。