Skip to content

improv(idempotency): consolidate internal implementation #1642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 211 additions & 72 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import type { JSONValue } from '@aws-lambda-powertools/commons';
import type {
JSONValue,
MiddyLikeRequest,
} from '@aws-lambda-powertools/commons';
import type { AnyFunction, IdempotencyHandlerOptions } from './types';
import { IdempotencyRecordStatus } from './types';
import {
Expand Down Expand Up @@ -34,7 +37,7 @@ export class IdempotencyHandler<Func extends AnyFunction> {
*
* This is the argument that is used for the idempotency.
*/
readonly #functionPayloadToBeHashed: JSONValue;
#functionPayloadToBeHashed: JSONValue;
/**
* Reference to the function to be made idempotent.
*/
Expand Down Expand Up @@ -68,9 +71,17 @@ export class IdempotencyHandler<Func extends AnyFunction> {
});
}

/**
* Takes an idempotency key and returns the idempotency record from the persistence layer.
*
* If the idempotency record is not COMPLETE, then it will throw an error based on the status of the record.
*
* @param idempotencyRecord The idempotency record stored in the persistence layer
* @returns The result of the function if the idempotency record is in a terminal state
*/
public static determineResultFromIdempotencyRecord(
idempotencyRecord: IdempotencyRecord
): Promise<unknown> | unknown {
): JSONValue {
if (idempotencyRecord.getStatus() === IdempotencyRecordStatus.EXPIRED) {
throw new IdempotencyInconsistentStateError(
'Item has expired during processing and may not longer be valid.'
Expand All @@ -96,50 +107,55 @@ export class IdempotencyHandler<Func extends AnyFunction> {
return idempotencyRecord.getResponse();
}

/**
* Execute the handler and return the result.
*
* If the handler fails, the idempotency record will be deleted.
* If it succeeds, the idempotency record will be updated with the result.
*
* @returns The result of the function execution
*/
public async getFunctionResult(): Promise<ReturnType<Func>> {
let result;
try {
result = await this.#functionToMakeIdempotent(...this.#functionArguments);
} catch (e) {
try {
await this.#persistenceStore.deleteRecord(
this.#functionPayloadToBeHashed
);
} catch (e) {
throw new IdempotencyPersistenceLayerError(
'Failed to delete record from idempotency store',
e as Error
);
}
throw e;
}
try {
await this.#persistenceStore.saveSuccess(
this.#functionPayloadToBeHashed,
result
);
} catch (e) {
throw new IdempotencyPersistenceLayerError(
'Failed to update success record to idempotency store',
e as Error
);
} catch (error) {
await this.#deleteInProgressRecord();
throw error;
}
await this.#saveSuccessfullResult(result);

return result;
}

/**
* Main entry point for the handler
* Entry point to handle the idempotency logic.
*
* Before the handler is executed, we need to check if there is already an
* execution in progress for the given idempotency key. If there is, we
* need to determine its status and return the appropriate response or
* throw an error.
*
* If there is no execution in progress, we need to save a record to the
* idempotency store to indicate that an execution is in progress.
*
* In some rare cases, when the persistent state changes in small time
* window, we might get an `IdempotencyInconsistentStateError`. In such
* cases we can safely retry the handling a few times.
*/
public async handle(): Promise<ReturnType<Func>> {
// early return if we should skip idempotency completely
if (this.shouldSkipIdempotency()) {
return await this.#functionToMakeIdempotent(...this.#functionArguments);
}

let e;
for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) {
try {
return await this.processIdempotency();
const result = await this.#saveInProgressOrReturnExistingResult();
if (result) return result as ReturnType<Func>;

return await this.getFunctionResult();
} catch (error) {
if (
error instanceof IdempotencyInconsistentStateError &&
Expand All @@ -156,60 +172,183 @@ export class IdempotencyHandler<Func extends AnyFunction> {
throw e;
}

public async processIdempotency(): Promise<ReturnType<Func>> {
// early return if we should skip idempotency completely
/**
* Handle the idempotency operations needed after the handler has returned.
*
* When the handler returns successfully, we need to update the record in the
* idempotency store to indicate that the execution has completed and
* store its result.
*
* To avoid duplication of code, we expose this method so that it can be
* called from the `after` phase of the Middy middleware.
*
* @param response The response returned by the handler.
*/
public async handleMiddyAfter(response: unknown): Promise<void> {
await this.#saveSuccessfullResult(response as ReturnType<Func>);
}

/**
* Handle the idempotency operations needed after the handler has returned.
*
* Before the handler is executed, we need to check if there is already an
* execution in progress for the given idempotency key. If there is, we
* need to determine its status and return the appropriate response or
* throw an error.
*
* If there is no execution in progress, we need to save a record to the
* idempotency store to indicate that an execution is in progress.
*
* In some rare cases, when the persistent state changes in small time
* window, we might get an `IdempotencyInconsistentStateError`. In such
* cases we can safely retry the handling a few times.
*
* @param request The request object passed to the handler.
* @param callback Callback function to cleanup pending middlewares when returning early.
*/
public async handleMiddyBefore(
request: MiddyLikeRequest,
callback: (request: MiddyLikeRequest) => Promise<void>
): Promise<ReturnType<Func> | void> {
for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) {
try {
const result = await this.#saveInProgressOrReturnExistingResult();
if (result) {
await callback(request);

return result as ReturnType<Func>;
}
break;
} catch (error) {
if (
error instanceof IdempotencyInconsistentStateError &&
retryNo < MAX_RETRIES
) {
// Retry
continue;
}
// Retries exhausted or other error
throw error;
}
}
}

/**
* Handle the idempotency operations needed when an error is thrown in the handler.
*
* When an error is thrown in the handler, we need to delete the record from the
* idempotency store.
*
* To avoid duplication of code, we expose this method so that it can be
* called from the `onError` phase of the Middy middleware.
*/
public async handleMiddyOnError(): Promise<void> {
await this.#deleteInProgressRecord();
}

/**
* Setter for the payload to be hashed to generate the idempotency key.
*
* This is useful if you want to use a different payload than the one
* used to instantiate the `IdempotencyHandler`, for example when using
* it within a Middy middleware.
*
* @param functionPayloadToBeHashed The payload to be hashed to generate the idempotency key
*/
public setFunctionPayloadToBeHashed(
functionPayloadToBeHashed: JSONValue
): void {
this.#functionPayloadToBeHashed = functionPayloadToBeHashed;
}

/**
* Avoid idempotency if the eventKeyJmesPath is not present in the payload and throwOnNoIdempotencyKey is false
*/
public shouldSkipIdempotency(): boolean {
if (!this.#idempotencyConfig.isEnabled()) return true;

if (
IdempotencyHandler.shouldSkipIdempotency(
this.#idempotencyConfig.eventKeyJmesPath,
this.#idempotencyConfig.throwOnNoIdempotencyKey,
this.#functionPayloadToBeHashed
)
this.#idempotencyConfig.eventKeyJmesPath !== '' &&
!this.#idempotencyConfig.throwOnNoIdempotencyKey
) {
return await this.#functionToMakeIdempotent(...this.#functionArguments);
const selection = search(
this.#functionPayloadToBeHashed,
this.#idempotencyConfig.eventKeyJmesPath
);

return selection === undefined || selection === null;
} else {
return false;
}
}

/**
* Delete an in progress record from the idempotency store.
*
* This is called when the handler throws an error.
*/
#deleteInProgressRecord = async (): Promise<void> => {
try {
await this.#persistenceStore.saveInProgress(
this.#functionPayloadToBeHashed,
this.#idempotencyConfig.lambdaContext?.getRemainingTimeInMillis()
await this.#persistenceStore.deleteRecord(
this.#functionPayloadToBeHashed
);
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
const idempotencyRecord: IdempotencyRecord =
await this.#persistenceStore.getRecord(
this.#functionPayloadToBeHashed
);
throw new IdempotencyPersistenceLayerError(
'Failed to delete record from idempotency store',
e as Error
);
}
};

return IdempotencyHandler.determineResultFromIdempotencyRecord(
idempotencyRecord
) as ReturnType<Func>;
} else {
throw new IdempotencyPersistenceLayerError(
'Failed to save in progress record to idempotency store',
e as Error
/**
* Save an in progress record to the idempotency store or return an existing result.
*
* If the record already exists, return the result from the record.
*/
#saveInProgressOrReturnExistingResult =
async (): Promise<JSONValue | void> => {
try {
await this.#persistenceStore.saveInProgress(
this.#functionPayloadToBeHashed,
this.#idempotencyConfig.lambdaContext?.getRemainingTimeInMillis()
);
}
}
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
const idempotencyRecord: IdempotencyRecord =
await this.#persistenceStore.getRecord(
this.#functionPayloadToBeHashed
);

return this.getFunctionResult();
}
return IdempotencyHandler.determineResultFromIdempotencyRecord(
idempotencyRecord
);
} else {
throw new IdempotencyPersistenceLayerError(
'Failed to save in progress record to idempotency store',
e as Error
);
}
}
};

/**
* avoid idempotency if the eventKeyJmesPath is not present in the payload and throwOnNoIdempotencyKey is false
* static so {@link makeHandlerIdempotent} middleware can use it
* TOOD: refactor so middy uses IdempotencyHandler internally wihtout reimplementing the logic
* @param eventKeyJmesPath
* @param throwOnNoIdempotencyKey
* @param fullFunctionPayload
* @private
*/
public static shouldSkipIdempotency(
eventKeyJmesPath: string,
throwOnNoIdempotencyKey: boolean,
fullFunctionPayload: JSONValue
): boolean {
return (eventKeyJmesPath &&
!throwOnNoIdempotencyKey &&
!search(fullFunctionPayload, eventKeyJmesPath)) as boolean;
}
* Save a successful result to the idempotency store.
*
* This is called when the handler returns successfully.
*
* @param result The result returned by the handler.
*/
#saveSuccessfullResult = async (result: ReturnType<Func>): Promise<void> => {
try {
await this.#persistenceStore.saveSuccess(
this.#functionPayloadToBeHashed,
result
);
} catch (e) {
throw new IdempotencyPersistenceLayerError(
'Failed to update success record to idempotency store',
e as Error
);
}
};
}
Loading