hadoop安装部署及Eclipse安装集成,这里不赘述了。
先说下业务需求吧,有个系统日志文件,记录系统的运行信息,其中包含DEBUG、INFO、WARN、ERROR四个级别的日志,现在想要看到所有级别各有多少条记录。
创建一个map/reduce项目,项目名为mapreducetest。在src下建立一个名为mapreducetest的包,然后建一个类名叫MapReduceTest,下面是代码。
package mapreducetest;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class MapReduceTest extends Configuration implements Tool{ /** * 配置 */ private Configuration configuration; /** * 获取配置 */ @Override public Configuration getConf() { return this.configuration; } /** * 设置配置 */ @Override public void setConf(Configuration arg0) { this.configuration=arg0; } /** * * @ClassName: Counter * @Description: TODO(计数器) * @author scc * @date 2015年5月27日 下午2:54:39 * */ enum Counter{ TIMER } /** * * @ClassName: Map * @Description: map实现,所有的map业务都在这里进行Mapper后的四个参数分别为,输入key类型,输入value类型,输出key类型,输出value类型 * @author scc * @date 2015年5月27日 下午2:30:06 * @ */ public static class Map extends Mapper{ /** * key:输入key * value:输入value * context:map上下文对象 * 说明,hdfs生成的所有键值对都会调用此方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try{ //得到日志每一行数据 String mapvalue=value.toString(); //日志具有固定格式,通过空格切分可以获得固定打下的string数组 String[] infos=mapvalue.split(" "); //时间在数组的第一列,日志级别在数据的第九列, String info=infos[10]; //调整数据格式(第一个参数为key,第二个参数为value),这里key和value都设置为日志级别 context.write(new Text(info), new Text(info)); }catch(Exception e){ //遇到错误是记录错误 context.getCounter(Counter.TIMER).increment(1); return; } } } /** * * @ClassName: Reduce * @Description: reduce处理类 ,Reducer四个参数,前两个是输入key和value的类型,必须和map一样,后两个是输出的key和value的类型 * @author scc * @date 2015年5月27日 下午3:33:06 * */ public static class Reduce extends Reducer { /** * 第一个参数输入的value,第二个参数是该key对应的所有的value集合,第三个是reducer的上下文 * 说明:与map不同的这里是对map处理后的数据进行的调用,当map处理后的key有重复时,这里传进来的key会是去重后的key,比方说在map里放进10个键值对, * 其中有五个key是key1,有五个是key2,那么在reduce的时候只会调用两次reduce,分别是key1和key2 */ @Override protected void reduce(Text key, Iterable values,Context arg2) throws IOException, InterruptedException { //获取当前遍历的key String info=key.toString(); //计数器 int count=0; //当值和key相同时计数器加1 for (Text text : values) { if(info.equals(text.toString())) count=count+1; } //将级别和对应的数据写出去 arg2.write(key, new Text(String.valueOf(count))); } } /** * run方法是一个入口 */ @Override public int run(String[] arg0) throws Exception { //建立一个job,并指定job的名称 Job job=Job.getInstance(getConf(), "maptest"); //指定job的类 job.setJarByClass(MapReduceTest.class); //设置日志文件路径(hdfs路径) FileInputFormat.setInputPaths(job, new Path(arg0[0])); //设置结果输出路径(hdfs路径) FileOutputFormat.setOutputPath(job, new Path(arg0[1])); //设置map处理类的class job.setMapperClass(Map.class); //设置reduce的class job.setReducerClass(Reduce.class); //设置输出格式化的类的class job.setOutputFormatClass(TextOutputFormat.class); //设置输出key的类型 job.setOutputKeyClass(Text.class); //设置输出value的类型 job.setOutputValueClass(Text.class); //设置等待job完成 job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws Exception { String[] args2=new String[2]; args2[0]="hdfs://192.168.1.55:9000/test2-in/singlemaptest.log"; args2[1]="hdfs://192.168.1.55:9000/test2-out"; int res=ToolRunner.run(new Configuration(), new MapReduceTest(), args2); System.exit(res); }}
下面是生成的结果:
INFO 3800WARN 55