java – Hadoop mapreduce:用于在MapReduce作业中链接映射器的驱动程序

前端之家收集整理的这篇文章主要介绍了java – Hadoop mapreduce:用于在MapReduce作业中链接映射器的驱动程序前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我有mapreduce工作:
我的代码Map类:
  1. public static class MapClass extends Mapper<Text,Text,LongWritable> {
  2.  
  3. @Override
  4. public void map(Text key,Text value,Context context)
  5. throws IOException,InterruptedException {
  6. }
  7. }

我想使用ChainMapper:

  1. 1. Job job = new Job(conf,"Job with chained tasks");
  2. 2. job.setJarByClass(MapReduce.class);
  3. 3. job.setInputFormatClass(TextInputFormat.class);
  4. 4. job.setOutputFormatClass(TextOutputFormat.class);
  5.  
  6. 5. FileInputFormat.setInputPaths(job,new Path(InputFile));
  7. 6. FileOutputFormat.setOutputPath(job,new Path(OutputFile));
  8.  
  9. 7. JobConf map1 = new JobConf(false);
  10.  
  11. 8. ChainMapper.addMapper(
  12. job,MapClass.class,Text.class,true,map1
  13. );

但它的报告在第8行有一个错误

Multiple markers at this line
– Occurrence of ‘addMapper’
– The method addMapper(JobConf,Class>,Class,
Class,boolean,JobConf) in the type ChainMapper is not applicable for the arguments (Job,Configuration)
– Debug Current Instruction Pointer
– The method addMapper(JobConf,JobConf) in the type ChainMapper is not applicable for the arguments
(JobConf,JobConf)

解决方法

经过大量的“功夫”,我能够使用ChainMapper / ChainReducer.感谢上次评论user864846.
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License,Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,software
  13. * distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17.  
  18. package myPKG;
  19.  
  20. /*
  21. * Ajitsen: Sample program for ChainMapper/ChainReducer. This program is modified version of WordCount example available in Hadoop-0.18.0. Added ChainMapper/ChainReducer and made to works in Hadoop 1.0.2.
  22. */
  23.  
  24. import java.io.IOException;
  25. import java.util.Iterator;
  26. import java.util.StringTokenizer;
  27.  
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.conf.Configured;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.io.IntWritable;
  32. import org.apache.hadoop.io.LongWritable;
  33. import org.apache.hadoop.io.Text;
  34. import org.apache.hadoop.mapred.*;
  35. import org.apache.hadoop.mapred.lib.ChainMapper;
  36. import org.apache.hadoop.mapred.lib.ChainReducer;
  37. import org.apache.hadoop.util.Tool;
  38. import org.apache.hadoop.util.ToolRunner;
  39.  
  40. public class ChainWordCount extends Configured implements Tool {
  41.  
  42. public static class Tokenizer extends MapReduceBase
  43. implements Mapper<LongWritable,IntWritable> {
  44.  
  45. private final static IntWritable one = new IntWritable(1);
  46. private Text word = new Text();
  47.  
  48. public void map(LongWritable key,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException {
  49. String line = value.toString();
  50. System.out.println("Line:"+line);
  51. StringTokenizer itr = new StringTokenizer(line);
  52. while (itr.hasMoreTokens()) {
  53. word.set(itr.nextToken());
  54. output.collect(word,one);
  55. }
  56. }
  57. }
  58.  
  59. public static class UpperCaser extends MapReduceBase
  60. implements Mapper<Text,IntWritable,IntWritable> {
  61.  
  62. public void map(Text key,IntWritable value,Reporter reporter) throws IOException {
  63. String word = key.toString().toUpperCase();
  64. System.out.println("Upper Case:"+word);
  65. output.collect(new Text(word),value);
  66. }
  67. }
  68.  
  69. public static class Reduce extends MapReduceBase
  70. implements Reducer<Text,IntWritable> {
  71.  
  72. public void reduce(Text key,Iterator<IntWritable> values,Reporter reporter) throws IOException {
  73. int sum = 0;
  74. while (values.hasNext()) {
  75. sum += values.next().get();
  76. }
  77. System.out.println("Word:"+key.toString()+"\tCount:"+sum);
  78. output.collect(key,new IntWritable(sum));
  79. }
  80. }
  81.  
  82. static int printUsage() {
  83. System.out.println("wordcount <input> <output>");
  84. ToolRunner.printGenericCommandUsage(System.out);
  85. return -1;
  86. }
  87.  
  88. public int run(String[] args) throws Exception {
  89. JobConf conf = new JobConf(getConf(),ChainWordCount.class);
  90. conf.setJobName("wordcount");
  91.  
  92. if (args.length != 2) {
  93. System.out.println("ERROR: Wrong number of parameters: " +
  94. args.length + " instead of 2.");
  95. return printUsage();
  96. }
  97. FileInputFormat.setInputPaths(conf,args[0]);
  98. FileOutputFormat.setOutputPath(conf,new Path(args[1]));
  99.  
  100. conf.setInputFormat(TextInputFormat.class);
  101. conf.setOutputFormat(TextOutputFormat.class);
  102.  
  103. JobConf mapAConf = new JobConf(false);
  104. ChainMapper.addMapper(conf,Tokenizer.class,LongWritable.class,IntWritable.class,mapAConf);
  105.  
  106. JobConf mapBConf = new JobConf(false);
  107. ChainMapper.addMapper(conf,UpperCaser.class,mapBConf);
  108.  
  109. JobConf reduceConf = new JobConf(false);
  110. ChainReducer.setReducer(conf,Reduce.class,reduceConf);
  111.  
  112. JobClient.runJob(conf);
  113. return 0;
  114. }
  115.  
  116. public static void main(String[] args) throws Exception {
  117. int res = ToolRunner.run(new Configuration(),new ChainWordCount(),args);
  118. System.exit(res);
  119. }
  120. }

编辑最新版本(至少从hadoop 2.6),不需要addMapper中的真正标志. (实际上签名有变化抑制它).

所以它会是公正的

  1. JobConf mapAConf = new JobConf(false);
  2. ChainMapper.addMapper(conf,mapAConf);

猜你在找的Java相关文章