Friday, May 12, 2017

Amazon Athena: Key highlights on Amazon Athena


Amazon Athena is a serverless interactive query service that is used to analyze data in Amazon S3 using standard SQL.

  • Amazon Athena applies schema-on-read.
  • Amazon Athena can be used to analyze structured, semi-structured and unstructured datasets.
  • Athena integrates with Amazon QuickSight for easy visualization.
  • Amazon Athena uses Presto, a distributed SQL engine to execute the queries.
  • Apache Hive DDL is used in Athena to define tables.
  • Amazon Athena can be accessed in either of two ways.
    • AWS Management Console
    • JDBC Connection
  • A user requires User permissions and Amazon S3 permissions to run queries.
    • Athena User permissions: Amazon IAM provides two managed policies for Athena: AmazonAthenaFullAccess and AWSQuicksightAthenaAccess. These policies can be attached to the user profile to get required permissions.
    • Athena S3 permissions: To access data from a particular S3 location, a Athena user needs appropriate permissions on S3 buckets.
  • Amazon Athena supports encryption.
  • If dataset is encrypted on Amazon S3, a table DDL can have TBLPROPERTIES('has_encrypted_data'='true') to inform Athena that data to read is encrypted. 
  • Amazon Athena stores the query result sets by default in S3 staging directory.
  • Any settings defined to store encrypted results, will apply to all tables and queries. Configuring settings for individual databases, tables or queries is not possible. 
  • To encrypt query results stored in Amazon S3 using the console, provide the required details in Settings Tab.
  • Tables can be created in Athena using
    • AWS console
    • Using JDBC Driver
    • using Athena create table wizard.
  • Databases and Tables are simply logical objects pointing to actual files on Amazon S3.
  • Athena catalog stores the metadata about the databases and tables.
  • Athena doesn't support
    • CREATE TABLE AS SELECT
    • Transaction based operations.
    • ALTER INDEX
    • ALTER TABLE .. ARCHIVE PARTITION
    • ALTER TABLE ... CLUSTERED BY ..
    • ALTER TABLE .. EXCHANGE PARTITION
    • ALTER TABLE .. NOT CLUSTERED
    • ALTER TABLE ... NOT SKEWED
    • ALTER TABLE .. RENAME TO
    • COMMIT
    • CREATE INDEX
    • CREATE ROLE
    • CREATE TABLE LIKE
    • CREATE VIEW
    • DESCRIBE DATABASE
    • INSERT INTO 
    • ROLLBACK
    • EXPORT/IMPORT TABLE
    • DELETE FROM
    • ALTER TABLE .. ADD COLUMNS
    • ALTER TABLE..REPLACE COLUMNS
    • ALTER TABLE .. CHANGE COLUMNS
    • ALTER TABLE ..TOUCH
    • UDF's are not supported
    • Stored procedures
  • Athena limitations:
    • Operations that change table stats like create,update ,delete tables are ACID compliant.
    • All tables are EXTERNAL.
    • Table names are case-insensitive
    • Table names allow only underscore character, cannot contain any other special character.
  • Advantages of accessing Athena using JDBC Driver
    • Using driver Athena can connect to third party applications such as SQL Workbench.
    • We can run queries programmatically against Athena. 
  • JDBC Driver Options:
    • s3_staging_dir
    • query_results_encryption_option
    • query_results_aws_kms_key
    • aws_credentials_provider_class
    • aws_credentails_provider_arguments
    • max_error_retries
    • connection_timeout
    • socket_timeout
    • retry_base_delay
    • retry_max_backoff_time
    • log_path
    • log_level
  • JDBC commands:
    • connection.createStatement(); 
    • statement.executeQuery("<query>")
  • AWS CloudTrail is a service that records AWS API calls and events for AWS accounts. CloudTrail generates encrypted (*.gzip) logfiles and stores them in Amazon S3 in JSON format.
  • CloudTrail SerDe is used by Athena to read log files generated by CloudTrail in JSON format.
  • Compression Formats supported:
    • Snappy
    • Zlib
    • GZIP
    • LZO

Thursday, May 4, 2017

Hadoop Streaming: Specifying Map-Only Jobs

Often, you may want to process input data using a map function only. To do this, simply set mapred.reduce.tasks to zero. The Map/Reduce framework will not create any reducer tasks. Rather, the outputs of the mapper tasks will be the final output of the job.

    -D mapred.reduce.tasks=0

Hadoop Streaming also supports map-only jobs by specifying "-D mapred.reduce.tasks=0".

To specify map-only jobs, use

hadoop jar hadoop-streaming-2.7.1.2.4.0.0-169.jar 
-D mapred.reduce.tasks=0 
-input /user/root/wordcount 
-output /user/root/out 
-mapper /bin/cat


We can also achieve map-only jobs by specifying "-numReduceTasks 0"

hadoop jar hadoop-streaming-2.7.1.2.4.0.0-169.jar 
-input /user/root/wordcount 
-output /user/root/out 
-mapper /bin/cat 
-numReduceTasks 0

Hadoop MapReduce job on Hortonworks Sandbox to find maximum temperature on NCDC weather dataset


If you want to setup Hortonworks Sandbox, follow the link
http://hortonworks.com/wp-content/uploads/2012/03/Tutorial_Hadoop_HDFS_MapReduce.pdf
Please follow my another post on how to download weather dataset from NCDC.


I have downloaded a sample weather dataset for the year 2017 on my local filesystem in the directory, /home/pluralsight/mapreduce/weather. You are free to download any number of files and any file.
[root@sandbox weather]# pwd
/home/pluralsight/mapreduce/weather
[root@sandbox weather]# ls -ltr
total 72
-rw-r--r-- 1 root root 70234 2017-05-04 16:59 007026-99999-2017.gz

Upload this file to HDFS 
[root@sandbox weather]# hadoop dfs -put /home/pluralsight/mapreduce/weather/007026-99999-2017.gz /home/pluralsight/weather
[root@sandbox weather]# hadoop dfs -ls /home/pluralsight/weather/
Found 1 items
-rw-r--r--   3 root hdfs      70234 2017-05-04 17:16 /home/pluralsight/weather/007026-99999-2017.gz

Now the file is in HDFS directory. Below is the MapReduce program in java to find the maximum temperature.

Application to find maximum temperature, maxTempDriver.java
------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class maxTempDriver extends Configured implements Tool{

        public static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
                @Override
                public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
                        String line = value.toString();
                        String year = line.substring(15,19);
                        int airTemp = Integer.parseInt(line.substring(87,92));
                        context.write(new Text(year), new IntWritable(airTemp));
                }
        }


        public static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
                @Override
                public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                        int maxValue = Integer.MIN_VALUE;
                        for (IntWritable value: values) {
                                maxValue = Math.max(maxValue, value.get());
                        }
                        context.write(key, new IntWritable(maxValue));
                }
        }

        public int run(String[] args) throws Exception {

                Job job = new Job(getConf(),"Max Temp");
                job.setJarByClass(maxTempDriver.class);

                job.setMapperClass(MaxTempMapper.class);
                job.setReducerClass(MaxTempReducer.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);

                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job,new Path(args[1]));

                return job.waitForCompletion(true) ? 0 : 1;
        }

        public static void main(String[] args) throws Exception{
                int exitCode = ToolRunner.run(new maxTempDriver(),args);
                System.exit(exitCode);
        }
}

-------------------------------------------------------------------------------------------------------------------

From the command line, compile .java program. First, check to see if your CLASSPATH variable is set in your environment. 

[root@sandbox maxTemp]# echo $CLASSPATH

[root@sandbox maxTemp]#

If your CLASSPATH is not set to include hadoop jar files required to compile your program, you can set them directly in the command. We need hadoop-common*.jar and hadoop-mapreduce-client-core*.jar files.
You can use find command to get the absolute path of these jar files in your sandbox.

[root@sandbox maxTemp]# find / -name hadoop-common*.jar
[root@sandbox maxTemp]# find / -name hadoop-mapreduce*.jar

For example in my environment hadoop-common*.jar file is present in /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar. Please include the relative paths of these jar files in your javac command as per your find results.
 
[root@sandbox maxTemp]# javac -classpath /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar:/usr/hdp/2.4.0.0-169/hadoop/client/hadoop-mapreduce-client-core.jar maxTempDriver.java

This command creates three class files, one for driver, one for mapper and one for reducer classes.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class


For a start in distributed setting, job's classes must be packaged into a job JAR file to send to cluster. Creating a JAR file can be conveniently achieved by

[root@sandbox maxTemp]#  jar -cvf maxTempDriver.jar *.class

This command packages all .class files into maxTempDriver.jar JAR file.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class
-rw-r--r-- 1 root root 3038 2017-05-04 16:02 maxTempDriver.jar

To launch the job, we need to run the driver. Driver picks the cluster information by default from conf/core-site.xml file.

[root@sandbox wordCount]# hadoop jar maxTempDriver.jar maxTempDriver --libjars maxTempDriver.jar /home/pluralsight/weather /home/pluralsight/out

Here is the stats summary of map and reduce jobs.

17/05/04 17:46:41 INFO mapreduce.Job:  map 100% reduce 100%
17/05/04 17:46:41 INFO mapreduce.Job: Job job_1493910685734_0010 completed successfully
17/05/04 17:46:41 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=43863
                FILE: Number of bytes written=354815
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=70380
                HDFS: Number of bytes written=9
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=4220
                Total time spent by all reduces in occupied slots (ms)=4377
                Total time spent by all map tasks (ms)=4220
                Total time spent by all reduce tasks (ms)=4377
                Total vcore-seconds taken by all map tasks=4220
                Total vcore-seconds taken by all reduce tasks=4377
                Total megabyte-seconds taken by all map tasks=1055000
                Total megabyte-seconds taken by all reduce tasks=1094250
        Map-Reduce Framework
                Map input records=3987
                Map output records=3987
                Map output bytes=35883
                Map output materialized bytes=43863
                Input split bytes=146
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=43863
                Reduce input records=3987
                Reduce output records=1
                Spilled Records=7974
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=93
                CPU time spent (ms)=2300
                Physical memory (bytes) snapshot=361009152
                Virtual memory (bytes) snapshot=1722482688
                Total committed heap usage (bytes)=265814016
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=70234
        File Output Format Counters
                Bytes Written=9

After successful completion of mapreduce job, output is saved by OutputCommitter on HDFS in the directory /home/pluralsight/out. Lets have a look at the output,

[root@sandbox wordCount]# hadoop dfs -cat /home/pluralsight/out2/*

2017    290

Since, we analyzed weather dataset for 2017 year we have single record in the output file. You can include multiple files from different years to get maximum temperature by year. Please note, maximum temperature for 2017 is shows as 290 degress which is unusual. If we include the logic to check the valid temperatures in our code, we can catch these scenarios.

Hope this helps.

Download National Climatic Data Center (NCDC) weather dataset for Hadoop MapReduce


To practice examples mentioned in  "Hadoop: The Definitive Guide" book by Tom White , http://www.amazon.com/Hadoop-Definitive-Guide-Tom-White/dp/1449311520 we need weather dataset from NCDC. The link https://www.ncdc.noaa.gov/ mentioned in the Tom White book doesn't easily provide the link to download weather dataset.

I had to spend lot of time looking for these datasets. Now I am here providing you with steps to download the weather dataset.

1. Datasets are available for download on this FTP link. Data is partitioned by year starting from 1901 to till date.

2. We can directly download these datasets on Unix machines. Command to do that, 

wget ftp://ftp.ncdc.noaa.gov:21/pub/data/noaa/2017/007026-99999-2017.gz

This command downloads 007026-99999-2017.gz file for the year 2017 onto your current working directory.

Hope this helps.

Amazon S3: Basic Concepts

Amazon S3 is an reliable, scalable, online object storage that stores files. Bucket: A bucket is a container in Amazon S3 where the fil...