主要是Storm流计算的简单的环境的搭建和简单的demo的运行。
当前系统和系统中已经预装的软件:
其中软件包括jdk1.7.0_79、python2.7.6、zookeeper3.3.5
如果没装的话,可以大致按照[1]这个安装这些软件。
安装ZeroMQ和jzmq
Storm底层是基于ZeroMQ消息队列的,所以还需要安装ZeroMQ和其java版本,但是在编译的过程中可能会遇到一些问题,主要是配置和编译的问题,可以通过[2]中的方式解决,同时按照[3]来进行相关配置。
安装Storm
参照[1][3]可以完成安装,主要是下载和相应的文件的配置。
测试安装
以上完成整个单机版本Storm的环境的配置,下面是Storm自带的examples中的storm-starter例子(也就是WordCountTopology)的运行。
启动Storm
cd /path/to/zookeeper/bin/
./zkServer.sh start #启动zookeeper
cd /path/to/storm/bin
./storm nimbus &
./storm supervisor &
./storm ui &
启动完成后利用jps查看后台进程:
运行demo
确保可以成功启动后来进入storm-starter进行WordCountTopology例子的运行:
实际下面的过程就是strom-starter中README.markdown中的说明和步骤:
前面熟悉平台和相应例子代码的过程先略过,来看利用Maven运行storm-starter:
安装Maven3.x
对Storm首先进行一个本地构建
# Must be run from the top-level directory of the Storm code repository
$ mvn clean install -@H_404_46@DskipTests=true
上面的命令在本地构建Storm并将jar包安装到$HOME/.m2/repository中,后面运行maven命令的时候,Maven能够找到相应的Storm版本等信息。
将storm-starter打包,便于提交和运行
$ mvn package
生成的包存于storm-starter的target目录下,命名为storm-starter-版本.jar
后面进行提交的时候可以使用这个包进行提交。
# Example 1: Run the ExclamationTopology in local mode (LocalCluster)
$ storm jar target/storm-starter-*.jar org.apache.storm.starter.ExclamationTopology
# Example 2: Run the RollingTopWords in remote/cluster mode,
# under the name "production-topology"
$ storm jar storm-starter-*.jar org.apache.storm.starter.RollingTopWords production-topology remote
运行wordCountTopology例子:
cd /path/to/storm/bin
./storm jar path/to/generated.jar org.apache.storm.starter.WordCountTopology
运行结果,输出比较多,整理其中一个例子查看:
可以看到这是“I am at two with nature”的一个统计词频的结果。
源码和相应的注释如下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License,Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing,software * distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.starter.spout.RandomSentenceSpout;
import java.util.HashMap;
import java.util.Map;
/** * This topology demonstrates Storm's stream groupings and multilang capabilities. */
public class WordCountTopology {
//分词 bolt
//基于shellBolt实现的SplitSentence类 方便使用多语言 如 python 等
public static class SplitSentence extends ShellBolt implements IRichBolt {
//用pyton写的一个Bolt 构造函数 调用其它语言写的bolt
public SplitSentence() {
super("python","splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String,Object> getComponentConfiguration() {
return null;
}
}
//词频统计 bolt
//用java实现,所以基类用BaseBasicBolt就够了
public static class WordCount extends BaseBasicBolt {
Map<String,Integer> counts = new HashMap<String,Integer>();
@Override
public void execute(Tuple tuple,BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word,count);
//发射单词和计数值
collector.emit(new Values(word,count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
//启动main函数
//
public static void main(String[] args) throws Exception {
//创建一个topology
TopologyBuilder builder = new TopologyBuilder();
//创建spout RandomSentenceSpout是一个随机句子产生器
builder.setSpout("spout",new RandomSentenceSpout(),5);
//创建split bolt
builder.setBolt("split",new SplitSentence(),8).shuffleGrouping("spout");
//创建count bolt
builder.setBolt("count",new WordCount(),12).fieldsGrouping("split",new Fields("word"));
//提交运行
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0],conf,builder.createTopology());
}
else {
//最大并行度
conf.setMaxTaskParallelism(3);
//提交
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count",builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License,Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self,tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
SplitSentenceBolt().run()
参考
[1] @L_502_1@
[2] https://my.oschina.net/mingdongcheng/blog/43009
[3] http://blog.csdn.net/zz430581/article/details/37505707
[4] http://blog.csdn.net/nb_vol_1/article/details/46287625
[5] http://blog.csdn.net/nb_vol_1/article/details/46287625