Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 58 additions & 23 deletions _data-prepper/pipelines/configuration/processors/aws-lambda.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,30 @@ You can configure the processor using the following configuration options.

Field | Type | Required | Description
-------------------- | ------- | -------- | ----------------------------------------------------------------------------
`function_name` | String | Required | The name of the AWS Lambda function to invoke.
`invocation_type` | String | Required | Specifies the invocation type, either `request-response` or `event`. Default is `request-response`.
`function_name` | String | Required | The name of the AWS Lambda function to invoke. Must be 3--500 characters.
`invocation_type` | String | Optional | Specifies the invocation type, either `request-response` or `event`. Default is `request-response`.
`aws.region` | String | Required | The AWS Region in which the Lambda function is located.
`aws.sts_role_arn` | String | Optional | The Amazon Resource Name (ARN) of the role to assume before invoking the Lambda function.
`max_retries` | Integer | Optional | The maximum number of retries for failed invocations. Default is `3`.
`aws.sts_role_arn` | String | Optional | The Amazon Resource Name (ARN) of the role to assume before invoking the Lambda function. Must be 20--2048 characters.
`aws.sts_external_id` | String | Optional | An external ID for STS role assumption. Must be 2--1224 characters.
`aws.sts_header_overrides` | Map | Optional | STS header overrides. Maximum of 5 headers supported.
`client.max_retries` | Integer | Optional | The maximum number of retries for failed invocations. Default is `3`.
`client.api_call_timeout` | Duration | Optional | The API call timeout. Default is `60s`.
`client.connection_timeout` | Duration | Optional | The SDK connection timeout. Default is `60s`.
`client.max_concurrency` | Integer | Optional | The maximum number of concurrent threads on the client. Default is `200`.
`client.base_delay` | Duration | Optional | The base delay for the exponential backoff. Default is `100ms`.
`client.max_backoff` | Duration | Optional | The maximum backoff time for the exponential backoff. Default is `20s`.
`batch` | Object | Optional | The batch settings for the Lambda invocations. Default is `key_name = "events"`. Default threshold is `event_count=100`, `maximum_size="5mb"`, and `event_collect_timeout = 10s`.
`lambda_when` | String | Optional | A conditional expression that determines when to invoke the Lambda processor.
`response_codec` | Object | Optional | A codec configuration for parsing Lambda responses. Default is `json`.
`tags_on_match_failure` | List | Optional | A list of tags to add to events when Lambda matching fails or encounters an unexpected error.
`sdk_timeout` | Duration| Optional | Configures the SDK's client connection timeout period. Default is `60s`.
`tags_on_failure` | List | Optional | A list of tags to add to events when the Lambda function fails or encounters an exception.
`response_events_match` | Boolean | Optional | Specifies how Data Prepper interprets and processes Lambda function responses. Default is `false`.
`client` | Object | Optional | The client configuration.
`api_call_timeout` | Duration | Optional | The amount of time that the SDK maintains the API call before timing out, in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations).
`base_delay` | Duration | Optional | The base delay for exponential backoff, in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations).
`connection_timeout` | Duration | Optional | The amount of time that the SDK maintains the connection to the client before timing out, in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations).
`max_backoff` | Duration | Optional | The maximum backoff time for exponential backoff, in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations).
`max_concurrency` | Integer | Optional | The maximum concurrency defined on the client side.
`max_retries` | Integer | Optional | The maximum number of retries before failing.

`response_mode` | String | Optional | The response handling mode, either `replace` or `merge`. Default is `replace`.
`keys` | List | Optional | Keys to send to the Lambda function.
`cache` | Object | Optional | The cache configuration. Only valid when `response_mode` is `merge` and `keys` are specified.
`cache.ttl` | Long | Optional | The cache time-to-live.
`cache.max_size` | Long | Optional | The maximum cache size. Must be between 1048576 and 10485760.
`circuit_breaker_retries` | Integer | Optional | The maximum number of circuit breaker checks before proceeding. Default is `0`.
`circuit_breaker_wait_interval` | Long | Optional | The amount of time, in milliseconds, between circuit breaker checks. Default is `1000ms`.

#### Example configuration

Expand All @@ -48,20 +53,31 @@ processors:
function_name: "my-lambda-function"
invocation_type: "request-response"
response_events_match: false
client:
connection_timeout: PT5M
api_call_timeout: PT5M
response_mode: "replace"
aws:
region: "us-east-1"
sts_role_arn: "arn:aws:iam::123456789012:role/my-lambda-role"
max_retries: 3
client:
max_retries: 3
api_call_timeout: "PT60S"
connection_timeout: "PT60S"
max_concurrency: 200
base_delay: "PT0.1S"
max_backoff: "PT20S"
batch:
key_name: "events"
threshold:
event_count: 100
maximum_size: "5mb"
event_collect_timeout: PT10S
lambda_when: "event['status'] == 'process'"
event_collect_timeout: "PT10S"
lambda_when: "/some_key == null"
keys: ["key1", "key2"]
cache:
ttl: 3600
max_size: 5242880
circuit_breaker_retries: 0
circuit_breaker_wait_interval: 1000
tags_on_failure: ["lambda_failed"]
```
{% include copy.html %}

Expand All @@ -71,9 +87,28 @@ The processor supports the following invocation types:

- `request-response`: The processor waits for Lambda function completion before proceeding.
- `event`: The function is triggered asynchronously without waiting for a response.
- `batch`: When enabled, events are aggregated and sent in bulk to optimize Lambda invocations. Batch thresholds control the event count, size limit, and timeout.
- `codec`: JSON is used for both request and response codecs. Lambda must return JSON array outputs.
- `tags_on_match_failure`: Custom tags can be applied to events when Lambda processing fails or encounters unexpected issues.

### Batch processing

When enabled, events are aggregated and sent in bulk to optimize Lambda invocations. Batch thresholds control the event count, size limit, and timeout.

### Response handling

The processor supports two response modes:
- `replace`: Lambda response replaces the original event data (default)
- `merge`: Lambda response is merged with the original event data

### Caching

When `response_mode` is set to `merge` and `keys` are specified, the processor can cache Lambda responses to improve performance for repeated requests.

### Circuit breaker

The processor includes circuit breaker functionality to handle memory pressure situations gracefully.

### Tags on failure

Custom tags can be applied to events when Lambda processing fails or encounters exceptions using the `tags_on_failure` configuration.

## Behavior

Expand Down