我有一个基于Spring JMS和Active MQ(5.6)的系统,该系统具有大约12个Spring默认消息侦听器容器(每个容器最多有20个并发实例),所有这些容器都连接到同一活动mq目标(队列).
系统由每个处理程序(容器)工作,使用选择器从队列中发给自己的消息中提取消息,进行工作,然后将消息放回队列中,直到完成所有工作.
我正在进行基准测试,发送25,000条消息,每条消息都需要通过9个不同的处理程序.
每次我运行测试时,只有大约11300条消息通过所有处理程序进行测试,但是活动MQ不会再发送任何消息.
在当前测试结束时,我可以看到队列的以下统计信息:
入队人数:120359
出队数:106693
发货数量:106693
飞行计数:0
队列大小:13666
除非重新启动代理,否则Active MQ不会调度更多消息.
以下是我的active-mq配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">
<bean id="propertyConfigurer" class="org.springframework.web.context.support.ServletContextPropertyPlaceholderConfigurer" />
<!-- The <broker> element is used to configure the ActiveMQ broker. -->
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="jmsDeployMqBroker" dataDirectory="${java.io.tmpdir}/activemq-data"
destroyApplicationContextOnStop="true" useJmx="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false">
</policyEntry>
<policyEntry queue=">" producerFlowControl="false">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<destinations>
<queue physicalName="handlersDest"/>
<topic physicalName="notificationsDest" />
<queue physicalName="ActiveMQ.DLQ" />
</destinations>
<!-- The managementContext is used to configure how ActiveMQ is exposed
in JMX. By default,ActiveMQ uses the MBean server that is started by the
JVM. For more information,see: http://activemq.apache.org/jmx.html -->
<managementContext>
<managementContext createConnector="false" />
</managementContext>
<!-- Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag). For more information,see: http://activemq.apache.org/persistence.html -->
<persistenceAdapter>
<amq:kahaPersistenceAdapter directory="${java.io.tmpdir}/activemq-data/kahadb" maxDataFileLength="1g" />
</persistenceAdapter>
<!-- The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information,see: http://activemq.apache.org/configuring-transports.html -->
<transportConnectors>
<transportConnector name="openwire" uri="${org.apache.activemq.brokerURL}" />
</transportConnectors>
</broker>
</beans>
这是我的处理程序的spring配置示例:
<jee:jndi-lookup id="connectionFactory" jndi-name="${jndi.jms.connfactory}">
<jee:environment>
java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
java.naming.provider.url = ${jndi.jms.naming.url}
</jee:environment>
</jee:jndi-lookup>
<!-- ID must not change as it is used in autowiring the handlers -->
<jee:jndi-lookup id="handlersDest" jndi-name="${jndi.docprod.queue}">
<jee:environment>
java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
java.naming.provider.url = ${jndi.jms.naming.url}
${jndi.queue.setup}
</jee:environment>
</jee:jndi-lookup>
<!-- ID must not change as it is used in autowiring the handlers -->
<jee:jndi-lookup id="notificationsDest" jndi-name="${jndi.docprod.topic}">
<jee:environment>
java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
java.naming.provider.url = ${jndi.jms.naming.url}
${jndi.topic.setup}
</jee:environment>
</jee:jndi-lookup>
<bean id="dmsReadContainer" class="mydomain.DocProdnMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:handlerClass="mydomain.DmsReadHandler"
p:messageListener-ref="dmsReadHandler"
p:destination-ref="handlersDest" >
<property name="concurrentConsumers"><value>${dmsRead.initialInstances}</value></property>
<property name="maxConcurrentConsumers"><value>${dmsRead.maxInstances}</value></property>
<property name="idleConsumerLimit"><value>${dmsRead.idleInstances}</value></property>
</bean>
<bean id="dmsReadHandler" class="mydomain.DmsReadHandler">
</bean>
...
ActiveMQ的日志文件没有显示任何异常,这表明了为什么它停止分发.
有谁知道为什么不发送进一步的消息或有任何建议进一步诊断问题?
最佳答案
我会尝试以下方法来打开一些东西…
>从默认值增加systemUsage …
<systemUsage>
<systemUsage>
<memoryUsage><memoryUsage limit="4 gb"/></memoryUsage> <!--75% of avail heap-->
<storeUsage><storeUsage limit="10 gb"/></storeUsage>
<tempUsage><tempUsage limit="10 gb"/></tempUsage>
</systemUsage>
</systemUsage>
>使用JMS连接池
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="maxConnections" value="8" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
>切换到像这样使用kahadb …
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb" enableJournalDiskSyncs="false"/>
</persistenceAdapter>
>较低的prefetch
tcp://localhost:61616?jms.prefetchPolicy.all=10