Sunday, February 1, 2015

Secondary sort technique in Mapreduce : List all departments with employee name in ascending order and salary in descending order

Here is the employee table and the sample data:

Emp
Name,Position Title,Department,Employee Annual Salary
"AARON,  ELVIA J",WATER RATE TAKER,WATER MGMNT,$87228.00
"AARON,  JEFFERY M",POLICE OFFICER,POLICE,$75372.00
"AARON,  KARINA",POLICE OFFICER,POLICE,$75372.00
"AARON,  KIMBERLEI R",CHIEF CONTRACT EXPEDITER,GENERAL SERVICES,$80916.00
"ABAD JR,  VICENTE M",CIVIL ENGINEER IV,WATER MGMNT,$99648.00
"ABARCA,  ANABEL",ASST TO THE ALDERMAN,CITY COUNCIL,$70764.00
"ABARCA,  EMMANUEL",GENERAL LABORER - DSS,STREETS & SAN,$40560.00
"ABBATACOLA,  ROBERT J",ELECTRICAL MECHANIC,AVIATION,$89440.00
"ABBATEMARCO,  JAMES J",FIRE ENGINEER,FIRE,$84396.00


The input to the mapreduce is the employee data. The output will be the list of departments with employee name in ascending order and salary in descending order.

To acheive this, we need to do the following:
1. Here natural key is dept and natural values employee name and salary.
2.To get natural values in sorted order, we have to make natural value part of key. So we need composite key which is a composite of natural key and natural value.
3.We need to implement custom partitioner on natural key alone, because all values belonging to same natural key should go to same reducer (partition).
4.The sort comparator should sort by composite key , natural key and natural value.
5. The group comparator should group only on natural key . When multiple natural keys are part of same partition then reducer should consider records with same natural key into one group.

Here is the mapreduce code to achieve the results with secondary sort technique.

The below code is synonymous to the SQL:

select dept,empname,salary from emp order by empname, salary desc;


import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import java.io.*;


//For each department display employee name and salary in order..
public class secondarySortEmpDetails extends Configured implements Tool {
 

 //Create a composite key with natural key and natural value and implements //WritableComparable
  
public static class compositeKey implements WritableComparable<compositeKey>{
       
        private String dept;
        private String empName;
        private String salary;
       
        public  compositeKey(){
            this.dept = null;
            this.empName = null;
            this.salary = null;
        }
       
        public  compositeKey(String dept, String empName, String salary){
            this.dept = dept;
            this.empName = empName;
            this.salary = salary;
        }
       
        @Override
        public void write(DataOutput out) throws IOException{
            out.writeUTF(dept);
            out.writeUTF(empName);
            out.writeUTF(salary);
        }
       
        @Override
        public void readFields(DataInput in) throws IOException{
            dept = in.readUTF();
            empName = in.readUTF();
            salary = in.readUTF();
        }
       
        @Override
        public String toString(){
            return dept + "-------->" + empName +","+salary;
        }
       
        @Override
        public int compareTo(compositeKey other){
            int result = dept.compareTo(other.dept);
            if (result == 0){
                result = empName.compareTo(other.empName);
                if (result == 0){
                    result = salary.compareTo(other.salary);
                }
            }
            return result;
        }
       
        public void setempName(String empName){
            this.empName = empName;
        }
       
        public void setSalary(String salary){
            this.salary = salary;
        }
       
        public void setDept(String dept){
            this.dept = dept;
        }
       
        public String getempName(){
            return empName;
        }
       
        public String getSalary(){
            return salary;
        }
       
        public String getDept(){
            return dept;
        }
       
        @Override
        public int hashCode(){
            return  dept.hashCode();
        }
       
        @Override
        public boolean equals(Object obj){
            compositeKey other = (compositeKey) obj;
            return dept.equals(other.dept) && empName.equals(other.empName) && salary.equals(other.salary);
        }
    }

    static class empMap extends Mapper<LongWritable, Text, compositeKey, NullWritable> {
       
        @Override
        public void map( LongWritable key,  Text value, Context context)
                throws IOException,InterruptedException{
        String a = value.toString();
        String[] b = a.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
        //String empName = b[0];
        //String dept = b[2];
        //String salary = b[3];
        if (!(b[0].equals("Name")))
            context.write(new compositeKey(b[2].toString(),b[0].toString(),b[3].toString()),NullWritable.get());
        }
    }
   
    static class empReduce extends Reducer<compositeKey, NullWritable, compositeKey, NullWritable> {
       
        @Override
        public void reduce(compositeKey key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {

            for(NullWritable value : values)
                context.write(key,NullWritable.get());
           
        }
    }

    //Secondary sort partitioner based on natural key
    public static class SecondarySortPartitioner
        extends Partitioner<compositeKey,NullWritable>{
       
        @Override
        public int getPartition(compositeKey key, NullWritable value, int numPartitions){
            return ((key.getDept().hashCode() & Integer.MAX_VALUE) % numPartitions);   
            }
    }
   
    //Sort Comparator based on natural key and natural value
    public static class CompkeySortComparator extends WritableComparator {
        protected CompkeySortComparator(){
            super(compositeKey.class,true);
        }
       
        @Override
        public int compare(WritableComparable w1,WritableComparable w2){
            compositeKey ip1 = (compositeKey) w1;
            compositeKey ip2 = (compositeKey) w2;
           
           
            int cmp = ip1.getDept().compareTo(ip2.getDept());
            if (cmp == 0){
                cmp = ip1.getempName().compareTo(ip2.getempName()); //Ascending Order
                if (cmp == 0){
                    return -ip1.getSalary().compareTo(ip2.getSalary()); //Descending Order
                    //If minus is taken out the results will be in ascending order;   
                }
            }
            return cmp;
        }
    }
       
    // Grouping Comparator based on natural key
    public static class groupingComparator extends WritableComparator {
        protected groupingComparator(){
            super(compositeKey.class,true);
        }
       
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            compositeKey key1 = (compositeKey) w1;
            compositeKey key2 = (compositeKey) w2;
           
            return key1.getDept().compareTo(key2.getDept());
        }
    }
   
    @Override
    public int run(String[] args) throws Exception {
       
        Job job = new Job(getConf());
        job.setJarByClass(getClass());
        job.setJobName("Employee Details .. Sort values in order");
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setMapperClass(empMap.class);
        job.setPartitionerClass(SecondarySortPartitioner.class);
        job.setGroupingComparatorClass(groupingComparator.class);
        job.setSortComparatorClass(CompkeySortComparator.class);
        job.setReducerClass(empReduce.class);

       
        job.setOutputKeyClass(compositeKey.class);
        job.setOutputValueClass(NullWritable.class);
       
       
        return job.waitForCompletion(true)?0 : 1;
    }
   

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new secondarySortEmpDetails() , args);
        System.exit(exitcode);
    }
}

Command to run the job

$ hadoop jar secondarySortEmpDetails.jar secondarySortEmpDetails emp out11

15/02/01 12:12:45 INFO input.FileInputFormat: Total input paths to process : 1
15/02/01 12:12:45 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/02/01 12:12:45 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/01 12:12:45 INFO mapred.JobClient: Running job: job_201501301653_0072
15/02/01 12:12:46 INFO mapred.JobClient:  map 0% reduce 0%
15/02/01 12:12:58 INFO mapred.JobClient:  map 100% reduce 0%
15/02/01 12:13:11 INFO mapred.JobClient:  map 100% reduce 100%
15/02/01 12:13:16 INFO mapred.JobClient: Job complete: job_201501301653_0072
15/02/01 12:13:16 INFO mapred.JobClient: Counters: 29
15/02/01 12:13:16 INFO mapred.JobClient:   Map-Reduce Framework
15/02/01 12:13:16 INFO mapred.JobClient:     Spilled Records=18
15/02/01 12:13:16 INFO mapred.JobClient:     Map output materialized bytes=422
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce input records=9
15/02/01 12:13:16 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3700391936
15/02/01 12:13:16 INFO mapred.JobClient:     Map input records=10
15/02/01 12:13:16 INFO mapred.JobClient:     SPLIT_RAW_BYTES=102
15/02/01 12:13:16 INFO mapred.JobClient:     Map output bytes=398
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce shuffle bytes=422
15/02/01 12:13:16 INFO mapred.JobClient:     Physical memory (bytes) snapshot=260874240
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce input groups=7
15/02/01 12:13:16 INFO mapred.JobClient:     Combine output records=0
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce output records=9
15/02/01 12:13:16 INFO mapred.JobClient:     Map output records=9
15/02/01 12:13:16 INFO mapred.JobClient:     Combine input records=0
15/02/01 12:13:16 INFO mapred.JobClient:     CPU time spent (ms)=1190
15/02/01 12:13:16 INFO mapred.JobClient:     Total committed heap usage (bytes)=164630528
15/02/01 12:13:16 INFO mapred.JobClient:   File Input Format Counters
15/02/01 12:13:16 INFO mapred.JobClient:     Bytes Read=592
15/02/01 12:13:16 INFO mapred.JobClient:   FileSystemCounters
15/02/01 12:13:16 INFO mapred.JobClient:     HDFS_BYTES_READ=694
15/02/01 12:13:16 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=113305
15/02/01 12:13:16 INFO mapred.JobClient:     FILE_BYTES_READ=422
15/02/01 12:13:16 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=443
15/02/01 12:13:16 INFO mapred.JobClient:   Job Counters
15/02/01 12:13:16 INFO mapred.JobClient:     Launched map tasks=1
15/02/01 12:13:16 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/01 12:13:16 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=13160
15/02/01 12:13:16 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/01 12:13:16 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=15115
15/02/01 12:13:16 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/01 12:13:16 INFO mapred.JobClient:     Data-local map tasks=1
15/02/01 12:13:16 INFO mapred.JobClient:   File Output Format Counters
15/02/01 12:13:16 INFO mapred.JobClient:     Bytes Written=443


The output will look like this.

$ hadoop dfs -cat out11/p*
AVIATION-------->"ABBATACOLA,  ROBERT J",$89440.00
CITY COUNCIL-------->"ABARCA,  ANABEL",$70764.00
FIRE-------->"ABBATEMARCO,  JAMES J",$84396.00
GENERAL SERVICES-------->"AARON,  KIMBERLEI R",$80916.00
POLICE-------->"AARON,  JEFFERY M",$75372.00
POLICE-------->"AARON,  KARINA",$75372.00
STREETS & SAN-------->"ABARCA,  EMMANUEL",$40560.00
WATER MGMNT-------->"AARON,  ELVIA J",$87228.00
WATER MGMNT-------->"ABAD JR,  VICENTE M",$99648.00

1 comment:

  1. Too Good article,Thank you for sharing this awesome Blog.
    Keep Updating more....

    Big Data Hadoop Training

    ReplyDelete

Amazon S3: Basic Concepts

Amazon S3 is an reliable, scalable, online object storage that stores files. Bucket: A bucket is a container in Amazon S3 where the fil...