Showing posts with label MapReduce. Show all posts
Showing posts with label MapReduce. Show all posts

Thursday, May 4, 2017

Hadoop Streaming: Specifying Map-Only Jobs

Often, you may want to process input data using a map function only. To do this, simply set mapred.reduce.tasks to zero. The Map/Reduce framework will not create any reducer tasks. Rather, the outputs of the mapper tasks will be the final output of the job.

    -D mapred.reduce.tasks=0

Hadoop Streaming also supports map-only jobs by specifying "-D mapred.reduce.tasks=0".

To specify map-only jobs, use

hadoop jar hadoop-streaming-2.7.1.2.4.0.0-169.jar 
-D mapred.reduce.tasks=0 
-input /user/root/wordcount 
-output /user/root/out 
-mapper /bin/cat


We can also achieve map-only jobs by specifying "-numReduceTasks 0"

hadoop jar hadoop-streaming-2.7.1.2.4.0.0-169.jar 
-input /user/root/wordcount 
-output /user/root/out 
-mapper /bin/cat 
-numReduceTasks 0

Hadoop MapReduce job on Hortonworks Sandbox to find maximum temperature on NCDC weather dataset


If you want to setup Hortonworks Sandbox, follow the link
http://hortonworks.com/wp-content/uploads/2012/03/Tutorial_Hadoop_HDFS_MapReduce.pdf
Please follow my another post on how to download weather dataset from NCDC.


I have downloaded a sample weather dataset for the year 2017 on my local filesystem in the directory, /home/pluralsight/mapreduce/weather. You are free to download any number of files and any file.
[root@sandbox weather]# pwd
/home/pluralsight/mapreduce/weather
[root@sandbox weather]# ls -ltr
total 72
-rw-r--r-- 1 root root 70234 2017-05-04 16:59 007026-99999-2017.gz

Upload this file to HDFS 
[root@sandbox weather]# hadoop dfs -put /home/pluralsight/mapreduce/weather/007026-99999-2017.gz /home/pluralsight/weather
[root@sandbox weather]# hadoop dfs -ls /home/pluralsight/weather/
Found 1 items
-rw-r--r--   3 root hdfs      70234 2017-05-04 17:16 /home/pluralsight/weather/007026-99999-2017.gz

Now the file is in HDFS directory. Below is the MapReduce program in java to find the maximum temperature.

Application to find maximum temperature, maxTempDriver.java
------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class maxTempDriver extends Configured implements Tool{

        public static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
                @Override
                public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
                        String line = value.toString();
                        String year = line.substring(15,19);
                        int airTemp = Integer.parseInt(line.substring(87,92));
                        context.write(new Text(year), new IntWritable(airTemp));
                }
        }


        public static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
                @Override
                public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                        int maxValue = Integer.MIN_VALUE;
                        for (IntWritable value: values) {
                                maxValue = Math.max(maxValue, value.get());
                        }
                        context.write(key, new IntWritable(maxValue));
                }
        }

        public int run(String[] args) throws Exception {

                Job job = new Job(getConf(),"Max Temp");
                job.setJarByClass(maxTempDriver.class);

                job.setMapperClass(MaxTempMapper.class);
                job.setReducerClass(MaxTempReducer.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);

                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job,new Path(args[1]));

                return job.waitForCompletion(true) ? 0 : 1;
        }

        public static void main(String[] args) throws Exception{
                int exitCode = ToolRunner.run(new maxTempDriver(),args);
                System.exit(exitCode);
        }
}

-------------------------------------------------------------------------------------------------------------------

From the command line, compile .java program. First, check to see if your CLASSPATH variable is set in your environment. 

[root@sandbox maxTemp]# echo $CLASSPATH

[root@sandbox maxTemp]#

If your CLASSPATH is not set to include hadoop jar files required to compile your program, you can set them directly in the command. We need hadoop-common*.jar and hadoop-mapreduce-client-core*.jar files.
You can use find command to get the absolute path of these jar files in your sandbox.

[root@sandbox maxTemp]# find / -name hadoop-common*.jar
[root@sandbox maxTemp]# find / -name hadoop-mapreduce*.jar

For example in my environment hadoop-common*.jar file is present in /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar. Please include the relative paths of these jar files in your javac command as per your find results.
 
[root@sandbox maxTemp]# javac -classpath /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar:/usr/hdp/2.4.0.0-169/hadoop/client/hadoop-mapreduce-client-core.jar maxTempDriver.java

This command creates three class files, one for driver, one for mapper and one for reducer classes.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class


For a start in distributed setting, job's classes must be packaged into a job JAR file to send to cluster. Creating a JAR file can be conveniently achieved by

[root@sandbox maxTemp]#  jar -cvf maxTempDriver.jar *.class

This command packages all .class files into maxTempDriver.jar JAR file.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class
-rw-r--r-- 1 root root 3038 2017-05-04 16:02 maxTempDriver.jar

To launch the job, we need to run the driver. Driver picks the cluster information by default from conf/core-site.xml file.

[root@sandbox wordCount]# hadoop jar maxTempDriver.jar maxTempDriver --libjars maxTempDriver.jar /home/pluralsight/weather /home/pluralsight/out

Here is the stats summary of map and reduce jobs.

17/05/04 17:46:41 INFO mapreduce.Job:  map 100% reduce 100%
17/05/04 17:46:41 INFO mapreduce.Job: Job job_1493910685734_0010 completed successfully
17/05/04 17:46:41 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=43863
                FILE: Number of bytes written=354815
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=70380
                HDFS: Number of bytes written=9
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=4220
                Total time spent by all reduces in occupied slots (ms)=4377
                Total time spent by all map tasks (ms)=4220
                Total time spent by all reduce tasks (ms)=4377
                Total vcore-seconds taken by all map tasks=4220
                Total vcore-seconds taken by all reduce tasks=4377
                Total megabyte-seconds taken by all map tasks=1055000
                Total megabyte-seconds taken by all reduce tasks=1094250
        Map-Reduce Framework
                Map input records=3987
                Map output records=3987
                Map output bytes=35883
                Map output materialized bytes=43863
                Input split bytes=146
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=43863
                Reduce input records=3987
                Reduce output records=1
                Spilled Records=7974
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=93
                CPU time spent (ms)=2300
                Physical memory (bytes) snapshot=361009152
                Virtual memory (bytes) snapshot=1722482688
                Total committed heap usage (bytes)=265814016
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=70234
        File Output Format Counters
                Bytes Written=9

After successful completion of mapreduce job, output is saved by OutputCommitter on HDFS in the directory /home/pluralsight/out. Lets have a look at the output,

[root@sandbox wordCount]# hadoop dfs -cat /home/pluralsight/out2/*

2017    290

Since, we analyzed weather dataset for 2017 year we have single record in the output file. You can include multiple files from different years to get maximum temperature by year. Please note, maximum temperature for 2017 is shows as 290 degress which is unusual. If we include the logic to check the valid temperatures in our code, we can catch these scenarios.

Hope this helps.

Download National Climatic Data Center (NCDC) weather dataset for Hadoop MapReduce


To practice examples mentioned in  "Hadoop: The Definitive Guide" book by Tom White , http://www.amazon.com/Hadoop-Definitive-Guide-Tom-White/dp/1449311520 we need weather dataset from NCDC. The link https://www.ncdc.noaa.gov/ mentioned in the Tom White book doesn't easily provide the link to download weather dataset.

I had to spend lot of time looking for these datasets. Now I am here providing you with steps to download the weather dataset.

1. Datasets are available for download on this FTP link. Data is partitioned by year starting from 1901 to till date.

2. We can directly download these datasets on Unix machines. Command to do that, 

wget ftp://ftp.ncdc.noaa.gov:21/pub/data/noaa/2017/007026-99999-2017.gz

This command downloads 007026-99999-2017.gz file for the year 2017 onto your current working directory.

Hope this helps.

Tuesday, April 11, 2017

Hadoop MapReduce: Basic concepts


  • Design of HDFS:
    • Suitable for Very Large Files
    • Streaming data access with write-once and read-many-times pattern.
    • Requires commodity hardware.
  • HDFS is not good for
    • Low latency data access like OLTP systems
    • Lots of small files because namenode memory is limited
    • Multiple writes and arbitrary file modifications
  • HDFS has two types of nodes
    • Namenode
    • Datanode
  • Namenode manages the filesystem namespace, the file system tree and the metadata for all the files and directories in the tree.
  • Namespace stores information persistently in two files: Namespace image and edit log.
  • Namenode knows the datanodes on which all the blocks are located. But the exact block location is stored by datanode.  
  • Secondary Namenode: 
    • It doesn't act as a namenode
    • Its main role is to periodically merge the namespace image with edit log to prevent edit log from becoming too large.
    • Secondary namenode keeps the copies of the merged namespace image incase primary namenode fails.
    • Secondary namenode lags that of the primary, so in case of primary namenode failure data loss is certain.
    • Secondary namenode runs on a separate physical machine with plenty of CPU and memory.
  • If namenode fails, all files on the filesystem is lost since metadata is lost.
  • HDFS Federation:
    • When metadata exceeds the namenode memory then memory becomes the bottleneck. To avoid this situation, "HDFS Federation" feature introduced in 2.x release allows the cluster to add namenodes.
    • Each namenode in the cluster manages a portion of filesystem namespace. For example, one namenode can manage all the files under /root and second namenode manages all files under /home.
    • Each namenode manages a namespace volume and a block pool. Namespace volume is made up of the metadata for the namespace and block pool contains all the blocks for the files in the namespace.
    • These namenodes under federation do not communicate with each other. The failure of one namenode doesn't affect the availability of another.  
  • High Availability of Namenode in Hadoop 1.x:
    • One method is to replicate persistent state of namenode on multiple filesystems. These writes are synchronous and atomic. The usual choice is to write on local disk and on remote NFS mount.
    • Secondary namenode doesn't act as primary namenode. But in case of primary failure, metadata on NFS can be copied to secondary and then run it as new primary. To perform all these actions, there will be a downtime of up to 30 mins.
    • Since secondary always lags with that of primary, data loss is inevitable.
  • HDFS High Availability:
    • HDFS High  availability in Hadoop 2.x resolves namenode high availability issue in Hadoop 1.x.
    • In this implementation, there are a pair of namenodes in an active-standby configuration. If active namenode fails, standby takes over as new active. In this configuration, data nodes must send block reports to both namenodes, active and standby. So standby always have latest state available in memory.
    • The observed failover time in this case will be around a minute.
  • Failover and fencing:
    • The transition from active namenode to standby is managed by failover controller. The default implementation uses ZOOKEEPER to ensure one namenode is always active.
    • There are two types of failovers:
      • Graceful failover
      • Ungraceful failover
    • Graceful failover is when admin manually initiates failover as part of maintenance. Failover controller arranges an orderly transition.
    • Ungraceful failover can be triggered by
      • Slow network
      • Failed active namenode
    • In case of slow network, standby namenode takes over the active assuming active namenode is down. But in this case previously active namenode is still running. So HA implementation (ZooKeepe make sures previously active namenode is stopped - a method known as fencing.
    • Different Fencing methods are
      • Killing the previously active namenode process
      • Revoking namenodes access to shared storage directory
      • Disabling the network ports
      • STONITH - Shoot The Other Node In The Head
    • Failover is transparent to the user.
  • Distcp:
    • Distcp is an efficient replacement for  "hadoop fs -cp".
    • Distcp copies the files to and from filesystems in parallel.
    • Distcp is implemented as a mapreduce job where the work of copying files is done by mapper in parallel and no reducers.
    • By default, upto 20 mappers are used.
    • Examples:
      • hadoop distcp dir1 dir2  -> Copies dir1 from HDFS file system to dir2 on the same HDFS file system
      • hadoop distcp file:///dir1 dir2 -> Copies dir1 from Local file system to dir2 on HDFS file system
      • hadoop distcp webhdfs:///dir1 webhdfs:///dir2  -> Copies dir1 from HDFS file system to dir2 on another HDFS file system. When two clusters are running incompatible versions of HDFS, we can use webhdfs protocol to distcp between them.
  • Balancer:
    • When multiple files are being copied by distcp, the first replica of each block in each file would reside on the node running the map taking network topology into account. The second and third replicas are spread across the cluster.
    • But the node running the map would be unbalanced.
    • Balancer tool can be used to even out the block distribution across the cluster.

Monday, February 2, 2015

Packaging small files into SequenceFile by using WholeFileInputFormat and WholeFileRecordReader

Sometimes we would want to process whole file as input to mapreduce. This can be achieved by

1. Not splitting the File
2. Having RecordReader that delivers the file contents as value of record.

Lets demonstrate this by creating a mapreduce job for packaging small files into sequence files where key is the name of the file and value is the contents of the whole file.

"wholeFileInputFormat" class in the below code shows a way of doing this. "wholeFileInputFormat" class takes NullWritable as key and BytesWritable as value, where value is the contents of the file. The isSplitable() method returns false which specifies that input file should not be split. The second overriden method createRecordReader() returns a custom implementation of RecordReader.

 The name of the file can be derived from context object in the mapper. It does this by casting InputSplit from the context to FileSplit and then using getPath() method. The outputformat is SequenceFileOutputFormat, since we need to store them as Sequence Files.

The number of map tasks launched depends on the number of input files to package. In our example, we are trying to package 5 small employee files, so we would have 5 mappers launched even if the input file size is smaller than block size. 

Lets look at the input files:

$ hadoop dfs -lsr emp
-rw-r--r--   1 swetha supergroup        643 2015-02-01 14:29 /user/swetha/emp/emp1
-rw-r--r--   1 swetha supergroup        542 2015-02-01 14:29 /user/swetha/emp/emp2
-rw-r--r--   1 swetha supergroup        629 2015-02-01 14:29 /user/swetha/emp/emp3
-rw-r--r--   1 swetha supergroup        596 2015-02-01 14:29 /user/swetha/emp/emp4
-rw-r--r--   1 swetha supergroup    1866851 2015-02-01 22:51 /user/swetha/emp/emp5


Code:

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class wholeFileReader extends Configured implements Tool{
   
    public static class wholeFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
       
        private Text fileName;
       
        @Override
        protected void setup(Context context) throws IOException, InterruptedException{
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            fileName = new Text(path.toString());
        }
       
        @Override
        protected void map(NullWritable key, BytesWritable value,Context context)
                throws IOException, InterruptedException{
            context.write(fileName, value);
        }
    }

    public static class wholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
       
        @Override
        protected boolean isSplitable(JobContext context,Path file){
            return false;
        }
       
        @Override
        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException,InterruptedException{
            wholeFileRecordReader reader = new wholeFileRecordReader();
            reader.initialize(split,context);
            return reader;
        }
    }
   
    public static class wholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
        private FileSplit fileSplit;
        private Configuration conf;
        private BytesWritable value = new BytesWritable();
        private boolean processed = false;
       
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException,InterruptedException{
            this.fileSplit = (FileSplit) split;
            this.conf = context.getConfiguration();
        }
       
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException{
            if (!processed){
                byte[] contents = new byte[(int) fileSplit.getLength()];
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                FSDataInputStream in = null;
                try{
                    in = fs.open(file);
                    IOUtils.readFully(in, contents, 0, contents.length);
                    value.set(contents,0,contents.length);
                }finally{
                    IOUtils.closeStream(in);
                }
                processed = true;
                return true;
            }
            return false;
        }
       
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException{
            return NullWritable.get();
        }
       
        @Override
        public BytesWritable getCurrentValue() throws IOException, InterruptedException{
            return value;
        }
       
        @Override
        public float getProgress() throws IOException{
            return processed ? 1.0f : 0.0f;
        }
       
        @Override
        public void close() throws IOException{
            // do nothing
        }
    }
   
    public int run(String[] args) throws Exception{
        Configuration conf = new Configuration();
       
        Job job = new Job(conf,"Whole File Input Format");
        job.setJarByClass(wholeFileReader.class);

        job.setMapperClass(wholeFileMapper.class);
        job.setInputFormatClass(wholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
       
        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new wholeFileReader(), args);
        System.exit(exitcode);
    }
}


Output :

$ hadoop jar wholeFileReader.jar wholeFileReader emp sequencefile
15/02/02 19:04:07 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/02 19:04:08 INFO input.FileInputFormat: Total input paths to process : 5
15/02/02 19:04:09 INFO mapred.JobClient: Running job: job_201501301653_0084
15/02/02 19:04:10 INFO mapred.JobClient:  map 0% reduce 0%
15/02/02 19:04:22 INFO mapred.JobClient:  map 40% reduce 0%
15/02/02 19:04:35 INFO mapred.JobClient:  map 80% reduce 0%
15/02/02 19:04:42 INFO mapred.JobClient:  map 100% reduce 0%
15/02/02 19:04:44 INFO mapred.JobClient:  map 100% reduce 33%
15/02/02 19:04:46 INFO mapred.JobClient:  map 100% reduce 100%
15/02/02 19:04:50 INFO mapred.JobClient: Job complete: job_201501301653_0084
15/02/02 19:04:50 INFO mapred.JobClient: Counters: 29
15/02/02 19:04:50 INFO mapred.JobClient:   Map-Reduce Framework
15/02/02 19:04:50 INFO mapred.JobClient:     Spilled Records=10
15/02/02 19:04:50 INFO mapred.JobClient:     Map output materialized bytes=1869547
15/02/02 19:04:50 INFO mapred.JobClient:     Reduce input records=5
15/02/02 19:04:50 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=11063402496
15/02/02 19:04:50 INFO mapred.JobClient:     Map input records=5
15/02/02 19:04:50 INFO mapred.JobClient:     SPLIT_RAW_BYTES=535
15/02/02 19:04:50 INFO mapred.JobClient:     Map output bytes=1869496
15/02/02 19:04:50 INFO mapred.JobClient:     Reduce shuffle bytes=1869547
15/02/02 19:04:50 INFO mapred.JobClient:     Physical memory (bytes) snapshot=930562048
15/02/02 19:04:50 INFO mapred.JobClient:     Reduce input groups=5
15/02/02 19:04:50 INFO mapred.JobClient:     Combine output records=0
15/02/02 19:04:50 INFO mapred.JobClient:     Reduce output records=5
15/02/02 19:04:50 INFO mapred.JobClient:     Map output records=5
15/02/02 19:04:50 INFO mapred.JobClient:     Combine input records=0
15/02/02 19:04:50 INFO mapred.JobClient:     CPU time spent (ms)=2820
15/02/02 19:04:50 INFO mapred.JobClient:     Total committed heap usage (bytes)=693391360
15/02/02 19:04:50 INFO mapred.JobClient:   File Input Format Counters
15/02/02 19:04:50 INFO mapred.JobClient:     Bytes Read=1869261
15/02/02 19:04:50 INFO mapred.JobClient:   FileSystemCounters
15/02/02 19:04:50 INFO mapred.JobClient:     HDFS_BYTES_READ=1869796
15/02/02 19:04:50 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=4072689
15/02/02 19:04:50 INFO mapred.JobClient:     FILE_BYTES_READ=1869523
15/02/02 19:04:50 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=1869643
15/02/02 19:04:50 INFO mapred.JobClient:   Job Counters
15/02/02 19:04:50 INFO mapred.JobClient:     Launched map tasks=5
15/02/02 19:04:50 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/02 19:04:50 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=24378
15/02/02 19:04:50 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/02 19:04:50 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=55834
15/02/02 19:04:50 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/02 19:04:50 INFO mapred.JobClient:     Data-local map tasks=5
15/02/02 19:04:50 INFO mapred.JobClient:   File Output Format Counters
15/02/02 19:04:50 INFO mapred.JobClient:     Bytes Written=1869643


When we look at the Launched map tasks counter, it shows as 5 which means a mapper has been launched for each input file. Having one file per mapper is inefficient, so this process could be improved by using CombinedFileInputFormat instead of FileInputFormat. Please refer to my another post on how to use CombineFileInputFormat.

http://hadoopsupport.blogspot.com/2015/02/combinefileinputformat-mapreduce-code.html

Data in the output file can be inspected with -text option. Sample output will look like this.

$ hadoop dfs -text sequencefile/part*
hdfs://localhost:9000/user/swetha/emp/emp1      4e 61 6d 65 2c 50 6f 73 69 74 69 6f 6e 20 54 69 74 6c 65 2c 44 65 70 61 72 74 6d 65 6e 74 2c 45 6d 70 6c 6f 79 65 65 20 41 6e 6e 75 61 6c 20 53 61 6c 61 72 79 0a 22 41 41 52 4f 4e 2c 20 20 45 4c 56 49 41 20 4a 22 2c 57 41 54 45 52 20 52 41 54 45 20 54 41 4b 45 52 2c 57 41 54 45 52 20 4d 47 4d 4e 54 2c 24 38 37 32 32 38 2e 30 30 0a 22 41 41 52 4f 4e 2c 20 20 4a 45 46 46 
hdfs://localhost:9000/user/swetha/emp/emp2      22 41 42 42 4f 54 54 2c 20 20 42 45 54 54 59 20 4c 22 2c 46 4f 53 54 45 52 20 47 52 41 4e 44 50 41 52 45 4e 54 2c 46 41 4d 49 4c 59 20 26 20 53 55 50 50 4f 52 54 2c 24 32 37 35 36 2e 30 30 0a 22 41 42 42 4f 54 54 2c 20 20 4c 59 4e 49 53 45 20 4d 22 2c 43 4c 45 52 4b 20 49 49 49 2c 50 4f 4c 49 43 45 2c 24 34 31 37 38 34 2e 30 30 0a 22 41 42 42 52 55 5a 5a 45 53 45 2c 20 20 57 49

Sunday, February 1, 2015

CombineFileInputFormat - Mapreduce code to handle many small files

CombineFileInputFormat is an abstract class without any concrete classes, unlike FileInputFormat. To have CombineFileInputFormat equivalent of TextInputFormat, we need to create a concrete subclass of CombineFileInputFormat and implement the getRecordReader() method.

CombineFileInputFormat combines many small files into a split, providing more data for each map task. It also considers data locality so performance is not compromised.

Lets try a mapreduce program which combines many small emp files by using CombineFileInputFormat.

Sample data in the input files would look like this

Name,Position Title,Department,Employee Annual Salary
"AARON,  ELVIA J",WATER RATE TAKER,WATER MGMNT,$87228.00
"AARON,  JEFFERY M",POLICE OFFICER,POLICE,$75372.00
"AARON,  KARINA",POLICE OFFICER,POLICE,$75372.00
"AARON,  KIMBERLEI R",CHIEF CONTRACT EXPEDITER,GENERAL SERVICES,$80916.00
"ABAD JR,  VICENTE M",CIVIL ENGINEER IV,WATER MGMNT,$99648.00
"ABARCA,  ANABEL",ASST TO THE ALDERMAN,CITY COUNCIL,$70764.00
"ABARCA,  EMMANUEL",GENERAL LABORER - DSS,STREETS & SAN,$40560.00
"ABBATACOLA,  ROBERT J",ELECTRICAL MECHANIC,AVIATION,$89440.00
"ABBATEMARCO,  JAMES J",FIRE ENGINEER,FIRE,$84396.00
"ABBATE,  TERRY M",POLICE OFFICER,POLICE,$80724.00
"ABBOTT,  BETTY L",FOSTER GRANDPARENT,FAMILY & SUPPORT,$2756.00
"ABBOTT,  LYNISE M",CLERK III,POLICE,$41784.00


I have created five small emp files and placed them under emp directory in HDFS.

 $ hadoop dfs -lsr emp
-rw-r--r--   1 swetha supergroup        643 2015-02-01 14:29 /user/swetha/emp/emp1
-rw-r--r--   1 swetha supergroup        542 2015-02-01 14:29 /user/swetha/emp/emp2
-rw-r--r--   1 swetha supergroup        629 2015-02-01 14:29 /user/swetha/emp/emp3
-rw-r--r--   1 swetha supergroup        596 2015-02-01 14:29 /user/swetha/emp/emp4
-rw-r--r--   1 swetha supergroup    1866851 2015-02-01 22:51 /user/swetha/emp/emp5


In the mapreduce code we need to set mapred.max.split.size. This determines the size of Input split that a mapper has to handle.

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;

public class combileSmallEmpFiles extends Configured implements Tool{

    public static class combineMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        public void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{
            String a = value.toString();
            String[] b = a.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
           
            if (!(b[0].equals("Name")))
                context.write(new Text(b[0]), new Text(b[2]));
        }
    }
   
    public static class combineReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException{
            for (Text values : value){
                context.write(key, values);
            }
        }
    }
   
    public static class combineEmpInputFormat extends CombineFileInputFormat<LongWritable,Text>{
       
        @Override
        public RecordReader<LongWritable,Text> createRecordReader(InputSplit split,TaskAttemptContext context)
            throws IOException{
            CombineFileRecordReader<LongWritable,Text> reader = new CombineFileRecordReader<LongWritable,Text>(
                    (CombineFileSplit) split, context, myCombineFileRecordReader.class);
            return reader;
        }
    }
   
    public static class myCombineFileRecordReader extends RecordReader<LongWritable,Text>{
        private LineRecordReader lineRecordReader = new LineRecordReader();
       
        public myCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
            throws IOException{
            FileSplit fileSplit = new FileSplit(split.getPath(index),split.getOffset(index),
                    split.getLength(index), split.getLocations());
            lineRecordReader.initialize(fileSplit, context);
        }
       
       
        @Override
        public void initialize(InputSplit split,TaskAttemptContext context)
            throws IOException, InterruptedException{
            //
        }
       
        @Override
        public void close() throws IOException{
            lineRecordReader.close();
        }
       
        @Override
        public float getProgress() throws IOException{
            return lineRecordReader.getProgress();
        }
       
        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException{
            return lineRecordReader.getCurrentKey();
        }
       
        @Override
        public Text getCurrentValue() throws IOException,InterruptedException{
            return lineRecordReader.getCurrentValue();
        }
       
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException{
            return lineRecordReader.nextKeyValue();
        }
       
    }
    public int run(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.set("mapred.max.split.size", "1048576"); //1MB
       
        Job job = new Job(conf,"Combine small files");
        job.setJarByClass(combileSmallEmpFiles.class);

        job.setMapperClass(combineMapper.class);
        job.setReducerClass(combineReducer.class);
        job.setInputFormatClass(combineEmpInputFormat.class);
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
       
        return job.waitForCompletion(true) ? 0:1;
    }
   
    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new combileSmallEmpFiles(), args);
        System.exit(exitcode);
    }
}


Output would look like this

$ hadoop jar combineSmallEmpFiles.jar combileSmallEmpFiles emp out1
15/02/01 23:12:37 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/01 23:12:38 INFO input.FileInputFormat: Total input paths to process : 5
15/02/01 23:12:38 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/02/01 23:12:38 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/01 23:12:38 INFO mapred.JobClient: Running job: job_201501301653_0077
15/02/01 23:12:39 INFO mapred.JobClient:  map 0% reduce 0%
15/02/01 23:12:52 INFO mapred.JobClient:  map 100% reduce 0%
15/02/01 23:13:03 INFO mapred.JobClient:  map 100% reduce 33%
15/02/01 23:13:05 INFO mapred.JobClient:  map 100% reduce 100%
15/02/01 23:13:09 INFO mapred.JobClient: Job complete: job_201501301653_0077
15/02/01 23:13:09 INFO mapred.JobClient: Counters: 28
15/02/01 23:13:09 INFO mapred.JobClient:   Map-Reduce Framework
15/02/01 23:13:09 INFO mapred.JobClient:     Spilled Records=64398
15/02/01 23:13:09 INFO mapred.JobClient:     Map output materialized bytes=996759
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce input records=32199
15/02/01 23:13:09 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3702489088
15/02/01 23:13:09 INFO mapred.JobClient:     Map input records=32201
15/02/01 23:13:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=370
15/02/01 23:13:09 INFO mapred.JobClient:     Map output bytes=932355
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce shuffle bytes=996759
15/02/01 23:13:09 INFO mapred.JobClient:     Physical memory (bytes) snapshot=262676480
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce input groups=31890
15/02/01 23:13:09 INFO mapred.JobClient:     Combine output records=0
15/02/01 23:13:09 INFO mapred.JobClient:     Reduce output records=32199
15/02/01 23:13:09 INFO mapred.JobClient:     Map output records=32199
15/02/01 23:13:09 INFO mapred.JobClient:     Combine input records=0
15/02/01 23:13:09 INFO mapred.JobClient:     CPU time spent (ms)=2890
15/02/01 23:13:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=164630528
15/02/01 23:13:09 INFO mapred.JobClient:   File Input Format Counters
15/02/01 23:13:09 INFO mapred.JobClient:     Bytes Read=0
15/02/01 23:13:09 INFO mapred.JobClient:   FileSystemCounters
15/02/01 23:13:09 INFO mapred.JobClient:     HDFS_BYTES_READ=1869631
15/02/01 23:13:09 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=2104561
15/02/01 23:13:09 INFO mapred.JobClient:     FILE_BYTES_READ=996759
15/02/01 23:13:09 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=932355
15/02/01 23:13:09 INFO mapred.JobClient:   Job Counters
15/02/01 23:13:09 INFO mapred.JobClient:     Launched map tasks=1
15/02/01 23:13:09 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/01 23:13:09 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=13302
15/02/01 23:13:09 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/01 23:13:09 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=16469
15/02/01 23:13:09 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/01 23:13:09 INFO mapred.JobClient:   File Output Format Counters
15/02/01 23:13:09 INFO mapred.JobClient:     Bytes Written=932355


Secondary sort technique in Mapreduce : List all departments with employee name in ascending order and salary in descending order

Here is the employee table and the sample data:

Emp
Name,Position Title,Department,Employee Annual Salary
"AARON,  ELVIA J",WATER RATE TAKER,WATER MGMNT,$87228.00
"AARON,  JEFFERY M",POLICE OFFICER,POLICE,$75372.00
"AARON,  KARINA",POLICE OFFICER,POLICE,$75372.00
"AARON,  KIMBERLEI R",CHIEF CONTRACT EXPEDITER,GENERAL SERVICES,$80916.00
"ABAD JR,  VICENTE M",CIVIL ENGINEER IV,WATER MGMNT,$99648.00
"ABARCA,  ANABEL",ASST TO THE ALDERMAN,CITY COUNCIL,$70764.00
"ABARCA,  EMMANUEL",GENERAL LABORER - DSS,STREETS & SAN,$40560.00
"ABBATACOLA,  ROBERT J",ELECTRICAL MECHANIC,AVIATION,$89440.00
"ABBATEMARCO,  JAMES J",FIRE ENGINEER,FIRE,$84396.00


The input to the mapreduce is the employee data. The output will be the list of departments with employee name in ascending order and salary in descending order.

To acheive this, we need to do the following:
1. Here natural key is dept and natural values employee name and salary.
2.To get natural values in sorted order, we have to make natural value part of key. So we need composite key which is a composite of natural key and natural value.
3.We need to implement custom partitioner on natural key alone, because all values belonging to same natural key should go to same reducer (partition).
4.The sort comparator should sort by composite key , natural key and natural value.
5. The group comparator should group only on natural key . When multiple natural keys are part of same partition then reducer should consider records with same natural key into one group.

Here is the mapreduce code to achieve the results with secondary sort technique.

The below code is synonymous to the SQL:

select dept,empname,salary from emp order by empname, salary desc;


import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import java.io.*;


//For each department display employee name and salary in order..
public class secondarySortEmpDetails extends Configured implements Tool {
 

 //Create a composite key with natural key and natural value and implements //WritableComparable
  
public static class compositeKey implements WritableComparable<compositeKey>{
       
        private String dept;
        private String empName;
        private String salary;
       
        public  compositeKey(){
            this.dept = null;
            this.empName = null;
            this.salary = null;
        }
       
        public  compositeKey(String dept, String empName, String salary){
            this.dept = dept;
            this.empName = empName;
            this.salary = salary;
        }
       
        @Override
        public void write(DataOutput out) throws IOException{
            out.writeUTF(dept);
            out.writeUTF(empName);
            out.writeUTF(salary);
        }
       
        @Override
        public void readFields(DataInput in) throws IOException{
            dept = in.readUTF();
            empName = in.readUTF();
            salary = in.readUTF();
        }
       
        @Override
        public String toString(){
            return dept + "-------->" + empName +","+salary;
        }
       
        @Override
        public int compareTo(compositeKey other){
            int result = dept.compareTo(other.dept);
            if (result == 0){
                result = empName.compareTo(other.empName);
                if (result == 0){
                    result = salary.compareTo(other.salary);
                }
            }
            return result;
        }
       
        public void setempName(String empName){
            this.empName = empName;
        }
       
        public void setSalary(String salary){
            this.salary = salary;
        }
       
        public void setDept(String dept){
            this.dept = dept;
        }
       
        public String getempName(){
            return empName;
        }
       
        public String getSalary(){
            return salary;
        }
       
        public String getDept(){
            return dept;
        }
       
        @Override
        public int hashCode(){
            return  dept.hashCode();
        }
       
        @Override
        public boolean equals(Object obj){
            compositeKey other = (compositeKey) obj;
            return dept.equals(other.dept) && empName.equals(other.empName) && salary.equals(other.salary);
        }
    }

    static class empMap extends Mapper<LongWritable, Text, compositeKey, NullWritable> {
       
        @Override
        public void map( LongWritable key,  Text value, Context context)
                throws IOException,InterruptedException{
        String a = value.toString();
        String[] b = a.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
        //String empName = b[0];
        //String dept = b[2];
        //String salary = b[3];
        if (!(b[0].equals("Name")))
            context.write(new compositeKey(b[2].toString(),b[0].toString(),b[3].toString()),NullWritable.get());
        }
    }
   
    static class empReduce extends Reducer<compositeKey, NullWritable, compositeKey, NullWritable> {
       
        @Override
        public void reduce(compositeKey key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {

            for(NullWritable value : values)
                context.write(key,NullWritable.get());
           
        }
    }

    //Secondary sort partitioner based on natural key
    public static class SecondarySortPartitioner
        extends Partitioner<compositeKey,NullWritable>{
       
        @Override
        public int getPartition(compositeKey key, NullWritable value, int numPartitions){
            return ((key.getDept().hashCode() & Integer.MAX_VALUE) % numPartitions);   
            }
    }
   
    //Sort Comparator based on natural key and natural value
    public static class CompkeySortComparator extends WritableComparator {
        protected CompkeySortComparator(){
            super(compositeKey.class,true);
        }
       
        @Override
        public int compare(WritableComparable w1,WritableComparable w2){
            compositeKey ip1 = (compositeKey) w1;
            compositeKey ip2 = (compositeKey) w2;
           
           
            int cmp = ip1.getDept().compareTo(ip2.getDept());
            if (cmp == 0){
                cmp = ip1.getempName().compareTo(ip2.getempName()); //Ascending Order
                if (cmp == 0){
                    return -ip1.getSalary().compareTo(ip2.getSalary()); //Descending Order
                    //If minus is taken out the results will be in ascending order;   
                }
            }
            return cmp;
        }
    }
       
    // Grouping Comparator based on natural key
    public static class groupingComparator extends WritableComparator {
        protected groupingComparator(){
            super(compositeKey.class,true);
        }
       
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            compositeKey key1 = (compositeKey) w1;
            compositeKey key2 = (compositeKey) w2;
           
            return key1.getDept().compareTo(key2.getDept());
        }
    }
   
    @Override
    public int run(String[] args) throws Exception {
       
        Job job = new Job(getConf());
        job.setJarByClass(getClass());
        job.setJobName("Employee Details .. Sort values in order");
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setMapperClass(empMap.class);
        job.setPartitionerClass(SecondarySortPartitioner.class);
        job.setGroupingComparatorClass(groupingComparator.class);
        job.setSortComparatorClass(CompkeySortComparator.class);
        job.setReducerClass(empReduce.class);

       
        job.setOutputKeyClass(compositeKey.class);
        job.setOutputValueClass(NullWritable.class);
       
       
        return job.waitForCompletion(true)?0 : 1;
    }
   

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new secondarySortEmpDetails() , args);
        System.exit(exitcode);
    }
}

Command to run the job

$ hadoop jar secondarySortEmpDetails.jar secondarySortEmpDetails emp out11

15/02/01 12:12:45 INFO input.FileInputFormat: Total input paths to process : 1
15/02/01 12:12:45 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/02/01 12:12:45 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/01 12:12:45 INFO mapred.JobClient: Running job: job_201501301653_0072
15/02/01 12:12:46 INFO mapred.JobClient:  map 0% reduce 0%
15/02/01 12:12:58 INFO mapred.JobClient:  map 100% reduce 0%
15/02/01 12:13:11 INFO mapred.JobClient:  map 100% reduce 100%
15/02/01 12:13:16 INFO mapred.JobClient: Job complete: job_201501301653_0072
15/02/01 12:13:16 INFO mapred.JobClient: Counters: 29
15/02/01 12:13:16 INFO mapred.JobClient:   Map-Reduce Framework
15/02/01 12:13:16 INFO mapred.JobClient:     Spilled Records=18
15/02/01 12:13:16 INFO mapred.JobClient:     Map output materialized bytes=422
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce input records=9
15/02/01 12:13:16 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=3700391936
15/02/01 12:13:16 INFO mapred.JobClient:     Map input records=10
15/02/01 12:13:16 INFO mapred.JobClient:     SPLIT_RAW_BYTES=102
15/02/01 12:13:16 INFO mapred.JobClient:     Map output bytes=398
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce shuffle bytes=422
15/02/01 12:13:16 INFO mapred.JobClient:     Physical memory (bytes) snapshot=260874240
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce input groups=7
15/02/01 12:13:16 INFO mapred.JobClient:     Combine output records=0
15/02/01 12:13:16 INFO mapred.JobClient:     Reduce output records=9
15/02/01 12:13:16 INFO mapred.JobClient:     Map output records=9
15/02/01 12:13:16 INFO mapred.JobClient:     Combine input records=0
15/02/01 12:13:16 INFO mapred.JobClient:     CPU time spent (ms)=1190
15/02/01 12:13:16 INFO mapred.JobClient:     Total committed heap usage (bytes)=164630528
15/02/01 12:13:16 INFO mapred.JobClient:   File Input Format Counters
15/02/01 12:13:16 INFO mapred.JobClient:     Bytes Read=592
15/02/01 12:13:16 INFO mapred.JobClient:   FileSystemCounters
15/02/01 12:13:16 INFO mapred.JobClient:     HDFS_BYTES_READ=694
15/02/01 12:13:16 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=113305
15/02/01 12:13:16 INFO mapred.JobClient:     FILE_BYTES_READ=422
15/02/01 12:13:16 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=443
15/02/01 12:13:16 INFO mapred.JobClient:   Job Counters
15/02/01 12:13:16 INFO mapred.JobClient:     Launched map tasks=1
15/02/01 12:13:16 INFO mapred.JobClient:     Launched reduce tasks=1
15/02/01 12:13:16 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=13160
15/02/01 12:13:16 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/01 12:13:16 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=15115
15/02/01 12:13:16 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/02/01 12:13:16 INFO mapred.JobClient:     Data-local map tasks=1
15/02/01 12:13:16 INFO mapred.JobClient:   File Output Format Counters
15/02/01 12:13:16 INFO mapred.JobClient:     Bytes Written=443


The output will look like this.

$ hadoop dfs -cat out11/p*
AVIATION-------->"ABBATACOLA,  ROBERT J",$89440.00
CITY COUNCIL-------->"ABARCA,  ANABEL",$70764.00
FIRE-------->"ABBATEMARCO,  JAMES J",$84396.00
GENERAL SERVICES-------->"AARON,  KIMBERLEI R",$80916.00
POLICE-------->"AARON,  JEFFERY M",$75372.00
POLICE-------->"AARON,  KARINA",$75372.00
STREETS & SAN-------->"ABARCA,  EMMANUEL",$40560.00
WATER MGMNT-------->"AARON,  ELVIA J",$87228.00
WATER MGMNT-------->"ABAD JR,  VICENTE M",$99648.00

Friday, January 30, 2015

MapReduce code to retrieve employee fields from employee file

I have created a file emp. Lets try a simple mapreduce program to retrieve the fields from employee file. Below is the file and the sample records with headers.

emp
empid,empname,deptid,salary
1,swetha,1,50000
2,Nikhil,2,75000
3,Varun,3,75000


First move the file to HDFS.
$ hadoop dfs -put emp emp

$hadoop dfs -ls
Found 1 item
-rw-r--r--   1 swetha supergroup         81 2015-01-30 17:31 /user/swetha/emp

Lets try to select empname and salary fields from emp table. Below is the mapreduce for the same.

The below code is synonymous to the below SQL:

select empname,salary from emp;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.*;


//Displays EmpName and EmpID
public class empDetails extends Configured implements Tool {

    static class empMap extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map( LongWritable key,  Text value, Context context)
                throws IOException,InterruptedException{
        String a = value.toString();
        String[] b = a.split(",");
        String empName = b[1];
        String salary = b[3];
        context.write(new Text(empName), new Text(salary));
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job();
        job.setJarByClass(getClass());
        job.setJobName("Employee Details");
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setMapperClass(empMap.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
       
        return job.waitForCompletion(true)?0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new empDetails() , args);
        System.exit(exitcode);
    }
}

Pack the file into a empDetails.jar file and run it on hadoop environment.

$ hadoop jar empDetails.jar empDetails emp out1
15/01/30 19:26:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/01/30 19:26:46 INFO input.FileInputFormat: Total input paths to process : 1
15/01/30 19:26:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/01/30 19:26:46 WARN snappy.LoadSnappy: Snappy native library not loaded
15/01/30 19:26:46 INFO mapred.JobClient: Running job: job_201501301653_0006
15/01/30 19:26:47 INFO mapred.JobClient:  map 0% reduce 0%
15/01/30 19:26:54 INFO mapred.JobClient:  map 100% reduce 0%
15/01/30 19:26:56 INFO mapred.JobClient: Job complete: job_201501301653_0006
15/01/30 19:26:56 INFO mapred.JobClient: Counters: 19
15/01/30 19:26:56 INFO mapred.JobClient:   Map-Reduce Framework
15/01/30 19:26:56 INFO mapred.JobClient:     Spilled Records=0
15/01/30 19:26:56 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=1845501952
15/01/30 19:26:56 INFO mapred.JobClient:     Map input records=4
15/01/30 19:26:56 INFO mapred.JobClient:     SPLIT_RAW_BYTES=102
15/01/30 19:26:56 INFO mapred.JobClient:     Map output records=4
15/01/30 19:26:56 INFO mapred.JobClient:     Physical memory (bytes) snapshot=75046912
15/01/30 19:26:56 INFO mapred.JobClient:     CPU time spent (ms)=150
15/01/30 19:26:56 INFO mapred.JobClient:     Total committed heap usage (bytes)=32440320
15/01/30 19:26:56 INFO mapred.JobClient:   File Input Format Counters
15/01/30 19:26:56 INFO mapred.JobClient:     Bytes Read=81
15/01/30 19:26:56 INFO mapred.JobClient:   FileSystemCounters
15/01/30 19:26:56 INFO mapred.JobClient:     HDFS_BYTES_READ=183
15/01/30 19:26:56 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=54900
15/01/30 19:26:56 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=55
15/01/30 19:26:56 INFO mapred.JobClient:   Job Counters
15/01/30 19:26:56 INFO mapred.JobClient:     Launched map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
15/01/30 19:26:56 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=9046
15/01/30 19:26:56 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/01/30 19:26:56 INFO mapred.JobClient:     Data-local map tasks=1
15/01/30 19:26:56 INFO mapred.JobClient:   File Output Format Counters
15/01/30 19:26:56 INFO mapred.JobClient:     Bytes Written=55


Now lets look at the output..

$ hadoop dfs -ls
Found 3 items
-rw-r--r--   1 swetha supergroup         38 2015-01-30 17:31 /user/swetha/dept
-rw-r--r--   1 swetha supergroup         81 2015-01-30 17:31 /user/swetha/emp
drwxr-xr-x   - swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1


since there are no reducers the mapper output is directly written to hdfs.

$ hadoop dfs -lsr out1
-rw-r--r--   1 swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1/_SUCCESS
drwxr-xr-x   - swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1/_logs
drwxr-xr-x   - swetha supergroup          0 2015-01-30 19:26 /user/swetha/out1/_logs/history
-rw-r--r--   1 swetha supergroup       9271 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_1422664006709_swetha_Employee+Details
-rw-r--r--   1 swetha supergroup      47336 2015-01-30 19:26 /user/swetha/out1/_logs/history/job_201501301653_0006_conf.xml
-rw-r--r--   1 swetha supergroup         55 2015-01-30 19:26 /user/swetha/out1/part-m-00000


$ hadoop dfs -cat out1/p*
 empname     salary
swetha    50000
Nikhil    75000
Varun    75000



To get the same output without headers, use 

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.*;


//Displays EmpName and EmpID without Headers
public class empDetailsWithOutHeaders extends Configured implements Tool {

    static class empMap extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map( LongWritable key,  Text value, Context context)
                throws IOException,InterruptedException{
        String a = value.toString();
        String[] b = a.split(",");
        String empName = b[1];
        String salary = b[3];
        if (!(empName.equals("empname")))
            context.write(new Text(empName), new Text(salary));
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job();
        job.setJarByClass(getClass());
        job.setJobName("Employee Details without Headers");
       
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
       
        job.setMapperClass(empMap.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
       
        return job.waitForCompletion(true)?0 : 1;
    }

    public static void main(String[] args) throws Exception{
        int exitcode = ToolRunner.run(new empDetailsWithOutHeaders() , args);
        System.exit(exitcode);
    }
}


The output will be the following..

 $ hadoop dfs -cat out2/p*
swetha    50000
Nikhil    75000
Varun    75000

Amazon S3: Basic Concepts

Amazon S3 is an reliable, scalable, online object storage that stores files. Bucket: A bucket is a container in Amazon S3 where the fil...