I have created a file emp. Lets try a simple mapreduce program to retrieve the fields from employee file. Below is the file and the sample records with headers.
First move the file to HDFS.
$ hadoop dfs -put emp emp
$hadoop dfs -ls
Found 1 item
-rw-r--r-- 1 swetha supergroup 81 2015-01-30 17:31 /user/swetha/emp
Lets try to select empname and salary fields from emp table. Below is the mapreduce for the same.
The below code is synonymous to the below SQL:
select empname,salary from emp;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.*;
//Displays EmpName and EmpID
public class empDetails extends Configured implements Tool {
static class empMap extends Mapper<LongWritable, Text, Text, Text> {
public void map( LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String a = value.toString();
String[] b = a.split(",");
String empName = b[1];
String salary = b[3];
context.write(new Text(empName), new Text(salary));
public int run(String[] args) throws Exception {
Job job = new Job();
job.setJobName("Employee Details");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true)?0 : 1;
public static void main(String[] args) throws Exception{
int exitcode = ToolRunner.run(new empDetails() , args);
Pack the file into a empDetails.jar file and run it on hadoop environment.
$ hadoop jar empDetails.jar empDetails emp out1
15/01/30 19:26:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/01/30 19:26:46 INFO input.FileInputFormat: Total input paths to process : 1
15/01/30 19:26:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/01/30 19:26:46 WARN snappy.LoadSnappy: Snappy native library not loaded
15/01/30 19:26:46 INFO mapred.JobClient: Running job: job_201501301653_0006
15/01/30 19:26:47 INFO mapred.JobClient: map 0% reduce 0%
15/01/30 19:26:54 INFO mapred.JobClient: map 100% reduce 0%
15/01/30 19:26:56 INFO mapred.JobClient: Job complete: job_201501301653_0006
15/01/30 19:26:56 INFO mapred.JobClient: Counters: 19
15/01/30 19:26:56 INFO mapred.JobClient: Map-Reduce Framework
15/01/30 19:26:56 INFO mapred.JobClient: Spilled Records=0
15/01/30 19:26:56 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1845501952
15/01/30 19:26:56 INFO mapred.JobClient: Map input records=4
15/01/30 19:26:56 INFO mapred.JobClient: SPLIT_RAW_BYTES=102
15/01/30 19:26:56 INFO mapred.JobClient: Map output records=4
15/01/30 19:26:56 INFO mapred.JobClient: Physical memory (bytes) snapshot=75046912
15/01/30 19:26:56 INFO mapred.JobClient: CPU time spent (ms)=150
15/01/30 19:26:56 INFO mapred.JobClient: Total committed heap usage (bytes)=32440320
15/01/30 19:26:56 INFO mapred.JobClient: File Input Format Counters
15/01/30 19:26:56 INFO mapred.JobClient: Bytes Read=81
15/01/30 19:26:56 INFO mapred.JobClient: FileSystemCounters
15/01/30 19:26:56 INFO mapred.JobClient: HDFS_BYTES_READ=183
15/01/30 19:26:56 INFO mapred.JobClient: FILE_BYTES_WRITTEN=54900
15/01/30 19:26:56 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=55
15/01/30 19:26:56 INFO mapred.JobClient: Job Counters
15/01/30 19:26:56 INFO mapred.JobClient: Launched map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
15/01/30 19:26:56 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=9046
15/01/30 19:26:56 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient: Data-local map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient: File Output Format Counters
15/01/30 19:26:56 INFO mapred.JobClient: Bytes Written=55
Now lets look at the output..
$ hadoop dfs -ls
Found 3 items
-rw-r--r-- 1 swetha supergroup 38 2015-01-30 17:31 /user/swetha/dept
-rw-r--r-- 1 swetha supergroup 81 2015-01-30 17:31 /user/swetha/emp
drwxr-xr-x - swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1
since there are no reducers the mapper output is directly written to hdfs.
$ hadoop dfs -lsr out1
-rw-r--r-- 1 swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1/_SUCCESS
drwxr-xr-x - swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1/_logs
drwxr-xr-x - swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1/_logs/history
-rw-r--r-- 1 swetha supergroup 9271 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_1422664006709_swetha_Employee+Details
-rw-r--r-- 1 swetha supergroup 47336 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_conf.xml
-rw-r--r-- 1 swetha supergroup 55 2015-01-30 19:26 /user/swetha/out1/part-m-00000
$ hadoop dfs -cat out1/p*
empname salary
swetha 50000
Nikhil 75000
Varun 75000
To get the same output without headers, use
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.*;
//Displays EmpName and EmpID without Headers
public class empDetailsWithOutHeaders extends Configured implements Tool {
static class empMap extends Mapper<LongWritable, Text, Text, Text> {
public void map( LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String a = value.toString();
String[] b = a.split(",");
String empName = b[1];
String salary = b[3];
if (!(empName.equals("empname")))
context.write(new Text(empName), new Text(salary));
public int run(String[] args) throws Exception {
Job job = new Job();
job.setJobName("Employee Details without Headers");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true)?0 : 1;
public static void main(String[] args) throws Exception{
int exitcode = ToolRunner.run(new empDetailsWithOutHeaders() , args);
The output will be the following..
$ hadoop dfs -cat out2/p*
swetha 50000
Nikhil 75000
Varun 75000
