Skip to content
This repository was archived by the owner on Jul 20, 2022. It is now read-only.

Code samples for "Power your Redshift Analytics With Apache Spark and… #57

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions aws-blog-power-redshift-analytics-with-amazonml-spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Power your Amazon Redshift Analytics with Apache Spark and Amazon ML
This is the code repository for the code sample used in the AWS Big Data blog post.


## Prerequisites
- Amazon Web Services account
- [AWS Command Line Interface (CLI)](http://aws.amazon.com/cli/)
- [sbt](http://www.scala-sbt.org/)
- [sbt-assembly](https://github.com/sbt/sbt-assembly)

## Running the code samples
- Follow the instructions in the blog to run the code snippets in Hive, Spark, Amazon Redshift and Amazon ML
10 changes: 10 additions & 0 deletions aws-blog-power-redshift-analytics-with-amazonml-spark/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name := "aws-blog-power-redshift-analytics-with-amazonml-spark"

version := "1.0"

scalaVersion := "2.11.5"

libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "2.0.0",
"com.amazonaws" % "aws-java-sdk" % "1.10.46" % "provided",
"com.amazonaws" % "aws-java-sdk-core" % "1.10.46" % "provided")

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Create table weather
CREATE EXTERNAL TABLE IF NOT EXISTS w (

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to use a more descriptive name than 'w'

station string,
station_name string,
elevation string,
latitude string,
longitude string,
wdate string,
prcp decimal(5,1),
snow int,
tmax string,
tmin string,
awnd string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 's3://<path-to-weather-dataset>/'
TBLPROPERTIES("skip.header.line.count"="1");

-- Make sure to change wdate to 'date type' - helpful when joining weather with other tables on date
CREATE TABLE weather AS SELECT station, station_name, elevation, latitude, longitude,
cast(concat(
substr(wdate,1,4), '-',
substr(wdate,5,2), '-',
substr(wdate,7,2)
) AS date) AS dt, prcp, snow, tmax, tmin, awnd FROM w;

set hive.cli.print.header=true;

select * from weather limit 10;
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
-- Lets make sure table does not exist
drop table all_flights

-- Create a database table with data for all flights in December 2013
create table all_flights
(
ORD_DELAY_ID bigint identity(0,1),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ord_delay_id as primary key is confusing, can we consider renaming

YEAR smallint,
QUARTER smallint,
MONTH smallint,
DAY_OF_MONTH smallint,
DAY_OF_WEEK smallint,
FL_DATE date,
UNIQUE_CARRIER varchar(10),
AIRLINE_ID int,
CARRIER varchar(4),
TAIL_NUM varchar(8),
FL_NUM varchar(4),
ORIGIN_AIRPORT_ID smallint,
ORIGIN varchar(5),
ORIGIN_CITY_NAME varchar(35),
ORIGIN_STATE_ABR varchar(2),
ORIGIN_STATE_NM varchar(50),
ORIGIN_WAC varchar(2),
DEST_AIRPORT_ID smallint,
DEST varchar(5),
DEST_CITY_NAME varchar(35),
DEST_STATE_ABR varchar(2),
DEST_STATE_NM varchar(50),
DEST_WAC varchar(2),
CRS_DEP_TIME smallint,
DEP_TIME varchar(6),
DEP_DELAY numeric(22,6),
DEP_DELAY_NEW numeric(22,6),
DEP_DEL15 numeric(22,6),
DEP_DELAY_GROUP smallint,
DEP_TIME_BLK varchar(15),
TAXI_OUT numeric(22,6),
TAXI_IN numeric(22,6),
CRS_ARR_TIME numeric(22,6),
ARR_TIME varchar(6),
ARR_DELAY numeric(22,6),
ARR_DELAY_NEW numeric(22,6),
ARR_DEL15 numeric(22,6),
ARR_DELAY_GROUP smallint,
ARR_TIME_BLK varchar(15),
CANCELLED numeric(22,6),
DIVERTED numeric(22,6),
CRS_ELAPSED_TIME numeric(22,6),
ACTUAL_ELAPSED_TIME numeric(22,6),
AIR_TIME numeric(22,6),
FLIGHTS numeric(22,6),
DISTANCE numeric(22,6),
DISTANCE_GROUP numeric(22,6),
CARRIER_DELAY numeric(22,6),
WEATHER_DELAY numeric(22,6),
NAS_DELAY numeric(22,6),
SECURITY_DELAY numeric(22,6),
LATE_AIRCRAFT_DELAY numeric(22,6),
primary key (ord_delay_id)
);

-- Copy all flights data for Dec 2013 and 2014 from S3 bucket
copy all_flights
FROM 's3://<path-to-flights-data/<flight-data>.csv'
credentials 'aws_iam_role=arn:aws:iam::<your-aws-account-number>:role/<role-name>' csv IGNOREHEADER 1;

-- records in all_flights
select count(*) from all_flights;
select top 50 * from all_flights;

drop function f_days_from_holiday (year int, month int, day int);

-- Create Python UDF to compute number of days before/after the nearest holiday
create or replace function f_days_from_holiday (year int, month int, day int)
returns int
stable
as $$
import datetime
from datetime import date
from calendar import monthrange

fdate = date(year, month, day)
last_day_of_month = monthrange(year, month)[1]

fmt = '%Y-%m-%d'
s_date = date(year, month, 1)
e_date = date(year, month, monthrange(year, month)[1])
start_date = s_date.strftime(fmt)
end_date = e_date.strftime(fmt)

"""
Compute a list of holidays over this period
"""
from pandas.tseries.holiday import USFederalHolidayCalendar
calendar = USFederalHolidayCalendar()
holidays = calendar.holidays(start_date, end_date)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only look for holidays in the present month, if the day passed in is the last day in month, nearest holiday could also be in the next month

days_from_closest_holiday = [(abs(fdate - hdate)).days for hdate in holidays.date.tolist()]
return min(days_from_closest_holiday)
$$ language plpythonu;

--
-- IMPORTANT NOTES:
-- 1. ord_flights is a table that will be copied to your Redshift cluster from your Spark environment
-- 2. Make sure the table exists before issuing the following commands
--
select count(*) from ord_flights;

-- Derived table: training data table for Amazon ML predictive model
create table train_ord_flights
as
(select * from ord_flights where year = 2013);

select count(*) from train_ord_flights;

-- Derived table: test data table for Amazon ML's batch predictions
create table test_ord_flights
as
(select * from ord_flights where year = 2013);

select count(*) from test_ord_flights;

Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Demonstrates combining tables in Hive and Redshift for data enrichment.
*/

import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SQLContext
import com.amazonaws.auth._
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider
import com.amazonaws.services.redshift.AmazonRedshiftClient
import _root_.com.amazon.redshift.jdbc42.Driver


object SparkRedshiftDemo {

val sc = new SparkContext(new SparkConf().setAppName("Spark Redshift Demo"))

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider()
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can I use this in PySPARK?

thanks

val token = credentials.getSessionToken
val awsAccessKey = credentials.getAWSAccessKeyId
val awsSecretKey = credentials.getAWSSecretKey

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._


def transformData (jdbcURL: String, s3TempDir: String, redshiftIAMRole: String): Unit = {

// Read weather table from hive
val rawWeatherDF = sqlContext.table("weather")

// Retrieve the header
val header = rawWeatherDF.first()

// Remove the header from the dataframe
val noHeaderWeatherDF = rawWeatherDF.filter(row => row != header)

// UDF to convert the air temperature from celsius to fahrenheit
val toFahrenheit = udf { (c: Double) => c * 9 / 5 + 32 }

// Apply the UDF to maximum and minimum air temperature
val weatherDF = noHeaderWeatherDF.withColumn("new_tmin", toFahrenheit(noHeaderWeatherDF("tmin")))
.withColumn("new_tmax", toFahrenheit(noHeaderWeatherDF("tmax")))
.drop("tmax")
.drop("tmin")
.withColumnRenamed("new_tmax", "tmax")
.withColumnRenamed("new_tmin", "tmin")

// Query against the ord_flights table in Redshift
val flightsQuery =
"""
select ORD_DELAY_ID, DAY_OF_MONTH, DAY_OF_WEEK, FL_DATE, f_days_from_holiday(year, month, day_of_month) as DAYS_TO_HOLIDAY, UNIQUE_CARRIER, FL_NUM, substring(DEP_TIME, 1, 2) as DEP_HOUR, cast(DEP_DEL15 as smallint),
cast(AIR_TIME as integer), cast(FLIGHTS as smallint), cast(DISTANCE as smallint)
from ord_flights where origin='ORD' and cancelled = 0
"""

// Create a Dataframe to hold the results of the above query
val flightsDF = sqlContext.read.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDir)
.option("query", flightsQuery)
.option("temporary_aws_access_key_id", awsAccessKey)
.option("temporary_aws_secret_access_key", awsSecretKey)
.option("temporary_aws_session_token", token).load()

// Join the two dataframes
val joinedDF = flightsDF.join(weatherDF, flightsDF("fl_date") ===

weatherDF("dt"))

// Write the joined data back to a Redshift table
joinedDF.write
.format("com.databricks.spark.redshift")
.option("temporary_aws_access_key_id", awsAccessKey)
.option("temporary_aws_secret_access_key", awsSecretKey)
.option("temporary_aws_session_token", token)
.option("url", jdbcURL)
.option("dbtable", "ord_flights")
.option("aws_iam_role", redshiftIAMRole)
.option("tempdir", s3TempDir)
.mode("error")
.save()
}

def main (args: Array[String]): Unit = {

val usage = """
Usage: SparkRedshift.scala jdbcURL s3TempDir redshiftIAMRole
"""
if (args.length < 3) {
println(usage)
}

// jdbc url for the Redshift
val jdbcURL = args(0)
// S3 bucket where the temporary files are written
val s3TempDir = args(1)
// Redshift IAM role
val redshiftIAMRole = args(2)

transformData(jdbcURL, s3TempDir, redshiftIAMRole)
}
}