持久化方式

目前ActiveMQ包含以下几种存储方式:

  • AMQ消息存储
    基于文件的存储方式,是以前的默认消息存储
  • JDBC消息存储
    使用数据库作为消息存储
  • KahaDB消息存储
    提供了容量的提升和恢复能力,是现在的默认存储方式
  • LevelDB消息存储
    基于Google的LevelDB库
  • Replicated LevelDB消息存储
    复制的LevelDB存储,需要ZK来选择master节点。

如果你不想使用activeMQ的持久存储,可以在activemq.xml中设置persistent=false来禁用它。

1
<broker persistent="false"> </broker>

这样broker会使用<memoryPersistenceAdapter>

各种存储方式介绍

AMQ消息存储

AMQ介绍

官方文档说ActiveMQ5.0及以上版本,AMQ消息存储是默认的存储方式,这个是不对的。在早期版本的ActiveMQ中默认使用的是AMQ,在5.0以后使用的是KahaDB。

AMQ消息存储是一种可嵌入的事务性消息存储解决方案,非常快速和可靠。消息以日志的形式存储在data log中,以实现持久化。同时被reference store进行索引以提高存取速度。消息索引存储在内存(cache)中,ActiveMQ会定期将索引持久化到Reference store以提高性能。data log文件是有容量限制的,默认为32MB,可以自行配置。当该data log文件中所有消息都被消费完毕的时候,data log文件会被标记为可删除或存档,在下一次消息清理时可以被删除或归档。
AMQ Store

配置

AMQ消息存储配置示例:

1
2
3
4
5
6
7
8
<broker brokerName="broker" persistent="true" useShutdownHook="false">
	<persistenceAdapter>
		<amqPersistenceAdapter directory="${activemq.base}/activemq-data" maxFileLength="32mb"/>
	</persistenceAdapter>
	<transportConnectors>
		<transportConnector uri="tcp://localhost:61616"/>
	</transportConnectors>
</broker>

AMQ存储配置选项

属性名 默认值 说明
directory activemq-data 存储消息文件和日志的目录
useNIO true 使用 NIO 特性
syncOnWrite false 同步写文件到磁盘
maxFileLength 32mb Message Data日志文件的最大 Size
persistentIndex false 持久化日志索引,如果设为 false ,则在内存中保存
maxCheckpointMessageAddSize 4kb 在自动提交前在事务中能保持的最大消息数
cleanupInterval 30000 每隔多少时间清理不再使用的消息日志(毫秒)
indexBinSize 1024 这个值是用来提升索引的性能的,值越大,索引相对性能越好
indexKeySize 96 index key的size,index key基于message id
indexPageSize 16kb 索引页的size
directoryArchive archive 消费完的Data Log存放的目录
archiveDataLogs false 设置为true的话,消费完的Data Log就放到Archive目录,而不是删除。

AMQ存储数据结构

在AMQ消息存储中有如下目录结构:
AMQ Dir

broker name
broker name用来区分消息数据的目录。默认目录是localhost。下面是它的子目录:

archive
丢弃的Data Log就放到这里,当archiveDataLogs 属性配置为true时才会存在

journal
用来保存消息数据日志

kr-store
reference store目录。

data
引用索引所在目录

state
存储的状态。如果broker没有正确关闭,那么将清理reference store indexes,并回放消息数据文件(包括消息/ack和事物边界),以重建消息存储状态。如果使用Kaha reference store(缺省值),通过删除/krstore/state目录可以强制恢复。

tmp-storage
用于保存可能存储在磁盘上的临时消息的数据文件,以减少内存消耗——例如,非持久性主题消息等待交付给活动但速度较慢的订阅者。

KahaDB消息存储

介绍

KahaDB是一个基于文件的持久性数据库,并对快速持久化做了优化。它是ActiveMQ5.4及后续版本的默认存储机制。KahaDB使用较少的文件描述符,并提供了比它的前身AMQ更快的恢复能力。

配置

要使用KahaDB作为broker的持久性适配器,你可以按如下方式配置:

1
2
3
4
5
<broker brokerName="broker">
   <persistenceAdapter>
     <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
   </persistenceAdapter>
</broker>

配置选项

属性 默认值 描述
archiveCorruptedIndex false 是否归档错误的索引
archiveDataLogs false 当为true时,归档的消息文件被移到directoryArchive,而不是直接删除
checkForCorruptJournalFiles false 检查消息文件是否损坏,true,检查发现损坏会尝试修复
checkpointInterval 5000 索引写入到消息文件的周期,单位ms
checksumJournalFiles false 产生一个checksum,以便能够检测journal文件是否损坏。
cleanupInterval 30000 清除操作周期,单位ms
compactAcksAfterNoGC 10 从ActiveMQ5.14.0开始:当确认压缩特性启用时,此值控制必须完成多少存储GC周期,并且在触发压缩逻辑之前不清理任何其他文件,从而可能将跨越日志文件的旧确认压缩到新的日志文件中。值设置得越低,压缩可能发生得越快,如果压缩经常运行,则会影响性能。
compactAcksIgnoresStoreGrowth false 从ActiveMQ 5.14.0开始:当确认压缩特性启用时,该值控制是否在存储仍在增长时运行压缩,或者是否应该只在存储停止增长时运行压缩(由于空闲或达到存储限制)。如果启用了压缩,则无论存储是否仍然有空间或是否处于活动状态都可以运行压缩,这可能会降低总体性能,但会更快地回收空间。
concurrentStoreAndDispatchQueues true 当写入消息的时候,是否转发队列消息
concurrentStoreAndDispatchTopics false 当写入消息的时候,是否转发主题消息(不推荐启用该属性)
directory activemq-data 消息文件和日志的存储目录
directoryArchive null 存储被归档的消息文件目录
enableAckCompaction true From ActiveMQ 5.14.0:此设置控制存储是否将定期压缩只包含消息确认的旧日志文件。通过将这些较早的确认压缩到新的日志文件中,可以删除较早的文件,从而释放空间,并允许消息存储在不触及存储大小限制的情况下继续运行。
enableIndexWriteAsync false true表示异步更新索引
enableJournalDiskSyncs true 从ActiveMQ5.14.0已废弃,请参考journalDiskSyncStrategy
ignoreMissingJournalfiles false 忽略丢失的消息文件,false,当丢失了消息文件,启动异常
indexCacheSize 10000 内存中,索引的页大小
indexDirectory 从ActiveMQ 5.10.0开始:如果设置,则配置将存储KahaDB索引文件(db.data和db.redo)的位置。 如果未设置,索引文件将存储在directory属性指定的目录中。
indexWriteBatchSize 1000 批量写入的索引数量
journalDiskSyncInterval 1000 同步磁盘时间间隔(单位毫秒)
journalDiskSyncStrategy always 从ActiveMQ 5.14.0:该设置配置磁盘同步策略。可用的同步策略列表如下(按安全性递减,性能递增的顺序):always 始终确保每次写日志之后都有磁盘同步(JMS持久性要求)。这是最安全的选择,但也是最慢的选择,因为它需要在每个消息写入之后进行同步。这相当于废弃的enableJournalDiskSyncs=true属性。periodic磁盘将以设定的时间间隔(如果发生写入)而不是在每次日志写入之后同步,这将减少磁盘上的负载并且应该提高吞吐量。滚动到新的日志文件时,磁盘也将同步。默认间隔为1秒。默认间隔提供非常好的性能,同时比从不磁盘同步更安全,因为数据丢失限制为最多1秒的值。请参阅journalDiskSyncInterval以更改磁盘同步的频率。nevel 永远不会被显式调用,它将由操作系统刷新到磁盘。这相当于设置废弃的属性enableJournalDiskSyncs=false。这是最快的选择,但也是最不安全的,因为无法保证何时将数据刷新到磁盘。因此,broker失败时可能发生消息丢失。
journalMaxFileLength 32mb 一个消息文件的大小
maxAsyncJobs 10000 排队等待存储的异步消息的最大数量(应该与MessageProducer的数量一样)
preallocationScope entire_journal 从ActiveMQ 5.14.0:该设置配置如何预先分配日志数据文件。默认策略在第一次使用appender线程时预先分配日志文件。entire_journal:将在首次使用时使用appender线程预分配日志文件。entire_journal_async:将在单独的线程中提前使用预分配。none:禁用预分配。在SSD上,使用entire_journal_async可以避免在首次使用时延迟写入等待预分配。注意:在HDD上,磁盘的额外线程争用会产生负面影响。 因此使用默认值。
preallocationStrategy sparse_file 从ActiveMQ 5.12.0:此设置配置broker在需要新日志文件时尝试预分配日志文件的方式。sparse_file:设置文件长度,os_kernel_copy:委托给操作系统,zeros:不设置文件长度
purgeRecoveredXATransactionStrategy never 从ActiveMQ 5.15.5开始:此设置将在恢复期间提交或回滚所有准备好的XA事务。never:在恢复期间对准备好的XA事务不做任何操作;commit:在恢复期间提交准备好的XA事务;rollback:在恢复期间回滚准备好的XA事务。此功能将对所有准备好的XA事务执行提交或回滚,请考虑使用RecoverXATransaction MBean手动检查特定事务的提交或回滚。
storeOpenWireVersion 11 确定KahaDB日志的OpenWire命令的版本。在ActiveMQ 5.12.0之前:默认值为6。broker的某些功能取决于较新协议修订版中存储在OpenWire命令中的信息,如果将存储版本设置为较低值,这些功能可能无法正常工作。 在许多情况下,broker版本大于5.9.0的KahaDB存储仍然可以被broker读取,但会导致broker继续使用较旧的存储版本,这意味着较新的功能可能无法按预期工作。对于在ActiveMQ 5.9.0之前的版本中创建的KahaDB存储,需要手动设置storeOpenWireVersion =“6”以便启动broker而不会出现错误。
cleanupOnStop true 是否在broker关闭时执行gc/cleanup操作。

慢速文件系统访问诊断日志记录

您可以为数据库更新配置一个以毫秒为单位的非零阈值。如果数据库操作比该阈值慢(例如,如果将其设置为500),您可能会看到如下消息:

1
Slow KahaDB access: cleanup took 1277 | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint Worker

您可以使用系统属性配置用于记录这些消息的阈值,并将其调整为磁盘速度,以便您可以轻松获取运行时异常。

1
-Dorg.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME=1500

jdbc消息存储

ActiveMQ支持一系列的SQL数据库作为消息存储。比如:

  • Apache Derby
  • Axion
  • DB2
  • HSQL
  • Informix
  • MaxDB
  • MySQL
  • Oracle
  • Postgresql
  • SQLServer
  • Sybase
    以及一些通用JDBC provider。

自动发现JDBC provider

ActiveMQ会尝试通过这些配置文件和JDBC驱动程序的返回字符串从JDBC驱动程序自动检测要使用的JDBCAdapter。

你可以在activemq.xml中使用其xbean标识符显式指定JDBC适配器…

1
<jdbcPersistenceAdapter adapter="postgresql-jdbc-adapter"/>

自定义 SQL DDL

您可以使用statements元素配置各种SQL数据类型 - 例如列大小等

1
2
3
4
5
6
7
8
9
10
11
<broker useJmx="false">
 
  <persistenceAdapter>
    <journaledJDBC useJournal="false">
      <statements>
        <statements stringIdDataType ="VARCHAR(128)"/>
      </statements>
    </journaledJDBC>
  </persistenceAdapter>
 
</broker>

有关可以在statements元素上设置哪些属性的更多信息,请参阅Statements类。 所有可设置的bean属性都可以用作元素的属性。

使用MySQL

如果您使用的是MySQL,则应将relaxAutoCommit标志设置为true。 例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<beans ...>
    ...
    <bean id="mysql-ds"
          class="org.apache.commons.dbcp.BasicDataSource"
          destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
        <property name="username" value="amq"/>
        <property name="password" value="amqPass"/> <property name="poolPreparedStatements" value="true"/> </bean>

    <broker ...>
        ...
        <persistenceAdapter>
			<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
        </persistenceAdapter>
        ...
    </broker>

</beans>

使用Oracle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<beans ... >
  <broker xmlns="http://activemq.apache.org/schema/core"
          brokerName="localhost">
    ...
    <persistenceAdapter>
       <jdbcPersistenceAdapter
            dataDirectory="${activemq.base}/data"
            dataSource="#oracle-ds"/>
    </persistenceAdapter>
    ...
  </broker>
 
  <!-- Oracle DataSource Sample Setup -->
  <bean id="oracle-ds"
        class="org.apache.commons.dbcp.BasicDataSource"
        destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
    <property name="username" value="scott"/>
    <property name="password" value="tiger"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

</beans>

使用Spring bean元素配置JDBC驱动程序。 id属性指定配置JDBC持久性适配器时引用驱动程序的名称。 class属性指定实现用于与JDBC驱动程序接口的数据源的类。 destroy-method属性指定JDBC驱动程序关闭时要调用的方法的名称。

LevelDB消息存储

注意:LevelDB存储已被废弃,并不再推荐使用。推荐的存储是:KahaDB。

LevelDB持久性适配器使用LevelDB作为高性能的消息存储。它是一个基于文件的存储库,它使用了Google的LevelDB,将索引保存到包含消息的日志文件中。它经过优化,提供了比KahaDB更快的持久性。它类似于KahahDB,但是它没有使用自定义的b树实现来索引写前日志,而是使用基于LevelDB的索引,由于“append only”文件访问模式,这些索引具有一些很好的属性:

  • 快速更新(不需要进行随机磁盘更新)
  • 并发读取
  • 使用硬链接的快速索引快照

KahaDB和LevelDB存储都必须进行周期性的垃圾收集,以确定可以删除哪些日志文件。在KahaDB的情况下,这可能非常昂贵,因为您增加了存储的数据量,并且在收集发生时可能导致读/写停止。LevelDB存储使用一种便宜得多的算法来确定何时可以收集日志文件并避免这些停顿。

LevelDB的主要优势有:

  • 更高的持久吞吐量
  • broker重启时恢复时间更快
  • 支持并发的读访问
  • 在垃圾回收周期中不会暂停
  • 使用较少的读取IO操作来加载存储的消息
  • 支持XA事务
  • 检查重复的消息
  • 通过JMX暴露公开状态以进行监控
  • 支持复制

基础配置

通过在broker中的persistenceAdapter中加入levelDB元素来配置LevelDB存储。

1
2
3
4
5
6
7
<broker brokerName="broker" persistent="true" ... >
  ...
  <persistenceAdapter>
    <levelDB directory="activemq-data" />
  </persistenceAdapter>
  ...
</broker>

配置属性

属性名 默认值 描述
directory activemq-data 数据文件的存储目录
readThreads 10 指定允许的并发IO读取数
sync true 是否进行磁盘的同步写操作
logSize 104857600(100mb) log日志文件的最大值(以字节为单位)
verifyChecksums false 是否对从文件系统读取的数据进行强制校验
paranoidChecks false 指定LevelDB在检测到内部错误时是否尽快出错。
indexFactory org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory 指定broker尝试加载的LevelDB API工厂实现类的列表(用逗号分隔)。broker将使用第一个成功加载的工厂类。org.fusesource.leveldbjni.JniDBFactory:启用JNI的基本实现。org.iq80.leveldb.impl.Iq80DBFactory:启用纯Java的实现。
indexMaxOpenFiles 1000 指定索引可以使用的打开文件数。LevelDB内部使用多线程进行文件读写操作
indexBlockRestartInterval 16 Specifies the number of keys between restart points for delta encoding of keys.
indexWriteBufferSize 4194304 指定在转换为已排序的磁盘文件之前要在内存中构建的索引数据的数量(以字节为单位)。
indexBlockSize 4096 指定每个块的索引数据的大小(以字节为单位)。
indexCacheSize 268435456(256MB) 指定用于缓存索引块的内存的最大量(以字节为单位)
indexCompression snappy 指定要应用于索引块的压缩类型。snappynone
logCompression snappy 指定要应用于日志记录的压缩类型。snappynone

Replicated LevelDB消息存储

摘要

Replicated LevelDB使用Apache Zookeeper从一组配置为Replicated LevelDB存储的broker节点选择一个作为master。然后,通过复制master的所有更新来同步所有的slave的LevelDB存储,从而使slave保持最新。

Replicated LevelDB存储使用预LevelDB存储相同的数据文件,因此broker可以随时在Replicated LevelDB和LevelDB之间随时切换。

工作原理

replicated-leveldb-store.png

它使用Apache Zookeeper来协调集群中的哪个节点作为master节点。选定的master节点启动并接受客户端连接。其他节点成为slave节点,并连接master,与master保持同步。从节点不接受客户端连接。所有持久化操作都将复制到slave节点。如果master挂掉,则具有最新更新的slave节点被提升为master。然后,故障节点可以重新连接,并作为slave节点。

所有的需要同步到磁盘的消息操作都将等待法定数量的slave节点复制数据完成。因此,如果你配置了replicas=3,那么法定的节点数量为3/2+1=2个。master会在本地存储更新,并等待另外一个slave节点存储更新后才会报告成功。换一种说法,Replicated LevelDB存储将会同步复制到法定数量的slave节点,并将异步复制到任何其他节点。

当选择新的master节点时,你还需要至少一个在线的法定数量的节点才能找到具有最新更新的节点。具有最新更新的节点将成为master节点。因此,建议至少使用3个节点,以便在服务不中断的情况下关闭其中一个节点。

部署建议

客户端应该使用failover连接集群中的节点,比如:

1
failover:(tcp://broker1:61616,tcp://broker2:61616,tcp://broker3:61616)

同时为了Zookeeper的高可用,你需要至少运行3个Zookeeperjieidan。不要过渡时会用Zookeeper,过度的Zookeeper可能会认为复制的节点由于处理心跳消息的延迟而离线了。

为了获得最佳结果,确保将hostname属性配置为了集群中其他节点的主机名或ip地址。其他集群成员无法始终访问自动确定的主机名,从而导致slave节点无法与master建立复制会话。

配置

您可以配置ActiveMQ使用LevelDB作为它的持久性适配器,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<broker brokerName="broker" ... >
  ...
  <persistenceAdapter>
    <replicatedLevelDB
      directory="activemq-data"
      replicas="3"
      bind="tcp://0.0.0.0:0"
      zkAddress="zoo1.example.org:2181,zoo2.example.org:2181,zoo3.example.org:2181"
      zkPassword="password"
      zkPath="/activemq/leveldb-stores"
      hostname="broker1.example.org"
      />
  </persistenceAdapter>
  ...
</broker>

Replicated LevelDB存储属性

相同复制集中的的broker节点应该有相同的broker name,相同的复制集中的下面的属性也应该相同。

属性名 默认值 描述
replicas 3 集群节点的数量,至少(replicas/2)+1个,以避免服务中断。
securityToken 一个安全令牌,必须在所有复制节点上匹配,才能接受彼此的复制请求。
zkAddress 127.0.0.1:2181 逗号分隔的ZooKeeper服务器列表。
zkPassword 连接到Zookeeper时使用的密码
zkPath /default Zookeeper主从选举使用的Zookeeper的目录路径
zkSessionTimeout 2s Zookeeper多久检测一个节点失败。(5.11以前,有一个错字zkSessionTmeout)
sync quorum_mem 用于控制同步的区域,以“,”分割多个区域。可选项有:local_mem, local_disk, remote_mem, remote_disk, quorum_mem, quorum_disk.如果你配置多个区域,将更强的保证机制将被触发。例如:local_mem, local_disk 与 local_disk 等同quorum_mem 与local_mem, remote_mem 等同quorum_disk 与local_disk, remote_disk等同

不同的复制集可以共享相同的zkPath只要他们有不同的brokerName.

下面的配置属性每个节点特殊的配置:

属性名 默认值 描述
bind tcp://0.0.0.0:61619 当该节点成为主节点时,绑定的地址和端口,用于服务复制协议还支持使用动态端口,只需配置tcp:/ / 0.0.0.0:0
hostname 用于在此节点成为主节点时通告复制服务的主机名。 如果未设置,将自动确定。
weight 1 具有最高权重的最新更新的复制节点将成为主节点。 用于优先选择某些节点成为主节点。

该存储还支持与标准LevelDB存储相同的配置属性,但它不支持可插拔的存储锁。

参考资料

参考:http://activemq.apache.org/persistence.html
AMQ消息存储参考:http://activemq.apache.org/amq-message-store.html
KahaDB消息存储参考:http://activemq.apache.org/kahadb.html
JDBC消息存储参考:http://activemq.apache.org/jdbc-support.htmlUSING JDBC TO CONNECT TO A DATABASE STORE
LevelDB消息存储参考:http://activemq.apache.org/leveldb-store.htmlhttps://access.redhat.com/documentation/en-us/red_hat_jboss_a-mq/6.2/html/configuring_broker_persistence/leveldbconfiguration
Replicated LevelDB消息存储参考:http://activemq.apache.org/replicated-leveldb-store.htmlUSING THE REPLICATED LEVELDB PERSISTENCE ADAPTER
多KahaDB存储:https://access.redhat.com/documentation/en-us/red_hat_jboss_a-mq/6.2/html/configuring_broker_persistence/fusembmultikahadb