Thursday, May 4, 2017

Hadoop MapReduce job on Hortonworks Sandbox to find maximum temperature on NCDC weather dataset


If you want to setup Hortonworks Sandbox, follow the link
http://hortonworks.com/wp-content/uploads/2012/03/Tutorial_Hadoop_HDFS_MapReduce.pdf
Please follow my another post on how to download weather dataset from NCDC.


I have downloaded a sample weather dataset for the year 2017 on my local filesystem in the directory, /home/pluralsight/mapreduce/weather. You are free to download any number of files and any file.
[root@sandbox weather]# pwd
/home/pluralsight/mapreduce/weather
[root@sandbox weather]# ls -ltr
total 72
-rw-r--r-- 1 root root 70234 2017-05-04 16:59 007026-99999-2017.gz

Upload this file to HDFS 
[root@sandbox weather]# hadoop dfs -put /home/pluralsight/mapreduce/weather/007026-99999-2017.gz /home/pluralsight/weather
[root@sandbox weather]# hadoop dfs -ls /home/pluralsight/weather/
Found 1 items
-rw-r--r--   3 root hdfs      70234 2017-05-04 17:16 /home/pluralsight/weather/007026-99999-2017.gz

Now the file is in HDFS directory. Below is the MapReduce program in java to find the maximum temperature.

Application to find maximum temperature, maxTempDriver.java
------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class maxTempDriver extends Configured implements Tool{

        public static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
                @Override
                public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
                        String line = value.toString();
                        String year = line.substring(15,19);
                        int airTemp = Integer.parseInt(line.substring(87,92));
                        context.write(new Text(year), new IntWritable(airTemp));
                }
        }


        public static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
                @Override
                public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                        int maxValue = Integer.MIN_VALUE;
                        for (IntWritable value: values) {
                                maxValue = Math.max(maxValue, value.get());
                        }
                        context.write(key, new IntWritable(maxValue));
                }
        }

        public int run(String[] args) throws Exception {

                Job job = new Job(getConf(),"Max Temp");
                job.setJarByClass(maxTempDriver.class);

                job.setMapperClass(MaxTempMapper.class);
                job.setReducerClass(MaxTempReducer.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);

                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job,new Path(args[1]));

                return job.waitForCompletion(true) ? 0 : 1;
        }

        public static void main(String[] args) throws Exception{
                int exitCode = ToolRunner.run(new maxTempDriver(),args);
                System.exit(exitCode);
        }
}

-------------------------------------------------------------------------------------------------------------------

From the command line, compile .java program. First, check to see if your CLASSPATH variable is set in your environment. 

[root@sandbox maxTemp]# echo $CLASSPATH

[root@sandbox maxTemp]#

If your CLASSPATH is not set to include hadoop jar files required to compile your program, you can set them directly in the command. We need hadoop-common*.jar and hadoop-mapreduce-client-core*.jar files.
You can use find command to get the absolute path of these jar files in your sandbox.

[root@sandbox maxTemp]# find / -name hadoop-common*.jar
[root@sandbox maxTemp]# find / -name hadoop-mapreduce*.jar

For example in my environment hadoop-common*.jar file is present in /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar. Please include the relative paths of these jar files in your javac command as per your find results.
 
[root@sandbox maxTemp]# javac -classpath /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar:/usr/hdp/2.4.0.0-169/hadoop/client/hadoop-mapreduce-client-core.jar maxTempDriver.java

This command creates three class files, one for driver, one for mapper and one for reducer classes.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class


For a start in distributed setting, job's classes must be packaged into a job JAR file to send to cluster. Creating a JAR file can be conveniently achieved by

[root@sandbox maxTemp]#  jar -cvf maxTempDriver.jar *.class

This command packages all .class files into maxTempDriver.jar JAR file.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class
-rw-r--r-- 1 root root 3038 2017-05-04 16:02 maxTempDriver.jar

To launch the job, we need to run the driver. Driver picks the cluster information by default from conf/core-site.xml file.

[root@sandbox wordCount]# hadoop jar maxTempDriver.jar maxTempDriver --libjars maxTempDriver.jar /home/pluralsight/weather /home/pluralsight/out

Here is the stats summary of map and reduce jobs.

17/05/04 17:46:41 INFO mapreduce.Job:  map 100% reduce 100%
17/05/04 17:46:41 INFO mapreduce.Job: Job job_1493910685734_0010 completed successfully
17/05/04 17:46:41 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=43863
                FILE: Number of bytes written=354815
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=70380
                HDFS: Number of bytes written=9
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=4220
                Total time spent by all reduces in occupied slots (ms)=4377
                Total time spent by all map tasks (ms)=4220
                Total time spent by all reduce tasks (ms)=4377
                Total vcore-seconds taken by all map tasks=4220
                Total vcore-seconds taken by all reduce tasks=4377
                Total megabyte-seconds taken by all map tasks=1055000
                Total megabyte-seconds taken by all reduce tasks=1094250
        Map-Reduce Framework
                Map input records=3987
                Map output records=3987
                Map output bytes=35883
                Map output materialized bytes=43863
                Input split bytes=146
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=43863
                Reduce input records=3987
                Reduce output records=1
                Spilled Records=7974
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=93
                CPU time spent (ms)=2300
                Physical memory (bytes) snapshot=361009152
                Virtual memory (bytes) snapshot=1722482688
                Total committed heap usage (bytes)=265814016
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=70234
        File Output Format Counters
                Bytes Written=9

After successful completion of mapreduce job, output is saved by OutputCommitter on HDFS in the directory /home/pluralsight/out. Lets have a look at the output,

[root@sandbox wordCount]# hadoop dfs -cat /home/pluralsight/out2/*

2017    290

Since, we analyzed weather dataset for 2017 year we have single record in the output file. You can include multiple files from different years to get maximum temperature by year. Please note, maximum temperature for 2017 is shows as 290 degress which is unusual. If we include the logic to check the valid temperatures in our code, we can catch these scenarios.

Hope this helps.

2 comments:

  1. Very nice article post,Thank you for sharing this awesome blog.
    keep updating more big data hadoop tutorials.

    Big Data Online Training

    ReplyDelete
  2. Very nice article post,Thank you for sharing this awesome blog.
    keep updating more big data hadoop tutorials.

    Big Data Online Training

    ReplyDelete

Amazon Athena: Key highlights on Amazon Athena

Amazon Athena is a serverless interactive query service that is used to analyze data in Amazon S3 using standard SQL. Amazon Athena app...