Ubuntu+eclipse,进行Hadoop集群操作实例(数据去重+数据排序)

前端之家收集整理的这篇文章主要介绍了Ubuntu+eclipse,进行Hadoop集群操作实例(数据去重+数据排序)前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。


跑完WordCount程序后,想在Hadoop集群上在熟练几个程序,毕竟辛苦搭建好的集群,选择了简单的两个例子:数据去重+数据排序。这里记录下程序及运行中发生的一些问题及解决方案。


前提准备:1、Ubuntu16.0系统+eclipse;

2、Ubuntu server版本搭建的分布式集群系统(1台master,2台slaves);

3、eclipse所在的Ubuntu系统应该和集群系统配置SSH免密登录

参考例程序: 点击打开链接

实例1:数据去重

描述:在原始数据中出现次数超过一次的数据在输出文件中只出现一次

方法:哪个不能重复哪个设置成Key

原始数据:

1、原始数据

file1

2012-3-1 a

2012-3-2 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-7 c

2012-3-3 c

file2

2012-3-1 b

2012-3-2 a

2012-3-3 b

2012-3-4 d

2012-3-5 a

2012-3-6 c

2012-3-7 d

2012-3-3 c

数据输出

2012-3-1 a

2012-3-1 b

2012-3-2 a

2012-3-2 b

2012-3-3 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-6 c

2012-3-7 c

2012-3-7 d



设计思路:Hadoop的Map类中可以自动排序,统计key值相同的数据项的数目,并且存放在value值中,交由Reduce类处理。这里可以通过Map程序处理数据,然后交由Reduce程序,输出的时候只输出Key值,value值置空即可。


package example2_re;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class SJQC {

    //map将输入中的value复制到输出数据的key上,并直接输出

    public static class Map extends Mapper<Object,Text,Text>{
        private static Text line=new Text();  //读入每行数据
        //实现map函数
        public void map(Object key,Text value,Context context) 
                       throws IOException,InterruptedException{
            line=value;
            context.write(line,new Text(""));
        }

    }

   

    //reduce将输入中的key复制到输出数据的key上,并直接输出
    public static class Reduce extends Reducer<Text,Text>{
        //实现reduce函数
        public void reduce(Text key,Iterable<Text> values,Context context)
                throws IOException,InterruptedException{
            context.write(key,new Text(""));
        }
   }

   

    public static void main(String[] args) throws Exception{
        
      Configuration conf = new Configuration();
     
     Job job = new Job(conf,"Data Deduplication");
     job.setJarByClass(SJQC.class);

     

     //设置Map、Combine和Reduce处理类

     job.setMapperClass(Map.class);
     job.setCombinerClass(Reduce.class);
     job.setReducerClass(Reduce.class);

     

     //设置输出类型

     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);

     

     //设置输入和输出目录
     System.out.println("11111");
     FileInputFormat.addInputPaths(job,"hdfs://192.168.42.130:9000/input/example2_re");
     FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.42.130:9000/output/example2_re"));
     System.exit(job.waitForCompletion(true) ? 0 : 1);
     System.out.println("00000");
     }

}

实例2:数据排序

描述:对原始数据进行排序,利用map自带排序

原始数据:

file1

2

32

654

32

15

756

65223


file2

5956

22

650

92


file3

26

54

6


输出结果:(有重复数据)

1 2

2 6

3 15

4 22

5 26

6 32

7 32

8 54

9 92

10 650

11 654

12 756

13 5956

14 65223

设计思路:应用MapReduce进行自动处理,Reduce输出阶段将Key当做value值输出输出次数由value-list中的元素个数决定;key值用全局变量linenum来代替,从1开始每次递增,表示位次


package example3_DataSort;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class DataSort {

  //map将输入中的value化成IntWritable类型,作为输出的key

   public static class Map extends Mapper<Object,IntWritable,IntWritable>{

       private static IntWritable data=new IntWritable();

       //实现map函数

       public void map(Object key,Context context)
               throws IOException,InterruptedException{

           String line=value.toString();
           data.set(Integer.parseInt(line));
           context.write(data,new IntWritable(1));
       }
   }

 
   //reduce将输入中的key复制到输出数据的key上,
   //然后根据输入的value-list中元素的个数决定key的输出次数
   //用全局linenum来代表key的位次

   public static class Reduce extends

           Reducer<IntWritable,IntWritable>{
       private static IntWritable linenum = new IntWritable(1);

       //实现reduce函数

       public void reduce(IntWritable key,Iterable<IntWritable> values,InterruptedException{

           for(IntWritable val:values){
               context.write(linenum,key);
               linenum = new IntWritable(linenum.get()+1);
           }
}

}

   public static void main(String[] args) throws Exception{

       Configuration conf = new Configuration();

      // conf.set("mapred.job.tracker","192.168.42.130:9001");

    Job job = new Job(conf,"Data Sort");
    job.setJarByClass(DataSort.class);

    //设置Map和Reduce处理类

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    
    //设置输出类型

    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);

    //设置输入和输出目录
    FileInputFormat.addInputPaths(job,"hdfs://192.168.42.130:9000/input/example3_DataSort");
    FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.42.130:9000/output/example3_DataSort"));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

   }
}


运行中遇到的问题:

1、eclipse控制台看不到日志信息,提示log4j的warning,如下:

log4j:WARN No appenders could be foundforlogger(org.apache.Hadoop.metrics2.lib.MutableMetricsFactory).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN hadoop See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


说明没有配置log4j.properties文件。这虽然不影响程序的正常运行,但是看不到log日志,不能及时发现错误位置。 解决方法: 把Hadoop2.7.3的安装目录下面的/etc/hadoop/目录下面的log4j.properties文件拷贝放到MapReduce工程的src目录下面。但是每个工程都需要拷贝,暂时没有找到一次配置的方法

猜你在找的Ubuntu相关文章