Sometimes we would want to process whole file as input to mapreduce. This can be achieved by
1. Not splitting the File
2. Having RecordReader that delivers the file contents as value of record.
Lets demonstrate this by creating a mapreduce job for packaging small files into sequence files where key is the name of the file and value is the contents of the whole file.
"wholeFileInputFormat" class in the below code shows a way of doing this. "wholeFileInputFormat" class takes NullWritable as key and BytesWritable as value, where value is the contents of the file. The isSplitable() method returns false which specifies that input file should not be split. The second overriden method createRecordReader() returns a custom implementation of RecordReader.
The name of the file can be derived from context object in the mapper. It does this by casting InputSplit from the context to FileSplit and then using getPath() method. The outputformat is SequenceFileOutputFormat, since we need to store them as Sequence Files.
The number of map tasks launched depends on the number of input files to package. In our example, we are trying to package 5 small employee files, so we would have 5 mappers launched even if the input file size is smaller than block size.
Lets look at the input files:
$ hadoop dfs -lsr emp
-rw-r--r-- 1 swetha supergroup 643 2015-02-01 14:29 /user/swetha/emp/emp1
-rw-r--r-- 1 swetha supergroup 542 2015-02-01 14:29 /user/swetha/emp/emp2
-rw-r--r-- 1 swetha supergroup 629 2015-02-01 14:29 /user/swetha/emp/emp3
-rw-r--r-- 1 swetha supergroup 596 2015-02-01 14:29 /user/swetha/emp/emp4
-rw-r--r-- 1 swetha supergroup 1866851 2015-02-01 22:51 /user/swetha/emp/emp5
Code:
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class wholeFileReader extends Configured implements Tool{
public static class wholeFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
private Text fileName;
@Override
protected void setup(Context context) throws IOException, InterruptedException{
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
fileName = new Text(path.toString());
}
@Override
protected void map(NullWritable key, BytesWritable value,Context context)
throws IOException, InterruptedException{
context.write(fileName, value);
}
}
public static class wholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
@Override
protected boolean isSplitable(JobContext context,Path file){
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException,InterruptedException{
wholeFileRecordReader reader = new wholeFileRecordReader();
reader.initialize(split,context);
return reader;
}
}
public static class wholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException,InterruptedException{
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException{
if (!processed){
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents,0,contents.length);
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException{
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException{
return value;
}
@Override
public float getProgress() throws IOException{
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException{
// do nothing
}
}
public int run(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = new Job(conf,"Whole File Input Format");
job.setJarByClass(wholeFileReader.class);
job.setMapperClass(wholeFileMapper.class);
job.setInputFormatClass(wholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
return job.waitForCompletion(true) ? 0:1;
}
public static void main(String[] args) throws Exception{
int exitcode = ToolRunner.run(new wholeFileReader(), args);
System.exit(exitcode);
}
}
Output :
$ hadoop jar wholeFileReader.jar wholeFileReader emp sequencefile
15/02/02 19:04:07 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/02 19:04:08 INFO input.FileInputFormat: Total input paths to process : 5
15/02/02 19:04:09 INFO mapred.JobClient: Running job: job_201501301653_0084
15/02/02 19:04:10 INFO mapred.JobClient: map 0% reduce 0%
15/02/02 19:04:22 INFO mapred.JobClient: map 40% reduce 0%
15/02/02 19:04:35 INFO mapred.JobClient: map 80% reduce 0%
15/02/02 19:04:42 INFO mapred.JobClient: map 100% reduce 0%
15/02/02 19:04:44 INFO mapred.JobClient: map 100% reduce 33%
15/02/02 19:04:46 INFO mapred.JobClient: map 100% reduce 100%
15/02/02 19:04:50 INFO mapred.JobClient: Job complete: job_201501301653_0084
15/02/02 19:04:50 INFO mapred.JobClient: Counters: 29
15/02/02 19:04:50 INFO mapred.JobClient: Map-Reduce Framework
15/02/02 19:04:50 INFO mapred.JobClient: Spilled Records=10
15/02/02 19:04:50 INFO mapred.JobClient: Map output materialized bytes=1869547
15/02/02 19:04:50 INFO mapred.JobClient: Reduce input records=5
15/02/02 19:04:50 INFO mapred.JobClient: Virtual memory (bytes) snapshot=11063402496
15/02/02 19:04:50 INFO mapred.JobClient: Map input records=5
15/02/02 19:04:50 INFO mapred.JobClient: SPLIT_RAW_BYTES=535
15/02/02 19:04:50 INFO mapred.JobClient: Map output bytes=1869496
15/02/02 19:04:50 INFO mapred.JobClient: Reduce shuffle bytes=1869547
15/02/02 19:04:50 INFO mapred.JobClient: Physical memory (bytes) snapshot=930562048
15/02/02 19:04:50 INFO mapred.JobClient: Reduce input groups=5
15/02/02 19:04:50 INFO mapred.JobClient: Combine output records=0
15/02/02 19:04:50 INFO mapred.JobClient: Reduce output records=5
15/02/02 19:04:50 INFO mapred.JobClient: Map output records=5
15/02/02 19:04:50 INFO mapred.JobClient: Combine input records=0
15/02/02 19:04:50 INFO mapred.JobClient: CPU time spent (ms)=2820
15/02/02 19:04:50 INFO mapred.JobClient: Total committed heap usage (bytes)=693391360
15/02/02 19:04:50 INFO mapred.JobClient: File Input Format Counters
15/02/02 19:04:50 INFO mapred.JobClient: Bytes Read=1869261
15/02/02 19:04:50 INFO mapred.JobClient: FileSystemCounters
15/02/02 19:04:50 INFO mapred.JobClient: HDFS_BYTES_READ=1869796
15/02/02 19:04:50 INFO mapred.JobClient: FILE_BYTES_WRITTEN=4072689
15/02/02 19:04:50 INFO mapred.JobClient: FILE_BYTES_READ=1869523
15/02/02 19:04:50 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=1869643
15/02/02 19:04:50 INFO mapred.JobClient: Job Counters
15/02/02 19:04:50 INFO mapred.JobClient: Launched map tasks=5
15/02/02 19:04:50 INFO mapred.JobClient: Launched reduce tasks=1
15/02/02 19:04:50 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=24378
15/02/02 19:04:50 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/02 19:04:50 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=55834
15/02/02 19:04:50 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/02/02 19:04:50 INFO mapred.JobClient: Data-local map tasks=5
15/02/02 19:04:50 INFO mapred.JobClient: File Output Format Counters
15/02/02 19:04:50 INFO mapred.JobClient: Bytes Written=1869643
When we look at the Launched map tasks counter, it shows as 5 which means a mapper has been launched for each input file. Having one file per mapper is inefficient, so this process could be improved by using CombinedFileInputFormat instead of FileInputFormat. Please refer to my another post on how to use CombineFileInputFormat.
http://hadoopsupport.blogspot.com/2015/02/combinefileinputformat-mapreduce-code.html
Data in the output file can be inspected with -text option. Sample output will look like this.
$ hadoop dfs -text sequencefile/part*
hdfs://localhost:9000/user/swetha/emp/emp1 4e 61 6d 65 2c 50 6f 73 69 74 69 6f 6e 20 54 69 74 6c 65 2c 44 65 70 61 72 74 6d 65 6e 74 2c 45 6d 70 6c 6f 79 65 65 20 41 6e 6e 75 61 6c 20 53 61 6c 61 72 79 0a 22 41 41 52 4f 4e 2c 20 20 45 4c 56 49 41 20 4a 22 2c 57 41 54 45 52 20 52 41 54 45 20 54 41 4b 45 52 2c 57 41 54 45 52 20 4d 47 4d 4e 54 2c 24 38 37 32 32 38 2e 30 30 0a 22 41 41 52 4f 4e 2c 20 20 4a 45 46 46
hdfs://localhost:9000/user/swetha/emp/emp2 22 41 42 42 4f 54 54 2c 20 20 42 45 54 54 59 20 4c 22 2c 46 4f 53 54 45 52 20 47 52 41 4e 44 50 41 52 45 4e 54 2c 46 41 4d 49 4c 59 20 26 20 53 55 50 50 4f 52 54 2c 24 32 37 35 36 2e 30 30 0a 22 41 42 42 4f 54 54 2c 20 20 4c 59 4e 49 53 45 20 4d 22 2c 43 4c 45 52 4b 20 49 49 49 2c 50 4f 4c 49 43 45 2c 24 34 31 37 38 34 2e 30 30 0a 22 41 42 42 52 55 5a 5a 45 53 45 2c 20 20 57 49
1. Not splitting the File
2. Having RecordReader that delivers the file contents as value of record.
Lets demonstrate this by creating a mapreduce job for packaging small files into sequence files where key is the name of the file and value is the contents of the whole file.
"wholeFileInputFormat" class in the below code shows a way of doing this. "wholeFileInputFormat" class takes NullWritable as key and BytesWritable as value, where value is the contents of the file. The isSplitable() method returns false which specifies that input file should not be split. The second overriden method createRecordReader() returns a custom implementation of RecordReader.
The name of the file can be derived from context object in the mapper. It does this by casting InputSplit from the context to FileSplit and then using getPath() method. The outputformat is SequenceFileOutputFormat, since we need to store them as Sequence Files.
The number of map tasks launched depends on the number of input files to package. In our example, we are trying to package 5 small employee files, so we would have 5 mappers launched even if the input file size is smaller than block size.
Lets look at the input files:
$ hadoop dfs -lsr emp
-rw-r--r-- 1 swetha supergroup 643 2015-02-01 14:29 /user/swetha/emp/emp1
-rw-r--r-- 1 swetha supergroup 542 2015-02-01 14:29 /user/swetha/emp/emp2
-rw-r--r-- 1 swetha supergroup 629 2015-02-01 14:29 /user/swetha/emp/emp3
-rw-r--r-- 1 swetha supergroup 596 2015-02-01 14:29 /user/swetha/emp/emp4
-rw-r--r-- 1 swetha supergroup 1866851 2015-02-01 22:51 /user/swetha/emp/emp5
Code:
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class wholeFileReader extends Configured implements Tool{
public static class wholeFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
private Text fileName;
@Override
protected void setup(Context context) throws IOException, InterruptedException{
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
fileName = new Text(path.toString());
}
@Override
protected void map(NullWritable key, BytesWritable value,Context context)
throws IOException, InterruptedException{
context.write(fileName, value);
}
}
public static class wholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
@Override
protected boolean isSplitable(JobContext context,Path file){
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException,InterruptedException{
wholeFileRecordReader reader = new wholeFileRecordReader();
reader.initialize(split,context);
return reader;
}
}
public static class wholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException,InterruptedException{
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException{
if (!processed){
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents,0,contents.length);
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException{
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException{
return value;
}
@Override
public float getProgress() throws IOException{
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException{
// do nothing
}
}
public int run(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = new Job(conf,"Whole File Input Format");
job.setJarByClass(wholeFileReader.class);
job.setMapperClass(wholeFileMapper.class);
job.setInputFormatClass(wholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
return job.waitForCompletion(true) ? 0:1;
}
public static void main(String[] args) throws Exception{
int exitcode = ToolRunner.run(new wholeFileReader(), args);
System.exit(exitcode);
}
}
Output :
$ hadoop jar wholeFileReader.jar wholeFileReader emp sequencefile
15/02/02 19:04:07 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/02 19:04:08 INFO input.FileInputFormat: Total input paths to process : 5
15/02/02 19:04:09 INFO mapred.JobClient: Running job: job_201501301653_0084
15/02/02 19:04:10 INFO mapred.JobClient: map 0% reduce 0%
15/02/02 19:04:22 INFO mapred.JobClient: map 40% reduce 0%
15/02/02 19:04:35 INFO mapred.JobClient: map 80% reduce 0%
15/02/02 19:04:42 INFO mapred.JobClient: map 100% reduce 0%
15/02/02 19:04:44 INFO mapred.JobClient: map 100% reduce 33%
15/02/02 19:04:46 INFO mapred.JobClient: map 100% reduce 100%
15/02/02 19:04:50 INFO mapred.JobClient: Job complete: job_201501301653_0084
15/02/02 19:04:50 INFO mapred.JobClient: Counters: 29
15/02/02 19:04:50 INFO mapred.JobClient: Map-Reduce Framework
15/02/02 19:04:50 INFO mapred.JobClient: Spilled Records=10
15/02/02 19:04:50 INFO mapred.JobClient: Map output materialized bytes=1869547
15/02/02 19:04:50 INFO mapred.JobClient: Reduce input records=5
15/02/02 19:04:50 INFO mapred.JobClient: Virtual memory (bytes) snapshot=11063402496
15/02/02 19:04:50 INFO mapred.JobClient: Map input records=5
15/02/02 19:04:50 INFO mapred.JobClient: SPLIT_RAW_BYTES=535
15/02/02 19:04:50 INFO mapred.JobClient: Map output bytes=1869496
15/02/02 19:04:50 INFO mapred.JobClient: Reduce shuffle bytes=1869547
15/02/02 19:04:50 INFO mapred.JobClient: Physical memory (bytes) snapshot=930562048
15/02/02 19:04:50 INFO mapred.JobClient: Reduce input groups=5
15/02/02 19:04:50 INFO mapred.JobClient: Combine output records=0
15/02/02 19:04:50 INFO mapred.JobClient: Reduce output records=5
15/02/02 19:04:50 INFO mapred.JobClient: Map output records=5
15/02/02 19:04:50 INFO mapred.JobClient: Combine input records=0
15/02/02 19:04:50 INFO mapred.JobClient: CPU time spent (ms)=2820
15/02/02 19:04:50 INFO mapred.JobClient: Total committed heap usage (bytes)=693391360
15/02/02 19:04:50 INFO mapred.JobClient: File Input Format Counters
15/02/02 19:04:50 INFO mapred.JobClient: Bytes Read=1869261
15/02/02 19:04:50 INFO mapred.JobClient: FileSystemCounters
15/02/02 19:04:50 INFO mapred.JobClient: HDFS_BYTES_READ=1869796
15/02/02 19:04:50 INFO mapred.JobClient: FILE_BYTES_WRITTEN=4072689
15/02/02 19:04:50 INFO mapred.JobClient: FILE_BYTES_READ=1869523
15/02/02 19:04:50 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=1869643
15/02/02 19:04:50 INFO mapred.JobClient: Job Counters
15/02/02 19:04:50 INFO mapred.JobClient: Launched map tasks=5
15/02/02 19:04:50 INFO mapred.JobClient: Launched reduce tasks=1
15/02/02 19:04:50 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=24378
15/02/02 19:04:50 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/02 19:04:50 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=55834
15/02/02 19:04:50 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/02/02 19:04:50 INFO mapred.JobClient: Data-local map tasks=5
15/02/02 19:04:50 INFO mapred.JobClient: File Output Format Counters
15/02/02 19:04:50 INFO mapred.JobClient: Bytes Written=1869643
When we look at the Launched map tasks counter, it shows as 5 which means a mapper has been launched for each input file. Having one file per mapper is inefficient, so this process could be improved by using CombinedFileInputFormat instead of FileInputFormat. Please refer to my another post on how to use CombineFileInputFormat.
http://hadoopsupport.blogspot.com/2015/02/combinefileinputformat-mapreduce-code.html
Data in the output file can be inspected with -text option. Sample output will look like this.
$ hadoop dfs -text sequencefile/part*
hdfs://localhost:9000/user/swetha/emp/emp1 4e 61 6d 65 2c 50 6f 73 69 74 69 6f 6e 20 54 69 74 6c 65 2c 44 65 70 61 72 74 6d 65 6e 74 2c 45 6d 70 6c 6f 79 65 65 20 41 6e 6e 75 61 6c 20 53 61 6c 61 72 79 0a 22 41 41 52 4f 4e 2c 20 20 45 4c 56 49 41 20 4a 22 2c 57 41 54 45 52 20 52 41 54 45 20 54 41 4b 45 52 2c 57 41 54 45 52 20 4d 47 4d 4e 54 2c 24 38 37 32 32 38 2e 30 30 0a 22 41 41 52 4f 4e 2c 20 20 4a 45 46 46
hdfs://localhost:9000/user/swetha/emp/emp2 22 41 42 42 4f 54 54 2c 20 20 42 45 54 54 59 20 4c 22 2c 46 4f 53 54 45 52 20 47 52 41 4e 44 50 41 52 45 4e 54 2c 46 41 4d 49 4c 59 20 26 20 53 55 50 50 4f 52 54 2c 24 32 37 35 36 2e 30 30 0a 22 41 42 42 4f 54 54 2c 20 20 4c 59 4e 49 53 45 20 4d 22 2c 43 4c 45 52 4b 20 49 49 49 2c 50 4f 4c 49 43 45 2c 24 34 31 37 38 34 2e 30 30 0a 22 41 42 42 52 55 5a 5a 45 53 45 2c 20 20 57 49
No comments:
Post a Comment