Monday, February 2, 2015

Packaging small files into SequenceFile by using WholeFileInputFormat and WholeFileRecordReader

Sometimes we would want to process whole file as input to mapreduce. This can be achieved by

1. Not splitting the File
2. Having RecordReader that delivers the file contents as value of record.

Lets demonstrate this by creating a mapreduce job for packaging small files into sequence files where key is the name of the file and value is the contents of the whole file.

"wholeFileInputFormat" class in the below code shows a way of doing this. "wholeFileInputFormat" class takes NullWritable as key and BytesWritable as value, where value is the contents of the file. The isSplitable() method returns false which specifies that input file should not be split. The second overriden method createRecordReader() returns a custom implementation of RecordReader.

 The name of the file can be derived from context object in the mapper. It does this by casting InputSplit from the context to FileSplit and then using getPath() method. The outputformat is SequenceFileOutputFormat, since we need to store them as Sequence Files.

The number of map tasks launched depends on the number of input files to package. In our example, we are trying to package 5 small employee files, so we would have 5 mappers launched even if the input file size is smaller than block size. 

Lets look at the input files:

$ hadoop dfs -lsr emp
-rw-r--r--   1 swetha supergroup        643 2015-02-01 14:29 /user/swetha/emp/emp1
-rw-r--r--   1 swetha supergroup        542 2015-02-01 14:29 /user/swetha/emp/emp2
-rw-r--r--   1 swetha supergroup        629 2015-02-01 14:29 /user/swetha/emp/emp3
-rw-r--r--   1 swetha supergroup        596 2015-02-01 14:29 /user/swetha/emp/emp4
-rw-r--r--   1 swetha supergroup    1866851 2015-02-01 22:51 /user/swetha/emp/emp5


Code:

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class wholeFileReader extends Configured implements Tool{
   
    public static class wholeFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
       
        private Text fileName;
       
        @Override
        protected void setup(Context context) throws IOException, InterruptedException{
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            fileName = new Text(path.toString());
        }
       
        @Override
        protected void map(NullWritable key, BytesWritable value,Context context)
                throws IOException, InterruptedException{
            context.write(fileName, value);
        }
    }

    public static class wholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
       
        @Override
        protected boolean isSplitable(JobContext context,Path file){
            return false;
        }
       
        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException,InterruptedException{
            wholeFileRecordReader reader = new wholeFileRecordReader();
            reader.initialize(split,context);
            return reader;
        }
    }
   
    public static class wholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
        private FileSplit fileSplit;
        private Configuration conf;
        private BytesWritable value = new BytesWritable();
        private boolean processed = false;
       
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException,InterruptedException{
            this.fileSplit = (FileSplit) split;
            this.conf = context.getConfiguration();
        }
       
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException{
            if (!processed){
                byte[] contents = new byte[(int) fileSplit.getLength()];
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                FSDataInputStream in = null;
                try{
                    in = fs.open(file);
                    IOUtils.readFully(in, contents, 0, contents.length);
                    value.set(contents,0,contents.length);
                }finally{
                    IOUtils.closeStream(in);
                }
                processed = true;
                return true;
            }
            return false;
        }
       
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException{
            return NullWritable.get();
        }
       
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException{
            return value;
        }
       
        @Override
        public float getProgress() throws IOException{
            return processed ? 1.0f : 0.0f;
        }
       
        @Override
        public void close() throws IOException{
            // do nothing
        }
    }
   
    public int run(String[] args) throws Exception{
        Configuration conf = new Configuration();
       
        Job job = new Job(conf,"Whole File Input Format");
        job.setJarByClass(wholeFileReader.class);

        job.setMapperClass(wholeFileMapper.class);
        job.setInputFormatClass(wholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
       
        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new wholeFileReader(), args);
        System.exit(exitcode);
    }
}


Output :

$ hadoop jar wholeFileReader.jar wholeFileReader emp sequencefile
15/02/02 19:04:07 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/02 19:04:08 INFO input.FileInputFormat: Total input paths to process : 5
15/02/02 19:04:09 INFO mapred.JobClient: Running job: job_201501301653_0084
15/02/02 19:04:10 INFO mapred.JobClient:  map 0% reduce 0%
15/02/02 19:04:22 INFO mapred.JobClient:  map 40% reduce 0%
15/02/02 19:04:35 INFO mapred.JobClient:  map 80% reduce 0%
15/02/02 19:04:42 INFO mapred.JobClient:  map 100% reduce 0%
15/02/02 19:04:44 INFO mapred.JobClient:  map 100% reduce 33%
15/02/02 19:04:46 INFO mapred.JobClient:  map 100% reduce 100%
15/02/02 19:04:50 INFO mapred.JobClient: Job complete: job_201501301653_0084
15/02/02 19:04:50 INFO mapred.JobClient: Counters: 29
15/02/02 19:04:50 INFO mapred.JobClient:   Map-Reduce Framework
15/02/02 19:04:50 INFO mapred.JobClient:     Spilled Records=10
15/02/02 19:04:50 INFO mapred.JobClient:     Map output materialized bytes=1869547
15/02/02 19:04:50 INFO mapred.JobClient:     Reduce input records=5
15/02/02 19:04:50 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=11063402496
15/02/02 19:04:50 INFO mapred.JobClient:     Map input records=5
15/02/02 19:04:50 INFO mapred.JobClient:     SPLIT_RAW_BYTES=535
15/02/02 19:04:50 INFO mapred.JobClient:     Map output bytes=1869496
15/02/02 19:04:50 INFO mapred.JobClient:     Reduce shuffle bytes=1869547
15/02/02 19:04:50 INFO mapred.JobClient:     Physical memory (bytes) snapshot=930562048
15/02/02 19:04:50 INFO mapred.JobClient:     Reduce input groups=5
15/02/02 19:04:50 INFO mapred.JobClient:     Combine output records=0
15/02/02 19:04:50 INFO mapred.JobClient:     Reduce output records=5
15/02/02 19:04:50 INFO mapred.JobClient:     Map output records=5
15/02/02 19:04:50 INFO mapred.JobClient:     Combine input records=0
15/02/02 19:04:50 INFO mapred.JobClient:     CPU time spent (ms)=2820
15/02/02 19:04:50 INFO mapred.JobClient:     Total committed heap usage (bytes)=693391360
15/02/02 19:04:50 INFO mapred.JobClient:   File Input Format Counters
15/02/02 19:04:50 INFO mapred.JobClient:     Bytes Read=1869261
15/02/02 19:04:50 INFO mapred.JobClient:   FileSystemCounters
15/02/02 19:04:50 INFO mapred.JobClient:     HDFS_BYTES_READ=1869796
15/02/02 19:04:50 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=4072689
15/02/02 19:04:50 INFO mapred.JobClient:     FILE_BYTES_READ=1869523
15/02/02 19:04:50 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=1869643
15/02/02 19:04:50 INFO mapred.JobClient:   Job Counters
15/02/02 19:04:50 INFO mapred.JobClient:     Launched map tasks=5
15/02/02 19:04:50 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/02 19:04:50 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=24378
15/02/02 19:04:50 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/02 19:04:50 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=55834
15/02/02 19:04:50 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/02 19:04:50 INFO mapred.JobClient:     Data-local map tasks=5
15/02/02 19:04:50 INFO mapred.JobClient:   File Output Format Counters
15/02/02 19:04:50 INFO mapred.JobClient:     Bytes Written=1869643


When we look at the Launched map tasks counter, it shows as 5 which means a mapper has been launched for each input file. Having one file per mapper is inefficient, so this process could be improved by using CombinedFileInputFormat instead of FileInputFormat. Please refer to my another post on how to use CombineFileInputFormat.

http://hadoopsupport.blogspot.com/2015/02/combinefileinputformat-mapreduce-code.html

Data in the output file can be inspected with -text option. Sample output will look like this.

$ hadoop dfs -text sequencefile/part*
hdfs://localhost:9000/user/swetha/emp/emp1      4e 61 6d 65 2c 50 6f 73 69 74 69 6f 6e 20 54 69 74 6c 65 2c 44 65 70 61 72 74 6d 65 6e 74 2c 45 6d 70 6c 6f 79 65 65 20 41 6e 6e 75 61 6c 20 53 61 6c 61 72 79 0a 22 41 41 52 4f 4e 2c 20 20 45 4c 56 49 41 20 4a 22 2c 57 41 54 45 52 20 52 41 54 45 20 54 41 4b 45 52 2c 57 41 54 45 52 20 4d 47 4d 4e 54 2c 24 38 37 32 32 38 2e 30 30 0a 22 41 41 52 4f 4e 2c 20 20 4a 45 46 46 
hdfs://localhost:9000/user/swetha/emp/emp2      22 41 42 42 4f 54 54 2c 20 20 42 45 54 54 59 20 4c 22 2c 46 4f 53 54 45 52 20 47 52 41 4e 44 50 41 52 45 4e 54 2c 46 41 4d 49 4c 59 20 26 20 53 55 50 50 4f 52 54 2c 24 32 37 35 36 2e 30 30 0a 22 41 42 42 4f 54 54 2c 20 20 4c 59 4e 49 53 45 20 4d 22 2c 43 4c 45 52 4b 20 49 49 49 2c 50 4f 4c 49 43 45 2c 24 34 31 37 38 34 2e 30 30 0a 22 41 42 42 52 55 5a 5a 45 53 45 2c 20 20 57 49

NLineInputFormat in Java MapReduce - Sample code

With NLineInputFormat each mapper receives fixed number of lines of input, unlike TextInputFormat and KeyValueTextInputFormat. The number of lines of input to each mapper can be controlled by setting the property, mapreduce.input.lineinputformat.linespermap in new API and mapred.line.input.format.linespermap in old API. The default value is 1.

Alternatively, we can  also set as
conf.setInt(NLineInputFormat.LINES_PER_MAP, 1000); // sets N value to 1000

NLineInputFormat is used in applications that take a small amount of input data and run an extensive (that is, CPU-intensive) computation for it, then emit their output.

Lets try this with a simple MapReduce code that takes as input Employees file with 32161 records. In the code I have set N to 1000. So each mapper will have 1000 records and hence we will have total 33 mappers launched.


$ hadoop dfs -cat Employees | wc -l
32161


Data in the input file looks like this..

Name,Position Title,Department,Employee Annual Salary
"AARON,  ELVIA J",WATER RATE TAKER,WATER MGMNT,$87228.00
"AARON,  JEFFERY M",POLICE OFFICER,POLICE,$75372.00
"AARON,  KARINA",POLICE OFFICER,POLICE,$75372.00
"AARON,  KIMBERLEI R",CHIEF CONTRACT EXPEDITER,GENERAL SERVICES,$80916.00
"ABAD JR,  VICENTE M",CIVIL ENGINEER IV,WATER MGMNT,$99648.00

Here is the simple program  code would look like this

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class NlineEmp extends Configured implements Tool{
   
    public static class NLineMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        public void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{
            String a = value.toString();
            String[] b = a.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
           
            if (!(b[0].equals("Name")))
                context.write(new Text(b[0]), new Text(b[2]));
        }
    }

    public static class NLineReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException{
            for (Text values : value){
                context.write(key, values);
            }
        }
    }
   
/*    public static class NLineEmpInputFormat extends FileInputFormat<LongWritable,Text>{
        public static final String  LINES_PER_MAP = "mapreduce.input.lineinputformat.linespermap";
       
        public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, TaskAttemptContext context) throws IOException{
            context.setStatus(split);
            return new LineRecordReader();
        }
    } */
   
    public int run(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.setInt(NLineInputFormat.LINES_PER_MAP, 1000);
       
        Job job = new Job(conf,"NLine Input Format");
        job.setJarByClass(NlineEmp.class);

        job.setMapperClass(NLineMapper.class);
        job.setReducerClass(NLineReducer.class);
        job.setInputFormatClass(NLineInputFormat.class);
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
       
        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new NlineEmp(), args);
        System.exit(exitcode);
    }
}


Output of the program:

$ hadoop jar NlineEmp.jar NlineEmp Employees out2
15/02/02 13:19:59 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/02 13:20:00 INFO input.FileInputFormat: Total input paths to process : 1
15/02/02 13:20:01 INFO mapred.JobClient: Running job: job_201501301653_0078
15/02/02 13:20:02 INFO mapred.JobClient:  map 0% reduce 0%
15/02/02 13:20:12 INFO mapred.JobClient:  map 3% reduce 0%
15/02/02 13:22:12 INFO mapred.JobClient:  map 90% reduce 29%
15/02/02 13:22:18 INFO mapred.JobClient:  map 93% reduce 30%
15/02/02 13:22:22 INFO mapred.JobClient:  map 96% reduce 30%
15/02/02 13:22:26 INFO mapred.JobClient:  map 100% reduce 30%
15/02/02 13:22:27 INFO mapred.JobClient:  map 100% reduce 31%
15/02/02 13:22:32 INFO mapred.JobClient:  map 100% reduce 100%
15/02/02 13:22:34 INFO mapred.JobClient: Job complete: job_201501301653_0078
15/02/02 13:22:34 INFO mapred.JobClient: Counters: 28
15/02/02 13:22:34 INFO mapred.JobClient:   Map-Reduce Framework
15/02/02 13:22:34 INFO mapred.JobClient:     Spilled Records=64320
15/02/02 13:22:34 INFO mapred.JobClient:     Map output materialized bytes=995670
15/02/02 13:22:34 INFO mapred.JobClient:     Reduce input records=32160
15/02/02 13:22:34 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=62696329216
15/02/02 13:22:34 INFO mapred.JobClient:     Map input records=32161
15/02/02 13:22:34 INFO mapred.JobClient:     SPLIT_RAW_BYTES=3564
15/02/02 13:22:34 INFO mapred.JobClient:     Map output bytes=931152
15/02/02 13:22:34 INFO mapred.JobClient:     Reduce shuffle bytes=995670
15/02/02 13:22:34 INFO mapred.JobClient:     Physical memory (bytes) snapshot=6016815104
15/02/02 13:22:34 INFO mapred.JobClient:     Reduce input groups=31890
15/02/02 13:22:34 INFO mapred.JobClient:     Combine output records=0
15/02/02 13:22:34 INFO mapred.JobClient:     Reduce output records=32160
15/02/02 13:22:34 INFO mapred.JobClient:     Map output records=32160
15/02/02 13:22:34 INFO mapred.JobClient:     Combine input records=0
15/02/02 13:22:34 INFO mapred.JobClient:     CPU time spent (ms)=17740
15/02/02 13:22:34 INFO mapred.JobClient:     Total committed heap usage (bytes)=4394717184
15/02/02 13:22:34 INFO mapred.JobClient:   File Input Format Counters
15/02/02 13:22:34 INFO mapred.JobClient:     Bytes Read=1967149
15/02/02 13:22:34 INFO mapred.JobClient:   FileSystemCounters
15/02/02 13:22:34 INFO mapred.JobClient:     HDFS_BYTES_READ=1970713
15/02/02 13:22:34 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=3881016
15/02/02 13:22:34 INFO mapred.JobClient:     FILE_BYTES_READ=995478
15/02/02 13:22:34 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=931152
15/02/02 13:22:34 INFO mapred.JobClient:   Job Counters
15/02/02 13:22:34 INFO mapred.JobClient:     Launched map tasks=33
15/02/02 13:22:34 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/02 13:22:34 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=136007
15/02/02 13:22:34 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/02 13:22:34 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=274677
15/02/02 13:22:34 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/02 13:22:34 INFO mapred.JobClient:   File Output Format Counters
15/02/02 13:22:34 INFO mapred.JobClient:     Bytes Written=931152



Notice that the Launched map tasks = 33 in the job output.

Sunday, February 1, 2015

CombineFileInputFormat - Mapreduce code to handle many small files

CombineFileInputFormat is an abstract class without any concrete classes, unlike FileInputFormat. To have CombineFileInputFormat equivalent of TextInputFormat, we need to create a concrete subclass of CombineFileInputFormat and implement the getRecordReader() method.

CombineFileInputFormat combines many small files into a split, providing more data for each map task. It also considers data locality so performance is not compromised.

Lets try a mapreduce program which combines many small emp files by using CombineFileInputFormat.

Sample data in the input files would look like this

Name,Position Title,Department,Employee Annual Salary
"AARON,  ELVIA J",WATER RATE TAKER,WATER MGMNT,$87228.00
"AARON,  JEFFERY M",POLICE OFFICER,POLICE,$75372.00
"AARON,  KARINA",POLICE OFFICER,POLICE,$75372.00
"AARON,  KIMBERLEI R",CHIEF CONTRACT EXPEDITER,GENERAL SERVICES,$80916.00
"ABAD JR,  VICENTE M",CIVIL ENGINEER IV,WATER MGMNT,$99648.00
"ABARCA,  ANABEL",ASST TO THE ALDERMAN,CITY COUNCIL,$70764.00
"ABARCA,  EMMANUEL",GENERAL LABORER - DSS,STREETS & SAN,$40560.00
"ABBATACOLA,  ROBERT J",ELECTRICAL MECHANIC,AVIATION,$89440.00
"ABBATEMARCO,  JAMES J",FIRE ENGINEER,FIRE,$84396.00
"ABBATE,  TERRY M",POLICE OFFICER,POLICE,$80724.00
"ABBOTT,  BETTY L",FOSTER GRANDPARENT,FAMILY & SUPPORT,$2756.00
"ABBOTT,  LYNISE M",CLERK III,POLICE,$41784.00


I have created five small emp files and placed them under emp directory in HDFS.

 $ hadoop dfs -lsr emp
-rw-r--r--   1 swetha supergroup        643 2015-02-01 14:29 /user/swetha/emp/emp1
-rw-r--r--   1 swetha supergroup        542 2015-02-01 14:29 /user/swetha/emp/emp2
-rw-r--r--   1 swetha supergroup        629 2015-02-01 14:29 /user/swetha/emp/emp3
-rw-r--r--   1 swetha supergroup        596 2015-02-01 14:29 /user/swetha/emp/emp4
-rw-r--r--   1 swetha supergroup    1866851 2015-02-01 22:51 /user/swetha/emp/emp5


In the mapreduce code we need to set mapred.max.split.size. This determines the size of Input split that a mapper has to handle.

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;

public class combileSmallEmpFiles extends Configured implements Tool{

    public static class combineMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        public void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{
            String a = value.toString();
            String[] b = a.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
           
            if (!(b[0].equals("Name")))
                context.write(new Text(b[0]), new Text(b[2]));
        }
    }
   
    public static class combineReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException{
            for (Text values : value){
                context.write(key, values);
            }
        }
    }
   
    public static class combineEmpInputFormat extends CombineFileInputFormat<LongWritable,Text>{
       
        @Override
        public RecordReader<LongWritable,Text> createRecordReader(InputSplit split,TaskAttemptContext context)
            throws IOException{
            CombineFileRecordReader<LongWritable,Text> reader = new CombineFileRecordReader<LongWritable,Text>(
                    (CombineFileSplit) split, context, myCombineFileRecordReader.class);
            return reader;
        }
    }
   
    public static class myCombineFileRecordReader extends RecordReader<LongWritable,Text>{
        private LineRecordReader lineRecordReader = new LineRecordReader();
       
        public myCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
            throws IOException{
            FileSplit fileSplit = new FileSplit(split.getPath(index),split.getOffset(index),
                    split.getLength(index), split.getLocations());
            lineRecordReader.initialize(fileSplit, context);
        }
       
       
        @Override
        public void initialize(InputSplit split,TaskAttemptContext context)
            throws IOException, InterruptedException{
            //
        }
       
        @Override
        public void close() throws IOException{
            lineRecordReader.close();
        }
       
        @Override
        public float getProgress() throws IOException{
            return lineRecordReader.getProgress();
        }
       
        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException{
            return lineRecordReader.getCurrentKey();
        }
       
        @Override
        public Text getCurrentValue() throws IOException,InterruptedException{
            return lineRecordReader.getCurrentValue();
        }
       
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException{
            return lineRecordReader.nextKeyValue();
        }
       
    }
    public int run(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.set("mapred.max.split.size", "1048576"); //1MB
       
        Job job = new Job(conf,"Combine small files");
        job.setJarByClass(combileSmallEmpFiles.class);

        job.setMapperClass(combineMapper.class);
        job.setReducerClass(combineReducer.class);
        job.setInputFormatClass(combineEmpInputFormat.class);
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
       
        return job.waitForCompletion(true) ? 0:1;
    }
   
    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new combileSmallEmpFiles(), args);
        System.exit(exitcode);
    }
}


Output would look like this

$ hadoop jar combineSmallEmpFiles.jar combileSmallEmpFiles emp out1
15/02/01 23:12:37 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/01 23:12:38 INFO input.FileInputFormat: Total input paths to process : 5
15/02/01 23:12:38 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/02/01 23:12:38 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/01 23:12:38 INFO mapred.JobClient: Running job: job_201501301653_0077
15/02/01 23:12:39 INFO mapred.JobClient:  map 0% reduce 0%
15/02/01 23:12:52 INFO mapred.JobClient:  map 100% reduce 0%
15/02/01 23:13:03 INFO mapred.JobClient:  map 100% reduce 33%
15/02/01 23:13:05 INFO mapred.JobClient:  map 100% reduce 100%
15/02/01 23:13:09 INFO mapred.JobClient: Job complete: job_201501301653_0077
15/02/01 23:13:09 INFO mapred.JobClient: Counters: 28
15/02/01 23:13:09 INFO mapred.JobClient:   Map-Reduce Framework
15/02/01 23:13:09 INFO mapred.JobClient:     Spilled Records=64398
15/02/01 23:13:09 INFO mapred.JobClient:     Map output materialized bytes=996759
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce input records=32199
15/02/01 23:13:09 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3702489088
15/02/01 23:13:09 INFO mapred.JobClient:     Map input records=32201
15/02/01 23:13:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=370
15/02/01 23:13:09 INFO mapred.JobClient:     Map output bytes=932355
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce shuffle bytes=996759
15/02/01 23:13:09 INFO mapred.JobClient:     Physical memory (bytes) snapshot=262676480
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce input groups=31890
15/02/01 23:13:09 INFO mapred.JobClient:     Combine output records=0
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce output records=32199
15/02/01 23:13:09 INFO mapred.JobClient:     Map output records=32199
15/02/01 23:13:09 INFO mapred.JobClient:     Combine input records=0
15/02/01 23:13:09 INFO mapred.JobClient:     CPU time spent (ms)=2890
15/02/01 23:13:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=164630528
15/02/01 23:13:09 INFO mapred.JobClient:   File Input Format Counters
15/02/01 23:13:09 INFO mapred.JobClient:     Bytes Read=0
15/02/01 23:13:09 INFO mapred.JobClient:   FileSystemCounters
15/02/01 23:13:09 INFO mapred.JobClient:     HDFS_BYTES_READ=1869631
15/02/01 23:13:09 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=2104561
15/02/01 23:13:09 INFO mapred.JobClient:     FILE_BYTES_READ=996759
15/02/01 23:13:09 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=932355
15/02/01 23:13:09 INFO mapred.JobClient:   Job Counters
15/02/01 23:13:09 INFO mapred.JobClient:     Launched map tasks=1
15/02/01 23:13:09 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/01 23:13:09 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=13302
15/02/01 23:13:09 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/01 23:13:09 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=16469
15/02/01 23:13:09 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/01 23:13:09 INFO mapred.JobClient:   File Output Format Counters
15/02/01 23:13:09 INFO mapred.JobClient:     Bytes Written=932355


Secondary sort technique in Mapreduce : List all departments with employee name in ascending order and salary in descending order

Here is the employee table and the sample data:

Emp
Name,Position Title,Department,Employee Annual Salary
"AARON,  ELVIA J",WATER RATE TAKER,WATER MGMNT,$87228.00
"AARON,  JEFFERY M",POLICE OFFICER,POLICE,$75372.00
"AARON,  KARINA",POLICE OFFICER,POLICE,$75372.00
"AARON,  KIMBERLEI R",CHIEF CONTRACT EXPEDITER,GENERAL SERVICES,$80916.00
"ABAD JR,  VICENTE M",CIVIL ENGINEER IV,WATER MGMNT,$99648.00
"ABARCA,  ANABEL",ASST TO THE ALDERMAN,CITY COUNCIL,$70764.00
"ABARCA,  EMMANUEL",GENERAL LABORER - DSS,STREETS & SAN,$40560.00
"ABBATACOLA,  ROBERT J",ELECTRICAL MECHANIC,AVIATION,$89440.00
"ABBATEMARCO,  JAMES J",FIRE ENGINEER,FIRE,$84396.00


The input to the mapreduce is the employee data. The output will be the list of departments with employee name in ascending order and salary in descending order.

To acheive this, we need to do the following:
1. Here natural key is dept and natural values employee name and salary.
2.To get natural values in sorted order, we have to make natural value part of key. So we need composite key which is a composite of natural key and natural value.
3.We need to implement custom partitioner on natural key alone, because all values belonging to same natural key should go to same reducer (partition).
4.The sort comparator should sort by composite key , natural key and natural value.
5. The group comparator should group only on natural key . When multiple natural keys are part of same partition then reducer should consider records with same natural key into one group.

Here is the mapreduce code to achieve the results with secondary sort technique.

The below code is synonymous to the SQL:

select dept,empname,salary from emp order by empname, salary desc;


import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import java.io.*;


//For each department display employee name and salary in order..
public class secondarySortEmpDetails extends Configured implements Tool {
 

 //Create a composite key with natural key and natural value and implements //WritableComparable
  
public static class compositeKey implements WritableComparable<compositeKey>{
       
        private String dept;
        private String empName;
        private String salary;
       
        public  compositeKey(){
            this.dept = null;
            this.empName = null;
            this.salary = null;
        }
       
        public  compositeKey(String dept, String empName, String salary){
            this.dept = dept;
            this.empName = empName;
            this.salary = salary;
        }
       
        @Override
        public void write(DataOutput out) throws IOException{
            out.writeUTF(dept);
            out.writeUTF(empName);
            out.writeUTF(salary);
        }
       
        @Override
        public void readFields(DataInput in) throws IOException{
            dept = in.readUTF();
            empName = in.readUTF();
            salary = in.readUTF();
        }
       
        @Override
        public String toString(){
            return dept + "-------->" + empName +","+salary;
        }
       
        @Override
        public int compareTo(compositeKey other){
            int result = dept.compareTo(other.dept);
            if (result == 0){
                result = empName.compareTo(other.empName);
                if (result == 0){
                    result = salary.compareTo(other.salary);
                }
            }
            return result;
        }
       
        public void setempName(String empName){
            this.empName = empName;
        }
       
        public void setSalary(String salary){
            this.salary = salary;
        }
       
        public void setDept(String dept){
            this.dept = dept;
        }
       
        public String getempName(){
            return empName;
        }
       
        public String getSalary(){
            return salary;
        }
       
        public String getDept(){
            return dept;
        }
       
        @Override
        public int hashCode(){
            return  dept.hashCode();
        }
       
        @Override
        public boolean equals(Object obj){
            compositeKey other = (compositeKey) obj;
            return dept.equals(other.dept) && empName.equals(other.empName) && salary.equals(other.salary);
        }
    }

    static class empMap extends Mapper<LongWritable, Text, compositeKey, NullWritable> {
       
        @Override
        public void map( LongWritable key,  Text value, Context context)
                throws IOException,InterruptedException{
        String a = value.toString();
        String[] b = a.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
        //String empName = b[0];
        //String dept = b[2];
        //String salary = b[3];
        if (!(b[0].equals("Name")))
            context.write(new compositeKey(b[2].toString(),b[0].toString(),b[3].toString()),NullWritable.get());
        }
    }
   
    static class empReduce extends Reducer<compositeKey, NullWritable, compositeKey, NullWritable> {
       
        @Override
        public void reduce(compositeKey key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {

            for(NullWritable value : values)
                context.write(key,NullWritable.get());
           
        }
    }

    //Secondary sort partitioner based on natural key
    public static class SecondarySortPartitioner
        extends Partitioner<compositeKey,NullWritable>{
       
        @Override
        public int getPartition(compositeKey key, NullWritable value, int numPartitions){
            return ((key.getDept().hashCode() & Integer.MAX_VALUE) % numPartitions);   
            }
    }
   
    //Sort Comparator based on natural key and natural value
    public static class CompkeySortComparator extends WritableComparator {
        protected CompkeySortComparator(){
            super(compositeKey.class,true);
        }
       
        @Override
        public int compare(WritableComparable w1,WritableComparable w2){
            compositeKey ip1 = (compositeKey) w1;
            compositeKey ip2 = (compositeKey) w2;
           
           
            int cmp = ip1.getDept().compareTo(ip2.getDept());
            if (cmp == 0){
                cmp = ip1.getempName().compareTo(ip2.getempName()); //Ascending Order
                if (cmp == 0){
                    return -ip1.getSalary().compareTo(ip2.getSalary()); //Descending Order
                    //If minus is taken out the results will be in ascending order;   
                }
            }
            return cmp;
        }
    }
       
    // Grouping Comparator based on natural key
    public static class groupingComparator extends WritableComparator {
        protected groupingComparator(){
            super(compositeKey.class,true);
        }
       
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            compositeKey key1 = (compositeKey) w1;
            compositeKey key2 = (compositeKey) w2;
           
            return key1.getDept().compareTo(key2.getDept());
        }
    }
   
    @Override
    public int run(String[] args) throws Exception {
       
        Job job = new Job(getConf());
        job.setJarByClass(getClass());
        job.setJobName("Employee Details .. Sort values in order");
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setMapperClass(empMap.class);
        job.setPartitionerClass(SecondarySortPartitioner.class);
        job.setGroupingComparatorClass(groupingComparator.class);
        job.setSortComparatorClass(CompkeySortComparator.class);
        job.setReducerClass(empReduce.class);

       
        job.setOutputKeyClass(compositeKey.class);
        job.setOutputValueClass(NullWritable.class);
       
       
        return job.waitForCompletion(true)?0 : 1;
    }
   

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new secondarySortEmpDetails() , args);
        System.exit(exitcode);
    }
}

Command to run the job

$ hadoop jar secondarySortEmpDetails.jar secondarySortEmpDetails emp out11

15/02/01 12:12:45 INFO input.FileInputFormat: Total input paths to process : 1
15/02/01 12:12:45 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/02/01 12:12:45 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/01 12:12:45 INFO mapred.JobClient: Running job: job_201501301653_0072
15/02/01 12:12:46 INFO mapred.JobClient:  map 0% reduce 0%
15/02/01 12:12:58 INFO mapred.JobClient:  map 100% reduce 0%
15/02/01 12:13:11 INFO mapred.JobClient:  map 100% reduce 100%
15/02/01 12:13:16 INFO mapred.JobClient: Job complete: job_201501301653_0072
15/02/01 12:13:16 INFO mapred.JobClient: Counters: 29
15/02/01 12:13:16 INFO mapred.JobClient:   Map-Reduce Framework
15/02/01 12:13:16 INFO mapred.JobClient:     Spilled Records=18
15/02/01 12:13:16 INFO mapred.JobClient:     Map output materialized bytes=422
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce input records=9
15/02/01 12:13:16 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3700391936
15/02/01 12:13:16 INFO mapred.JobClient:     Map input records=10
15/02/01 12:13:16 INFO mapred.JobClient:     SPLIT_RAW_BYTES=102
15/02/01 12:13:16 INFO mapred.JobClient:     Map output bytes=398
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce shuffle bytes=422
15/02/01 12:13:16 INFO mapred.JobClient:     Physical memory (bytes) snapshot=260874240
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce input groups=7
15/02/01 12:13:16 INFO mapred.JobClient:     Combine output records=0
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce output records=9
15/02/01 12:13:16 INFO mapred.JobClient:     Map output records=9
15/02/01 12:13:16 INFO mapred.JobClient:     Combine input records=0
15/02/01 12:13:16 INFO mapred.JobClient:     CPU time spent (ms)=1190
15/02/01 12:13:16 INFO mapred.JobClient:     Total committed heap usage (bytes)=164630528
15/02/01 12:13:16 INFO mapred.JobClient:   File Input Format Counters
15/02/01 12:13:16 INFO mapred.JobClient:     Bytes Read=592
15/02/01 12:13:16 INFO mapred.JobClient:   FileSystemCounters
15/02/01 12:13:16 INFO mapred.JobClient:     HDFS_BYTES_READ=694
15/02/01 12:13:16 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=113305
15/02/01 12:13:16 INFO mapred.JobClient:     FILE_BYTES_READ=422
15/02/01 12:13:16 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=443
15/02/01 12:13:16 INFO mapred.JobClient:   Job Counters
15/02/01 12:13:16 INFO mapred.JobClient:     Launched map tasks=1
15/02/01 12:13:16 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/01 12:13:16 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=13160
15/02/01 12:13:16 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/01 12:13:16 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=15115
15/02/01 12:13:16 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/01 12:13:16 INFO mapred.JobClient:     Data-local map tasks=1
15/02/01 12:13:16 INFO mapred.JobClient:   File Output Format Counters
15/02/01 12:13:16 INFO mapred.JobClient:     Bytes Written=443


The output will look like this.

$ hadoop dfs -cat out11/p*
AVIATION-------->"ABBATACOLA,  ROBERT J",$89440.00
CITY COUNCIL-------->"ABARCA,  ANABEL",$70764.00
FIRE-------->"ABBATEMARCO,  JAMES J",$84396.00
GENERAL SERVICES-------->"AARON,  KIMBERLEI R",$80916.00
POLICE-------->"AARON,  JEFFERY M",$75372.00
POLICE-------->"AARON,  KARINA",$75372.00
STREETS & SAN-------->"ABARCA,  EMMANUEL",$40560.00
WATER MGMNT-------->"AARON,  ELVIA J",$87228.00
WATER MGMNT-------->"ABAD JR,  VICENTE M",$99648.00

Friday, January 30, 2015

MapReduce code to retrieve employee fields from employee file

I have created a file emp. Lets try a simple mapreduce program to retrieve the fields from employee file. Below is the file and the sample records with headers.

emp
empid,empname,deptid,salary
1,swetha,1,50000
2,Nikhil,2,75000
3,Varun,3,75000


First move the file to HDFS.
$ hadoop dfs -put emp emp

$hadoop dfs -ls
Found 1 item
-rw-r--r--   1 swetha supergroup         81 2015-01-30 17:31 /user/swetha/emp

Lets try to select empname and salary fields from emp table. Below is the mapreduce for the same.

The below code is synonymous to the below SQL:

select empname,salary from emp;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.*;


//Displays EmpName and EmpID
public class empDetails extends Configured implements Tool {

    static class empMap extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map( LongWritable key,  Text value, Context context)
                throws IOException,InterruptedException{
        String a = value.toString();
        String[] b = a.split(",");
        String empName = b[1];
        String salary = b[3];
        context.write(new Text(empName), new Text(salary));
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job();
        job.setJarByClass(getClass());
        job.setJobName("Employee Details");
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setMapperClass(empMap.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
       
        return job.waitForCompletion(true)?0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new empDetails() , args);
        System.exit(exitcode);
    }
}

Pack the file into a empDetails.jar file and run it on hadoop environment.

$ hadoop jar empDetails.jar empDetails emp out1
15/01/30 19:26:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/01/30 19:26:46 INFO input.FileInputFormat: Total input paths to process : 1
15/01/30 19:26:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/01/30 19:26:46 WARN snappy.LoadSnappy: Snappy native library not loaded
15/01/30 19:26:46 INFO mapred.JobClient: Running job: job_201501301653_0006
15/01/30 19:26:47 INFO mapred.JobClient:  map 0% reduce 0%
15/01/30 19:26:54 INFO mapred.JobClient:  map 100% reduce 0%
15/01/30 19:26:56 INFO mapred.JobClient: Job complete: job_201501301653_0006
15/01/30 19:26:56 INFO mapred.JobClient: Counters: 19
15/01/30 19:26:56 INFO mapred.JobClient:   Map-Reduce Framework
15/01/30 19:26:56 INFO mapred.JobClient:     Spilled Records=0
15/01/30 19:26:56 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=1845501952
15/01/30 19:26:56 INFO mapred.JobClient:     Map input records=4
15/01/30 19:26:56 INFO mapred.JobClient:     SPLIT_RAW_BYTES=102
15/01/30 19:26:56 INFO mapred.JobClient:     Map output records=4
15/01/30 19:26:56 INFO mapred.JobClient:     Physical memory (bytes) snapshot=75046912
15/01/30 19:26:56 INFO mapred.JobClient:     CPU time spent (ms)=150
15/01/30 19:26:56 INFO mapred.JobClient:     Total committed heap usage (bytes)=32440320
15/01/30 19:26:56 INFO mapred.JobClient:   File Input Format Counters
15/01/30 19:26:56 INFO mapred.JobClient:     Bytes Read=81
15/01/30 19:26:56 INFO mapred.JobClient:   FileSystemCounters
15/01/30 19:26:56 INFO mapred.JobClient:     HDFS_BYTES_READ=183
15/01/30 19:26:56 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=54900
15/01/30 19:26:56 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=55
15/01/30 19:26:56 INFO mapred.JobClient:   Job Counters
15/01/30 19:26:56 INFO mapred.JobClient:     Launched map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
15/01/30 19:26:56 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=9046
15/01/30 19:26:56 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient:     Data-local map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient:   File Output Format Counters
15/01/30 19:26:56 INFO mapred.JobClient:     Bytes Written=55


Now lets look at the output..

$ hadoop dfs -ls
Found 3 items
-rw-r--r--   1 swetha supergroup         38 2015-01-30 17:31 /user/swetha/dept
-rw-r--r--   1 swetha supergroup         81 2015-01-30 17:31 /user/swetha/emp
drwxr-xr-x   - swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1


since there are no reducers the mapper output is directly written to hdfs.

$ hadoop dfs -lsr out1
-rw-r--r--   1 swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1/_SUCCESS
drwxr-xr-x   - swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1/_logs
drwxr-xr-x   - swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1/_logs/history
-rw-r--r--   1 swetha supergroup       9271 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_1422664006709_swetha_Employee+Details
-rw-r--r--   1 swetha supergroup      47336 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_conf.xml
-rw-r--r--   1 swetha supergroup         55 2015-01-30 19:26 /user/swetha/out1/part-m-00000


$ hadoop dfs -cat out1/p*
 empname     salary
swetha    50000
Nikhil    75000
Varun    75000



To get the same output without headers, use 

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.*;


//Displays EmpName and EmpID without Headers
public class empDetailsWithOutHeaders extends Configured implements Tool {

    static class empMap extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map( LongWritable key,  Text value, Context context)
                throws IOException,InterruptedException{
        String a = value.toString();
        String[] b = a.split(",");
        String empName = b[1];
        String salary = b[3];
        if (!(empName.equals("empname")))
            context.write(new Text(empName), new Text(salary));
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job();
        job.setJarByClass(getClass());
        job.setJobName("Employee Details without Headers");
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setMapperClass(empMap.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
       
        return job.waitForCompletion(true)?0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new empDetailsWithOutHeaders() , args);
        System.exit(exitcode);
    }
}


The output will be the following..

 $ hadoop dfs -cat out2/p*
swetha    50000
Nikhil    75000
Varun    75000

Amazon Athena: Key highlights on Amazon Athena

Amazon Athena is a serverless interactive query service that is used to analyze data in Amazon S3 using standard SQL. Amazon Athena app...