我正在尝试编写一个执行以下操作的拓扑:
>订阅一个twitter Feed(基于关键字)
>一个聚合螺栓,用于聚合收集中的一些tweets(例如N),并将它们发送给打印机螺栓
>一个简单的螺栓,将集合一次打印到控制台.
在现实中,我想对收藏进行一些更多的处理.
我在本地测试,看起来像是在工作.但是,我不知道我是否正确设置了螺栓上的分组,并且在部署在实际的风暴集群上时可以正常工作.如果有人可以帮助您查看此拓扑并提出任何错误,更改或改进,我将不胜感激.
谢谢.
这是我的拓扑结构.
builder.setSpout("spout",new TwitterFilterSpout("pittsburgh")); builder.setBolt("sampleaggregate",new SampleAggregatorBolt()) .shuffleGrouping("spout"); builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
聚合螺栓
public class SampleAggregatorBolt implements IRichBolt { protected OutputCollector collector; protected Tuple currentTuple; protected Logger log; /** * Holds the messages in the bolt till you are ready to send them out */ protected List<Status> statusCache; @Override public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) { this.collector = collector; log = Logger.getLogger(getClass().getName()); statusCache = new ArrayList<Status>(); } @Override public void execute(Tuple tuple) { currentTuple = tuple; Status currentStatus = null; try { currentStatus = (Status) tuple.getValue(0); } catch (ClassCastException e) { } if (currentStatus != null) { //add it to the status cache statusCache.add(currentStatus); collector.ack(tuple); //check the size of the status cache and pass it to the next stage if you have enough messages to emit if (statusCache.size() > 10) { collector.emit(new Values(statusCache)); } } } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweets")); } @Override public Map<String,Object> getComponentConfiguration() { return null; //To change body of implemented methods use File | Settings | File Templates. } protected void setupNonSerializableAttributes() { } }
打印机螺栓
public class PrinterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple,BasicOutputCollector collector) { System.out.println(tuple.size() + " " + tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } }