Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/batch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
],
"dependencies": {
"@aws-lambda-powertools/commons": "2.28.1",
"@aws/lambda-invoke-store": "0.1.1",
"@standard-schema/spec": "^1.0.0"
},
"devDependencies": {
Expand Down
11 changes: 3 additions & 8 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
*/
public COLLECTOR_MAPPING;

/**
* Response to be returned after processing
*/
public batchResponse: PartialItemFailureResponse;

/**
* Type of event that the processor is handling
*/
Expand Down Expand Up @@ -200,9 +195,9 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
* Set up the processor with the initial state ready for processing
*/
public prepare(): void {
this.successMessages.length = 0;
this.failureMessages.length = 0;
this.errors.length = 0;
this.successMessages = [];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this because it seemed odd to be just setting the length to 0 rather than just resetting the variable as is done elsewhere.

this.failureMessages = [];
this.errors = [];
this.batchResponse = DEFAULT_RESPONSE;
}

Expand Down
85 changes: 54 additions & 31 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { BatchProcessingStore } from './BatchProcessingStore.js';
import type {
BaseRecord,
BatchProcessingOptions,
Expand All @@ -21,42 +22,64 @@ import type {
*/
abstract class BasePartialProcessor {
/**
* List of errors that occurred during processing
* Store for managing invocation-specific state
*/
public errors: Error[];
readonly #store = new BatchProcessingStore();

/**
* List of records that failed processing
*/
public failureMessages: EventSourceDataClassTypes[];
public get errors(): Error[] {
return this.#store.getErrors();
}

/**
* Record handler provided by customers to process records
*/
public handler: CallableFunction;
protected set errors(errors: Error[]) {
this.#store.setErrors(errors);
}

/**
* Options to be used during processing (optional)
*/
public options?: BatchProcessingOptions;
public get failureMessages(): EventSourceDataClassTypes[] {
return this.#store.getFailureMessages();
}

/**
* List of records to be processed
*/
public records: BaseRecord[];
protected set failureMessages(messages: EventSourceDataClassTypes[]) {
this.#store.setFailureMessages(messages);
}

/**
* List of records that were processed successfully
*/
public successMessages: EventSourceDataClassTypes[];

public constructor() {
this.successMessages = [];
this.failureMessages = [];
this.errors = [];
this.records = [];
// No-op function to avoid null checks, will be overridden by customer when using the class
this.handler = new Function();
public get handler(): CallableFunction {
return this.#store.getHandler();
}

protected set handler(handler: CallableFunction) {
this.#store.setHandler(handler);
}

public get options(): BatchProcessingOptions | undefined {
return this.#store.getOptions();
}

protected set options(options: BatchProcessingOptions | undefined) {
this.#store.setOptions(options);
}

public get records(): BaseRecord[] {
return this.#store.getRecords();
}

protected set records(records: BaseRecord[]) {
this.#store.setRecords(records);
}

public get successMessages(): EventSourceDataClassTypes[] {
return this.#store.getSuccessMessages();
}

protected set successMessages(messages: EventSourceDataClassTypes[]) {
this.#store.setSuccessMessages(messages);
}

protected get batchResponse() {
return this.#store.getBatchResponse();
}

protected set batchResponse(response) {
this.#store.setBatchResponse(response);
}

/**
Expand Down Expand Up @@ -196,7 +219,7 @@ abstract class BasePartialProcessor {
*
* We use a separate method to do this rather than the constructor
* to allow for reusing the processor instance across multiple invocations
* by instantiating the processor outside of the Lambda function handler.
* by instantiating the processor outside the Lambda function handler.
*
* @param records - Array of records to be processed
* @param handler - CallableFunction to process each record from the batch
Expand Down
159 changes: 159 additions & 0 deletions packages/batch/src/BatchProcessingStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import { InvokeStore } from '@aws/lambda-invoke-store';
import type {
BaseRecord,
BatchProcessingOptions,
EventSourceDataClassTypes,
PartialItemFailureResponse,
} from './types.js';

/**
* Manages storage of batch processing state with automatic context detection.
*
* This class abstracts the storage mechanism for batch processing state,
* automatically choosing between InvokeStore (when in Lambda context) and
* fallback instance variables (when outside Lambda context). The decision is
* made at runtime on every method call to support Lambda's concurrent execution
* isolation.
*/
class BatchProcessingStore {
readonly #recordsKey = Symbol('powertools.batch.records');
readonly #handlerKey = Symbol('powertools.batch.handler');
readonly #optionsKey = Symbol('powertools.batch.options');
readonly #failureMessagesKey = Symbol('powertools.batch.failureMessages');
readonly #successMessagesKey = Symbol('powertools.batch.successMessages');
readonly #batchResponseKey = Symbol('powertools.batch.batchResponse');
readonly #errorsKey = Symbol('powertools.batch.errors');

#fallbackRecords: BaseRecord[] = [];
#fallbackHandler: CallableFunction = () => {};
#fallbackOptions?: BatchProcessingOptions;
#fallbackFailureMessages: EventSourceDataClassTypes[] = [];
#fallbackSuccessMessages: EventSourceDataClassTypes[] = [];
#fallbackBatchResponse: PartialItemFailureResponse = {
batchItemFailures: [],
};
#fallbackErrors: Error[] = [];

public getRecords(): BaseRecord[] {
if (InvokeStore.getContext() === undefined) {
return this.#fallbackRecords;
}
return (InvokeStore.get(this.#recordsKey) as BaseRecord[]) ?? [];
}

public setRecords(records: BaseRecord[]): void {
if (InvokeStore.getContext() === undefined) {
this.#fallbackRecords = records;
return;
}
InvokeStore.set(this.#recordsKey, records);
}

public getHandler(): CallableFunction {
if (InvokeStore.getContext() === undefined) {
return this.#fallbackHandler;
}
return (
(InvokeStore.get(this.#handlerKey) as CallableFunction) ?? (() => {})
);
}

public setHandler(handler: CallableFunction): void {
if (InvokeStore.getContext() === undefined) {
this.#fallbackHandler = handler;
return;
}
InvokeStore.set(this.#handlerKey, handler);
}

public getOptions(): BatchProcessingOptions | undefined {
if (InvokeStore.getContext() === undefined) {
return this.#fallbackOptions;
}
return InvokeStore.get(this.#optionsKey) as
| BatchProcessingOptions
| undefined;
}

public setOptions(options: BatchProcessingOptions | undefined): void {
if (InvokeStore.getContext() === undefined) {
this.#fallbackOptions = options;
return;
}
InvokeStore.set(this.#optionsKey, options);
}

public getFailureMessages(): EventSourceDataClassTypes[] {
if (InvokeStore.getContext() === undefined) {
return this.#fallbackFailureMessages;
}
return (
(InvokeStore.get(
this.#failureMessagesKey
) as EventSourceDataClassTypes[]) ?? []
);
}

public setFailureMessages(messages: EventSourceDataClassTypes[]): void {
if (InvokeStore.getContext() === undefined) {
this.#fallbackFailureMessages = messages;
return;
}
InvokeStore.set(this.#failureMessagesKey, messages);
}

public getSuccessMessages(): EventSourceDataClassTypes[] {
if (InvokeStore.getContext() === undefined) {
return this.#fallbackSuccessMessages;
}
return (
(InvokeStore.get(
this.#successMessagesKey
) as EventSourceDataClassTypes[]) ?? []
);
}

public setSuccessMessages(messages: EventSourceDataClassTypes[]): void {
if (InvokeStore.getContext() === undefined) {
this.#fallbackSuccessMessages = messages;
return;
}
InvokeStore.set(this.#successMessagesKey, messages);
}

public getBatchResponse(): PartialItemFailureResponse {
if (InvokeStore.getContext() === undefined) {
return this.#fallbackBatchResponse;
}
return (
(InvokeStore.get(
this.#batchResponseKey
) as PartialItemFailureResponse) ?? { batchItemFailures: [] }
);
}

public setBatchResponse(response: PartialItemFailureResponse): void {
if (InvokeStore.getContext() === undefined) {
this.#fallbackBatchResponse = response;
return;
}
InvokeStore.set(this.#batchResponseKey, response);
}

public getErrors(): Error[] {
if (InvokeStore.getContext() === undefined) {
return this.#fallbackErrors;
}
return (InvokeStore.get(this.#errorsKey) as Error[]) ?? [];
}

public setErrors(errors: Error[]): void {
if (InvokeStore.getContext() === undefined) {
this.#fallbackErrors = errors;
return;
}
InvokeStore.set(this.#errorsKey, errors);
}
}

export { BatchProcessingStore };
6 changes: 5 additions & 1 deletion packages/batch/src/SqsFifoPartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
const remainingRecords = this.records.slice(firstFailureIndex);

for (const record of remainingRecords) {
this.#processFailRecord(record, new SqsFifoShortCircuitError());
const result = this.#processFailRecord(
record,
new SqsFifoShortCircuitError()
);
processedRecords.push(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we miss having this previously?

}

this.clean();
Expand Down
6 changes: 5 additions & 1 deletion packages/batch/src/SqsFifoPartialProcessorAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
const remainingRecords = this.records.slice(firstFailureIndex);

for (const record of remainingRecords) {
this.#processFailRecord(record, new SqsFifoShortCircuitError());
const result = this.#processFailRecord(
record,
new SqsFifoShortCircuitError()
);
processedRecords.push(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in here, was there a bug because of this?

}

this.clean();
Expand Down
Loading
Loading