Skip to content
This repository was archived by the owner on Jul 20, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions aws-blog-querying-kinesis-with-spark-and-hive/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Project Name
Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming

## Introduction
This project is derived from the blog post https://blogs.aws.amazon.com/bigdata/post/Tx3916WCIUPVA3T/Querying-Amazon-Kinesis-Streams-Directly-with-SQL-and-Spark-Streaming
163 changes: 163 additions & 0 deletions aws-blog-querying-kinesis-with-spark-and-hive/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>aws-blog-querying-kinesis-with-spark-and-hive</artifactId>
<groupId>aws</groupId>
<version>1.0.0</version>

<properties>
<scala.version>2.10.5</scala.version>
<spark.version>1.6.1</spark.version>
</properties>

<dependencies>

<!-- Spark Kinesis client needs 2.4.4 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.4</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.examples.streaming.KinesisWatch</mainClass>
</transformer>
</transformers>
<artifactSet>
<excludes>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
<source>src/main/scala-2.10</source>
<source>src/test/scala</source>
<source>src/test/scala-2.10</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>

</build>

<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash
function usage_and_exit {
echo "usage $0 [release label (default: emr-4.5.0)] [key name] [instance type (default: m3.xlarge)]"
exit 1
}

if [ "$#" -lt 1 ]; then
usage_and_exit
fi

RELEASE_LABEL=emr-4.5.0
APPLICATIONS="Name=Spark Name=Hive"
KEY_NAME=
INSTANCE_TYPE=m3.xlarge

if [ "$#" -eq 1 ]; then
KEY_NAME=$1
elif [ "$#" -ne 3]; then
usage_and_exit
else
RELEASE_LABEL=$1
KEY_NAME=$2
INSTANCE_TYPE=$3
fi

INSTANCE_GROUPS="InstanceGroupType=MASTER,InstanceCount=1,BidPrice=0.08,InstanceType=$INSTANCE_TYPE InstanceGroupType=CORE,InstanceCount=2,BidPrice=0.08,InstanceType=$INSTANCE_TYPE"
BOOTSTRAP_ACTIONS="Path=s3://aws-bigdata-blog/artifacts/Querying_Amazon_Kinesis/DownloadKCLtoEMR400.sh,Name=InstallKCLLibs"

aws emr create-cluster --release-label $RELEASE_LABEL --applications $APPLICATIONS --ec2-attributes KeyName=$KEY_NAME --use-default-roles --instance-groups $INSTANCE_GROUPS --bootstrap-actions $BOOTSTRAP_ACTIONS --configurations https://s3-ap-southeast-1.amazonaws.com/helix-public/spark-defaults.json

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash
function usage_and_exit {
echo "usage $0 [stream name] [shard count]"
exit 1
}

if [ "$#" -ne 2 ]; then
usage_and_exit
fi

STREAM_NAME=$1
SHARD_COUNT=$2

aws kinesis create-stream --stream-name $STREAM_NAME --shard-count $SHARD_COUNT

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
__author__ = 'Amo Abeyaratne'

import string
import random
import time
from datetime import datetime
from boto import kinesis


def random_generator(size=6, chars=string.ascii_lowercase + string.digits):
return ''.join(random.choice(chars) for x in range(size))


#connecting to Kinesis stream

#region = 'us-east-1'
region = 'ap-southeast-1'
kinesisStreamName = 'js-development-stream'

kinesis = kinesis.connect_to_region(region)

# generating data and feeding kinesis.

while True:


y = random_generator(10,"techsummit2015")

urls = ['foo.com','amazon.com','testing.com','google.com','sydney.com']
x = random.randint(0,4)
userid = random.randint(25,35)+1200

now = datetime.now()
timeformatted = str(now.month) + "/" + str(now.day) + "/" + str(now.year) + " " + str(now.hour) + ":" +str(now.minute) + ":" + str(now.second)


#building the pay load for kinesis puts.

putString = str(userid)+','+'www.'+urls[x]+'/'+y+','+timeformatted
patitionKey = random.choice('abcdefghij')

# schema of the imput string now userid,url,timestamp

print putString

result = kinesis.put_record(kinesisStreamName,putString,patitionKey)

print result







Loading