diff --git a/aws-blog-querying-kinesis-with-spark-and-hive/README.md b/aws-blog-querying-kinesis-with-spark-and-hive/README.md new file mode 100644 index 00000000..81334f67 --- /dev/null +++ b/aws-blog-querying-kinesis-with-spark-and-hive/README.md @@ -0,0 +1,5 @@ +# Project Name +Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming + +## Introduction +This project is derived from the blog post https://blogs.aws.amazon.com/bigdata/post/Tx3916WCIUPVA3T/Querying-Amazon-Kinesis-Streams-Directly-with-SQL-and-Spark-Streaming \ No newline at end of file diff --git a/aws-blog-querying-kinesis-with-spark-and-hive/pom.xml b/aws-blog-querying-kinesis-with-spark-and-hive/pom.xml new file mode 100644 index 00000000..08e34540 --- /dev/null +++ b/aws-blog-querying-kinesis-with-spark-and-hive/pom.xml @@ -0,0 +1,163 @@ + + 4.0.0 + aws-blog-querying-kinesis-with-spark-and-hive + aws + 1.0.0 + + + 2.10.5 + 1.6.1 + + + + + + + com.fasterxml.jackson.core + jackson-databind + 2.4.4 + provided + + + + org.apache.spark + spark-core_2.10 + ${spark.version} + provided + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.apache.spark + spark-streaming-kinesis-asl_2.10 + ${spark.version} + + + org.apache.spark + spark-hive_2.10 + ${spark.version} + + + org.apache.spark + spark-sql_2.10 + ${spark.version} + + + org.apache.spark + spark-hive-thriftserver_2.10 + ${spark.version} + + + + org.slf4j + slf4j-api + 1.7.21 + + + org.apache.hadoop + hadoop-client + 2.7.2 + + + + + + + org.scala-tools + maven-scala-plugin + + + + compile + testCompile + + + + + ${scala.version} + + + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + package + + shade + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.apache.spark.examples.streaming.KinesisWatch + + + + + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + add-source + generate-sources + + add-source + + + + src/main/scala + src/main/scala-2.10 + src/test/scala + src/test/scala-2.10 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.7 + 1.7 + UTF-8 + + + + + + + + + scala-tools.org + Scala-tools Maven2 Repository + http://scala-tools.org/repo-releases + + + + diff --git a/aws-blog-querying-kinesis-with-spark-and-hive/scripts/create-emr.sh b/aws-blog-querying-kinesis-with-spark-and-hive/scripts/create-emr.sh new file mode 100755 index 00000000..b97ee541 --- /dev/null +++ b/aws-blog-querying-kinesis-with-spark-and-hive/scripts/create-emr.sh @@ -0,0 +1,30 @@ +#!/bin/bash +function usage_and_exit { +echo "usage $0 [release label (default: emr-4.5.0)] [key name] [instance type (default: m3.xlarge)]" + exit 1 +} + +if [ "$#" -lt 1 ]; then + usage_and_exit +fi + +RELEASE_LABEL=emr-4.5.0 +APPLICATIONS="Name=Spark Name=Hive" +KEY_NAME= +INSTANCE_TYPE=m3.xlarge + +if [ "$#" -eq 1 ]; then + KEY_NAME=$1 +elif [ "$#" -ne 3]; then + usage_and_exit +else + RELEASE_LABEL=$1 + KEY_NAME=$2 + INSTANCE_TYPE=$3 +fi + +INSTANCE_GROUPS="InstanceGroupType=MASTER,InstanceCount=1,BidPrice=0.08,InstanceType=$INSTANCE_TYPE InstanceGroupType=CORE,InstanceCount=2,BidPrice=0.08,InstanceType=$INSTANCE_TYPE" +BOOTSTRAP_ACTIONS="Path=s3://aws-bigdata-blog/artifacts/Querying_Amazon_Kinesis/DownloadKCLtoEMR400.sh,Name=InstallKCLLibs" + +aws emr create-cluster --release-label $RELEASE_LABEL --applications $APPLICATIONS --ec2-attributes KeyName=$KEY_NAME --use-default-roles --instance-groups $INSTANCE_GROUPS --bootstrap-actions $BOOTSTRAP_ACTIONS --configurations https://s3-ap-southeast-1.amazonaws.com/helix-public/spark-defaults.json + diff --git a/aws-blog-querying-kinesis-with-spark-and-hive/scripts/create-kinesis.sh b/aws-blog-querying-kinesis-with-spark-and-hive/scripts/create-kinesis.sh new file mode 100755 index 00000000..ae7449ce --- /dev/null +++ b/aws-blog-querying-kinesis-with-spark-and-hive/scripts/create-kinesis.sh @@ -0,0 +1,15 @@ +#!/bin/bash +function usage_and_exit { + echo "usage $0 [stream name] [shard count]" + exit 1 +} + +if [ "$#" -ne 2 ]; then + usage_and_exit +fi + +STREAM_NAME=$1 +SHARD_COUNT=$2 + +aws kinesis create-stream --stream-name $STREAM_NAME --shard-count $SHARD_COUNT + diff --git a/aws-blog-querying-kinesis-with-spark-and-hive/src/main/python/webaccesstraffic_generator.py b/aws-blog-querying-kinesis-with-spark-and-hive/src/main/python/webaccesstraffic_generator.py new file mode 100644 index 00000000..f5a9a74d --- /dev/null +++ b/aws-blog-querying-kinesis-with-spark-and-hive/src/main/python/webaccesstraffic_generator.py @@ -0,0 +1,55 @@ +__author__ = 'Amo Abeyaratne' + +import string +import random +import time +from datetime import datetime +from boto import kinesis + + +def random_generator(size=6, chars=string.ascii_lowercase + string.digits): + return ''.join(random.choice(chars) for x in range(size)) + + + #connecting to Kinesis stream + +#region = 'us-east-1' +region = 'ap-southeast-1' +kinesisStreamName = 'js-development-stream' + +kinesis = kinesis.connect_to_region(region) + +# generating data and feeding kinesis. + +while True: + + + y = random_generator(10,"techsummit2015") + + urls = ['foo.com','amazon.com','testing.com','google.com','sydney.com'] + x = random.randint(0,4) + userid = random.randint(25,35)+1200 + + now = datetime.now() + timeformatted = str(now.month) + "/" + str(now.day) + "/" + str(now.year) + " " + str(now.hour) + ":" +str(now.minute) + ":" + str(now.second) + + + #building the pay load for kinesis puts. + + putString = str(userid)+','+'www.'+urls[x]+'/'+y+','+timeformatted + patitionKey = random.choice('abcdefghij') + + # schema of the imput string now userid,url,timestamp + + print putString + + result = kinesis.put_record(kinesisStreamName,putString,patitionKey) + + print result + + + + + + + diff --git a/aws-blog-querying-kinesis-with-spark-and-hive/src/main/scala-2.10/org/apache/spark/examples/streaming/KinesisWatch.scala b/aws-blog-querying-kinesis-with-spark-and-hive/src/main/scala-2.10/org/apache/spark/examples/streaming/KinesisWatch.scala new file mode 100644 index 00000000..628a93ba --- /dev/null +++ b/aws-blog-querying-kinesis-with-spark-and-hive/src/main/scala-2.10/org/apache/spark/examples/streaming/KinesisWatch.scala @@ -0,0 +1,171 @@ +// scalastyle:off println +// AUTHOR: Amo Abeyaratne +// DATE Updated : 10-sep-2015 +// Company : AWS Sydney +// This is built as a proof of conecpt to show how spark streaming can be used for representing micro-batched data as temporary tables via JDBC. + +package org.apache.spark.examples.streaming + +import java.nio.ByteBuffer + +import scala.util.Random + +import com.amazonaws.auth.{DefaultAWSCredentialsProviderChain, BasicAWSCredentials} +import com.amazonaws.regions.RegionUtils +import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import com.amazonaws.services.kinesis.model.PutRecordRequest +import org.apache.log4j.{Level, Logger} + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Milliseconds, StreamingContext} +import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions +import org.apache.spark.streaming.kinesis.KinesisUtils + +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Time, Seconds, StreamingContext} +import org.apache.spark.util.IntParam +import org.apache.spark.sql.SQLContext + + +import org.apache.spark.storage.StorageLevel + +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 + +// Import Row. +import org.apache.spark.sql.Row; + +// Import Spark SQL data types +import org.apache.spark.sql.types.{StructType,StructField,StringType} +import org.apache.spark.sql.SaveMode.Append + + +object KinesisWatch extends Logging { + def main(args: Array[String]) { + // Check that all required args were passed in. + if (args.length != 7) { + System.err.println( + """ + |Usage: KinesisWatch + | + | is the name of the consumer app, used to track the read data in DynamoDB + | is the name of the Kinesis stream + | is the endpoint of the Kinesis service + | (e.g. https://kinesis.us-east-1.amazonaws.com) + | How long is each batch for the temp table view directly from stream in milliseconds + | What should be the schema for CSV inputs via kinesis. Block within double quotes. + | i.e "custid date value1 value2" for a 4 column dataframe. This has to match the input data. + | Delimiter? - how to split the data in each row of the stream to generate fields i.e "," + | Name of the tempTable accessible via JDBC HiveThriftServer2 for the duration of the batch. + | + """.stripMargin) + System.exit(1) + } + + //StreamingExamples.setStreamingLogLevels() + + // Populate the appropriate variables from the given args + val Array(appName, streamName, endpointUrl, batchLength, schemaString, inputDelimiter, tempTableName ) = args + + + // Determine the number of shards from the stream using the low-level Kinesis Client + // from the AWS Java SDK. + val credentials = new DefaultAWSCredentialsProviderChain().getCredentials() + require(credentials != null, + "No AWS credentials found. Please specify credentials using one of the methods specified " + + "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html") + val kinesisClient = new AmazonKinesisClient(credentials) + kinesisClient.setEndpoint(endpointUrl) + val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size + + + // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard. + // This is not a necessity; if there are less receivers/DStreams than the number of shards, + // then the shards will be automatically distributed among the receivers and each receiver + // will receive data from multiple shards. + val numStreams = numShards + + // Spark Streaming batch interval + val batchInterval = Milliseconds(batchLength.toLong) + + // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information + // on sequence number of records that have been received. Same as batchInterval for this + // example. + val kinesisCheckpointInterval = batchInterval + + // Get the region name from the endpoint URL to save Kinesis Client Library metadata in + // DynamoDB of the same region as the Kinesis stream + val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() + + // Setup the SparkConfig and StreamingContext + val sparkConfig = new SparkConf().setAppName("KinesisWatch") + val ssc = new StreamingContext(sparkConfig, batchInterval) + + // Create the Kinesis DStreams + val kinesisStreams = (0 until numStreams).map { i => + KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, + InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) + } + + // Union all the streams + val unionStreams = ssc.union(kinesisStreams) + + val hiveContext = new HiveContext(ssc.sparkContext); + + + //define the schema for input data structure + //val schemaString = "datetimeid customerid grosssolar netsolar load controlledload" + //Passing the schemaString as a part of the arguments seperated by spaces. + + val tableSchema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) + + + //looping through the DStream (from Kinesis) + + unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { + + // Split each line in each Dstream RDD by given delimiter and feed + + val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(inputDelimiter))) + + //create a DataFrame - For each DStream -> RDD for the batch window. This will basically convert a window of + //"batchInterval" size block of data on kinesis in to a dataframe. + + val wordsDataFrame = hiveContext.createDataFrame(rowRDD, tableSchema) + + // Register the current dataFrame in the loop as table + wordsDataFrame.registerTempTable(tempTableName) + + // save the table Also as a persistent table in Hive. + + wordsDataFrame.saveAsTable("permKinesisTable",Append) +// wordsDataFrame.write.mode(Append).saveAsTable("permKinesisTable"); + + // once the above is done. A table called "inputStream" + + val sqlString = "select count(*) from "+tempTableName + // Do a count on table using SQL and print it for each timestamp on console + val wordCountsDataFrame = hiveContext.sql(sqlString) + + println(s"========= $time =========") + wordCountsDataFrame.show() + }) + + + +// creating a HiveContext to present via JDBC - connect via JDBC /Beeline to test it. +HiveThriftServer2.startWithContext(hiveContext) + + // Start the streaming context and await termination - CTRL+C to terminate on console. + + ssc.start() + ssc.awaitTermination() + } +} + + + diff --git a/aws-blog-querying-kinesis-with-spark-and-hive/src/main/spark-config/spark-defaults.json b/aws-blog-querying-kinesis-with-spark-and-hive/src/main/spark-config/spark-defaults.json new file mode 100644 index 00000000..98165fc7 --- /dev/null +++ b/aws-blog-querying-kinesis-with-spark-and-hive/src/main/spark-config/spark-defaults.json @@ -0,0 +1,8 @@ +[ +{ + "Classification": "spark-defaults", + "Properties": { + "spark.sql.hive.thriftServer.singleSession": "true" + } +} +]