java – 在一个简单的聚合风暴拓扑中进行分组

前端之家收集整理的这篇文章主要介绍了java – 在一个简单的聚合风暴拓扑中进行分组前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在尝试编写一个执行以下操作的拓扑:

>订阅一个twitter Feed(基于关键字)
>一个聚合螺栓,用于聚合收集中的一些tweets(例如N),并将它们发送给打印机螺栓
>一个简单的螺栓,将集合一次打印到控制台.

在现实中,我想对收藏进行一些更多的处理.

我在本地测试,看起来像是在工作.但是,我不知道我是否正确设置了螺栓上的分组,并且在部署在实际的风暴集群上时可以正常工作.如果有人可以帮助您查看此拓扑并提出任何错误,更改或改进,我将不胜感激.

谢谢.

这是我的拓扑结构.

builder.setSpout("spout",new TwitterFilterSpout("pittsburgh"));
   builder.setBolt("sampleaggregate",new SampleAggregatorBolt())
                .shuffleGrouping("spout");
   builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");

聚合螺栓

public class SampleAggregatorBolt implements IRichBolt {

    protected OutputCollector collector;
    protected Tuple currentTuple;
    protected Logger log;
    /**
     * Holds the messages in the bolt till you are ready to send them out
     */
    protected List<Status> statusCache;

    @Override
    public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
        this.collector = collector;

        log = Logger.getLogger(getClass().getName());
        statusCache = new ArrayList<Status>();
    }

    @Override
    public void execute(Tuple tuple) {
        currentTuple = tuple;

        Status currentStatus = null;
        try {
            currentStatus = (Status) tuple.getValue(0);
        } catch (ClassCastException e) {
        }
        if (currentStatus != null) {

            //add it to the status cache
            statusCache.add(currentStatus);
            collector.ack(tuple);


            //check the size of the status cache and pass it to the next stage if you have enough messages to emit
            if (statusCache.size() > 10) {
                collector.emit(new Values(statusCache));
            }

        }
    }

    @Override
    public void cleanup() {


    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("tweets"));

    }

    @Override
    public Map<String,Object> getComponentConfiguration() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }


    protected void setupNonSerializableAttributes() {

    }

}

打印机螺栓

public class PrinterBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple,BasicOutputCollector collector) {
        System.out.println(tuple.size() + " "  + tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

}

解决方法

从我可以看到它看起来不错.魔鬼的细节虽然如此.我不知道你的聚合器螺栓是什么,但是如果对传递给它的值做出任何假设,那么你应该考虑一个适当的字段分组.当您使用默认的并行性提示1时,这可能不会有太大的差异,但是如果您决定使用多个聚合螺栓实例进行缩放隐式逻辑假设,则可能需要非随机分组.

猜你在找的Java相关文章