Friday, May 12, 2017

Amazon Athena: Key highlights on Amazon Athena


Amazon Athena is a serverless interactive query service that is used to analyze data in Amazon S3 using standard SQL.

  • Amazon Athena applies schema-on-read.
  • Amazon Athena can be used to analyze structured, semi-structured and unstructured datasets.
  • Athena integrates with Amazon QuickSight for easy visualization.
  • Amazon Athena uses Presto, a distributed SQL engine to execute the queries.
  • Apache Hive DDL is used in Athena to define tables.
  • Amazon Athena can be accessed in either of two ways.
    • AWS Management Console
    • JDBC Connection
  • A user requires User permissions and Amazon S3 permissions to run queries.
    • Athena User permissions: Amazon IAM provides two managed policies for Athena: AmazonAthenaFullAccess and AWSQuicksightAthenaAccess. These policies can be attached to the user profile to get required permissions.
    • Athena S3 permissions: To access data from a particular S3 location, a Athena user needs appropriate permissions on S3 buckets.
  • Amazon Athena supports encryption.
  • If dataset is encrypted on Amazon S3, a table DDL can have TBLPROPERTIES('has_encrypted_data'='true') to inform Athena that data to read is encrypted. 
  • Amazon Athena stores the query result sets by default in S3 staging directory.
  • Any settings defined to store encrypted results, will apply to all tables and queries. Configuring settings for individual databases, tables or queries is not possible. 
  • To encrypt query results stored in Amazon S3 using the console, provide the required details in Settings Tab.
  • Tables can be created in Athena using
    • AWS console
    • Using JDBC Driver
    • using Athena create table wizard.
  • Databases and Tables are simply logical objects pointing to actual files on Amazon S3.
  • Athena catalog stores the metadata about the databases and tables.
  • Athena doesn't support
    • CREATE TABLE AS SELECT
    • Transaction based operations.
    • ALTER INDEX
    • ALTER TABLE .. ARCHIVE PARTITION
    • ALTER TABLE ... CLUSTERED BY ..
    • ALTER TABLE .. EXCHANGE PARTITION
    • ALTER TABLE .. NOT CLUSTERED
    • ALTER TABLE ... NOT SKEWED
    • ALTER TABLE .. RENAME TO
    • COMMIT
    • CREATE INDEX
    • CREATE ROLE
    • CREATE TABLE LIKE
    • CREATE VIEW
    • DESCRIBE DATABASE
    • INSERT INTO 
    • ROLLBACK
    • EXPORT/IMPORT TABLE
    • DELETE FROM
    • ALTER TABLE .. ADD COLUMNS
    • ALTER TABLE..REPLACE COLUMNS
    • ALTER TABLE .. CHANGE COLUMNS
    • ALTER TABLE ..TOUCH
    • UDF's are not supported
    • Stored procedures
  • Athena limitations:
    • Operations that change table stats like create,update ,delete tables are ACID compliant.
    • All tables are EXTERNAL.
    • Table names are case-insensitive
    • Table names allow only underscore character, cannot contain any other special character.
  • Advantages of accessing Athena using JDBC Driver
    • Using driver Athena can connect to third party applications such as SQL Workbench.
    • We can run queries programmatically against Athena. 
  • JDBC Driver Options:
    • s3_staging_dir
    • query_results_encryption_option
    • query_results_aws_kms_key
    • aws_credentials_provider_class
    • aws_credentails_provider_arguments
    • max_error_retries
    • connection_timeout
    • socket_timeout
    • retry_base_delay
    • retry_max_backoff_time
    • log_path
    • log_level
  • JDBC commands:
    • connection.createStatement(); 
    • statement.executeQuery("<query>")
  • AWS CloudTrail is a service that records AWS API calls and events for AWS accounts. CloudTrail generates encrypted (*.gzip) logfiles and stores them in Amazon S3 in JSON format.
  • CloudTrail SerDe is used by Athena to read log files generated by CloudTrail in JSON format.
  • Compression Formats supported:
    • Snappy
    • Zlib
    • GZIP
    • LZO

Thursday, May 4, 2017

Hadoop Streaming: Specifying Map-Only Jobs

Often, you may want to process input data using a map function only. To do this, simply set mapred.reduce.tasks to zero. The Map/Reduce framework will not create any reducer tasks. Rather, the outputs of the mapper tasks will be the final output of the job.

    -D mapred.reduce.tasks=0

Hadoop Streaming also supports map-only jobs by specifying "-D mapred.reduce.tasks=0".

To specify map-only jobs, use

hadoop jar hadoop-streaming-2.7.1.2.4.0.0-169.jar 
-D mapred.reduce.tasks=0 
-input /user/root/wordcount 
-output /user/root/out 
-mapper /bin/cat


We can also achieve map-only jobs by specifying "-numReduceTasks 0"

hadoop jar hadoop-streaming-2.7.1.2.4.0.0-169.jar 
-input /user/root/wordcount 
-output /user/root/out 
-mapper /bin/cat 
-numReduceTasks 0

Hadoop MapReduce job on Hortonworks Sandbox to find maximum temperature on NCDC weather dataset


If you want to setup Hortonworks Sandbox, follow the link
http://hortonworks.com/wp-content/uploads/2012/03/Tutorial_Hadoop_HDFS_MapReduce.pdf
Please follow my another post on how to download weather dataset from NCDC.


I have downloaded a sample weather dataset for the year 2017 on my local filesystem in the directory, /home/pluralsight/mapreduce/weather. You are free to download any number of files and any file.
[root@sandbox weather]# pwd
/home/pluralsight/mapreduce/weather
[root@sandbox weather]# ls -ltr
total 72
-rw-r--r-- 1 root root 70234 2017-05-04 16:59 007026-99999-2017.gz

Upload this file to HDFS 
[root@sandbox weather]# hadoop dfs -put /home/pluralsight/mapreduce/weather/007026-99999-2017.gz /home/pluralsight/weather
[root@sandbox weather]# hadoop dfs -ls /home/pluralsight/weather/
Found 1 items
-rw-r--r--   3 root hdfs      70234 2017-05-04 17:16 /home/pluralsight/weather/007026-99999-2017.gz

Now the file is in HDFS directory. Below is the MapReduce program in java to find the maximum temperature.

Application to find maximum temperature, maxTempDriver.java
------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class maxTempDriver extends Configured implements Tool{

        public static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
                @Override
                public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
                        String line = value.toString();
                        String year = line.substring(15,19);
                        int airTemp = Integer.parseInt(line.substring(87,92));
                        context.write(new Text(year), new IntWritable(airTemp));
                }
        }


        public static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
                @Override
                public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                        int maxValue = Integer.MIN_VALUE;
                        for (IntWritable value: values) {
                                maxValue = Math.max(maxValue, value.get());
                        }
                        context.write(key, new IntWritable(maxValue));
                }
        }

        public int run(String[] args) throws Exception {

                Job job = new Job(getConf(),"Max Temp");
                job.setJarByClass(maxTempDriver.class);

                job.setMapperClass(MaxTempMapper.class);
                job.setReducerClass(MaxTempReducer.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);

                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job,new Path(args[1]));

                return job.waitForCompletion(true) ? 0 : 1;
        }

        public static void main(String[] args) throws Exception{
                int exitCode = ToolRunner.run(new maxTempDriver(),args);
                System.exit(exitCode);
        }
}

-------------------------------------------------------------------------------------------------------------------

From the command line, compile .java program. First, check to see if your CLASSPATH variable is set in your environment. 

[root@sandbox maxTemp]# echo $CLASSPATH

[root@sandbox maxTemp]#

If your CLASSPATH is not set to include hadoop jar files required to compile your program, you can set them directly in the command. We need hadoop-common*.jar and hadoop-mapreduce-client-core*.jar files.
You can use find command to get the absolute path of these jar files in your sandbox.

[root@sandbox maxTemp]# find / -name hadoop-common*.jar
[root@sandbox maxTemp]# find / -name hadoop-mapreduce*.jar

For example in my environment hadoop-common*.jar file is present in /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar. Please include the relative paths of these jar files in your javac command as per your find results.
 
[root@sandbox maxTemp]# javac -classpath /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar:/usr/hdp/2.4.0.0-169/hadoop/client/hadoop-mapreduce-client-core.jar maxTempDriver.java

This command creates three class files, one for driver, one for mapper and one for reducer classes.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class


For a start in distributed setting, job's classes must be packaged into a job JAR file to send to cluster. Creating a JAR file can be conveniently achieved by

[root@sandbox maxTemp]#  jar -cvf maxTempDriver.jar *.class

This command packages all .class files into maxTempDriver.jar JAR file.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class
-rw-r--r-- 1 root root 3038 2017-05-04 16:02 maxTempDriver.jar

To launch the job, we need to run the driver. Driver picks the cluster information by default from conf/core-site.xml file.

[root@sandbox wordCount]# hadoop jar maxTempDriver.jar maxTempDriver --libjars maxTempDriver.jar /home/pluralsight/weather /home/pluralsight/out

Here is the stats summary of map and reduce jobs.

17/05/04 17:46:41 INFO mapreduce.Job:  map 100% reduce 100%
17/05/04 17:46:41 INFO mapreduce.Job: Job job_1493910685734_0010 completed successfully
17/05/04 17:46:41 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=43863
                FILE: Number of bytes written=354815
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=70380
                HDFS: Number of bytes written=9
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=4220
                Total time spent by all reduces in occupied slots (ms)=4377
                Total time spent by all map tasks (ms)=4220
                Total time spent by all reduce tasks (ms)=4377
                Total vcore-seconds taken by all map tasks=4220
                Total vcore-seconds taken by all reduce tasks=4377
                Total megabyte-seconds taken by all map tasks=1055000
                Total megabyte-seconds taken by all reduce tasks=1094250
        Map-Reduce Framework
                Map input records=3987
                Map output records=3987
                Map output bytes=35883
                Map output materialized bytes=43863
                Input split bytes=146
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=43863
                Reduce input records=3987
                Reduce output records=1
                Spilled Records=7974
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=93
                CPU time spent (ms)=2300
                Physical memory (bytes) snapshot=361009152
                Virtual memory (bytes) snapshot=1722482688
                Total committed heap usage (bytes)=265814016
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=70234
        File Output Format Counters
                Bytes Written=9

After successful completion of mapreduce job, output is saved by OutputCommitter on HDFS in the directory /home/pluralsight/out. Lets have a look at the output,

[root@sandbox wordCount]# hadoop dfs -cat /home/pluralsight/out2/*

2017    290

Since, we analyzed weather dataset for 2017 year we have single record in the output file. You can include multiple files from different years to get maximum temperature by year. Please note, maximum temperature for 2017 is shows as 290 degress which is unusual. If we include the logic to check the valid temperatures in our code, we can catch these scenarios.

Hope this helps.

Download National Climatic Data Center (NCDC) weather dataset for Hadoop MapReduce


To practice examples mentioned in  "Hadoop: The Definitive Guide" book by Tom White , http://www.amazon.com/Hadoop-Definitive-Guide-Tom-White/dp/1449311520 we need weather dataset from NCDC. The link https://www.ncdc.noaa.gov/ mentioned in the Tom White book doesn't easily provide the link to download weather dataset.

I had to spend lot of time looking for these datasets. Now I am here providing you with steps to download the weather dataset.

1. Datasets are available for download on this FTP link. Data is partitioned by year starting from 1901 to till date.

2. We can directly download these datasets on Unix machines. Command to do that, 

wget ftp://ftp.ncdc.noaa.gov:21/pub/data/noaa/2017/007026-99999-2017.gz

This command downloads 007026-99999-2017.gz file for the year 2017 onto your current working directory.

Hope this helps.

Saturday, April 15, 2017

Comparison of commands between Apache Hive, Amazon RedShift


Below is the comparison of SQL commands between Apache Hive and Amazon RedShift.

Create a database sqlcompare:

Hive: Create database/schema [if not exists] sqlcompare;
RedShift: Create database sqlcompare [with owner <ownername>];

Drop the database sqlcompare:

Hive: Drop database/schema [if exists] sqlcompare [cascade];
RedShift: Drop database sqlcompare;

Command to rename column name salary to sal in table employee :

Hive: Alter table employee change salary sal number;
RedShift: Alter table employee rename salary to sal;

Adding a column mgr to a table employee:

Hive: Alter table employee add columns (mgr int);
RedShift: Alter table employee add column mgr int;

Dropping a column mgr from table employee:

Hive: Dropping a column is not directly supported by Hive. We can use replace columns to get the desired result. To drop mgr column from employee table, below replace command includes all columns except mgr.

Alter table employee replace columns (empId Int, empName string, dept string, salary int);

RedShift: Alter table employee drop column mgr;

Renaming a table employee to emp;

Hive: Alter table employee rename to emp;
RedShift: Alter table employee rename to emp;

Inserting a row into table employees:

Hive: Insert into employee values(1,'John','Finance',25000);
RedShift: Insert into employee values(1,'John','Finance',25000);

Insert into new table from parent table:

Hive: Insert into employee_bkp select * from employee;
RedShift: Insert into employee_bkp (select * from employee);

Create table Like:

Hive: Create table emp_bkp like employee;
RedShift: Create table emp_bkp ( like employee);

Get Current schema:

Hive: set hive.cli.print.current.db = true;
Redshift: select current_schema();

Order the result set:

Hive: Hive orders result set in different ways. ORDER BY orders the result set across all reducers. SORT BY orders the result set in each reducer. DISTRIBUTE BY partitions the data and then data is sorted by SORT BY column in each partition. CLUSTER BY partitions the data and orders the result set in ascending order on same column.

select * from employee order by empname;
select * from employee sort by empname;
select * from employee distribute by empname sort by empname;
select * from employee cluster by empname;

Redshift: select * from employee order by empname;

NULL values sort first in ASC mode and last in DESC mode both in Redshift and Hive.

Referencing columns by positional notation:

Hive: To use positional notation for Hive 0.11.0 through 2.1.x,  set hive.groupby.orderby.position.alias to true. From Hive 2.2.0 and later, hive.orderby.position.alias  is true by default.

select * from employee order by 2 desc;

RedShift: select * from employee order by 2 desc;

Column aliases in where clause:

Hive: Column name aliases cant be referenced in where clause.
RedShift: Column name aliases can be referenced in where clause.
select deptid as dept, avg(salary) avg_sal from employee where dept in (2,4,5);

IN Clause sub-query:

Hive: In Hive, IN-clause is implemented using LEFT SEMI JOIN. In left semi join, columns from left side (employee) of the join can only be referenced in select clause of sql command.

select e.* from employee e left semi join departments d where e.dept_no = d.dept_no;

Redshift: select e.* from employee e where e.dept_no in (select d.dept_no from departments d);

Wednesday, April 12, 2017

Snakebite: A Python client library for accessing HDFS

Snakebite is a Python package, created by Spotify, that provides a Python client library allowing HDFS to be accessed programmatically from Python applications. Snakebite package also comes with a command line interface for HDFS that is based on client library. The client library uses Protobuf messages to communicate directly with Namenode.

Snakebite requires Python 2.0 and python-protobuf  2.4.1 or higher. Python 3 is not currently supported. Snakebite can be installed using pip. This command installs snakebite, python-protobuf and other dependent packages.

$ pip install snakebite

Snakebite is distributed through PyPI. Snakebite CLI accesses HDFS similar to hdfs dfs. Here are some of the CLI commands

$ snakebite mkdir /user/root/snakebite
OK: /user/root/snakebite

$  snakebite ls snakebite
Found 1 items
-rw-r--r--   3 root       root               46 2017-04-12 23:58 /user/root/snakebite/sample.txt

Snakebite, however doesn't support copying a file from local filesystem to HDFS yet. We can copy from hadoop to local filesystem.

$ snakebite get snakebite/sample.txt ./sample_bkp.txt
OK: /root/sample_bkp.txt

The major difference between snakebite and hdfs dfs is that snakebite is pure python client and doesn't need to load any Java libraries to communicate with HDFS. 

Now, lets see how snakebite can communicate with HDFS programmatically with a sample example.

from snakebite.client import Client
client = Client('localhost',9000)
for x in client.ls(['snakebite/']):
   print x

returns

{'group': u'root', 'permission': 420, 'file_type': 'f', 'access_time': 1492041536789L, 'block_replication': 3, 'modification_time': 1492041537040L, 'length': 46L, 'blocksize': 134217728L, 'owner': u'root', 'path': '/user/root/snakebite/sample.txt'}

Snakebite ls method takes a list of paths and returns a list of maps that contain the file information. Every program that uses the client library must make a connection to HDFS Namenode. The hostname and portnumber values provided in Client() method are dependent on HDFS configuration. These values can be found in hadoop/conf/core-site.xml file under the property fs.defaultFS:

    <property>
      <name>fs.defaultFS</name>
      <value>hdfs://localhost:9000</value>
      <final>true</final>
    </property>
 
For more information on Snakebite, refer to the official documentation page.



Tuesday, April 11, 2017

Hadoop 1.x Limitations


Limitations of Hadoop 1.x:

  • No horizontal scalability of namenode
  • Does not support Namenode High availability
  • Overburdened Job Tracker
  • Not possible to run Non-MapReduce Big data applications on HDFS
  • Does not support Multi-Tenancy

Problem: Namenode - No Horizontal Scalability

Hadoop 1.x supports Single Namenode and Single Namespace, limited by namenode RAM. Even though we have hundreds of DataNodes in the cluster, the NameNode keeps all its metadata in memory, so we are limited to a maximum of only 50-100M files in the entire cluster because of a Single NameNode and Single Namespace.

Problem: Namenode - No High Availability

NameNode is Single Point of Failure. Without namenode the filesystem can't be used. We need to manually recover using Secondary NameNode in case of failure. Since secondary always lags with that of primary, data loss is inevitable.

Problem: Job Tracker is Overburdened

Job Tracker spends significant portion of time and effort managing the life cycle of Applications.

Problem: No Multi-Tenancy. Non-MapReduce jobs not supported

In Hadoop 1.x, dedicates all the Datanode resources to Map and Reduce slots. Other workloads such as Graph processing etc is not allowed to utilize the data on HDFS.

Hadoop 2.x features addressing Hadoop 1.x limitations:

  • HDFS Federation 
  • HDFS High Availability
  • YARN 

HDFS Federation:

HDFS Federation solves the "Namenode - No Horizontal Scalability" problem by using multiple independent Namenodes each of which can manage a portion of filesystem Namespace.

HDFS High Availability:

HDFS High  availability in Hadoop 2x resolves namenode high availability issue in Hadoop 1.x. In this implementation, there are a pair of namenodes in an active-standby configuration. If active namenode fails, standby takes over as new active. In this configuration, data nodes must send block reports to both namenodes, active and standby. So standby always have latest state available in memory.

YARN:

YARN is designed to overcome the disadvantage of too much burden on JobTracker in Hadoop 1.x. YARN also supports multi-tenancy approach. YARN adds more general interface to run non-hadoop jobs within hadoop framework.


Amazon Athena: Key highlights on Amazon Athena

Amazon Athena is a serverless interactive query service that is used to analyze data in Amazon S3 using standard SQL. Amazon Athena app...