Sunday, February 1, 2015

CombineFileInputFormat - Mapreduce code to handle many small files

CombineFileInputFormat is an abstract class without any concrete classes, unlike FileInputFormat. To have CombineFileInputFormat equivalent of TextInputFormat, we need to create a concrete subclass of CombineFileInputFormat and implement the getRecordReader() method.

CombineFileInputFormat combines many small files into a split, providing more data for each map task. It also considers data locality so performance is not compromised.

Lets try a mapreduce program which combines many small emp files by using CombineFileInputFormat.

Sample data in the input files would look like this

Name,Position Title,Department,Employee Annual Salary
"AARON,  ELVIA J",WATER RATE TAKER,WATER MGMNT,$87228.00
"AARON,  JEFFERY M",POLICE OFFICER,POLICE,$75372.00
"AARON,  KARINA",POLICE OFFICER,POLICE,$75372.00
"AARON,  KIMBERLEI R",CHIEF CONTRACT EXPEDITER,GENERAL SERVICES,$80916.00
"ABAD JR,  VICENTE M",CIVIL ENGINEER IV,WATER MGMNT,$99648.00
"ABARCA,  ANABEL",ASST TO THE ALDERMAN,CITY COUNCIL,$70764.00
"ABARCA,  EMMANUEL",GENERAL LABORER - DSS,STREETS & SAN,$40560.00
"ABBATACOLA,  ROBERT J",ELECTRICAL MECHANIC,AVIATION,$89440.00
"ABBATEMARCO,  JAMES J",FIRE ENGINEER,FIRE,$84396.00
"ABBATE,  TERRY M",POLICE OFFICER,POLICE,$80724.00
"ABBOTT,  BETTY L",FOSTER GRANDPARENT,FAMILY & SUPPORT,$2756.00
"ABBOTT,  LYNISE M",CLERK III,POLICE,$41784.00


I have created five small emp files and placed them under emp directory in HDFS.

 $ hadoop dfs -lsr emp
-rw-r--r--   1 swetha supergroup        643 2015-02-01 14:29 /user/swetha/emp/emp1
-rw-r--r--   1 swetha supergroup        542 2015-02-01 14:29 /user/swetha/emp/emp2
-rw-r--r--   1 swetha supergroup        629 2015-02-01 14:29 /user/swetha/emp/emp3
-rw-r--r--   1 swetha supergroup        596 2015-02-01 14:29 /user/swetha/emp/emp4
-rw-r--r--   1 swetha supergroup    1866851 2015-02-01 22:51 /user/swetha/emp/emp5


In the mapreduce code we need to set mapred.max.split.size. This determines the size of Input split that a mapper has to handle.

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;

public class combileSmallEmpFiles extends Configured implements Tool{

    public static class combineMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        public void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{
            String a = value.toString();
            String[] b = a.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
           
            if (!(b[0].equals("Name")))
                context.write(new Text(b[0]), new Text(b[2]));
        }
    }
   
    public static class combineReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException{
            for (Text values : value){
                context.write(key, values);
            }
        }
    }
   
    public static class combineEmpInputFormat extends CombineFileInputFormat<LongWritable,Text>{
       
        @Override
        public RecordReader<LongWritable,Text> createRecordReader(InputSplit split,TaskAttemptContext context)
            throws IOException{
            CombineFileRecordReader<LongWritable,Text> reader = new CombineFileRecordReader<LongWritable,Text>(
                    (CombineFileSplit) split, context, myCombineFileRecordReader.class);
            return reader;
        }
    }
   
    public static class myCombineFileRecordReader extends RecordReader<LongWritable,Text>{
        private LineRecordReader lineRecordReader = new LineRecordReader();
       
        public myCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
            throws IOException{
            FileSplit fileSplit = new FileSplit(split.getPath(index),split.getOffset(index),
                    split.getLength(index), split.getLocations());
            lineRecordReader.initialize(fileSplit, context);
        }
       
       
        @Override
        public void initialize(InputSplit split,TaskAttemptContext context)
            throws IOException, InterruptedException{
            //
        }
       
        @Override
        public void close() throws IOException{
            lineRecordReader.close();
        }
       
        @Override
        public float getProgress() throws IOException{
            return lineRecordReader.getProgress();
        }
       
        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException{
            return lineRecordReader.getCurrentKey();
        }
       
        @Override
        public Text getCurrentValue() throws IOException,InterruptedException{
            return lineRecordReader.getCurrentValue();
        }
       
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException{
            return lineRecordReader.nextKeyValue();
        }
       
    }
    public int run(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.set("mapred.max.split.size", "1048576"); //1MB
       
        Job job = new Job(conf,"Combine small files");
        job.setJarByClass(combileSmallEmpFiles.class);

        job.setMapperClass(combineMapper.class);
        job.setReducerClass(combineReducer.class);
        job.setInputFormatClass(combineEmpInputFormat.class);
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
       
        return job.waitForCompletion(true) ? 0:1;
    }
   
    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new combileSmallEmpFiles(), args);
        System.exit(exitcode);
    }
}


Output would look like this

$ hadoop jar combineSmallEmpFiles.jar combileSmallEmpFiles emp out1
15/02/01 23:12:37 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/01 23:12:38 INFO input.FileInputFormat: Total input paths to process : 5
15/02/01 23:12:38 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/02/01 23:12:38 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/01 23:12:38 INFO mapred.JobClient: Running job: job_201501301653_0077
15/02/01 23:12:39 INFO mapred.JobClient:  map 0% reduce 0%
15/02/01 23:12:52 INFO mapred.JobClient:  map 100% reduce 0%
15/02/01 23:13:03 INFO mapred.JobClient:  map 100% reduce 33%
15/02/01 23:13:05 INFO mapred.JobClient:  map 100% reduce 100%
15/02/01 23:13:09 INFO mapred.JobClient: Job complete: job_201501301653_0077
15/02/01 23:13:09 INFO mapred.JobClient: Counters: 28
15/02/01 23:13:09 INFO mapred.JobClient:   Map-Reduce Framework
15/02/01 23:13:09 INFO mapred.JobClient:     Spilled Records=64398
15/02/01 23:13:09 INFO mapred.JobClient:     Map output materialized bytes=996759
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce input records=32199
15/02/01 23:13:09 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3702489088
15/02/01 23:13:09 INFO mapred.JobClient:     Map input records=32201
15/02/01 23:13:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=370
15/02/01 23:13:09 INFO mapred.JobClient:     Map output bytes=932355
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce shuffle bytes=996759
15/02/01 23:13:09 INFO mapred.JobClient:     Physical memory (bytes) snapshot=262676480
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce input groups=31890
15/02/01 23:13:09 INFO mapred.JobClient:     Combine output records=0
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce output records=32199
15/02/01 23:13:09 INFO mapred.JobClient:     Map output records=32199
15/02/01 23:13:09 INFO mapred.JobClient:     Combine input records=0
15/02/01 23:13:09 INFO mapred.JobClient:     CPU time spent (ms)=2890
15/02/01 23:13:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=164630528
15/02/01 23:13:09 INFO mapred.JobClient:   File Input Format Counters
15/02/01 23:13:09 INFO mapred.JobClient:     Bytes Read=0
15/02/01 23:13:09 INFO mapred.JobClient:   FileSystemCounters
15/02/01 23:13:09 INFO mapred.JobClient:     HDFS_BYTES_READ=1869631
15/02/01 23:13:09 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=2104561
15/02/01 23:13:09 INFO mapred.JobClient:     FILE_BYTES_READ=996759
15/02/01 23:13:09 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=932355
15/02/01 23:13:09 INFO mapred.JobClient:   Job Counters
15/02/01 23:13:09 INFO mapred.JobClient:     Launched map tasks=1
15/02/01 23:13:09 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/01 23:13:09 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=13302
15/02/01 23:13:09 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/01 23:13:09 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=16469
15/02/01 23:13:09 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/01 23:13:09 INFO mapred.JobClient:   File Output Format Counters
15/02/01 23:13:09 INFO mapred.JobClient:     Bytes Written=932355


1 comment:

  1. Very nice article post,Thank you for sharing this awesome blog.
    keep updating more big data hadoop tutorials.

    Big Data Online Training

    ReplyDelete

Amazon S3: Basic Concepts

Amazon S3 is an reliable, scalable, online object storage that stores files. Bucket: A bucket is a container in Amazon S3 where the fil...