Friday, January 30, 2015

MapReduce code to retrieve employee fields from employee file

I have created a file emp. Lets try a simple mapreduce program to retrieve the fields from employee file. Below is the file and the sample records with headers.

emp
empid,empname,deptid,salary
1,swetha,1,50000
2,Nikhil,2,75000
3,Varun,3,75000


First move the file to HDFS.
$ hadoop dfs -put emp emp

$hadoop dfs -ls
Found 1 item
-rw-r--r--   1 swetha supergroup         81 2015-01-30 17:31 /user/swetha/emp

Lets try to select empname and salary fields from emp table. Below is the mapreduce for the same.

The below code is synonymous to the below SQL:

select empname,salary from emp;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.*;


//Displays EmpName and EmpID
public class empDetails extends Configured implements Tool {

    static class empMap extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map( LongWritable key,  Text value, Context context)
                throws IOException,InterruptedException{
        String a = value.toString();
        String[] b = a.split(",");
        String empName = b[1];
        String salary = b[3];
        context.write(new Text(empName), new Text(salary));
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job();
        job.setJarByClass(getClass());
        job.setJobName("Employee Details");
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setMapperClass(empMap.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
       
        return job.waitForCompletion(true)?0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new empDetails() , args);
        System.exit(exitcode);
    }
}

Pack the file into a empDetails.jar file and run it on hadoop environment.

$ hadoop jar empDetails.jar empDetails emp out1
15/01/30 19:26:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/01/30 19:26:46 INFO input.FileInputFormat: Total input paths to process : 1
15/01/30 19:26:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/01/30 19:26:46 WARN snappy.LoadSnappy: Snappy native library not loaded
15/01/30 19:26:46 INFO mapred.JobClient: Running job: job_201501301653_0006
15/01/30 19:26:47 INFO mapred.JobClient:  map 0% reduce 0%
15/01/30 19:26:54 INFO mapred.JobClient:  map 100% reduce 0%
15/01/30 19:26:56 INFO mapred.JobClient: Job complete: job_201501301653_0006
15/01/30 19:26:56 INFO mapred.JobClient: Counters: 19
15/01/30 19:26:56 INFO mapred.JobClient:   Map-Reduce Framework
15/01/30 19:26:56 INFO mapred.JobClient:     Spilled Records=0
15/01/30 19:26:56 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=1845501952
15/01/30 19:26:56 INFO mapred.JobClient:     Map input records=4
15/01/30 19:26:56 INFO mapred.JobClient:     SPLIT_RAW_BYTES=102
15/01/30 19:26:56 INFO mapred.JobClient:     Map output records=4
15/01/30 19:26:56 INFO mapred.JobClient:     Physical memory (bytes) snapshot=75046912
15/01/30 19:26:56 INFO mapred.JobClient:     CPU time spent (ms)=150
15/01/30 19:26:56 INFO mapred.JobClient:     Total committed heap usage (bytes)=32440320
15/01/30 19:26:56 INFO mapred.JobClient:   File Input Format Counters
15/01/30 19:26:56 INFO mapred.JobClient:     Bytes Read=81
15/01/30 19:26:56 INFO mapred.JobClient:   FileSystemCounters
15/01/30 19:26:56 INFO mapred.JobClient:     HDFS_BYTES_READ=183
15/01/30 19:26:56 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=54900
15/01/30 19:26:56 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=55
15/01/30 19:26:56 INFO mapred.JobClient:   Job Counters
15/01/30 19:26:56 INFO mapred.JobClient:     Launched map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
15/01/30 19:26:56 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=9046
15/01/30 19:26:56 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient:     Data-local map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient:   File Output Format Counters
15/01/30 19:26:56 INFO mapred.JobClient:     Bytes Written=55


Now lets look at the output..

$ hadoop dfs -ls
Found 3 items
-rw-r--r--   1 swetha supergroup         38 2015-01-30 17:31 /user/swetha/dept
-rw-r--r--   1 swetha supergroup         81 2015-01-30 17:31 /user/swetha/emp
drwxr-xr-x   - swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1


since there are no reducers the mapper output is directly written to hdfs.

$ hadoop dfs -lsr out1
-rw-r--r--   1 swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1/_SUCCESS
drwxr-xr-x   - swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1/_logs
drwxr-xr-x   - swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1/_logs/history
-rw-r--r--   1 swetha supergroup       9271 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_1422664006709_swetha_Employee+Details
-rw-r--r--   1 swetha supergroup      47336 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_conf.xml
-rw-r--r--   1 swetha supergroup         55 2015-01-30 19:26 /user/swetha/out1/part-m-00000


$ hadoop dfs -cat out1/p*
 empname     salary
swetha    50000
Nikhil    75000
Varun    75000



To get the same output without headers, use 

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.*;


//Displays EmpName and EmpID without Headers
public class empDetailsWithOutHeaders extends Configured implements Tool {

    static class empMap extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map( LongWritable key,  Text value, Context context)
                throws IOException,InterruptedException{
        String a = value.toString();
        String[] b = a.split(",");
        String empName = b[1];
        String salary = b[3];
        if (!(empName.equals("empname")))
            context.write(new Text(empName), new Text(salary));
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job();
        job.setJarByClass(getClass());
        job.setJobName("Employee Details without Headers");
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setMapperClass(empMap.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
       
        return job.waitForCompletion(true)?0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new empDetailsWithOutHeaders() , args);
        System.exit(exitcode);
    }
}


The output will be the following..

 $ hadoop dfs -cat out2/p*
swetha    50000
Nikhil    75000
Varun    75000

Amazon Athena: Key highlights on Amazon Athena

Amazon Athena is a serverless interactive query service that is used to analyze data in Amazon S3 using standard SQL. Amazon Athena app...