diff --git a/src/kinesis_lambda_es.js b/src/kinesis_lambda_es.js index c042c46..fe89cfa 100644 --- a/src/kinesis_lambda_es.js +++ b/src/kinesis_lambda_es.js @@ -17,6 +17,7 @@ /* == Imports == */ var AWS = require('aws-sdk'); var path = require('path'); +var when = require('when'); /* == Globals == */ var esDomain = { @@ -39,17 +40,21 @@ var creds = new AWS.EnvironmentCredentials('AWS'); /* Lambda "main": Execution begins here */ exports.handler = function(event, context) { console.log(JSON.stringify(event, null, ' ')); + var promises = []; event.Records.forEach(function(record) { var jsonDoc = new Buffer(record.kinesis.data, 'base64'); - postToES(jsonDoc.toString(), context); + postToES(jsonDoc.toString(), context, promises); }); + when.all(promises).then(function(res) { + context.succeed('Lambda Event Processed'); + }) } /* * Post the given document to Elasticsearch */ -function postToES(doc, context) { +function postToES(doc, context, promises) { var req = new AWS.HttpRequest(endpoint); req.method = 'POST'; @@ -63,6 +68,7 @@ function postToES(doc, context) { signer.addAuthorization(creds, new Date()); var send = new AWS.NodeHttpClient(); + var deferred = when.defer(); send.handleRequest(req, null, function(httpResp) { var respBody = ''; httpResp.on('data', function (chunk) { @@ -70,10 +76,11 @@ function postToES(doc, context) { }); httpResp.on('end', function (chunk) { console.log('Response: ' + respBody); - context.succeed('Lambda added document ' + doc); + promises.push(deferred.reject); }); }, function(err) { console.log('Error: ' + err); context.fail('Lambda failed with error ' + err); + promises.push(deferred.reject); }); }