You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/utilities/batch.md
+49-4Lines changed: 49 additions & 4 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -3,12 +3,12 @@ title: Batch Processing
3
3
description: Utility
4
4
---
5
5
6
-
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
6
+
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, Amazon DynamoDB Streams, and Amazon MSK/self-managed Apache Kafka.
When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
41
+
When using SQS, Kinesis Data Streams, DynamoDB Streams, or Amazon MSK/Apache Kafka as a Lambda event source, your Lambda functions are triggered with a batch of messages.
42
42
43
43
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** records expire.
44
44
@@ -55,13 +55,14 @@ This behavior changes when you enable Report Batch Item Failures feature in your
55
55
<!-- markdownlint-disable MD013 -->
56
56
*[**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
57
57
*[**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
58
+
*[**Kafka (MSK and self-managed)**](#processing-messages-from-kafka). Failed records are identified by topic-partition and offset. Only failed records will be retried.
58
59
59
60
<!-- HTML tags are required in admonition content thus increasing line length beyond our limits -->
60
61
<!-- markdownlint-disable MD013 -->
61
62
???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it"
62
63
We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible.
63
64
64
-
You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.
65
+
You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"}, or [MSK/Kafka](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html){target="_blank"} in the AWS Documentation.
65
66
66
67
## Getting started
67
68
@@ -93,6 +94,12 @@ The remaining sections of the documentation will rely on these samples. For comp
Processing batches from Amazon MSK or self-managed Apache Kafka works in three stages:
250
+
251
+
1. Instantiate **`BatchProcessor`** and choose **`EventType.Kafka`** for the event type
252
+
2. Define your function to handle each batch record, and use [`KafkaEventRecord`](data_classes.md#kafka){target="_blank"} type annotation for autocompletion
253
+
3. Use **`process_partial_response`** to kick off processing
254
+
255
+
!!! info "This works with both MSK and self-managed Apache Kafka"
256
+
The batch processor automatically handles the different event structures from MSK and self-managed Kafka clusters.
Use `record.json_value` to get the deserialized JSON body from the Kafka record. For raw bytes access, use `record.decoded_value`.
282
+
283
+
For advanced deserialization (Avro, Protobuf), see the [Kafka Consumer utility](kafka.md){target="_blank"} which can be used alongside the batch processor.
284
+
240
285
### Error handling
241
286
242
287
By default, we catch any exception raised by your record handler function. This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution.
Copy file name to clipboardExpand all lines: docs/utilities/kafka.md
+15Lines changed: 15 additions & 0 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -325,6 +325,21 @@ The [idempotency utility](idempotency.md){target="_blank"} automatically stores
325
325
326
326
TIP: By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once.
327
327
328
+
### Handling partial batch failures
329
+
330
+
When processing Kafka messages, individual records may fail while others succeed. By default, Lambda retries the entire batch when any record fails. To retry only the failed records, use the [Batch Processing utility](batch.md#processing-messages-from-kafka){target="_blank"} with `EventType.Kafka`.
331
+
332
+
This feature allows Lambda to checkpoint successful records and only retry the failed ones, significantly improving processing efficiency and reducing duplicate processing.
The Batch Processing utility uses the basic `KafkaEventRecord` data class. For advanced deserialization (Avro, Protobuf), you can use the Kafka Consumer's deserialization utilities inside your record handler function.
0 commit comments