Friday, May 12, 2017

Amazon Athena: Key highlights on Amazon Athena


Amazon Athena is a serverless interactive query service that is used to analyze data in Amazon S3 using standard SQL.

  • Amazon Athena applies schema-on-read.
  • Amazon Athena can be used to analyze structured, semi-structured and unstructured datasets.
  • Athena integrates with Amazon QuickSight for easy visualization.
  • Amazon Athena uses Presto, a distributed SQL engine to execute the queries.
  • Apache Hive DDL is used in Athena to define tables.
  • Amazon Athena can be accessed in either of two ways.
    • AWS Management Console
    • JDBC Connection
  • A user requires User permissions and Amazon S3 permissions to run queries.
    • Athena User permissions: Amazon IAM provides two managed policies for Athena: AmazonAthenaFullAccess and AWSQuicksightAthenaAccess. These policies can be attached to the user profile to get required permissions.
    • Athena S3 permissions: To access data from a particular S3 location, a Athena user needs appropriate permissions on S3 buckets.
  • Amazon Athena supports encryption.
  • If dataset is encrypted on Amazon S3, a table DDL can have TBLPROPERTIES('has_encrypted_data'='true') to inform Athena that data to read is encrypted. 
  • Amazon Athena stores the query result sets by default in S3 staging directory.
  • Any settings defined to store encrypted results, will apply to all tables and queries. Configuring settings for individual databases, tables or queries is not possible. 
  • To encrypt query results stored in Amazon S3 using the console, provide the required details in Settings Tab.
  • Tables can be created in Athena using
    • AWS console
    • Using JDBC Driver
    • using Athena create table wizard.
  • Databases and Tables are simply logical objects pointing to actual files on Amazon S3.
  • Athena catalog stores the metadata about the databases and tables.
  • Athena doesn't support
    • CREATE TABLE AS SELECT
    • Transaction based operations.
    • ALTER INDEX
    • ALTER TABLE .. ARCHIVE PARTITION
    • ALTER TABLE ... CLUSTERED BY ..
    • ALTER TABLE .. EXCHANGE PARTITION
    • ALTER TABLE .. NOT CLUSTERED
    • ALTER TABLE ... NOT SKEWED
    • ALTER TABLE .. RENAME TO
    • COMMIT
    • CREATE INDEX
    • CREATE ROLE
    • CREATE TABLE LIKE
    • CREATE VIEW
    • DESCRIBE DATABASE
    • INSERT INTO 
    • ROLLBACK
    • EXPORT/IMPORT TABLE
    • DELETE FROM
    • ALTER TABLE .. ADD COLUMNS
    • ALTER TABLE..REPLACE COLUMNS
    • ALTER TABLE .. CHANGE COLUMNS
    • ALTER TABLE ..TOUCH
    • UDF's are not supported
    • Stored procedures
  • Athena limitations:
    • Operations that change table stats like create,update ,delete tables are ACID compliant.
    • All tables are EXTERNAL.
    • Table names are case-insensitive
    • Table names allow only underscore character, cannot contain any other special character.
  • Advantages of accessing Athena using JDBC Driver
    • Using driver Athena can connect to third party applications such as SQL Workbench.
    • We can run queries programmatically against Athena. 
  • JDBC Driver Options:
    • s3_staging_dir
    • query_results_encryption_option
    • query_results_aws_kms_key
    • aws_credentials_provider_class
    • aws_credentails_provider_arguments
    • max_error_retries
    • connection_timeout
    • socket_timeout
    • retry_base_delay
    • retry_max_backoff_time
    • log_path
    • log_level
  • JDBC commands:
    • connection.createStatement(); 
    • statement.executeQuery("<query>")
  • AWS CloudTrail is a service that records AWS API calls and events for AWS accounts. CloudTrail generates encrypted (*.gzip) logfiles and stores them in Amazon S3 in JSON format.
  • CloudTrail SerDe is used by Athena to read log files generated by CloudTrail in JSON format.
  • Compression Formats supported:
    • Snappy
    • Zlib
    • GZIP
    • LZO

Thursday, May 4, 2017

Hadoop Streaming: Specifying Map-Only Jobs

Often, you may want to process input data using a map function only. To do this, simply set mapred.reduce.tasks to zero. The Map/Reduce framework will not create any reducer tasks. Rather, the outputs of the mapper tasks will be the final output of the job.

    -D mapred.reduce.tasks=0

Hadoop Streaming also supports map-only jobs by specifying "-D mapred.reduce.tasks=0".

To specify map-only jobs, use

hadoop jar hadoop-streaming-2.7.1.2.4.0.0-169.jar 
-D mapred.reduce.tasks=0 
-input /user/root/wordcount 
-output /user/root/out 
-mapper /bin/cat


We can also achieve map-only jobs by specifying "-numReduceTasks 0"

hadoop jar hadoop-streaming-2.7.1.2.4.0.0-169.jar 
-input /user/root/wordcount 
-output /user/root/out 
-mapper /bin/cat 
-numReduceTasks 0

Hadoop MapReduce job on Hortonworks Sandbox to find maximum temperature on NCDC weather dataset


If you want to setup Hortonworks Sandbox, follow the link
http://hortonworks.com/wp-content/uploads/2012/03/Tutorial_Hadoop_HDFS_MapReduce.pdf
Please follow my another post on how to download weather dataset from NCDC.


I have downloaded a sample weather dataset for the year 2017 on my local filesystem in the directory, /home/pluralsight/mapreduce/weather. You are free to download any number of files and any file.
[root@sandbox weather]# pwd
/home/pluralsight/mapreduce/weather
[root@sandbox weather]# ls -ltr
total 72
-rw-r--r-- 1 root root 70234 2017-05-04 16:59 007026-99999-2017.gz

Upload this file to HDFS 
[root@sandbox weather]# hadoop dfs -put /home/pluralsight/mapreduce/weather/007026-99999-2017.gz /home/pluralsight/weather
[root@sandbox weather]# hadoop dfs -ls /home/pluralsight/weather/
Found 1 items
-rw-r--r--   3 root hdfs      70234 2017-05-04 17:16 /home/pluralsight/weather/007026-99999-2017.gz

Now the file is in HDFS directory. Below is the MapReduce program in java to find the maximum temperature.

Application to find maximum temperature, maxTempDriver.java
------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class maxTempDriver extends Configured implements Tool{

        public static class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
                @Override
                public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
                        String line = value.toString();
                        String year = line.substring(15,19);
                        int airTemp = Integer.parseInt(line.substring(87,92));
                        context.write(new Text(year), new IntWritable(airTemp));
                }
        }


        public static class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
                @Override
                public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                        int maxValue = Integer.MIN_VALUE;
                        for (IntWritable value: values) {
                                maxValue = Math.max(maxValue, value.get());
                        }
                        context.write(key, new IntWritable(maxValue));
                }
        }

        public int run(String[] args) throws Exception {

                Job job = new Job(getConf(),"Max Temp");
                job.setJarByClass(maxTempDriver.class);

                job.setMapperClass(MaxTempMapper.class);
                job.setReducerClass(MaxTempReducer.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);

                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job,new Path(args[1]));

                return job.waitForCompletion(true) ? 0 : 1;
        }

        public static void main(String[] args) throws Exception{
                int exitCode = ToolRunner.run(new maxTempDriver(),args);
                System.exit(exitCode);
        }
}

-------------------------------------------------------------------------------------------------------------------

From the command line, compile .java program. First, check to see if your CLASSPATH variable is set in your environment. 

[root@sandbox maxTemp]# echo $CLASSPATH

[root@sandbox maxTemp]#

If your CLASSPATH is not set to include hadoop jar files required to compile your program, you can set them directly in the command. We need hadoop-common*.jar and hadoop-mapreduce-client-core*.jar files.
You can use find command to get the absolute path of these jar files in your sandbox.

[root@sandbox maxTemp]# find / -name hadoop-common*.jar
[root@sandbox maxTemp]# find / -name hadoop-mapreduce*.jar

For example in my environment hadoop-common*.jar file is present in /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar. Please include the relative paths of these jar files in your javac command as per your find results.
 
[root@sandbox maxTemp]# javac -classpath /usr/hdp/2.4.0.0-169/hadoop/hadoop-common.jar:/usr/hdp/2.4.0.0-169/hadoop/client/hadoop-mapreduce-client-core.jar maxTempDriver.java

This command creates three class files, one for driver, one for mapper and one for reducer classes.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class


For a start in distributed setting, job's classes must be packaged into a job JAR file to send to cluster. Creating a JAR file can be conveniently achieved by

[root@sandbox maxTemp]#  jar -cvf maxTempDriver.jar *.class

This command packages all .class files into maxTempDriver.jar JAR file.

[root@sandbox maxTemp]# ls -ltr
total 20
-rw-r--r-- 1 root root 2254 2017-05-04 15:55 maxTempDriver.java
-rw-r--r-- 1 root root 1665 2017-05-04 15:58 maxTempDriver$MaxTempMapper.class
-rw-r--r-- 1 root root 1707 2017-05-04 15:58 maxTempDriver$MaxTempReducer.class
-rw-r--r-- 1 root root 1689 2017-05-04 15:58 maxTempDriver.class
-rw-r--r-- 1 root root 3038 2017-05-04 16:02 maxTempDriver.jar

To launch the job, we need to run the driver. Driver picks the cluster information by default from conf/core-site.xml file.

[root@sandbox wordCount]# hadoop jar maxTempDriver.jar maxTempDriver --libjars maxTempDriver.jar /home/pluralsight/weather /home/pluralsight/out

Here is the stats summary of map and reduce jobs.

17/05/04 17:46:41 INFO mapreduce.Job:  map 100% reduce 100%
17/05/04 17:46:41 INFO mapreduce.Job: Job job_1493910685734_0010 completed successfully
17/05/04 17:46:41 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=43863
                FILE: Number of bytes written=354815
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=70380
                HDFS: Number of bytes written=9
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=4220
                Total time spent by all reduces in occupied slots (ms)=4377
                Total time spent by all map tasks (ms)=4220
                Total time spent by all reduce tasks (ms)=4377
                Total vcore-seconds taken by all map tasks=4220
                Total vcore-seconds taken by all reduce tasks=4377
                Total megabyte-seconds taken by all map tasks=1055000
                Total megabyte-seconds taken by all reduce tasks=1094250
        Map-Reduce Framework
                Map input records=3987
                Map output records=3987
                Map output bytes=35883
                Map output materialized bytes=43863
                Input split bytes=146
                Combine input records=0
                Combine output records=0
                Reduce input groups=1
                Reduce shuffle bytes=43863
                Reduce input records=3987
                Reduce output records=1
                Spilled Records=7974
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=93
                CPU time spent (ms)=2300
                Physical memory (bytes) snapshot=361009152
                Virtual memory (bytes) snapshot=1722482688
                Total committed heap usage (bytes)=265814016
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=70234
        File Output Format Counters
                Bytes Written=9

After successful completion of mapreduce job, output is saved by OutputCommitter on HDFS in the directory /home/pluralsight/out. Lets have a look at the output,

[root@sandbox wordCount]# hadoop dfs -cat /home/pluralsight/out2/*

2017    290

Since, we analyzed weather dataset for 2017 year we have single record in the output file. You can include multiple files from different years to get maximum temperature by year. Please note, maximum temperature for 2017 is shows as 290 degress which is unusual. If we include the logic to check the valid temperatures in our code, we can catch these scenarios.

Hope this helps.

Download National Climatic Data Center (NCDC) weather dataset for Hadoop MapReduce


To practice examples mentioned in  "Hadoop: The Definitive Guide" book by Tom White , http://www.amazon.com/Hadoop-Definitive-Guide-Tom-White/dp/1449311520 we need weather dataset from NCDC. The link https://www.ncdc.noaa.gov/ mentioned in the Tom White book doesn't easily provide the link to download weather dataset.

I had to spend lot of time looking for these datasets. Now I am here providing you with steps to download the weather dataset.

1. Datasets are available for download on this FTP link. Data is partitioned by year starting from 1901 to till date.

2. We can directly download these datasets on Unix machines. Command to do that, 

wget ftp://ftp.ncdc.noaa.gov:21/pub/data/noaa/2017/007026-99999-2017.gz

This command downloads 007026-99999-2017.gz file for the year 2017 onto your current working directory.

Hope this helps.

Saturday, April 15, 2017

Comparison of commands between Apache Hive, Amazon RedShift


Below is the comparison of SQL commands between Apache Hive and Amazon RedShift.

Create a database sqlcompare:

Hive: Create database/schema [if not exists] sqlcompare;
RedShift: Create database sqlcompare [with owner <ownername>];

Drop the database sqlcompare:

Hive: Drop database/schema [if exists] sqlcompare [cascade];
RedShift: Drop database sqlcompare;

Command to rename column name salary to sal in table employee :

Hive: Alter table employee change salary sal number;
RedShift: Alter table employee rename salary to sal;

Adding a column mgr to a table employee:

Hive: Alter table employee add columns (mgr int);
RedShift: Alter table employee add column mgr int;

Dropping a column mgr from table employee:

Hive: Dropping a column is not directly supported by Hive. We can use replace columns to get the desired result. To drop mgr column from employee table, below replace command includes all columns except mgr.

Alter table employee replace columns (empId Int, empName string, dept string, salary int);

RedShift: Alter table employee drop column mgr;

Renaming a table employee to emp;

Hive: Alter table employee rename to emp;
RedShift: Alter table employee rename to emp;

Inserting a row into table employees:

Hive: Insert into employee values(1,'John','Finance',25000);
RedShift: Insert into employee values(1,'John','Finance',25000);

Insert into new table from parent table:

Hive: Insert into employee_bkp select * from employee;
RedShift: Insert into employee_bkp (select * from employee);

Create table Like:

Hive: Create table emp_bkp like employee;
RedShift: Create table emp_bkp ( like employee);

Get Current schema:

Hive: set hive.cli.print.current.db = true;
Redshift: select current_schema();

Order the result set:

Hive: Hive orders result set in different ways. ORDER BY orders the result set across all reducers. SORT BY orders the result set in each reducer. DISTRIBUTE BY partitions the data and then data is sorted by SORT BY column in each partition. CLUSTER BY partitions the data and orders the result set in ascending order on same column.

select * from employee order by empname;
select * from employee sort by empname;
select * from employee distribute by empname sort by empname;
select * from employee cluster by empname;

Redshift: select * from employee order by empname;

NULL values sort first in ASC mode and last in DESC mode both in Redshift and Hive.

Referencing columns by positional notation:

Hive: To use positional notation for Hive 0.11.0 through 2.1.x,  set hive.groupby.orderby.position.alias to true. From Hive 2.2.0 and later, hive.orderby.position.alias  is true by default.

select * from employee order by 2 desc;

RedShift: select * from employee order by 2 desc;

Column aliases in where clause:

Hive: Column name aliases cant be referenced in where clause.
RedShift: Column name aliases can be referenced in where clause.
select deptid as dept, avg(salary) avg_sal from employee where dept in (2,4,5);

IN Clause sub-query:

Hive: In Hive, IN-clause is implemented using LEFT SEMI JOIN. In left semi join, columns from left side (employee) of the join can only be referenced in select clause of sql command.

select e.* from employee e left semi join departments d where e.dept_no = d.dept_no;

Redshift: select e.* from employee e where e.dept_no in (select d.dept_no from departments d);

Wednesday, April 12, 2017

Snakebite: A Python client library for accessing HDFS

Snakebite is a Python package, created by Spotify, that provides a Python client library allowing HDFS to be accessed programmatically from Python applications. Snakebite package also comes with a command line interface for HDFS that is based on client library. The client library uses Protobuf messages to communicate directly with Namenode.

Snakebite requires Python 2.0 and python-protobuf  2.4.1 or higher. Python 3 is not currently supported. Snakebite can be installed using pip. This command installs snakebite, python-protobuf and other dependent packages.

$ pip install snakebite

Snakebite is distributed through PyPI. Snakebite CLI accesses HDFS similar to hdfs dfs. Here are some of the CLI commands

$ snakebite mkdir /user/root/snakebite
OK: /user/root/snakebite

$  snakebite ls snakebite
Found 1 items
-rw-r--r--   3 root       root               46 2017-04-12 23:58 /user/root/snakebite/sample.txt

Snakebite, however doesn't support copying a file from local filesystem to HDFS yet. We can copy from hadoop to local filesystem.

$ snakebite get snakebite/sample.txt ./sample_bkp.txt
OK: /root/sample_bkp.txt

The major difference between snakebite and hdfs dfs is that snakebite is pure python client and doesn't need to load any Java libraries to communicate with HDFS. 

Now, lets see how snakebite can communicate with HDFS programmatically with a sample example.

from snakebite.client import Client
client = Client('localhost',9000)
for x in client.ls(['snakebite/']):
   print x

returns

{'group': u'root', 'permission': 420, 'file_type': 'f', 'access_time': 1492041536789L, 'block_replication': 3, 'modification_time': 1492041537040L, 'length': 46L, 'blocksize': 134217728L, 'owner': u'root', 'path': '/user/root/snakebite/sample.txt'}

Snakebite ls method takes a list of paths and returns a list of maps that contain the file information. Every program that uses the client library must make a connection to HDFS Namenode. The hostname and portnumber values provided in Client() method are dependent on HDFS configuration. These values can be found in hadoop/conf/core-site.xml file under the property fs.defaultFS:

    <property>
      <name>fs.defaultFS</name>
      <value>hdfs://localhost:9000</value>
      <final>true</final>
    </property>
 
For more information on Snakebite, refer to the official documentation page.



Tuesday, April 11, 2017

Hadoop 1.x Limitations


Limitations of Hadoop 1.x:

  • No horizontal scalability of namenode
  • Does not support Namenode High availability
  • Overburdened Job Tracker
  • Not possible to run Non-MapReduce Big data applications on HDFS
  • Does not support Multi-Tenancy

Problem: Namenode - No Horizontal Scalability

Hadoop 1.x supports Single Namenode and Single Namespace, limited by namenode RAM. Even though we have hundreds of DataNodes in the cluster, the NameNode keeps all its metadata in memory, so we are limited to a maximum of only 50-100M files in the entire cluster because of a Single NameNode and Single Namespace.

Problem: Namenode - No High Availability

NameNode is Single Point of Failure. Without namenode the filesystem can't be used. We need to manually recover using Secondary NameNode in case of failure. Since secondary always lags with that of primary, data loss is inevitable.

Problem: Job Tracker is Overburdened

Job Tracker spends significant portion of time and effort managing the life cycle of Applications.

Problem: No Multi-Tenancy. Non-MapReduce jobs not supported

In Hadoop 1.x, dedicates all the Datanode resources to Map and Reduce slots. Other workloads such as Graph processing etc is not allowed to utilize the data on HDFS.

Hadoop 2.x features addressing Hadoop 1.x limitations:

  • HDFS Federation 
  • HDFS High Availability
  • YARN 

HDFS Federation:

HDFS Federation solves the "Namenode - No Horizontal Scalability" problem by using multiple independent Namenodes each of which can manage a portion of filesystem Namespace.

HDFS High Availability:

HDFS High  availability in Hadoop 2x resolves namenode high availability issue in Hadoop 1.x. In this implementation, there are a pair of namenodes in an active-standby configuration. If active namenode fails, standby takes over as new active. In this configuration, data nodes must send block reports to both namenodes, active and standby. So standby always have latest state available in memory.

YARN:

YARN is designed to overcome the disadvantage of too much burden on JobTracker in Hadoop 1.x. YARN also supports multi-tenancy approach. YARN adds more general interface to run non-hadoop jobs within hadoop framework.


Hadoop MapReduce: Basic concepts


  • Design of HDFS:
    • Suitable for Very Large Files
    • Streaming data access with write-once and read-many-times pattern.
    • Requires commodity hardware.
  • HDFS is not good for
    • Low latency data access like OLTP systems
    • Lots of small files because namenode memory is limited
    • Multiple writes and arbitrary file modifications
  • HDFS has two types of nodes
    • Namenode
    • Datanode
  • Namenode manages the filesystem namespace, the file system tree and the metadata for all the files and directories in the tree.
  • Namespace stores information persistently in two files: Namespace image and edit log.
  • Namenode knows the datanodes on which all the blocks are located. But the exact block location is stored by datanode.  
  • Secondary Namenode: 
    • It doesn't act as a namenode
    • Its main role is to periodically merge the namespace image with edit log to prevent edit log from becoming too large.
    • Secondary namenode keeps the copies of the merged namespace image incase primary namenode fails.
    • Secondary namenode lags that of the primary, so in case of primary namenode failure data loss is certain.
    • Secondary namenode runs on a separate physical machine with plenty of CPU and memory.
  • If namenode fails, all files on the filesystem is lost since metadata is lost.
  • HDFS Federation:
    • When metadata exceeds the namenode memory then memory becomes the bottleneck. To avoid this situation, "HDFS Federation" feature introduced in 2.x release allows the cluster to add namenodes.
    • Each namenode in the cluster manages a portion of filesystem namespace. For example, one namenode can manage all the files under /root and second namenode manages all files under /home.
    • Each namenode manages a namespace volume and a block pool. Namespace volume is made up of the metadata for the namespace and block pool contains all the blocks for the files in the namespace.
    • These namenodes under federation do not communicate with each other. The failure of one namenode doesn't affect the availability of another.  
  • High Availability of Namenode in Hadoop 1.x:
    • One method is to replicate persistent state of namenode on multiple filesystems. These writes are synchronous and atomic. The usual choice is to write on local disk and on remote NFS mount.
    • Secondary namenode doesn't act as primary namenode. But in case of primary failure, metadata on NFS can be copied to secondary and then run it as new primary. To perform all these actions, there will be a downtime of up to 30 mins.
    • Since secondary always lags with that of primary, data loss is inevitable.
  • HDFS High Availability:
    • HDFS High  availability in Hadoop 2.x resolves namenode high availability issue in Hadoop 1.x.
    • In this implementation, there are a pair of namenodes in an active-standby configuration. If active namenode fails, standby takes over as new active. In this configuration, data nodes must send block reports to both namenodes, active and standby. So standby always have latest state available in memory.
    • The observed failover time in this case will be around a minute.
  • Failover and fencing:
    • The transition from active namenode to standby is managed by failover controller. The default implementation uses ZOOKEEPER to ensure one namenode is always active.
    • There are two types of failovers:
      • Graceful failover
      • Ungraceful failover
    • Graceful failover is when admin manually initiates failover as part of maintenance. Failover controller arranges an orderly transition.
    • Ungraceful failover can be triggered by
      • Slow network
      • Failed active namenode
    • In case of slow network, standby namenode takes over the active assuming active namenode is down. But in this case previously active namenode is still running. So HA implementation (ZooKeepe make sures previously active namenode is stopped - a method known as fencing.
    • Different Fencing methods are
      • Killing the previously active namenode process
      • Revoking namenodes access to shared storage directory
      • Disabling the network ports
      • STONITH - Shoot The Other Node In The Head
    • Failover is transparent to the user.
  • Distcp:
    • Distcp is an efficient replacement for  "hadoop fs -cp".
    • Distcp copies the files to and from filesystems in parallel.
    • Distcp is implemented as a mapreduce job where the work of copying files is done by mapper in parallel and no reducers.
    • By default, upto 20 mappers are used.
    • Examples:
      • hadoop distcp dir1 dir2  -> Copies dir1 from HDFS file system to dir2 on the same HDFS file system
      • hadoop distcp file:///dir1 dir2 -> Copies dir1 from Local file system to dir2 on HDFS file system
      • hadoop distcp webhdfs:///dir1 webhdfs:///dir2  -> Copies dir1 from HDFS file system to dir2 on another HDFS file system. When two clusters are running incompatible versions of HDFS, we can use webhdfs protocol to distcp between them.
  • Balancer:
    • When multiple files are being copied by distcp, the first replica of each block in each file would reside on the node running the map taking network topology into account. The second and third replicas are spread across the cluster.
    • But the node running the map would be unbalanced.
    • Balancer tool can be used to even out the block distribution across the cluster.

Wednesday, April 5, 2017

Amazon Redshift: Key highlights on "SQL Functions" and "Queries"

A very high level overview of Amazon Redshift SQL functions and commands is explained below. This post can be used mainly for refreshing the SQL concepts.

  • A column alias can be referenced in the where clause.
  • current_schema() function is a leader_node only function.
  • SEARCH_PATH:
    • show search_path; -> shows the search path 
    • set search_path to '$user','public','sql_class','sql_views'; -> adds databases sql_class and sql_views to search path
    • when an object is created without the schema, it's placed in the first schema listed in the search path. If search path is empty, it returns error.
    • When identical objects exist in different schemas, the one found first in the search path is the one that will be used.
    • Objects that are not in any search path schema can only be referenced by using a fully qualified name that also includes the schema name.
    • pg_catalog system catalog schema is always searched. If it is mentioned in the path, it is searched in the specified order. If pg_catalog is not specified in search path then it is searched before any of the path items.
    • The current session's temporary schema (pg_temp_nnn) is searched first when referencing temporary table even before referencing pg_catalog schema. Temporary schema is only searched for tables and view names, not for any function names.
  • Neither IN nor NOT IN can search for NULLS.
  • Order of logical operators precedence: (), NOT, AND, OR.
  • BETWEEN and NOT BETWEEN statements are inclusive.  
  • LIKE command works differently on CHAR and VARCHAR.
    • CHAR data type pads spaces behind them until they reach the size specified while defining. For example, CHAR(10) will pad 10 spaces until it reach 10 characters.
      • last_name like '%n' will look for values where 10th character is n. All values less than 10 characters will have spaces padded at end and any such rows will not be returned. 
      • to get the results, we can trim spaces and then search for pattern. 
        • trim(last_name) like '%n' will return the expected results
  • ILIKE command is case insensitive, whereas LIKE command is case sensitive.
  • All integers are right justified. All integers will start from the right and move left.
  • All character data is left justified. They will start from left and move to the right.
  • Default Escape character is \\. '\\_' means _ is no longer a wildcard, but an actual literal underscore.
  • We can set any character to escape character. 
    • where name like 'ch@_%' escape '@' -> This considers _ as actual literal and @ as escape command.
  • LIKE command can be replaced with SIMILAR TO. Similar To is computationally expensive compared to LIKE.
  • DISTINCT vs GROUP BY
    • Many Duplicates - use group by
    • Few Duplicates - use distinct
    • Space exceeded - use group by
  • TOP command returns top rows from the result set. LIMIT and TOP returns the same result set.
  • Aggregate commands ignore null values.
  • count(*) considers null values, count(<col_name>) ignores null values.
  • Redshift supports non-ansi syntax for joins.
  • Once a table has been aliased, columns should be referenced using alias not actual table name.
  • Redshift supports Oracle joins for outer joins i.e. (+) operator. Both these statements are valid
    • customer as c left outer join orders as o on (c.cust_no = o.cust_no)
    • cutomer as c , orders as o where c.cust_no = o.cust_no(+)
  • where clause is performed first to reduce data before performing inner joins.
  • where clause is performed last after performing outer joins.
  • ANSI joins should have ON clause. With out ON clause they wont perform cartesian product, instead will error out.
  • In ANSI,  CROSS JOIN performs cartesian product.
  • In INNER JOIN, WHERE condition defined and the same condition defined in AND clause of ON returns the same result.
    • custumer cust inner join orders ord ON cust.cust_no = ord.cust_no where cust_name like '%Bi%';  returns same result set as
    • custumer cust inner join orders ord ON cust.cust_no = ord.cust_no and cust_name like '%Bi%'
  • In OUTER JOINS, WHERE is performed after the join condition. The additional AND is performed in conjunction with ON statement. So WHERE condition and AND clause in ON doesn't return same results.
    • custumer cust left outer join orders ord ON cust.cust_no = ord.cust_no where cust_name like '%Bi%';  doesn't return same result set as
    • custumer cust left outer join orders ord ON cust.cust_no = ord.cust_no and cust_name like '%Bi%'
      • This query returns all rows from customer table because its left outer on customer. It includes the values from orders table if cust_name matches the condition, else they will be null.
  • In multi-table joins all ON clauses can be written at the end. But the only way it works is to place the ON clauses backwards. The first ON clause represents the last JOIN and then move backwards.
  • The output of different Date Functions are:
    • CURRENT_DATE : Returns in YYYY-MM-DD format
    • TIMEOFDAY() : Returns weekday, date and time with microseconds.
    • SYSDATE : Date and time with microseconds.
    • TRUNC(SYSDATE) : YYYY-MM-DD
    • GETDATE() : Date and time , no microseconds included
    • TRUNC(GETDATE()) : YYYY-MM-DD
    • date_col + 60 : Adds 60 Days to date_col field.
    • date_col - 60 : subtracts 60 days from date_col.
    • ADD_MONTHS('2014-04-30',1) : Adds 1 month to date_col and returns last day of month. Returns '2014-05-31 00:00:00'. Add months returns timestamp, by default 00:00:00.
    • DATEADD(month,1,'2014-04-30') : Returns the corresponding day in the resulting month, not the last day of month. Returns '2014-05-30 00:00:00'. This also returns timestamp.
    • EXTRACT(hour from current_time) - Extracts hour part from the current_time. Can be used to extract DAY, MONTH, YEAR, HOUR, MINUTE, SECOND.
    • To use EXTRACT with literal values, use 
      • EXTRACT(hour from TIME '10:02:34') returns 10.
      • EXTRACT(DAY from DATE '2017-05-04') returns 04.
    • DATEDIFF(Day, order_date, current_date) : Returns number of days between order_date and current_date. This function uses the datepart (Day, Week, Month, Year etc).
    • DATE_PART(dow,order_date) : DOW stands for Day Of Week. Returns values between 0 and 6 where 0 stands for Sunday and 6 for Saturday.
    • DATE_PART(minute,order_date): Returns minutes from order_date. If this expression is used in select statement and an alias is not defined, a default alias "pgdate_part" is assigned.
    • Date1 - Date2 : Direct subtraction of two date variables results in number of days between them.
    • MONTHS_BETWEEN(date1,date2) - returns number of months between two date variables.
    • CURRENT_TIME: Returns currrent time in HH:MI:SS
    • CURRENT_TIME - 55 - Subtracts 55 seconds from current_time
    • CURRENT_TIMESTAMP - Returns date and time.
    • TIMESTAMP(date_var) - TIMESTAMP function can convert date variable or combination of date and time to timestamp.
      • TIMESTAMP(DATE '2005-01-01', TIME '4:30:33') - 2005-01-01 4:30:33:204331
    • TO_TIMESTAMP - converts character string to timestamp.
    • NOW() - Returns current YYYY-MM-DD HH:MI:SS. Same as CURRENT_TIMESTAMP.
    • AGE(start_date,end_date) - Returns the interval in days, months and years and along with time.
    • current_date + Interval '1' Day - Adds 1 day to current_date
    • current_date + Interval '3' month, current_date + Interval '5' Year - adds month and year specified to current_date.
    • The default for all intervals is 2 digits.
    • OVERLAPS - Is a boolean function that returns true if two different date/time ranges have common days/time, else it returns false.
  • Different forms of OLAP functions:
    • ROWS UNBOUNDED PRECEDING - start calculating from first row and continue till the last row.
      • Examples:
      • SUM(sales) over (order by sale_date rows unbounded preceding)
      • SUM(sales) over (order by product_id, sale_date rows unbounded preceding)
      • SUM(sales) over (partition by product_id order by product_id, sale_date rows unbounded preceding)
        • For each partition identified in the dataset, SUM of sales starts over. Calculating the sum of sales starts from first row and continues till the last row in each partition.
    • ROWS 2 PRECEDING - Gets the moving sum of 3 rows (Moving window of 3 rows). Calculate the Current Row and 2 rows preceding
      • Examples:
      • SUM(sales) over (order by product_id,sale_date rows 2 preceding) 
    • RANK() over (order by sales) - Rank() defaults to Ascending order. RANK doesn't have anything in parenthesis.
      • RANK() over (order by sales desc) - RANK() in descending order. This query orders sales by descending values and assigns rank from 1.
      • If two values get same rank for example, rank 2 , then the next values skips the 3 and assigns 4. 
      • If where condition is added to sql with rank() OLAP function, the rows are filtered based on where condition after assigning the rank values.
    • PERCENT_ROWS() over (order by daily_sales) - Gives rank in percentages
    • COUNT(*) over (partition by product_id order by order_date rows unbounded preceding)
    • MAX(sales) over (order by product_id,sale_date rows unbounded preceding)
    • MIN(sales) over (order by product_id,sale_date rows unbounded preceding)
    • ROW_NUMBER() over (order by product_id,sale_date) - Generates sequence number and increases sequentially.
    • FIRST_VALUE and LAST_VALUE : These functions allow to specify sorted aggregate groups and return the first and last values in each group.
    • LEAD and LAG - LAG and LEAD allows to compare different rows of a table by specifying an offset from the current row.
  • If there is a subquery inside a top query, then subquery runs first.
  • The IN list ignores duplicate values.
  • A subquery in IN clause is similar to LEFT SEMI JOIN in Apache Hive.
  • If we need columns only from one table use subquery in IN clause. If we need columns from both tables then perform a join. Both subquery and join operation join two tables on a common column.
  • Correlated Subqueries:
    • The top-query is correlated to bottom query.
    • The tables in top and bottom queries are given different aliases.
    • The top query runs first.
    • For each distinct value from the top query, the bottom query is run.
  • A correlated subquery can be rewritten as a join with derived table.
         

  • When using NOT IN clause, make sure to exclude NULL values else query would return zero rows.
  • NOT EXISTS is unaffected by a NULL value. Replace NOT IN with NOT EXISTS whereever possible to deal with NULL values.
  • TRIM(name) and TRIM(BOTH from name) - Both commands trims trailing and leading spaces.
  • TRIM(TRAILING 'y' from name) - Removes y letter from the end of name, if exists.
  • TRIM command is case sensitivity.
  • SUBSTRING(firstname from 2 for 3) - start from 2nd position and go for 3 positions. This is 1-based.
  • SUBSTRING(firstname from 3) - start from 3rd character till the end.
  • SUBSTRING(firstname from 0 for 6) - returns from 1st character to 5th character. 
  • SUBSTRNG(firstname from -1 for 4) - returns 2 characters before because the first position starts 2 spaces before.
  • POSITION('e' in last_name) - returns the position of e letter (1-based) from last name.
  • CURSOR:
    • begin;  -> This keyword begins a transaction.
    • declare c_tom cursor for select * from sales; -> This declares a cursor c_tom.
    • fetch forward 5 from c_tom; -> This fetches first 5 rows from c_tom;
    • close c_tom; -> Closes the cursor;
  • NULLIFZERO(quantity) - If quantity is zero, return null. Else return the actual value.
  • NULLIF(val1,val2) - If val1 and val2 are equal return null. Else return val1.
  • ZEROIFNULL(col) - If col is null then return zero, else return actual value.
  • When case statement is not given alias, system assigns <CASE expression> as alias.
  • CASE statements can be nested.
  • ORDER BY statements are allowed in the CREATE VIEW statements.
  • When Order by is used in a select statement on View (view is already created with order by ), the order by defined in select statement is applied. The order by defined in VIEW definition is not considered in this case.
  • create view allview as select * from baseTable;
    • The view only includes the columns present when the view was created. Any new column added to base table after view creation is not included in view results though select * is present.
    • Same applies to deleting a column. If a column exists at the time of view creation and its dropped later, then the view will no longer work though select * is present in view creation.
  • Redshift allows the user to update the base table though the views.
  • SET Operators precedence is INTERSECT, UNION and EXCEPT/MINUS.
  • EXCEPT and MINUS behave similarly in SET operators.
  • When using SET operators top query will define alias whereas bottom query defines ORDER BY. Here ORDER BY supports only positional notation. Name notion if used will fail.
  • Statistical Aggregate Functions:
    • STDDEV
    • STDDEV_POP
    • STDDEV_SAMP
    • SQRT
    • VAR_SAMP
    • VAR_POP


Monday, April 3, 2017

Amazon RedShift: Key highlights on "Explain"


Terminology used in EXPLAIN PLAN in Amazon Redshift is briefly explained in this post.

  • To get the explain plan of a query, include EXPLAIN in front of any query.
  • Collecting statistics of the tables by analyzing them is important to get correct estimates in explain plan. 
  • Terminology used in EXPLAIN:
  • STEP: Every individual operation is a step in explain plan.
  • SEGMENT: Segments are the number of steps that can be done by a single process.
  • STREAM: A collection of segments that always begin with scan or reading of data  and ends with materialization  or blocking up. 
  • LAST SEGMENT: The term last segment means the query returns the data. If the return set is aggregated or sorted, the intermediate data is sent to leader node from all compute nodes. Leader node collects the data and sends back to the requesting client.
  • SEQUENTIAL SCAN: Also termed as scan. Data is read sequentially from beginning to end.
  • MERGE JOIN: Also termed as mjoin. This is the fastest Redshift join. This is used for inner joins and outer joins that are both distributed and sorted on join keys.
  • HASH JOIN: Also termed as hjoin.  This is based on hashing the joining columns. Its faster than nested loop join.
  • NESTED LOOP JOIN: Also termed as nloop. Its used for cross joins, joins with inequality conditions. Its the slowest join of all.
  • AGGREGATE: Also termed as aggr. This keyword is used for scalar aggregation functions. Scalar agg. functions returns only one row and one column.
  • HASHAGGREGATE: Also termed as aggr. This is used for unsorted grouped aggregate functions. 
  • GROUPAGGREGATE: Also termed as aggr. 
  • SORT: Also termed as sort. ORDER BY controls this sort. 
  • MERGE: Also termed as merge. This produces the final results based on intermediate sorted results derived from parallel operations.
  • SetOp Except: Also termed as hjoin. This is only used for EXCEPT queries.
  • HASH Intersect: Also termed as hjoin. This is used for INTERSECT queries.
  • Append: Also termed as save. This is the append used with subquery scan to implement UNION and UNION ALL queries.
  • LIMIT: Also termed as limit. This is used with LIMIT clause.
  • MATERIALIZE: Also termed as save. 
  • UNIQUE: Also termed as unique. Used mostly when DISTINCT keyword is used.
  • WINDOW: Also termed as window. This term means to compute window functions. 
  • Network (Broadcast): Also termed as bcast. This is a Broadcast that is considered an attribute of the Join Explain operators and steps.
    • DS_BCAST_INNER: In joins, it means we are broadcasting the entire inner table to all the compute nodes.
  • Network (Distribute) : Also termed as dist. This is used to distribute rows to compute nodes for parallel processing.
    • DS_DIST_NONE: No Data moves. Since joining tables have same distkey as join keys, data resides on the same node.
  • Network(Send to Leader) - Also termed as return. Sends the results to Leader node for furthur processing.
  • INSERT(Using Results) - Also termed as insert. Inserts data.
  • DELETE(Scan and Filter) - Also termed as delete. Deletes data.
  • UPDATE(Scan and Filter) - Also termed as delete,insert. Update is actually both delete and then insert.
  • COST 0.00 ..0.09: Costs are cumulative as we read up the plan. 0.00 is the cost to read first row and 0.09 is the cost to read all rows.
  • ROWS: The expected number of rows to return.
  • WIDTH: The estimated average row size (width) of all the rows, in bytes.

Visit my post on "Key highlights on SQL Functions, Commands, Aggregate Functions, OLAP functions,  Date/Time functions, Queries" here.

Amazon RedShift: Key highlights on "Temporary Tables" and "Derived Tables'

Important points to remember about Temporary Tables are

  • Temporary tables are visible only within the current session. They are automatically dropped at the end of the session.
  • If we begin the table name with #, it will automatically creates a temporary table.
  • Operations on temporary tables
    • create table #employee_temp ...  
      • This sql automatically creates employee_temp as temporary table due to # at the front of the table name.
    • insert into #employee_temp select * from employee; 
      • # is required at the front of the temporary table name while inserting.
    • create temp table employee_temp like employee;
      • This command creates a temporary table employee_temp. It inherits its columns, Distkey ,sortkey and NOT NULL from employee table.
    • create temp table employee_temp as select * from employee;
      • This command creates a temporary table employee_temp. The temporary table only inherits the column names.
    • create temp table employee_temp as select firstname,lastname from employee;
      • Creates temporary table employee_temp that has firstname and lastname from employee.
    • create temp table emp_temp distkey(emp_id) as select * from employee;
    • create temp table emp_temp diststyle even as select * from employee;
    • create temp table emp_temp diststyle even sortkey(emp_name) as select * from employee;
  • A derived table lasts the life of a query but temporary table lasts the life of a session.
  • A temporary table can have the same name as permanent table,but not recommended.
  • All Users by default has temporary table creation privilege. To remove, revoke TEMP privilege from PUBLIC and grant TEMP privilege to specific user/group.
  • Deep Copy:
    • Deep copy operation recreates and repopulates the table using bulk insert  and also automatically sorts the table.
    • If a table has large unsorted region, deep copy is a preferred method compared to vacuum to sort the table.
    • 4 Methods to perform deep copy:
      • Use original DDL command - CREATE TABLE command. We can specify all PK, FK, distkey, sortkey in DDL command.
      • CREATE TABLE LIKE - The new table doesn't inherit PK and FK. It only inherits distkey, sortkey and not null attributes from parent table.
      • CTAS - The new table will not inherit PK, FK, not null, distkey, sortkey from parent table.
      • Create temporary table - If we need to retain all the attributes of the original table, then we have to create temporary table using CTAS command. Then, truncate parent table and then insert into parent table from temporary table.
    • During deep copy operation concurrent updates are not allowed whereas its allowed during vacuum.
    • v_generate_tbl_ddl script is used to generate the table DDL. 
  • Derived Tables:
    • Exists only within query.
    • Analogous to subquery concept in RDBMS world.
    • Derived table lives in memory.
    • Derived table must be given name.
    • All columns must be given name in the derived table. 
    • Derived table is materialized by a select statement inside the query and exists between open and closed parenthesis.
    • Different ways to define derived tables:
      • select * from (select dept,avg(salary) from employee) as salavg(dept, avgsal);
        • salavg is the name given to derived table. dept, avgsal are aliases defined externally to dept, avg(salary) columns.
      • select * from (select dept,avg(salary) avgsal from employee) as salavg;
        • Salavg is the name given to derived table. dept column is not aliased because it can default to normal column and avgsal alias is defined within the derived table query.
      • WITH salavg(avgsal) as (select avg(salary) from employee) select * from salavg;
        • Using with command we create derived table with column alias before running the main query.
      • WITH emp as (select * from employee) select * from emp;
        • Derived table emp selects all rows and columns from main table employee. 
      • The following query creates two derived tables using WITH command and then performs a join.
                              WITH emp(deptid,avgsal) as (select deptid, avg(sal) from employee) ,
                                      dept as (select deptid,deptname from department)
                              select emp.deptid, dept.deptname, avgsal from emp inner join dept
                              on emp.deptid = dept.deptid;          
    • The derived table is built first while executing a query.

Check my post on Key highlights on Explain.

Sunday, April 2, 2017

Amazon RedShift: Key highlights on "Compression"


This post highlights important points on Compression in Redshift.

  • Compression is at column-level in Redshift.
  • By default, Amazon Redshift stores data in raw and uncompressed format.
  • Types of compression supported by Redshift are

  • Byte Dictionary encoding stores a dictionary of unique values for each block of column values. Due to 1MB disk block size, dictionary can hold upto 256 unique values in a single block. Any more distinct values above 256 are stored in raw, uncompressed form. This encoding is effective when distinct values are less than 256.
  • Delta encoding is useful mostly for date and time columns. This encoding compress data by storing the difference between values that follow each other in the column. DELTA records difference as 1 byte values, DELTA 32k records difference as 2-byte values.
  • LZO encoding works best with character strings. LZO provides high compression ratio, so compression is little slower but supports extremely fast decompression.
  • Mostly encoding is useful when the declared data type for a column has large capacity than the majority of values actually stored. 
  • Run Length encoding is useful when a column has consecutively repetitive data values. This encoding replaces the consecutively repetitive value in a column with a token that contains the value and number of consecutive occurrences.
  • Text255 and Text32k encodings are useful for compressing VARCHAR columns only.  A separate dictionary is created with unique words in the column value and an index value is associated with each unique word. 
  • Analyze Compression Commands performs compression analysis and makes suggestions. To implement suggestions, we should recreate table.
    • analyze compression : Analyzes all the tables in current DB 
    • analyze compression table : analyzes the table specified. More than one table cant be specified in this command.
    • analyze compression table_name column_name : analyzes the column specified. More than one column can be specified.
    • analyze compression comprows : This is the number of rows <numrows> to be used as the sample size for compression analysis.
    • analyze compression comprows numrows: numrows is the number of rows to be used as the compression sample size.
  • Analyze compression acquires table lock.
  • Analyze compression doesn't consider Run Length encoding for SortKey columns.

Amazon RedShift: Key highlights on "System Tables"

This post has key points on System Tables used in Redshift.


  • Redshift has following system tables
    • STL (System Table Logging) - These system tables are generated from log files. Logging tables has STL prefix.
    • STV (System Table Virtual) -  These virtual system tables has snapshots of the current system data. These virtual tables has STV as prefix.
    • System Views - Subset of data found in STL and STV tables is available in the system views, SVL (System View Logging) and SVV (System View Virtual) respectively. 
    • System Catalog tables - These tables store schema metadata, such as information on tables and columns. These tables have PG prefix.
  • For system tables to return a table metadata, the schema of the table should be added to the search_path. Below command adds sql_class database to search_path. Once it is done, we can retrieve any table metadata in sql_class database.
    • set search_path to '$user','public','sql_class';
  • All system tables exists in pg_catalog database in Redshift.
  • Some of the Redshift system tables:
    • pg_table_def: Contains table information like columns, datatypes etc.
    • pg_aggregate 
    • svv_diskusage : Is used to find the data distribution skew in each table like number of rows stored in each slice for a given table.
    • svl_statementtext : Checks all statements that used the analyze command.
    • stv_blocklist : Is used to find how many 1MB blocks of disk space are used for each table
    • stl_load_commits: we can check the details of the COPY operation.
    • stl_query: Can find out when a table has been last analyzed
    • stv_table_perm: Has table Id's.
    • svl_qlog: Contains elapsed time of queries
    • svl_query_summary: We can determine if query is writing to disk. If is_diskenabled field is ("t") for any step, then that step wrote data to disk.
View my next post on Key Highlights on Compression.

Amazon RedShift: Key highlights on "Introduction" and "Best Practices For Designing a Table"

Amazon Redshift is a MPP (massive parallel processing) peta-byte scale data warehouse service hosted on Amazon Web Services. Here I have provided some key highlights or important points to remember about Amazon RedShift. To refresh your knowledge on Amazon Redshift the below provided information is wealth.

This post contains highlights on Amazon RedShift Introduction and Best Practices For Designing a Table.


  • Amazon Redshift is a columnar database.
  • Tables can be distributed as
    • Distribution on unique key
    • Distribution on non-unique key 
    • Distribution Style is ALL
    • Distribution Style is EVEN
    • Matching Distribution Keys for co-location of Joins
    • Big table/Small table joins
  • Defining a Sort Key helps in performance improvement. Data is stored on disk in sorted order.
  • Columns that are good candidates to be defined as sort key are
    • Timestamp or date columns on which we frequently access the recent data 
    • Columns on which we perform range or equality filtering 
    • Columns frequently used for order by, group by or window functions
    • The joining column, on which two tables can be joined 
  • Columns that are good candidates to be defined as distributed key:
    • To distribute data evenly among the nodes and slices in a cluster
    • To collocate data for joins and aggregations
  • When two tables are frequently joined on a join column, specify that join column as both sort key and distribution key.
  • RedShift stores columnar data in 1MB disk blocks.
  • Each block comes with metadata. The min and max values of keys for each block are stored as part of metadata. This helps range or equality queries to quickly identify the block which matches the filter condition.
  • Analyze command collects statistics
    • Analyze; -> analyzes all tables in current database
    • Analyze verboze; -> Analyzes all tables in current database and report progress
    • analyze table -> analyzes the table specified
    • analyze table(col1, col2) -> analyzes the columns col1 and col2 in table
  • Redshift automatically analyzes some tables created with the following commands.
    • Create table as
    • create temp table as
    • select into 
  • Vacuum:
    • Redshift doesn't automatically reclaim or reuse space that is freed by delete and update operations. These rows are logically deleted but not physically. Vacuum will reclaim the space.
    • Redshift during update operation marks the old row as delete and inserts a new row. So every update is a delete followed by insert. The sort order might be disturbed by updates.
    • When delete operation is performed, the row is marked for delete but not removed until Vacuum.
  • Vacuum operation is time consuming, so its preferred to run it during maintenance window.
  • Vacuum commands:
    • vacuum;  -> reclaims space of all tables in current database
    • vacuum table -> reclaims space of specified table.
    • vacuum sort only table -> resorts rows for table
    • vacuum delete only table -> claims space from deletes and updates for table 
  • Best Practices for designing tables
    • Choose the best sort key
    • Choose the great distribution key
    • Consider defining primary key and foreign key constraints
    • Use the smallest possible column size
    • Use date/time data types for date columns
    • Specify redundant predicates on the sort column
  • Primary key, foreign key, unique key constraints are not enforced by Redshift. They are for informational purpose and is used by query planner to generate a better query plan.
  • RedShift does enforce NOT NULL constraints.
  • Redshift stores Date and Timestamp data more efficiently than other datatypes.
  • While performing joins, specify redundant predicates so unwanted blocks are skipped.

To continue, go to my next post on System Tables.

Amazon Athena: Key highlights on Amazon Athena

Amazon Athena is a serverless interactive query service that is used to analyze data in Amazon S3 using standard SQL. Amazon Athena app...