java – QPID – Spring CachingConnectionFactory – 重新连接

前端之家收集整理的这篇文章主要介绍了java – QPID – Spring CachingConnectionFactory – 重新连接前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
@H_301_0@
弹簧配置
<bean id="jmsQueueConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
    <constructor-arg index="0"
        value="amqp://guest:guest@localhost/test?brokerlist='tcp://localhost:5672'" />
</bean>

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsQueueConnectionFactory" />
    <property name="sessionCacheSize" value="1" />
    <property name="reconnectOnException" value="true" />
</bean>

<bean id="myDestination" class="org.apache.qpid.client.AMQAnyDestination">
    <constructor-arg index="0" value="ADDR:myqueue; {create: always}" />
</bean>

<bean id="myServiceBean" class="com.test.MyService" />

<bean id="myContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="exceptionListener" ref="cachingConnectionFactory" /> 
    <property name="messageListener" ref="myServiceBean" />
    <property name="concurrentConsumers" value="1" />
    <property name="autoStartup" value="true" />
    <property name="destination" ref="myDestination" />
    <property name="recoveryInterval" value="10000" />      
</bean>

MyService.java

public class MyService implements MessageListener {
    public void onMessage(Message msg) {        
    log.info("----On Message called :"+msg+",:"+msg.getClass().getName());         
    }
}

当我重新启动QPID

如果没有reconnectOnException = true,我会继续获得此异常,但不会重新连接

3203 [myContainer-1] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS Session for mode 1: org.apache.qpid.client.AMQSession_0_10@1d03a4e
3312 [myContainer-1] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS MessageConsumer for destination ['myqueue'/None; {
  'create': 'always'
}]: org.apache.qpid.client.BasicMessageConsumer_0_10@8a2023
99109 [myContainer-1] WARN org.springframework.jms.listener.DefaultMessageListenerContainer  - Setup of JMS message listener invoker Failed for destination ''myqueue'/None; {
  'create': 'always'
}' - trying to recover. Cause: timed out waiting for session to become open (state=DETACHED)
org.apache.qpid.transport.SessionException: timed out waiting for session to become open (state=DETACHED)
    at org.apache.qpid.transport.Session.invoke(Session.java:630)
    at org.apache.qpid.transport.Session.invoke(Session.java:559)
    at org.apache.qpid.transport.SessionInvoker.executionSync(SessionInvoker.java:84)
    at org.apache.qpid.transport.Session.sync(Session.java:782)
    at org.apache.qpid.transport.Session.sync(Session.java:770)
    at org.apache.qpid.client.BasicMessageConsumer_0_10.getMessageFromQueue(BasicMessageConsumer_0_10.java:423)
    at org.apache.qpid.client.BasicMessageConsumer.receive(BasicMessageConsumer.java:407)
    at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:74)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
    at java.lang.Thread.run(Thread.java:619)
99109 [myContainer-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection

..

281125 [myContainer-3] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection


使用reconnectOnException = true时,它会连接并断开连接

    13015 [IoReceiver - localhost/127.0.0.1:5672] WARN org.springframework.jms.connection.CachingConnectionFactory  - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: connection aborted
    at org.apache.qpid.client.AMQConnectionDelegate_0_10.closed(AMQConnectionDelegate_0_10.java:303)
    at org.apache.qpid.transport.Connection.closed(Connection.java:568)
    at org.apache.qpid.transport.network.Assembler.closed(Assembler.java:110)
    at org.apache.qpid.transport.network.InputHandler.closed(InputHandler.java:202)
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:150)
    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.transport.ConnectionException: connection aborted
    at org.apache.qpid.transport.Connection.closed(Connection.java:541)
    ... 4 more
13031 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Closing shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672
Virtual Host: test
Client ID: localhost
Active session count: 1
73031 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Could not close shared JMS Connection
org.apache.qpid.client.JMSAMQException: timed out waiting for session to become open (state=DETACHED)
    at org.apache.qpid.client.AMQConnection.stop(AMQConnection.java:824)
    at org.springframework.jms.connection.SingleConnectionFactory.closeConnection(SingleConnectionFactory.java:422)
    at org.springframework.jms.connection.SingleConnectionFactory.resetConnection(SingleConnectionFactory.java:321)
    at org.springframework.jms.connection.CachingConnectionFactory.resetConnection(CachingConnectionFactory.java:197)
    at org.springframework.jms.connection.SingleConnectionFactory.onException(SingleConnectionFactory.java:302)
    at org.springframework.jms.connection.ChainedExceptionListener.onException(ChainedExceptionListener.java:57)
    at org.apache.qpid.client.AMQConnectionDelegate_0_10.closed(AMQConnectionDelegate_0_10.java:306)
    ...
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:150)
    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.AMQException: timed out waiting for session to become open (state=DETACHED) [error code 541: internal error]
    at org.apache.qpid.client.AMQSession_0_10.setCurrentException(AMQSession_0_10.java:1050)
    at org.apache.qpid.client.AMQSession_0_10.sync(AMQSession_0_10.java:1030)
    at org.apache.qpid.client.AMQSession_0_10.sendSuspendChannel(AMQSession_0_10.java:857)
    at org.apache.qpid.client.AMQSession.suspendChannel(AMQSession.java:3006)
    at org.apache.qpid.client.AMQSession.stop(AMQSession.java:2341)
    at org.apache.qpid.client.AMQConnection.stop(AMQConnection.java:820)
    ... 11 more

104875 [myContainer-1] INFO org.springframework.jms.connection.CachingConnectionFactory  - Established shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672
Virtual Host: test
Client ID: localhost
Active session count: 0
104875 [myContainer-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection
104875 [myContainer-2] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS Session for mode 1: org.apache.qpid.client.AMQSession_0_10@191e4c
104937 [IoReceiver - localhost/127.0.0.1:5672] WARN org.springframework.jms.connection.CachingConnectionFactory  - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: 404
    at org.apache.qpid.client.AMQConnection.exceptionReceived(AMQConnection.java:1230)

    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.AMQException: ch=0 id=0 ExecutionException(errorCode=NOT_FOUND,commandId=0,description=Queue: myqueue not found) [error code 404: not found]
    at org.apache.qpid.client.AMQSession_0_10.setCurrentException(AMQSession_0_10.java:1050)
    ... 29 more
104937 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Closing shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672


解决方法

您是否尝试在DMLC上省略exceptionListener设置?我不记得曾经需要指定它,并且在启动连接恢复后,似乎您的连接无效.

另外,假设您没有使用JBoss 4,您可以尝试使用DMLC的内置缓存机制(通过cacheLevel设置),而不是为您的消费者使用缓存连接工厂;您甚至可以获得更好的性能,因为DMLC可以缓存会话和消费者以及连接.

猜你在找的Java相关文章