Skip to content
This repository was archived by the owner on Jul 20, 2022. It is now read-only.

firehose use environment variables, use JDK 8 language features #88

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions aws-blog-firehose-lambda/kinesisFirehose/README
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@ The project is set up with a generic mvn archetype and the build occurs with Mav

Java 8 is the prescribed JDK to compile to.

Run mvn clean install to create the Lambda jar distribution (Java 8!!!)

Things to look for:
Package com.amazonaws.proserv.lambda houses the lambda functions
- at the top of each class in the is package are instance variables. YOU MUST EDIT these instance variables. At the time of this writing, there was not a way to pass parameters into Java-based
Lambda functions - and since a jar is used there is no need to extract them to propery files (but you very well can).

Run `mvn clean install` to create the Lambda jar distribution (Java 8!!!)

To use a newer AWS SDK for Java run `mvn clean install -Daws-sdk.version=1.23.456`

Helper classes are provided:
com.amazonaws.proserv.PopulateKinesisData
Expand All @@ -23,4 +18,12 @@ com.amazonaws.proserv.SampleAWSCredentialProvider
Configs if running locally:
- resources/AwsCredentials.properties - put your creds here if you are running on your local NOTE: make sure this file IS NOT EXCLUDED in your Pom.xml if running locally. It is excluded so as not to be packaged and distributed (a non-no).


Configuration for AWS Lambda or running locally to added to the environment:
- FIREHOSE_ENDPOINT_URL eg. `https://firehose.eu-west-1.amazonaws.com`
- FIREHOSE_SIGNING_REGION eg. `us-west-1`
- DELIVERY_STREAM_NAME eg. `DevBlogEvents`
- DELIVERY_STREAM_ROLE_ARN eg. `arn:aws:iam::<AWS Acct Id>:role/firehose_blog_role`
- TARGET_BUCKET_ARN eg. `arn:aws:s3:::dev-firebase-events`
- TARGET_PREFIX eg. `events/`
- INTERVAL_SEC eg. `60`
- BUFFER_SIZE_MB eg. `2`
8 changes: 4 additions & 4 deletions aws-blog-firehose-lambda/kinesisFirehose/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

<groupId>com.amazonaws.proserv</groupId>
<artifactId>firehoseLambda</artifactId>
<version>1.0</version>
<version>1.1</version>
<packaging>jar</packaging>

<name>Kinesis-Firehose</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<aws-sdk.version>1.10.24</aws-sdk.version>
<aws-sdk.version>1.11.184</aws-sdk.version>

<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,77 +1,81 @@
package com.amazonaws.proserv.lambda;


import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
import com.amazonaws.services.kinesisfirehose.model.*;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

import java.nio.ByteBuffer;
import java.util.Date;
import java.util.stream.IntStream;

/**
* Created by dgraeber on 6/18/2015.
* Updated by oliver.carr on 08/30/2017.
*/
public class KinesisToFirehose {
private String firehoseEndpointURL = "https://firehose.us-east-1.amazonaws.com";
private String deliveryStreamName = "blogfirehose";
private String deliveryStreamRoleARN = "arn:aws:iam::<AWS Acct Id>:role/firehose_blog_role";
private String targetBucketARN = "arn:aws:s3:::dgraeberaws-blogs";
private String targetPrefix = "blogoutput/";
private int intervalInSec = 60;
private int buffSizeInMB = 2;

private AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient();
private final String firehoseEndpointURL = System.getenv("FIREHOSE_ENDPOINT_URL");
private final String firehoseSigningRegion = System.getenv("FIREHOSE_SIGNING_REGION");
private final String deliveryStreamName = System.getenv("DELIVERY_STREAM_NAME");
private final String deliveryStreamRoleARN = System.getenv("DELIVERY_STREAM_ROLE_ARN");
private final String targetBucketARN = System.getenv("TARGET_BUCKET_ARN");
private final String targetPrefix = System.getenv("TARGET_PREFIX");
private final int intervalInSec = Integer.valueOf(System.getenv("INTERVAL_SEC"));
private final int buffSizeInMB = Integer.valueOf(System.getenv("BUFFER_SIZE_MB"));

private AmazonKinesisFirehose firehoseClient;
private LambdaLogger logger;

public void kinesisHandler(KinesisEvent event, Context context){
public void kinesisHandler(final KinesisEvent event, final Context context) {
logger = context.getLogger();
setup();
for(KinesisEvent.KinesisEventRecord rec : event.getRecords()) {
event.getRecords().forEach(rec -> {
logger.log("Got message ");
String msg = new String(rec.getKinesis().getData().array())+"\n";
Record deliveryStreamRecord = new Record().withData(ByteBuffer.wrap(msg.getBytes()));
final String msg = new String(rec.getKinesis().getData().array()) + "\n";
final Record deliveryStreamRecord = new Record().withData(ByteBuffer.wrap(msg.getBytes()));

PutRecordRequest putRecordRequest = new PutRecordRequest()
final PutRecordRequest putRecordRequest = new PutRecordRequest()
.withDeliveryStreamName(deliveryStreamName)
.withRecord(deliveryStreamRecord);

logger.log("Putting message");
firehoseClient.putRecord(putRecordRequest);
logger.log("Successful Put");
}
});
}

private void setup(){
firehoseClient = new AmazonKinesisFirehoseClient();
firehoseClient.setEndpoint(firehoseEndpointURL);
private void setup() {
firehoseClient = AmazonKinesisFirehoseClient.builder().withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(firehoseEndpointURL, firehoseSigningRegion)).build();
checkHoseStatus();
}

private void checkHoseStatus(){
DescribeDeliveryStreamRequest describeHoseRequest = new DescribeDeliveryStreamRequest()
private void checkHoseStatus() {
final DescribeDeliveryStreamRequest describeHoseRequest = new DescribeDeliveryStreamRequest()
.withDeliveryStreamName(deliveryStreamName);
DescribeDeliveryStreamResult describeHoseResult = null;
DescribeDeliveryStreamResult describeHoseResult;
String status = "UNDEFINED";
try {
describeHoseResult = firehoseClient.describeDeliveryStream(describeHoseRequest);
status = describeHoseResult.getDeliveryStreamDescription().getDeliveryStreamStatus();
} catch (Exception e) {
} catch (final Exception e) {
System.out.println(e.getLocalizedMessage());
logIt("Firehose Not Existent...will create");
createFirehose();
checkHoseStatus();
}
if(status.equalsIgnoreCase("ACTIVE")){
if (status.equalsIgnoreCase("ACTIVE")) {
logIt("Firehose ACTIVE");
//return;
}
else if(status.equalsIgnoreCase("CREATING")){
else if (status.equalsIgnoreCase("CREATING")) {
logIt("Firehose CREATING");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
e.printStackTrace();
}
checkHoseStatus();
Expand All @@ -81,73 +85,69 @@ else if(status.equalsIgnoreCase("CREATING")){
}
}

private void createFirehose(){
BufferingHints buffHints = new BufferingHints()
private void createFirehose() {
final BufferingHints buffHints = new BufferingHints()
.withIntervalInSeconds(intervalInSec)
.withSizeInMBs(buffSizeInMB);

S3DestinationConfiguration s3DestConf = new S3DestinationConfiguration()
final ExtendedS3DestinationConfiguration s3DestConf = new ExtendedS3DestinationConfiguration()
.withBucketARN(targetBucketARN)
.withCompressionFormat(CompressionFormat.UNCOMPRESSED)
.withPrefix(targetPrefix)
.withBufferingHints(buffHints)
.withRoleARN(deliveryStreamRoleARN);

CreateDeliveryStreamRequest createHoseRequest = new CreateDeliveryStreamRequest()
final CreateDeliveryStreamRequest createHoseRequest = new CreateDeliveryStreamRequest()
.withDeliveryStreamName(deliveryStreamName)
.withS3DestinationConfiguration(s3DestConf);
.withExtendedS3DestinationConfiguration(s3DestConf);

logIt("Sending create firehose request");
firehoseClient.createDeliveryStream(createHoseRequest);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
e.printStackTrace();
}
}


private void logIt(String message){
if(logger!=null)
private void logIt(final String message) {
if (logger != null) {
logger.log(message);
else
}
else {
System.out.println(message);
}

private void listFirehose(){
ListDeliveryStreamsRequest listHosesRequest = new ListDeliveryStreamsRequest();
ListDeliveryStreamsResult lhr = firehoseClient.listDeliveryStreams(listHosesRequest);

for(String name:lhr.getDeliveryStreamNames()){
logIt(name);
}
}

private void listFirehose() {
final ListDeliveryStreamsRequest listHosesRequest = new ListDeliveryStreamsRequest();
final ListDeliveryStreamsResult lhr = firehoseClient.listDeliveryStreams(listHosesRequest);
lhr.getDeliveryStreamNames().forEach(this::logIt);
}

private void deleteFirehose(){
private void deleteFirehose() {
deleteFirehose(deliveryStreamName);
}

private void deleteFirehose(String delivStreamName){
DeleteDeliveryStreamRequest deleteHoseRequest= new DeleteDeliveryStreamRequest();
deleteHoseRequest.setDeliveryStreamName(delivStreamName);
private void deleteFirehose(final String deliveryStreamName) {
final DeleteDeliveryStreamRequest deleteHoseRequest = new DeleteDeliveryStreamRequest();
deleteHoseRequest.setDeliveryStreamName(deliveryStreamName);
firehoseClient.deleteDeliveryStream(deleteHoseRequest);
}


private void putSampleMessages(){
private void putSampleMessages() {
setup();
for(int i = 0; i<20000; i++) {
String message = "{\"timestamp\":\"" + new Date().getTime() + "\"}";
Record record = new Record()
IntStream.range(0, 20000).forEach(idx -> {
final String message = "{\"timestamp\":\"" + System.currentTimeMillis() + "\"}";
final Record record = new Record()
.withData(ByteBuffer.wrap(message.getBytes()));
PutRecordRequest putRecordInHoseRequest = new PutRecordRequest()
final PutRecordRequest putRecordInHoseRequest = new PutRecordRequest()
.withDeliveryStreamName(deliveryStreamName)
.withRecord(record);

PutRecordResult res = firehoseClient.putRecord(putRecordInHoseRequest);
final PutRecordResult res = firehoseClient.putRecord(putRecordInHoseRequest);
logIt(res.toString());
}
});
}


Expand All @@ -159,9 +159,4 @@ private void putSampleMessages(){
//kinesisToFirehose.setup();
//kinesisToFirehose.putSampleMessages();
// }





}