Monday, February 2, 2015

NLineInputFormat in Java MapReduce - Sample code

With NLineInputFormat each mapper receives fixed number of lines of input, unlike TextInputFormat and KeyValueTextInputFormat. The number of lines of input to each mapper can be controlled by setting the property, mapreduce.input.lineinputformat.linespermap in new API and mapred.line.input.format.linespermap in old API. The default value is 1.

Alternatively, we can  also set as
conf.setInt(NLineInputFormat.LINES_PER_MAP, 1000); // sets N value to 1000

NLineInputFormat is used in applications that take a small amount of input data and run an extensive (that is, CPU-intensive) computation for it, then emit their output.

Lets try this with a simple MapReduce code that takes as input Employees file with 32161 records. In the code I have set N to 1000. So each mapper will have 1000 records and hence we will have total 33 mappers launched.


$ hadoop dfs -cat Employees | wc -l
32161


Data in the input file looks like this..

Name,Position Title,Department,Employee Annual Salary
"AARON,  ELVIA J",WATER RATE TAKER,WATER MGMNT,$87228.00
"AARON,  JEFFERY M",POLICE OFFICER,POLICE,$75372.00
"AARON,  KARINA",POLICE OFFICER,POLICE,$75372.00
"AARON,  KIMBERLEI R",CHIEF CONTRACT EXPEDITER,GENERAL SERVICES,$80916.00
"ABAD JR,  VICENTE M",CIVIL ENGINEER IV,WATER MGMNT,$99648.00

Here is the simple program  code would look like this

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class NlineEmp extends Configured implements Tool{
   
    public static class NLineMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        public void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{
            String a = value.toString();
            String[] b = a.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
           
            if (!(b[0].equals("Name")))
                context.write(new Text(b[0]), new Text(b[2]));
        }
    }

    public static class NLineReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException{
            for (Text values : value){
                context.write(key, values);
            }
        }
    }
   
/*    public static class NLineEmpInputFormat extends FileInputFormat<LongWritable,Text>{
        public static final String  LINES_PER_MAP = "mapreduce.input.lineinputformat.linespermap";
       
        public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, TaskAttemptContext context) throws IOException{
            context.setStatus(split);
            return new LineRecordReader();
        }
    } */
   
    public int run(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.setInt(NLineInputFormat.LINES_PER_MAP, 1000);
       
        Job job = new Job(conf,"NLine Input Format");
        job.setJarByClass(NlineEmp.class);

        job.setMapperClass(NLineMapper.class);
        job.setReducerClass(NLineReducer.class);
        job.setInputFormatClass(NLineInputFormat.class);
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
       
        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new NlineEmp(), args);
        System.exit(exitcode);
    }
}


Output of the program:

$ hadoop jar NlineEmp.jar NlineEmp Employees out2
15/02/02 13:19:59 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/02 13:20:00 INFO input.FileInputFormat: Total input paths to process : 1
15/02/02 13:20:01 INFO mapred.JobClient: Running job: job_201501301653_0078
15/02/02 13:20:02 INFO mapred.JobClient:  map 0% reduce 0%
15/02/02 13:20:12 INFO mapred.JobClient:  map 3% reduce 0%
15/02/02 13:22:12 INFO mapred.JobClient:  map 90% reduce 29%
15/02/02 13:22:18 INFO mapred.JobClient:  map 93% reduce 30%
15/02/02 13:22:22 INFO mapred.JobClient:  map 96% reduce 30%
15/02/02 13:22:26 INFO mapred.JobClient:  map 100% reduce 30%
15/02/02 13:22:27 INFO mapred.JobClient:  map 100% reduce 31%
15/02/02 13:22:32 INFO mapred.JobClient:  map 100% reduce 100%
15/02/02 13:22:34 INFO mapred.JobClient: Job complete: job_201501301653_0078
15/02/02 13:22:34 INFO mapred.JobClient: Counters: 28
15/02/02 13:22:34 INFO mapred.JobClient:   Map-Reduce Framework
15/02/02 13:22:34 INFO mapred.JobClient:     Spilled Records=64320
15/02/02 13:22:34 INFO mapred.JobClient:     Map output materialized bytes=995670
15/02/02 13:22:34 INFO mapred.JobClient:     Reduce input records=32160
15/02/02 13:22:34 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=62696329216
15/02/02 13:22:34 INFO mapred.JobClient:     Map input records=32161
15/02/02 13:22:34 INFO mapred.JobClient:     SPLIT_RAW_BYTES=3564
15/02/02 13:22:34 INFO mapred.JobClient:     Map output bytes=931152
15/02/02 13:22:34 INFO mapred.JobClient:     Reduce shuffle bytes=995670
15/02/02 13:22:34 INFO mapred.JobClient:     Physical memory (bytes) snapshot=6016815104
15/02/02 13:22:34 INFO mapred.JobClient:     Reduce input groups=31890
15/02/02 13:22:34 INFO mapred.JobClient:     Combine output records=0
15/02/02 13:22:34 INFO mapred.JobClient:     Reduce output records=32160
15/02/02 13:22:34 INFO mapred.JobClient:     Map output records=32160
15/02/02 13:22:34 INFO mapred.JobClient:     Combine input records=0
15/02/02 13:22:34 INFO mapred.JobClient:     CPU time spent (ms)=17740
15/02/02 13:22:34 INFO mapred.JobClient:     Total committed heap usage (bytes)=4394717184
15/02/02 13:22:34 INFO mapred.JobClient:   File Input Format Counters
15/02/02 13:22:34 INFO mapred.JobClient:     Bytes Read=1967149
15/02/02 13:22:34 INFO mapred.JobClient:   FileSystemCounters
15/02/02 13:22:34 INFO mapred.JobClient:     HDFS_BYTES_READ=1970713
15/02/02 13:22:34 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=3881016
15/02/02 13:22:34 INFO mapred.JobClient:     FILE_BYTES_READ=995478
15/02/02 13:22:34 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=931152
15/02/02 13:22:34 INFO mapred.JobClient:   Job Counters
15/02/02 13:22:34 INFO mapred.JobClient:     Launched map tasks=33
15/02/02 13:22:34 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/02 13:22:34 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=136007
15/02/02 13:22:34 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/02 13:22:34 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=274677
15/02/02 13:22:34 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/02 13:22:34 INFO mapred.JobClient:   File Output Format Counters
15/02/02 13:22:34 INFO mapred.JobClient:     Bytes Written=931152



Notice that the Launched map tasks = 33 in the job output.

2 comments:

  1. Hi,
    Very nice post,thank you for shring this article.
    keep updating...

    big data online training

    Hadoop admin training

    ReplyDelete
  2. Experience the great learning experience with Infycle Technologies, and feach amazing training in DevOps Training in Chennai and we also offer other technical courses like Cyber Security, Graphic Design and Animation, Block Security, Java, Cyber Security, Oracle, Python, Big data, Azure, Python, Manual and Automation Testing, DevOps, Medical Coding etc., and we also provide best technical trainers with excellent training 100+ Live Practical Sessions with Real-Time scenarios at the end of the course the freshers, experienced, and Tech professionals will be able to obtain more knowledge of the course and be able to get through the interviews on top MNC’s with an amazing package. For more details ring us up on 7504633633, 7502633633.

    ReplyDelete

Amazon S3: Basic Concepts

Amazon S3 is an reliable, scalable, online object storage that stores files. Bucket: A bucket is a container in Amazon S3 where the fil...