lambda-stash is an AWS Lambda script for shipping data from S3 or other cloud
data sources to data stores, like Elasticsearch.
-
Supported input formats: AWS Cloudfront access logs, AWS Cloudtrail API logs, AWS CloudWatch logs, AWS Config change logs, AWS Elastic Load Balancer access logs, AWS S3 access logs, and other formats by implementing a custom handler.
-
Supported output formats: JSON, plain text key-value pairs.
-
Supported shipping methods: Elasticsearch (includes signing for AWS domains), HTTP/S POST, TCP socket.
-
Set up a Lambda deployment package that includes
lambda-stashand a script with your configuration. See the example included (underexample/) and the configuration documentation below to get started. -
Use the AWS Management Console to create a Lambda function using the Node.js 12.x runtime. Upload your package, configure it with event sources as desired (S3 buckets or CloudWatch logs). Be sure the Lambda function has an IAM role with any necessary permissions, like getting data from an S3 bucket or accessing an AWS Elasticsearch domain.
-
Check CloudWatch Logs, where a Log Group should be created once the Lambda script runs the first time. Review the logs to make sure the script finishes successfully within the time allowed. You can set up a CloudWatch Alarm to be notified if an error occurs and to track for how long the function is running. The elasticsearch client timeout default is 30 seconds. You can set config.elasticsearch.requestTimeout to milliseconds or 'Infinity'.
lambda-stash is intended to be implemented by a script that provides the
configuration, such as what processors to use for specific events.
See the included example in example/ and the handlers documentation below for
details on how to implement the handlers.
Converts an array of objects to key-value strings with the format:
prefix key1="value1" key2="value2" ... suffix
Inputs:
config.datashould be an array of data objects.config.string.prefix(optional) is a string to add at the beginning of each output string.config.string.suffix(optional) is a string to add at the end of each output string.
Outputs:
config.datais transformed to a string with the result of the handler.
Decodes a Base64 encoded string.
Inputs:
config.datashould be a string in Base64 format.
Outputs:
config.datais transformed to a buffer with the decoded data.
Decompresses gzipped data.
Inputs:
config.datashould be a buffer in gzip format.
Outputs:
config.datais transformed to a string with the decompressed data.
Processes parsed data from Cloudfront access logs and normalizes it in key-value objects.
Raw Cloudfront access log files in S3 should be processed with decompressGzip
and parseTabs before using this format handler.
Inputs:
-
config.datashould be an array of log records, each of which is an array of field values. -
config.dateField(optional) is the key name in the output that should contain the log's date time value (combined from the date and time fields).
Outputs:
config.datais transformed to an array of objects with log data.
Processes parsed data from Cloudtrail logs and normalizes it in key-value objects.
Raw Cloudtrail access log files in S3 should be processed with decompressGzip
and parseJson before using this format handler.
Inputs:
-
config.datashould have aRecordsproperty that is an array of objects for each log item. -
config.dateField(optional) is the key name in the output that should contain the log's date time value.
Outputs:
config.datais transformed to an array of objects with log data.
Processes parsed data from CloudWatch logs and normalizes it in key-value objects. Includes handling for Lambda function start, end, and report logs.
This handler is meant to be used with data provided through a CloudWatch Logs
subscription event, which is automatically processed with decodeBase64,
decompressGzip, and parseJson.
Inputs:
-
config.datashould have alogEventsproperty that is an array of objects for each log item. -
config.dateField(optional) is the key name in the output that should contain the log's date time value.
Outputs:
config.datais transformed to an array of objects with log data.
Processes parsed data from AWS Config logs and normalizes it in key-value objects.
Raw Config log files in S3 should be processed with decompressGzip
and parseJson before using this format handler.
Inputs:
-
config.datashould have aconfigurationItemsproperty that is an array of objects for each log item. -
config.dateField(optional) is the key name in the output that should contain the log's date time value.
Outputs:
config.datais transformed to an array of objects with log data.
Processes parsed data from AWS Elastic Load Balancer (ELB) Classic Load Balancer (ELB version 1) logs and normalizes it in key-value objects.
Raw ELB log files in S3 should be processed with parseSpaces before using this
format handler.
Inputs:
-
config.datashould be an array (one item per log record) of arrays (one item per record field). -
config.dateField(optional) is the key name in the output that should contain the log's date time value.
Outputs:
config.datais transformed to an array of objects with log data.
Processes parsed data from AWS Elastic Load Balancer (ELB) Application Load Balancer (ELB version 2) logs and normalizes it in key-value objects.
Raw ELB log files in S3 should be processed with parseSpaces before using this
format handler.
Inputs:
-
config.datashould be an array (one item per log record) of arrays (one item per record field). -
config.dateField(optional) is the key name in the output that should contain the log's date time value.
Outputs:
config.datais transformed to an array of objects with log data.
Processes parsed data from AWS Simple Storage Service (S3) access logs and normalizes it in key-value objects.
Raw S3 log files should be processed with parseSpaces before using this
format handler.
Inputs:
-
config.datashould be an array (one item per log record) of arrays (one item per record field). -
config.dateField(optional) is the key name in the output that should contain the log's date time value.
Outputs:
config.datais transformed to an array of objects with log data.
Fetches an object from S3.
Inputs:
-
config.S3.srcBucketis the S3 bucket name for the object to fetch. -
config.S3.srcKeyis the S3 key name for the object to fetch.
Outputs:
config.datais set to the buffer with the S3 object data.
Transforms output data into a series of JSON strings delimited by a newline. This output format is compatible with the Loggly HTTP/S bulk endpoint.
Inputs:
config.datais an array of objects with log data.
Outputs:
config.datais transformed into JSON strings delimited by newlines.
Parses CSV data into an array of records, each of which is an array of column values.
Inputs:
config.datais a string with CSV data.
Outputs:
config.datais transformed into an array of records, each of which is an array of strings with column values.
Parses a JSON string into its equivalent JavaScript object.
Inputs:
config.datais a JSON string.
Outputs:
config.datais the equivalent JavaScript object.
Parses tab separated value data into an array of records, each of which is an array of column values.
Inputs:
config.datais a string with tab separated data.
Outputs:
config.datais transformed into an array of records, each of which is an array of strings with column values.
Ships the data to an Elasticsearch index. The Elasticsearch _index and _type
values are configurable, and support for the AWS Elasticsearch request signing
process is provided.
Inputs:
-
config.datais an array of objects with log data. -
config.elasticsearch.hostis an HTTP/S URL to the Elasticsearch domain to which to send data. -
config.elasticsearch.indexis a string with the_indexvalue to add to log objects. -
config.elasticsearch.typeis a string with the_typevalue to add to log objects. -
config.elasticsearch.useAWSshould be set totrueto connect with an AWS Elasticsearch domain and automatically sign requests for AWS. -
config.elasticsearch.regionis the AWS region where the AWS Elasticsearch domain is hosted. -
config.elasticsearch.accessKey(optional) is the AWS access key to use for signing the request. If an access key and secret key are not provided, the environment credentials are used instead (i.e. the IAM role under which the Lambda function is running). -
config.elasticsearch.secretKey(optional) is the AWS secret key to use for signing the request. -
config.elasticsearch.maxChunkSize(optional) is the maximum number of log items to ship in one request. By default, set to 1000. -
config.elasticsearch.requestTimeout(optional) is the Elasticsearch client timeout in milliseconds or can be set toInfinityfor no timeout. The Elasticsearch client's default is 30000 (30 seconds).
Ships the data to an HTTP/S endpoint.
Inputs:
-
config.datais the string data to ship. An alternate key can be configured. -
config.http.urlis an HTTP/S URL to which to send data. -
config.http.keyData(optional) is the key name in theconfigobject of the data to ship. By default, set todata.
Ships the data to a TCP service.
Inputs:
-
config.datais the string data to ship. An alternate key can be configured. -
config.tcp.hostis a hostname to which to open a TCP socket. -
config.tcp.portis a port on which to open the socket. -
config.tcp.keyData(optional) is the key name in theconfigobject of the data to ship. By default, set todata.
The following script can be used to ship Cloudtrail logs from S3 to an AWS Elasticsearch instance.
var shipper = require('lambda-stash');
var config = {
elasticsearch: {
host: 'https://search-test-es-abcdef...',
region: 'us-east-1',
useAWS: true
},
mappings: [
{
bucket: 'my-cloudtrail-logs',
processors: [
'decompressGzip',
'parseJson',
'formatCloudtrail',
'shipElasticsearch'
],
elasticsearch: {
index: 'logs',
type: 'cloudtrail'
},
dateField: 'date'
}
]
};
shipper.handler(config, event, context, callback);