Please follow my another post on how to download weather dataset from NCDC.
I have downloaded a sample weather dataset for the year 2017 on my local filesystem in the directory, /home/pluralsight/mapreduce/weather. You are free to download any number of files and any file.
[root@sandbox weather]# pwd
/home/pluralsight/mapreduce/weather
[root@sandbox weather]# ls -ltr
total 72
-rw-r--r-- 1 root root 70234 2017-05-04 16:59 007026-99999-2017.gz
Upload this file to HDFS
[root@sandbox weather]# hadoop dfs -put /home/pluralsight/mapreduce/weather/007026-99999-2017.gz /home/pluralsight/weather
[root@sandbox weather]# hadoop dfs -ls /home/pluralsight/weather/
Found 1 items
-rw-r--r-- 3 root hdfs 70234 2017-05-04 17:16 /home/pluralsight/weather/007026-99999-2017.gz
Now the file is in HDFS directory. Below is the MapReduce program in java to find the maximum temperature.
Application to find maximum temperature, maxTempDriver.java
------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class maxTempDriver extends Configured implements Tool{
public static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String year = line.substring(15,19);
int airTemp = Integer.parseInt(line.substring(87,92));
context.write(new Text(year), new IntWritable(airTemp));
}
}
public static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value: values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public int run(String[] args) throws Exception {
Job job = new Job(getConf(),"Max Temp");
job.setJarByClass(maxTempDriver.class);
job.setMapperClass(MaxTempMapper.class);
job.setReducerClass(MaxTempReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new maxTempDriver(),args);
System.exit(exitCode);
}
}
-------------------------------------------------------------------------------------------------------------------
From the command line, compile .java program. First, check to see if your CLASSPATH variable is set in your environment.
[root@sandbox maxTemp]# echo $CLASSPATH
[root@sandbox maxTemp]#
If your CLASSPATH is not set to include hadoop jar files required to compile your program, you can set them directly in the command. We need hadoop-common*.jar and hadoop-mapreduce-client-core*.jar files.
You can use find command to get the absolute path of these jar files in your sandbox.
[root@sandbox maxTemp]# find / -name hadoop-common*.jar
[root@sandbox maxTemp]# find / -name hadoop-mapreduce*.jar
For example in my environment hadoop-common*.jar file is present in /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar. Please include the relative paths of these jar files in your javac command as per your find results.
[root@sandbox maxTemp]# javac -classpath /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar:/usr/hdp/2.4.0.0-169/hadoop/client/hadoop-mapreduce-client-core.jar maxTempDriver.java
This command creates three class files, one for driver, one for mapper and one for reducer classes.
[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class
For a start in distributed setting, job's classes must be packaged into a job JAR file to send to cluster. Creating a JAR file can be conveniently achieved by
[root@sandbox maxTemp]# jar -cvf maxTempDriver.jar *.class
This command packages all .class files into maxTempDriver.jar JAR file.
[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class
-rw-r--r-- 1 root root 3038 2017-05-04 16:02 maxTempDriver.jar
To launch the job, we need to run the driver. Driver picks the cluster information by default from conf/core-site.xml file.
[root@sandbox wordCount]# hadoop jar maxTempDriver.jar maxTempDriver --libjars maxTempDriver.jar /home/pluralsight/weather /home/pluralsight/out
Here is the stats summary of map and reduce jobs.
17/05/04 17:46:41 INFO mapreduce.Job: map 100% reduce 100%
17/05/04 17:46:41 INFO mapreduce.Job: Job job_1493910685734_0010 completed successfully
17/05/04 17:46:41 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=43863
FILE: Number of bytes written=354815
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=70380
HDFS: Number of bytes written=9
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=4220
Total time spent by all reduces in occupied slots (ms)=4377
Total time spent by all map tasks (ms)=4220
Total time spent by all reduce tasks (ms)=4377
Total vcore-seconds taken by all map tasks=4220
Total vcore-seconds taken by all reduce tasks=4377
Total megabyte-seconds taken by all map tasks=1055000
Total megabyte-seconds taken by all reduce tasks=1094250
Map-Reduce Framework
Map input records=3987
Map output records=3987
Map output bytes=35883
Map output materialized bytes=43863
Input split bytes=146
Combine input records=0
Combine output records=0
Reduce input groups=1
Reduce shuffle bytes=43863
Reduce input records=3987
Reduce output records=1
Spilled Records=7974
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=93
CPU time spent (ms)=2300
Physical memory (bytes) snapshot=361009152
Virtual memory (bytes) snapshot=1722482688
Total committed heap usage (bytes)=265814016
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=70234
File Output Format Counters
Bytes Written=9
After successful completion of mapreduce job, output is saved by OutputCommitter on HDFS in the directory /home/pluralsight/out. Lets have a look at the output,
[root@sandbox wordCount]# hadoop dfs -cat /home/pluralsight/out2/*
2017 290
Since, we analyzed weather dataset for 2017 year we have single record in the output file. You can include multiple files from different years to get maximum temperature by year. Please note, maximum temperature for 2017 is shows as 290 degress which is unusual. If we include the logic to check the valid temperatures in our code, we can catch these scenarios.
Hope this helps.