我有mapreduce工作:
我的代码Map类:
我的代码Map类:
- public static class MapClass extends Mapper<Text,Text,LongWritable> {
- @Override
- public void map(Text key,Text value,Context context)
- throws IOException,InterruptedException {
- }
- }
我想使用ChainMapper:
- 1. Job job = new Job(conf,"Job with chained tasks");
- 2. job.setJarByClass(MapReduce.class);
- 3. job.setInputFormatClass(TextInputFormat.class);
- 4. job.setOutputFormatClass(TextOutputFormat.class);
- 5. FileInputFormat.setInputPaths(job,new Path(InputFile));
- 6. FileOutputFormat.setOutputPath(job,new Path(OutputFile));
- 7. JobConf map1 = new JobConf(false);
- 8. ChainMapper.addMapper(
- job,MapClass.class,Text.class,true,map1
- );
但它的报告在第8行有一个错误:
Multiple markers at this line
– Occurrence of ‘addMapper’
– The method addMapper(JobConf,Class>,Class,
Class,boolean,JobConf) in the type ChainMapper is not applicable for the arguments (Job,Configuration)
– Debug Current Instruction Pointer
– The method addMapper(JobConf,JobConf) in the type ChainMapper is not applicable for the arguments
(JobConf,JobConf)
解决方法
经过大量的“功夫”,我能够使用ChainMapper / ChainReducer.感谢上次评论user864846.
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License,Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,software
- * distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package myPKG;
- /*
- * Ajitsen: Sample program for ChainMapper/ChainReducer. This program is modified version of WordCount example available in Hadoop-0.18.0. Added ChainMapper/ChainReducer and made to works in Hadoop 1.0.2.
- */
- import java.io.IOException;
- import java.util.Iterator;
- import java.util.StringTokenizer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.*;
- import org.apache.hadoop.mapred.lib.ChainMapper;
- import org.apache.hadoop.mapred.lib.ChainReducer;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class ChainWordCount extends Configured implements Tool {
- public static class Tokenizer extends MapReduceBase
- implements Mapper<LongWritable,IntWritable> {
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- public void map(LongWritable key,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException {
- String line = value.toString();
- System.out.println("Line:"+line);
- StringTokenizer itr = new StringTokenizer(line);
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- output.collect(word,one);
- }
- }
- }
- public static class UpperCaser extends MapReduceBase
- implements Mapper<Text,IntWritable,IntWritable> {
- public void map(Text key,IntWritable value,Reporter reporter) throws IOException {
- String word = key.toString().toUpperCase();
- System.out.println("Upper Case:"+word);
- output.collect(new Text(word),value);
- }
- }
- public static class Reduce extends MapReduceBase
- implements Reducer<Text,IntWritable> {
- public void reduce(Text key,Iterator<IntWritable> values,Reporter reporter) throws IOException {
- int sum = 0;
- while (values.hasNext()) {
- sum += values.next().get();
- }
- System.out.println("Word:"+key.toString()+"\tCount:"+sum);
- output.collect(key,new IntWritable(sum));
- }
- }
- static int printUsage() {
- System.out.println("wordcount <input> <output>");
- ToolRunner.printGenericCommandUsage(System.out);
- return -1;
- }
- public int run(String[] args) throws Exception {
- JobConf conf = new JobConf(getConf(),ChainWordCount.class);
- conf.setJobName("wordcount");
- if (args.length != 2) {
- System.out.println("ERROR: Wrong number of parameters: " +
- args.length + " instead of 2.");
- return printUsage();
- }
- FileInputFormat.setInputPaths(conf,args[0]);
- FileOutputFormat.setOutputPath(conf,new Path(args[1]));
- conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(TextOutputFormat.class);
- JobConf mapAConf = new JobConf(false);
- ChainMapper.addMapper(conf,Tokenizer.class,LongWritable.class,IntWritable.class,mapAConf);
- JobConf mapBConf = new JobConf(false);
- ChainMapper.addMapper(conf,UpperCaser.class,mapBConf);
- JobConf reduceConf = new JobConf(false);
- ChainReducer.setReducer(conf,Reduce.class,reduceConf);
- JobClient.runJob(conf);
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(),new ChainWordCount(),args);
- System.exit(res);
- }
- }
编辑最新版本(至少从hadoop 2.6),不需要addMapper中的真正标志. (实际上签名有变化抑制它).
所以它会是公正的
- JobConf mapAConf = new JobConf(false);
- ChainMapper.addMapper(conf,mapAConf);