I have created a file emp. Lets try a simple mapreduce program to retrieve the fields from employee file. Below is the file and the sample records with headers.
emp
empid,empname,deptid,salary
1,swetha,1,50000
2,Nikhil,2,75000
3,Varun,3,75000
First move the file to HDFS.
$ hadoop dfs -put emp emp
$hadoop dfs -ls
Found 1 item
-rw-r--r-- 1 swetha supergroup 81 2015-01-30 17:31 /user/swetha/emp
Lets try to select empname and salary fields from emp table. Below is the mapreduce for the same.
The below code is synonymous to the below SQL:
select empname,salary from emp;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.*;
//Displays EmpName and EmpID
public class empDetails extends Configured implements Tool {
static class empMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map( LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String a = value.toString();
String[] b = a.split(",");
String empName = b[1];
String salary = b[3];
context.write(new Text(empName), new Text(salary));
}
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(getClass());
job.setJobName("Employee Details");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(empMap.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
return job.waitForCompletion(true)?0 : 1;
}
public static void main(String[] args) throws Exception{
int exitcode = ToolRunner.run(new empDetails() , args);
System.exit(exitcode);
}
}
Pack the file into a empDetails.jar file and run it on hadoop environment.
$ hadoop jar empDetails.jar empDetails emp out1
15/01/30 19:26:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/01/30 19:26:46 INFO input.FileInputFormat: Total input paths to process : 1
15/01/30 19:26:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/01/30 19:26:46 WARN snappy.LoadSnappy: Snappy native library not loaded
15/01/30 19:26:46 INFO mapred.JobClient: Running job: job_201501301653_0006
15/01/30 19:26:47 INFO mapred.JobClient: map 0% reduce 0%
15/01/30 19:26:54 INFO mapred.JobClient: map 100% reduce 0%
15/01/30 19:26:56 INFO mapred.JobClient: Job complete: job_201501301653_0006
15/01/30 19:26:56 INFO mapred.JobClient: Counters: 19
15/01/30 19:26:56 INFO mapred.JobClient: Map-Reduce Framework
15/01/30 19:26:56 INFO mapred.JobClient: Spilled Records=0
15/01/30 19:26:56 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1845501952
15/01/30 19:26:56 INFO mapred.JobClient: Map input records=4
15/01/30 19:26:56 INFO mapred.JobClient: SPLIT_RAW_BYTES=102
15/01/30 19:26:56 INFO mapred.JobClient: Map output records=4
15/01/30 19:26:56 INFO mapred.JobClient: Physical memory (bytes) snapshot=75046912
15/01/30 19:26:56 INFO mapred.JobClient: CPU time spent (ms)=150
15/01/30 19:26:56 INFO mapred.JobClient: Total committed heap usage (bytes)=32440320
15/01/30 19:26:56 INFO mapred.JobClient: File Input Format Counters
15/01/30 19:26:56 INFO mapred.JobClient: Bytes Read=81
15/01/30 19:26:56 INFO mapred.JobClient: FileSystemCounters
15/01/30 19:26:56 INFO mapred.JobClient: HDFS_BYTES_READ=183
15/01/30 19:26:56 INFO mapred.JobClient: FILE_BYTES_WRITTEN=54900
15/01/30 19:26:56 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=55
15/01/30 19:26:56 INFO mapred.JobClient: Job Counters
15/01/30 19:26:56 INFO mapred.JobClient: Launched map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
15/01/30 19:26:56 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=9046
15/01/30 19:26:56 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient: Data-local map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient: File Output Format Counters
15/01/30 19:26:56 INFO mapred.JobClient: Bytes Written=55
Now lets look at the output..
$ hadoop dfs -ls
Found 3 items
-rw-r--r-- 1 swetha supergroup 38 2015-01-30 17:31 /user/swetha/dept
-rw-r--r-- 1 swetha supergroup 81 2015-01-30 17:31 /user/swetha/emp
drwxr-xr-x - swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1
since there are no reducers the mapper output is directly written to hdfs.
$ hadoop dfs -lsr out1
-rw-r--r-- 1 swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1/_SUCCESS
drwxr-xr-x - swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1/_logs
drwxr-xr-x - swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1/_logs/history
-rw-r--r-- 1 swetha supergroup 9271 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_1422664006709_swetha_Employee+Details
-rw-r--r-- 1 swetha supergroup 47336 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_conf.xml
-rw-r--r-- 1 swetha supergroup 55 2015-01-30 19:26 /user/swetha/out1/part-m-00000
$ hadoop dfs -cat out1/p*
empname salary
swetha 50000
Nikhil 75000
Varun 75000
To get the same output without headers, use
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.*;
//Displays EmpName and EmpID without Headers
public class empDetailsWithOutHeaders extends Configured implements Tool {
static class empMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map( LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String a = value.toString();
String[] b = a.split(",");
String empName = b[1];
String salary = b[3];
if (!(empName.equals("empname")))
context.write(new Text(empName), new Text(salary));
}
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(getClass());
job.setJobName("Employee Details without Headers");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(empMap.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
return job.waitForCompletion(true)?0 : 1;
}
public static void main(String[] args) throws Exception{
int exitcode = ToolRunner.run(new empDetailsWithOutHeaders() , args);
System.exit(exitcode);
}
}
The output will be the following..
$ hadoop dfs -cat out2/p*
swetha 50000
Nikhil 75000
Varun 75000
emp
empid,empname,deptid,salary
1,swetha,1,50000
2,Nikhil,2,75000
3,Varun,3,75000
First move the file to HDFS.
$ hadoop dfs -put emp emp
$hadoop dfs -ls
Found 1 item
-rw-r--r-- 1 swetha supergroup 81 2015-01-30 17:31 /user/swetha/emp
Lets try to select empname and salary fields from emp table. Below is the mapreduce for the same.
The below code is synonymous to the below SQL:
select empname,salary from emp;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.*;
//Displays EmpName and EmpID
public class empDetails extends Configured implements Tool {
static class empMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map( LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String a = value.toString();
String[] b = a.split(",");
String empName = b[1];
String salary = b[3];
context.write(new Text(empName), new Text(salary));
}
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(getClass());
job.setJobName("Employee Details");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(empMap.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
return job.waitForCompletion(true)?0 : 1;
}
public static void main(String[] args) throws Exception{
int exitcode = ToolRunner.run(new empDetails() , args);
System.exit(exitcode);
}
}
Pack the file into a empDetails.jar file and run it on hadoop environment.
$ hadoop jar empDetails.jar empDetails emp out1
15/01/30 19:26:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/01/30 19:26:46 INFO input.FileInputFormat: Total input paths to process : 1
15/01/30 19:26:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/01/30 19:26:46 WARN snappy.LoadSnappy: Snappy native library not loaded
15/01/30 19:26:46 INFO mapred.JobClient: Running job: job_201501301653_0006
15/01/30 19:26:47 INFO mapred.JobClient: map 0% reduce 0%
15/01/30 19:26:54 INFO mapred.JobClient: map 100% reduce 0%
15/01/30 19:26:56 INFO mapred.JobClient: Job complete: job_201501301653_0006
15/01/30 19:26:56 INFO mapred.JobClient: Counters: 19
15/01/30 19:26:56 INFO mapred.JobClient: Map-Reduce Framework
15/01/30 19:26:56 INFO mapred.JobClient: Spilled Records=0
15/01/30 19:26:56 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1845501952
15/01/30 19:26:56 INFO mapred.JobClient: Map input records=4
15/01/30 19:26:56 INFO mapred.JobClient: SPLIT_RAW_BYTES=102
15/01/30 19:26:56 INFO mapred.JobClient: Map output records=4
15/01/30 19:26:56 INFO mapred.JobClient: Physical memory (bytes) snapshot=75046912
15/01/30 19:26:56 INFO mapred.JobClient: CPU time spent (ms)=150
15/01/30 19:26:56 INFO mapred.JobClient: Total committed heap usage (bytes)=32440320
15/01/30 19:26:56 INFO mapred.JobClient: File Input Format Counters
15/01/30 19:26:56 INFO mapred.JobClient: Bytes Read=81
15/01/30 19:26:56 INFO mapred.JobClient: FileSystemCounters
15/01/30 19:26:56 INFO mapred.JobClient: HDFS_BYTES_READ=183
15/01/30 19:26:56 INFO mapred.JobClient: FILE_BYTES_WRITTEN=54900
15/01/30 19:26:56 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=55
15/01/30 19:26:56 INFO mapred.JobClient: Job Counters
15/01/30 19:26:56 INFO mapred.JobClient: Launched map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
15/01/30 19:26:56 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=9046
15/01/30 19:26:56 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient: Data-local map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient: File Output Format Counters
15/01/30 19:26:56 INFO mapred.JobClient: Bytes Written=55
Now lets look at the output..
$ hadoop dfs -ls
Found 3 items
-rw-r--r-- 1 swetha supergroup 38 2015-01-30 17:31 /user/swetha/dept
-rw-r--r-- 1 swetha supergroup 81 2015-01-30 17:31 /user/swetha/emp
drwxr-xr-x - swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1
since there are no reducers the mapper output is directly written to hdfs.
$ hadoop dfs -lsr out1
-rw-r--r-- 1 swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1/_SUCCESS
drwxr-xr-x - swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1/_logs
drwxr-xr-x - swetha supergroup 0 2015-01-30 19:26 /user/swetha/out1/_logs/history
-rw-r--r-- 1 swetha supergroup 9271 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_1422664006709_swetha_Employee+Details
-rw-r--r-- 1 swetha supergroup 47336 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_conf.xml
-rw-r--r-- 1 swetha supergroup 55 2015-01-30 19:26 /user/swetha/out1/part-m-00000
$ hadoop dfs -cat out1/p*
empname salary
swetha 50000
Nikhil 75000
Varun 75000
To get the same output without headers, use
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.*;
//Displays EmpName and EmpID without Headers
public class empDetailsWithOutHeaders extends Configured implements Tool {
static class empMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map( LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String a = value.toString();
String[] b = a.split(",");
String empName = b[1];
String salary = b[3];
if (!(empName.equals("empname")))
context.write(new Text(empName), new Text(salary));
}
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(getClass());
job.setJobName("Employee Details without Headers");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(empMap.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
return job.waitForCompletion(true)?0 : 1;
}
public static void main(String[] args) throws Exception{
int exitcode = ToolRunner.run(new empDetailsWithOutHeaders() , args);
System.exit(exitcode);
}
}
The output will be the following..
$ hadoop dfs -cat out2/p*
swetha 50000
Nikhil 75000
Varun 75000