我有一个用
Java编写的单线程ActiveMQ使用者.我所要做的就是从队列中接收()一个消息,尝试将其发送到Web服务,如果成功则确认()它.如果Web服务调用失败,我希望消息保留在队列中并在超时后重新发送.
它或多或少都在工作,除了重发部分:每次重新启动我的消费者时,它会为每个仍然在队列中的消息收到一条消息,但是在发送它们之后,消息永远不会被重新发送.
我的代码看起来像:
public boolean init() throws JMSException,FileNotFoundException,IOException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(500); policy.setBackOffMultiplier(2); policy.setUseExponentialBackOff(true); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); // ???? Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted,ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); destination = session.createQueue(subject); //??? consumer = session.createConsumer(destination); //consumer.setMessageListener(this); // message listener had same behavIoUr } private void process() { while(true) { System.out.println("Waiting..."); try { Message message = consumer.receive(); onMessage(message); } catch (JMSException e) { e.printStackTrace(); } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(!client.sendMessage(msg)) { System.out.println("Webservice call Failed. Keeping message"); //message. } else { message.acknowledge(); } if (transacted) { if ((messagesReceived % batch) == 0) { System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived); session.commit(); } } } catch (JMSException e) { e.printStackTrace(); } } }
我目前没有使用交易(也许我应该这样做?).
我确定我错过了一些简单的东西,很快就会拍打我的额头,但我似乎无法弄清楚这是怎么回事.谢谢!
编辑:我自己也不能回答这个问题:
好的,经过一些实验,事实证明交易是实现这一目标的唯一方法.这是新代码:
public boolean init() throws JMSException,url); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(1000L); policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES); connectionFactory.setRedeliveryPolicy(policy); connectionFactory.setUseRetroactiveConsumer(true); Connection connection = connectionFactory.createConnection(); connection.setExceptionListener(this); connection.start(); session = connection.createSession(transacted,ActiveMQSession.CLIENT_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } @Override public void onMessage(Message message) { System.out.println("onMessage"); messagesReceived++; if (message instanceof TextMessage) { try { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); if(client.sendMessage(msg)) { if(transacted) { System.out.println("Call succeeded - committing message"); session.commit(); } //message.acknowledge(); } else { if(transacted) { System.out.println("Webservice call Failed. Rolling back message"); session.rollback(); } } } catch (JMSException e) { e.printStackTrace(); } } }
现在,重新传送策略中指定的消息每1000毫秒重新发送一次.
希望这有助于其他人!