这可能看起来像一个愚蠢的问题,但我没有在我的mapreduce代码中看到我的类型中的问题为hadoop
正如问题中所述,问题是它期望IntWritable,但我在reducer的collector.collect中传递了一个Text对象.
我的作业配置有以下映射器输出类:
conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(IntWritable.class);
以下减速机输出类:
conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class);
我的映射类具有以下定义:
public static class Reduce extends MapReduceBase implements Reducer<IntWritable,IntWritable,Text,IntWritable>
具有所需功能:
public void reduce(IntWritable key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter)
然后当我打电话时失败:
output.collect(new Text(),new IntWritable());
我是相当新的map reduce,但所有类型似乎都匹配,它编译但是然后在该行上失败,说它期望IntWritable作为reduce类的键.如果重要的话我使用的是0.21版本的Hadoop
这是我的地图类:
public static class Map extends MapReduceBase implements Mapper<LongWritable,IntWritable> { private IntWritable node = new IntWritable(); private IntWritable edge = new IntWritable(); public void map(LongWritable key,Text value,OutputCollector<IntWritable,Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { node.set(Integer.parseInt(tokenizer.nextToken())); edge.set(Integer.parseInt(tokenizer.nextToken())); if(node.get() < edge.get()) output.collect(node,edge); } } }
和我的减少类:
public static class Reduce extends MapReduceBase implements Reducer<IntWritable,IntWritable> { IntWritable $= new IntWritable(Integer.MAX_VALUE); Text keyText = new Text(); public void reduce(IntWritable key,Reporter reporter) throws IOException { ArrayList<IntWritable> valueList = new ArrayList<IntWritable>(); //outputs original edge pair as key and $for value while (values.hasNext()) { IntWritable value = values.next(); valueList.add(value); keyText.set(key.get() + "," + value.get()); output.collect(keyText,$); } //outputs all the 2 length pairs for(int i = 0; i < valueList.size(); i++) for(int j = i+1; i < valueList.size(); j++) output.collect(new Text(valueList.get(i).get() + "," + valueList.get(j).get()),key); } }
和我的工作配置:
JobConf conf = new JobConf(Triangles.class); conf.setJobName("mapred1"); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf,new Path(args[0])); FileOutputFormat.setOutputPath(conf,new Path("mapred1")); JobClient.runJob(conf);
解决方法
您的问题是您将Reduce类设置为组合器
conf.setCombinerClass(Reduce.class);
组合器在映射阶段运行,它们需要发出相同的键/值类型(在您的情况下为IntWriteable,IntWritable)删除这一行,你应该没问题