Skip to content
This repository was archived by the owner on Jul 20, 2022. It is now read-only.

Serverless-CF-Analysis #83

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions aws-blog-serverless-cf-analysis/Config
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*-perl-*-

package.Aws-lambda-athena = {
interfaces = (1.0);

deploy = {
generic = true;
};

build-environment = {
chroot = basic;
network-access = blocked;
};

# Use NoOpBuild. See https://w.amazon.com/index.php/BrazilBuildSystem/NoOpBuild
build-system = no-op;
build-tools = {
1.0 = {
NoOpBuild = 1.0;
};
};

# Use runtime-dependencies for when you want to bring in additional
# packages when deploying.
# Use dependencies instead if you intend for these dependencies to
# be exported to other packages that build against you.
dependencies = {
1.0 = {
};
};

runtime-dependencies = {
1.0 = {
};
};

};
94 changes: 94 additions & 0 deletions aws-blog-serverless-cf-analysis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws.services.lambda</groupId>
<artifactId>aws-lambda-athena</artifactId>
<version>1.0.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.1.0</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>1.3.0</version>
</dependency>

<!--
From lib directory:
mvn install:install-file -Dfile=./AthenaJDBC41-1.0.0.jar -DgroupId=com.amazonaws -DartifactId=athena.jdbc41 -Dversion=1.0.0 -Dpackaging=jar -DgeneratePom=true
-->

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>athena.jdbc41</artifactId>
<version>1.0.0</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.7.5</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>


</project>
83 changes: 83 additions & 0 deletions aws-blog-serverless-cf-analysis/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
Description
-----------

Creates partitions in Athena on behalf of files added to S3 that use a `/year/month/day/hour/` key prefix.

Build
-----

As a one-off operation, you'll need to install the Athena JDBC driver into a lib folder, and then add it to your local Maven repository so that it can be incorporated into the final jar:

```
mkdir lib
aws s3 cp s3://athena-downloads/drivers/AthenaJDBC41-1.0.1.jar lib/
mvn install:install-file -Dfile=lib/AthenaJDBC41-1.0.1.jar -DgroupId=com.amazonaws -DartifactId=athena.jdbc41 -Dversion=1.0.0 -Dpackaging=jar -DgeneratePom=true
```

And then, to build:

```
mvn clean compile verify
```

Create an IAM Role
------------------

Before you create a Lambda function, you will need to create an IAM role that allows Lambda to execute queries in Athena. Create a role named `lambda_athena_exec_role` and attach the following managed policies to the role: AmazonS3FullAccess, AmazonAthenaFullAccess.

Add this inline access policy:

```
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
```

And attach the following trust relationship to enable Lambda to assume the role:

```
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
```

Create a Lambda Function to Add Partitions to Athena
----------------------------------------------------

Create a Lambda function that can be associated with S3 new object event notifications. When creating the function, you'll need to set several environment variables:

- `PARTITION_TYPE` Supply one of the following values: `Month`, `Day` or `Hour`. This environment variable is optional: if you omit it, the function will default to `Day`.
- `TABLE_NAME` Use the format ``<database>.<table_name>`. For example, `sampledb.vpc_flow_logs`.
- `S3_STAGING_DIR` An Amazon S3 location to which your query output will be written. (Although the Lambda function is only executing DDL statements, Athena still writes an output file to S3.)
- `ATHENA_REGION` The region in which Athena is located (e.g. `us-east-1`).
- `DDB_TABLE_NAME` The name of the DynamoDB table holding partition information.

Specify the handler and an existing role:

- *Handler:* `com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3Event::handleRequest`
- *Existing role:* `lambda_athena_exec_role`

Set the timeout to one minute.



Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.amazonaws.services.lambda;


import com.amazonaws.services.lambda.model.Partition;
import com.amazonaws.services.lambda.model.PartitionConfig;
import com.amazonaws.services.lambda.model.S3Object;
import com.amazonaws.services.lambda.model.TableService;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.s3.event.S3EventNotification;

import java.util.Collection;
import java.util.HashSet;
import java.util.stream.Collectors;

public class CreateAthenaPartitionsBasedOnS3Event implements RequestHandler<S3Event, Void> {

private final PartitionConfig partitionConfig;

public CreateAthenaPartitionsBasedOnS3Event() {
this(PartitionConfig.fromEnv());
}

CreateAthenaPartitionsBasedOnS3Event(PartitionConfig partitionConfig) {
this.partitionConfig = partitionConfig;
}

@Override
public Void handleRequest(S3Event s3Event, Context context) {

Collection<Partition> requiredPartitions = new HashSet<>();
TableService tableService = new TableService();

for (S3EventNotification.S3EventNotificationRecord record : s3Event.getRecords()) {

String bucket = record.getS3().getBucket().getName();
String key = record.getS3().getObject().getKey();

System.out.printf("S3 event [Event: %s, Bucket: %s, Key: %s]%n", record.getEventName(), bucket, key);

S3Object s3Object = new S3Object(bucket, key);

if (s3Object.hasDateTimeKey()) {
requiredPartitions.add(partitionConfig.createPartitionFor(s3Object));
}
}

if (!requiredPartitions.isEmpty()) {
Collection<Partition> missingPartitions = determineMissingPartitions(
partitionConfig.tableName(),
requiredPartitions,
tableService);
tableService.addPartitions(partitionConfig.tableName(), missingPartitions);
}

return null;
}

// We could use DynamoDB to store a list of existing partitions – quick then to check which of the required
// partitions already exist.
private Collection<Partition> determineMissingPartitions(String tableName, Collection<Partition> requiredPartitions, TableService tableService) {

Collection<String> existingPartitions = tableService.getExistingPartitions(tableName);

return requiredPartitions.stream()
.filter(p -> !existingPartitions.contains(p.spec()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.amazonaws.services.lambda;

import com.amazonaws.athena.jdbc.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.services.lambda.model.*;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.PutItemSpec;
import com.amazonaws.services.dynamodbv2.document.utils.NameMap;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.lambda.model.PartitionConfig;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.s3.event.S3EventNotification;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.stream.Collectors;

public class CreateAthenaPartitionsBasedOnS3EventWithDDB implements RequestHandler<S3Event, Void> {

private final PartitionConfig partitionConfig;

public CreateAthenaPartitionsBasedOnS3EventWithDDB() {
this(PartitionConfig.fromEnv());
}

CreateAthenaPartitionsBasedOnS3EventWithDDB(PartitionConfig partitionConfig) {
this.partitionConfig = partitionConfig;
}

@Override
public Void handleRequest(S3Event s3Event, Context context){

Collection<Partition>requiredPartitions = new HashSet<>();
TableService tableService = new TableService();
DynamoDB dynamoDBClient=new DynamoDB(new AmazonDynamoDBClient(new EnvironmentVariableCredentialsProvider()));

for(S3EventNotification.S3EventNotificationRecord record:s3Event.getRecords()){

String bucket=record.getS3().getBucket().getName();
String key=record.getS3().getObject().getKey();

System.out.printf("S3event[Event:%s,Bucket:%s,Key:%s]%n",record.getEventName(),bucket,key);

S3Object s3Object=new S3Object(bucket,key);

if(s3Object.hasDateTimeKey()){
Partition partition = partitionConfig.createPartitionFor(s3Object);

//Check if the partition exists in DynamoDBtable, if not add the partition details to the table, skip otherwise
if (tryAddMissingPartition(partitionConfig.dynamoDBTableName(), dynamoDBClient, partition)) {
requiredPartitions.add(partition);
}
}
}

if(!requiredPartitions.isEmpty()){
tableService.addPartitions(partitionConfig.tableName(),requiredPartitions, true);
}

return null;
}

//ReturntrueifpartitionisnotinDynamoDBandaddthepartition,falseotherwise
private boolean tryAddMissingPartition(String dyanmoDBTaableName,DynamoDB dynamoDBClient, Partition partition){

Table ddbTable= dynamoDBClient.getTable(dyanmoDBTaableName);

Item item=new Item()
.withPrimaryKey("PartitionSpec",partition.spec())
.withString("PartitionPath",partition.path())
.withString("PartitionName", partition.name());

PutItemSpec itemSpec=new PutItemSpec()
.withItem(item)
.withConditionExpression("attribute_not_exists(#ps)")
.withNameMap(new NameMap()
.with("#ps","PartitionSpec"));

try{
ddbTable.putItem(itemSpec);
System.out.println("Item was added to the table.PartitionSpec="+partition.spec()+"; Path="+partition.path());
return true;
}
catch(ConditionalCheckFailedException e){
System.out.println(e.toString());
System.out.println("Item already exists. PartitionSpec="+partition.spec()+"; Path="+partition.path());
return false;
}
}
}
Loading