我已经开始使用风暴,所以我使用
this tutorial创建简单的拓扑
当我使用LocalCluster运行我的拓扑结构,似乎都很好,
我的问题是我没有在元组上得到ACK,这意味着我的spout ack不会被调用.
我的代码在下面 – 你知道为什么不叫Ack吗?
所以我的拓扑看起来像这样
public StormTopology build() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(HelloWorldSpout.class.getSimpleName(),helloWorldSpout,spoutParallelism); HelloWorldBolt bolt = new HelloWorldBolt(); builder.setBolt(HelloWorldBolt.class.getSimpleName(),bolt,boltParallelism) .shuffleGrouping(HelloWorldSpout.class.getSimpleName()); }
我的水嘴看起来像这样
public class HelloWorldSpout extends BaseRichSpout implements ISpout { private SpoutOutputCollector collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("int")); } public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) { this.collector = collector; } private static Boolean flag = false; public void nextTuple() { Utils.sleep(5000); //emit only 1 tuple - for testing if (!flag){ this.collector.emit(new Values(6)); flag = true; } } @Override public void ack(Object msgId) { System.out.println("[HelloWorldSpout] ack on msgId" + msgId); } public void fail(Object msgId){ System.out.println("[HelloWorldSpout] fail on msgId" + msgId); } }
我的螺栓看起来像这样
@SuppressWarnings("serial") public class HelloWorldBolt extends BaseRichBolt{ private OutputCollector collector; public void prepare(Map conf,OutputCollector collector) { this.collector = collector; logger.info("preparing HelloWorldBolt"); } public void execute(Tuple tuple) { System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0)); this.collector.ack(tuple); } public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } }
解决方法
您的喷嘴中的emit()方法只有一个参数,因此元组不被锚定.这就是为什么你没有得到一个回调到ack()方法在喷口,即使你是在螺栓中的元组.
要使其正常工作,您需要修改您的喷口以发出第二个参数,即消息ID.正是这个id被传回给spout中的ack()方法:
public void nextTuple() { Utils.sleep(5000); //emit only 1 tuple - for testing if (!flag){ Object msgId = "ID 6"; // this can be any object this.collector.emit(new Values(6),msgId); flag = true; } } @Override public void ack(Object msgId) { // msgId should be "ID 6" System.out.println("[HelloWorldSpout] ack on msgId" + msgId); }