From 7e0ce997e8b9be39f7770775ca49554117a3142b Mon Sep 17 00:00:00 2001 From: R Ravirala Date: Wed, 17 Aug 2016 19:34:38 -0400 Subject: [PATCH] Code samples for "Power your Redshift Analytics With Apache Spark and Amazon ML" blog --- .../README.md | 12 ++ .../build.sbt | 10 ++ .../src/main/hive/weather-hive.sql | 29 +++++ .../src/main/redshift/redshift-queries.sql | 122 ++++++++++++++++++ .../src/main/scala/SparkRedshiftDemo.scala | 110 ++++++++++++++++ 5 files changed, 283 insertions(+) create mode 100644 aws-blog-power-redshift-analytics-with-amazonml-spark/README.md create mode 100644 aws-blog-power-redshift-analytics-with-amazonml-spark/build.sbt create mode 100644 aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/hive/weather-hive.sql create mode 100644 aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/redshift/redshift-queries.sql create mode 100644 aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/scala/SparkRedshiftDemo.scala diff --git a/aws-blog-power-redshift-analytics-with-amazonml-spark/README.md b/aws-blog-power-redshift-analytics-with-amazonml-spark/README.md new file mode 100644 index 00000000..96458e9e --- /dev/null +++ b/aws-blog-power-redshift-analytics-with-amazonml-spark/README.md @@ -0,0 +1,12 @@ +# Power your Amazon Redshift Analytics with Apache Spark and Amazon ML +This is the code repository for the code sample used in the AWS Big Data blog post. + + +## Prerequisites + - Amazon Web Services account + - [AWS Command Line Interface (CLI)](http://aws.amazon.com/cli/) + - [sbt](http://www.scala-sbt.org/) + - [sbt-assembly](https://github.com/sbt/sbt-assembly) + +## Running the code samples + - Follow the instructions in the blog to run the code snippets in Hive, Spark, Amazon Redshift and Amazon ML \ No newline at end of file diff --git a/aws-blog-power-redshift-analytics-with-amazonml-spark/build.sbt b/aws-blog-power-redshift-analytics-with-amazonml-spark/build.sbt new file mode 100644 index 00000000..b19cfd02 --- /dev/null +++ b/aws-blog-power-redshift-analytics-with-amazonml-spark/build.sbt @@ -0,0 +1,10 @@ +name := "aws-blog-power-redshift-analytics-with-amazonml-spark" + +version := "1.0" + +scalaVersion := "2.11.5" + +libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "2.0.0", + "com.amazonaws" % "aws-java-sdk" % "1.10.46" % "provided", + "com.amazonaws" % "aws-java-sdk-core" % "1.10.46" % "provided") + diff --git a/aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/hive/weather-hive.sql b/aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/hive/weather-hive.sql new file mode 100644 index 00000000..f9837480 --- /dev/null +++ b/aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/hive/weather-hive.sql @@ -0,0 +1,29 @@ +-- Create table weather +CREATE EXTERNAL TABLE IF NOT EXISTS w ( + station string, + station_name string, + elevation string, + latitude string, + longitude string, + wdate string, + prcp decimal(5,1), + snow int, + tmax string, + tmin string, + awnd string +) +ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' +LOCATION 's3:///' +TBLPROPERTIES("skip.header.line.count"="1"); + +-- Make sure to change wdate to 'date type' - helpful when joining weather with other tables on date +CREATE TABLE weather AS SELECT station, station_name, elevation, latitude, longitude, + cast(concat( + substr(wdate,1,4), '-', + substr(wdate,5,2), '-', + substr(wdate,7,2) + ) AS date) AS dt, prcp, snow, tmax, tmin, awnd FROM w; + +set hive.cli.print.header=true; + +select * from weather limit 10; diff --git a/aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/redshift/redshift-queries.sql b/aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/redshift/redshift-queries.sql new file mode 100644 index 00000000..1a75e724 --- /dev/null +++ b/aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/redshift/redshift-queries.sql @@ -0,0 +1,122 @@ +-- Lets make sure table does not exist +drop table all_flights + +-- Create a database table with data for all flights in December 2013 +create table all_flights +( +ORD_DELAY_ID bigint identity(0,1), +YEAR smallint, +QUARTER smallint, +MONTH smallint, +DAY_OF_MONTH smallint, +DAY_OF_WEEK smallint, +FL_DATE date, +UNIQUE_CARRIER varchar(10), +AIRLINE_ID int, +CARRIER varchar(4), +TAIL_NUM varchar(8), +FL_NUM varchar(4), +ORIGIN_AIRPORT_ID smallint, +ORIGIN varchar(5), +ORIGIN_CITY_NAME varchar(35), +ORIGIN_STATE_ABR varchar(2), +ORIGIN_STATE_NM varchar(50), +ORIGIN_WAC varchar(2), +DEST_AIRPORT_ID smallint, +DEST varchar(5), +DEST_CITY_NAME varchar(35), +DEST_STATE_ABR varchar(2), +DEST_STATE_NM varchar(50), +DEST_WAC varchar(2), +CRS_DEP_TIME smallint, +DEP_TIME varchar(6), +DEP_DELAY numeric(22,6), +DEP_DELAY_NEW numeric(22,6), +DEP_DEL15 numeric(22,6), +DEP_DELAY_GROUP smallint, +DEP_TIME_BLK varchar(15), +TAXI_OUT numeric(22,6), +TAXI_IN numeric(22,6), +CRS_ARR_TIME numeric(22,6), +ARR_TIME varchar(6), +ARR_DELAY numeric(22,6), +ARR_DELAY_NEW numeric(22,6), +ARR_DEL15 numeric(22,6), +ARR_DELAY_GROUP smallint, +ARR_TIME_BLK varchar(15), +CANCELLED numeric(22,6), +DIVERTED numeric(22,6), +CRS_ELAPSED_TIME numeric(22,6), +ACTUAL_ELAPSED_TIME numeric(22,6), +AIR_TIME numeric(22,6), +FLIGHTS numeric(22,6), +DISTANCE numeric(22,6), +DISTANCE_GROUP numeric(22,6), +CARRIER_DELAY numeric(22,6), +WEATHER_DELAY numeric(22,6), +NAS_DELAY numeric(22,6), +SECURITY_DELAY numeric(22,6), +LATE_AIRCRAFT_DELAY numeric(22,6), +primary key (ord_delay_id) +); + +-- Copy all flights data for Dec 2013 and 2014 from S3 bucket +copy all_flights +FROM 's3://.csv' + credentials 'aws_iam_role=arn:aws:iam:::role/' csv IGNOREHEADER 1; + +-- records in all_flights +select count(*) from all_flights; +select top 50 * from all_flights; + +drop function f_days_from_holiday (year int, month int, day int); + +-- Create Python UDF to compute number of days before/after the nearest holiday +create or replace function f_days_from_holiday (year int, month int, day int) + returns int +stable +as $$ + import datetime + from datetime import date + from calendar import monthrange + + fdate = date(year, month, day) + last_day_of_month = monthrange(year, month)[1] + + fmt = '%Y-%m-%d' + s_date = date(year, month, 1) + e_date = date(year, month, monthrange(year, month)[1]) + start_date = s_date.strftime(fmt) + end_date = e_date.strftime(fmt) + + """ + Compute a list of holidays over this period + """ + from pandas.tseries.holiday import USFederalHolidayCalendar + calendar = USFederalHolidayCalendar() + holidays = calendar.holidays(start_date, end_date) + days_from_closest_holiday = [(abs(fdate - hdate)).days for hdate in holidays.date.tolist()] + return min(days_from_closest_holiday) +$$ language plpythonu; + +-- +-- IMPORTANT NOTES: +-- 1. ord_flights is a table that will be copied to your Redshift cluster from your Spark environment +-- 2. Make sure the table exists before issuing the following commands +-- +select count(*) from ord_flights; + +-- Derived table: training data table for Amazon ML predictive model +create table train_ord_flights +as + (select * from ord_flights where year = 2013); + +select count(*) from train_ord_flights; + +-- Derived table: test data table for Amazon ML's batch predictions +create table test_ord_flights +as + (select * from ord_flights where year = 2013); + +select count(*) from test_ord_flights; + diff --git a/aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/scala/SparkRedshiftDemo.scala b/aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/scala/SparkRedshiftDemo.scala new file mode 100644 index 00000000..e8136ab3 --- /dev/null +++ b/aws-blog-power-redshift-analytics-with-amazonml-spark/src/main/scala/SparkRedshiftDemo.scala @@ -0,0 +1,110 @@ +/** + * Demonstrates combining tables in Hive and Redshift for data enrichment. + */ + +import org.apache.spark.sql._ +import org.apache.spark.SparkContext +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.SQLContext +import com.amazonaws.auth._ +import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.auth.AWSSessionCredentials +import com.amazonaws.auth.InstanceProfileCredentialsProvider +import com.amazonaws.services.redshift.AmazonRedshiftClient +import _root_.com.amazon.redshift.jdbc42.Driver + + +object SparkRedshiftDemo { + + val sc = new SparkContext(new SparkConf().setAppName("Spark Redshift Demo")) + + // Instance Profile for authentication to AWS resources + val provider = new InstanceProfileCredentialsProvider() + val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials] + val token = credentials.getSessionToken + val awsAccessKey = credentials.getAWSAccessKeyId + val awsSecretKey = credentials.getAWSSecretKey + + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + + def transformData (jdbcURL: String, s3TempDir: String, redshiftIAMRole: String): Unit = { + + // Read weather table from hive + val rawWeatherDF = sqlContext.table("weather") + + // Retrieve the header + val header = rawWeatherDF.first() + + // Remove the header from the dataframe + val noHeaderWeatherDF = rawWeatherDF.filter(row => row != header) + + // UDF to convert the air temperature from celsius to fahrenheit + val toFahrenheit = udf { (c: Double) => c * 9 / 5 + 32 } + + // Apply the UDF to maximum and minimum air temperature + val weatherDF = noHeaderWeatherDF.withColumn("new_tmin", toFahrenheit(noHeaderWeatherDF("tmin"))) + .withColumn("new_tmax", toFahrenheit(noHeaderWeatherDF("tmax"))) + .drop("tmax") + .drop("tmin") + .withColumnRenamed("new_tmax", "tmax") + .withColumnRenamed("new_tmin", "tmin") + + // Query against the ord_flights table in Redshift + val flightsQuery = + """ + select ORD_DELAY_ID, DAY_OF_MONTH, DAY_OF_WEEK, FL_DATE, f_days_from_holiday(year, month, day_of_month) as DAYS_TO_HOLIDAY, UNIQUE_CARRIER, FL_NUM, substring(DEP_TIME, 1, 2) as DEP_HOUR, cast(DEP_DEL15 as smallint), + cast(AIR_TIME as integer), cast(FLIGHTS as smallint), cast(DISTANCE as smallint) + from ord_flights where origin='ORD' and cancelled = 0 + """ + + // Create a Dataframe to hold the results of the above query + val flightsDF = sqlContext.read.format("com.databricks.spark.redshift") + .option("url", jdbcURL) + .option("tempdir", s3TempDir) + .option("query", flightsQuery) + .option("temporary_aws_access_key_id", awsAccessKey) + .option("temporary_aws_secret_access_key", awsSecretKey) + .option("temporary_aws_session_token", token).load() + + // Join the two dataframes + val joinedDF = flightsDF.join(weatherDF, flightsDF("fl_date") === + + weatherDF("dt")) + + // Write the joined data back to a Redshift table + joinedDF.write + .format("com.databricks.spark.redshift") + .option("temporary_aws_access_key_id", awsAccessKey) + .option("temporary_aws_secret_access_key", awsSecretKey) + .option("temporary_aws_session_token", token) + .option("url", jdbcURL) + .option("dbtable", "ord_flights") + .option("aws_iam_role", redshiftIAMRole) + .option("tempdir", s3TempDir) + .mode("error") + .save() + } + + def main (args: Array[String]): Unit = { + + val usage = """ + Usage: SparkRedshift.scala jdbcURL s3TempDir redshiftIAMRole + """ + if (args.length < 3) { + println(usage) + } + + // jdbc url for the Redshift + val jdbcURL = args(0) + // S3 bucket where the temporary files are written + val s3TempDir = args(1) + // Redshift IAM role + val redshiftIAMRole = args(2) + + transformData(jdbcURL, s3TempDir, redshiftIAMRole) + } +} \ No newline at end of file