Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
594 changes: 509 additions & 85 deletions lib/lib-storage/src/s3-transfer-manager/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ describe(S3TransferManager.name, () => {
}
});

describe("(SEP) download single object tests", () => {
async function sepTests(
describe("Required compliance download single object tests", () => {
async function complianceTests(
objectType: "single" | "multipart",
multipartType: "PART" | "RANGE",
range: string | undefined,
Expand Down Expand Up @@ -301,16 +301,16 @@ describe(S3TransferManager.name, () => {
}

it("single object: multipartDownloadType = PART, range = 0-12MB, partNumber = null", async () => {
await sepTests("single", "PART", `bytes=0-${12 * 1024 * 1024}`, undefined);
await complianceTests("single", "PART", `bytes=0-${12 * 1024 * 1024}`, undefined);
}, 60_000);
it("multipart object: multipartDownloadType = RANGE, range = 0-12MB, partNumber = null", async () => {
await sepTests("multipart", "RANGE", `bytes=0-${12 * 1024 * 1024}`, undefined);
await complianceTests("multipart", "RANGE", `bytes=0-${12 * 1024 * 1024}`, undefined);
}, 60_000);
it("single object: multipartDownloadType = PART, range = null, partNumber = null", async () => {
await sepTests("single", "PART", undefined, undefined);
await complianceTests("single", "PART", undefined, undefined);
}, 60_000);
it("single object: multipartDownloadType = RANGE, range = null, partNumber = null", async () => {
await sepTests("single", "RANGE", undefined, undefined);
await complianceTests("single", "RANGE", undefined, undefined);
}, 60_000);
});
});
167 changes: 152 additions & 15 deletions lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ import type {
} from "./types";

/**
* Describe what this is
* TODO: Switch all @public to @alpha
* TODO: tag internal for itneral functions
* Client for efficient transfer of objects to and from Amazon S3.
* Provides methods to optimize uploading and downloading individual objects
* as well as entire directories, with support for multipart operations,
* concurrency control, and request cancellation.
* Implements an eventTarget-based progress tracking system with methods to register,
* dispatch, and remove listeners for transfer lifecycle events.
*
* @alpha
*/

export class S3TransferManager implements IS3TransferManager {
private static MIN_PART_SIZE = 5 * 1024 * 1024; // 5MB
private static DEFAULT_PART_SIZE = 8 * 1024 * 1024; // 8MB
Expand Down Expand Up @@ -69,6 +74,16 @@ export class S3TransferManager implements IS3TransferManager {
this.validateConfig();
}

/**
* Registers a callback function to be executed when a specific transfer event occurs.
* Supports monitoring the full lifecycle of transfers.
*
* @param type - The type of event to listen for.
* @param callback - Function to execute when the specified event occurs.
* @param options - Optional configuration for event listener behavior.
*
* @alpha
*/
public addEventListener(
type: "transferInitiated",
callback: EventListener<TransferEvent>,
Expand Down Expand Up @@ -130,13 +145,13 @@ export class S3TransferManager implements IS3TransferManager {
}

/**
* todo: what does the return boolean mean?
* Dispatches an event to the registered event listeners.
* Triggers callbacks registered via addEventListener with matching event types.
*
* it returns false if the event is cancellable, and at least oneo the handlers which received event called
* Event.preventDefault(). Otherwise true.
* The use cases of preventDefault() does not apply to transfermanager but we should still keep the boolean
* and continue to return true to stay consistent with EventTarget.
* @param event - The event object to dispatch.
* @returns whether the event ran to completion
*
* @alpha
*/
public dispatchEvent(event: Event & TransferEvent): boolean;
public dispatchEvent(event: Event & TransferCompleteEvent): boolean;
Expand All @@ -157,6 +172,16 @@ export class S3TransferManager implements IS3TransferManager {
return true;
}

/**
* Removes a previously registered event listener from the specified event type.
* Stops the callback from being invoked when the event occurs.
*
* @param type - The type of event to stop listening for.
* @param callback - The function that was previously registered.
* @param options - Optional configuration for the event listener.
*
* @alpha
*/
public removeEventListener(
type: "transferInitiated",
callback: EventListener<TransferEvent>,
Expand Down Expand Up @@ -209,10 +234,32 @@ export class S3TransferManager implements IS3TransferManager {
}
}

/**
* Uploads objects to S3 with automatic multipart upload handling.
* Automatically chooses between single object upload or multipart upload based on content length threshold.
*
* @param request - PutObjectCommandInput and CreateMultipartUploadCommandInput parameters for single or multipart uploads.
* @param transferOptions - Optional abort signal and event listeners for transfer lifecycle monitoring.
*
* @returns S3 PutObject or CompleteMultipartUpload response with transfer event dispatching.
*
* @alpha
*/
public upload(request: UploadRequest, transferOptions?: TransferOptions): Promise<UploadResponse> {
throw new Error("Method not implemented.");
}

/**
* Downloads single objects from S3 with automatic multipart handling.
* Automatically chooses between PART or RANGE download strategies and joins streams into a single response.
*
* @param request - GetObjectCommandInput parameters. PartNumber is not supported - use GetObjectCommand directly for specific parts.
* @param transferOptions - Optional abort signal and event listeners for transfer lifecycle monitoring.
*
* @returns S3 GetObject response with joined Body stream and transfer event dispatching.
*
* @alpha
*/
public async download(request: DownloadRequest, transferOptions?: TransferOptions): Promise<DownloadResponse> {
const partNumber = request.PartNumber;
if (typeof partNumber === "number") {
Expand Down Expand Up @@ -248,12 +295,6 @@ export class S3TransferManager implements IS3TransferManager {
}
};

// TODO:
// after completing SEP requirements:
// - acquire lock on webstreams in the same
// - synchronous frame as they are opened or else
// - the connection might be closed too early.

const response = {
...metadata,
Body: await joinStreams(streams, {
Expand Down Expand Up @@ -299,6 +340,16 @@ export class S3TransferManager implements IS3TransferManager {
return response;
}

/**
* Uploads all files in a directory recursively to an S3 bucket.
* Automatically maps local file paths to S3 object keys using prefix and delimiter configuration.
*
* @param options - Configuration including bucket, source directory, filtering, failure handling, and transfer settings.
*
* @returns the number of objects that have been uploaded and the number of objects that have failed.
*
* @alpha
*/
public uploadAll(options: {
bucket: string;
source: string;
Expand All @@ -314,6 +365,16 @@ export class S3TransferManager implements IS3TransferManager {
throw new Error("Method not implemented.");
}

/**
* Downloads all objects in a bucket to a local directory.
* Uses ListObjectsV2 to retrieve objects and automatically maps S3 object keys to local file paths.
*
* @param options - Configuration including bucket, destination directory, filtering, failure handling, and transfer settings.
*
* @returns The number of objects that have been downloaded and the number of objects that have failed.
*
* @alpha
*/
public downloadAll(options: {
bucket: string;
destination: string;
Expand All @@ -328,6 +389,11 @@ export class S3TransferManager implements IS3TransferManager {
throw new Error("Method not implemented.");
}

/**
* Downloads object using part-based strategy with concurrent part requests.
*
* @internal
*/
protected async downloadByPart(
request: DownloadRequest,
transferOptions: TransferOptions,
Expand Down Expand Up @@ -432,6 +498,11 @@ export class S3TransferManager implements IS3TransferManager {
};
}

/**
* Downloads object using range-based strategy with concurrent range requests.
*
* @internal
*/
protected async downloadByRange(
request: DownloadRequest,
transferOptions: TransferOptions,
Expand Down Expand Up @@ -564,6 +635,11 @@ export class S3TransferManager implements IS3TransferManager {
};
}

/**
* Adds all event listeners from provided collection to the transfer manager.
*
* @internal
*/
private addEventListeners(eventListeners?: TransferEventListeners): void {
for (const listeners of this.iterateListeners(eventListeners)) {
for (const listener of listeners) {
Expand All @@ -572,6 +648,11 @@ export class S3TransferManager implements IS3TransferManager {
}
}

/**
* Removes event listeners from provided collection from the transfer manager.
*
* @internal
*/
private removeEventListeners(eventListeners?: TransferEventListeners): void {
for (const listeners of this.iterateListeners(eventListeners)) {
for (const listener of listeners) {
Expand All @@ -580,6 +661,11 @@ export class S3TransferManager implements IS3TransferManager {
}
}

/**
* Copies all response properties except Body to the container object.
*
* @internal
*/
private assignMetadata(container: any, response: any) {
for (const key in response) {
if (key === "Body") {
Expand All @@ -589,13 +675,23 @@ export class S3TransferManager implements IS3TransferManager {
}
}

/**
* Updates response ContentLength and ContentRange based on total object size.
*
* @internal
*/
private updateResponseLengthAndRange(response: DownloadResponse, totalSize: number | undefined): void {
if (totalSize !== undefined) {
response.ContentLength = totalSize;
response.ContentRange = `bytes 0-${totalSize - 1}/${totalSize}`;
}
}

/**
* Clears checksum values for composite multipart downloads.
*
* @internal
*/
private updateChecksumValues(initialPart: DownloadResponse, metadata: Omit<DownloadResponse, "Body">) {
if (initialPart.ChecksumType === "COMPOSITE") {
metadata.ChecksumCRC32 = undefined;
Expand All @@ -605,6 +701,11 @@ export class S3TransferManager implements IS3TransferManager {
}
}

/**
* Processes response metadata by updating length, copying properties, and handling checksums.
*
* @internal
*/
private processResponseMetadata(
response: DownloadResponse,
metadata: Omit<DownloadResponse, "Body">,
Expand All @@ -615,18 +716,33 @@ export class S3TransferManager implements IS3TransferManager {
this.updateChecksumValues(response, metadata);
}

/**
* Throws AbortError if transfer has been aborted via signal.
*
* @internal
*/
private checkAborted(transferOptions?: TransferOptions): void {
if (transferOptions?.abortSignal?.aborted) {
throw Object.assign(new Error("Download aborted."), { name: "AbortError" });
}
}

/**
* Validates if configuration parameters meets minimum requirements.
*
* @internal
*/
private validateConfig(): void {
if (this.targetPartSizeBytes < S3TransferManager.MIN_PART_SIZE) {
throw new Error(`targetPartSizeBytes must be at least ${S3TransferManager.MIN_PART_SIZE} bytes`);
}
}

/**
* Dispatches transferInitiated event with initial progress snapshot.
*
* @internal
*/
private dispatchTransferInitiatedEvent(request: DownloadRequest | UploadRequest, totalSize?: number): boolean {
this.dispatchEvent(
Object.assign(new Event("transferInitiated"), {
Expand All @@ -640,6 +756,11 @@ export class S3TransferManager implements IS3TransferManager {
return true;
}

/**
* Dispatches transferFailed event with error details and progress snapshot.
*
* @internal
*/
private dispatchTransferFailedEvent(
request: DownloadRequest | UploadRequest,
totalSize?: number,
Expand All @@ -658,6 +779,11 @@ export class S3TransferManager implements IS3TransferManager {
return true;
}

/**
* Generator that yields event listeners from the provided collection for iteration.
*
* @internal
*/
private *iterateListeners(eventListeners: TransferEventListeners = {}) {
for (const key in eventListeners) {
const eventType = key as keyof TransferEventListeners;
Expand All @@ -675,6 +801,11 @@ export class S3TransferManager implements IS3TransferManager {
}
}

/**
* Validates part download ContentRange matches expected part boundaries.
*
* @internal
*/
private validatePartDownload(contentRange: string | undefined, partNumber: number, partSize: number) {
if (!contentRange) {
throw new Error(`Missing ContentRange for part ${partNumber}.`);
Expand All @@ -699,6 +830,11 @@ export class S3TransferManager implements IS3TransferManager {
}
}

/**
* Validates range download ContentRange matches requested byte range.
*
* @internal
*/
private validateRangeDownload(requestRange: string, responseRange: string | undefined) {
if (!responseRange) {
throw new Error(`Missing ContentRange for range ${requestRange}.`);
Expand Down Expand Up @@ -732,8 +868,9 @@ export class S3TransferManager implements IS3TransferManager {
throw new Error(`Expected range to end at ${expectedEnd} but got ${end}`);
}
}

/**
*
* Internal event handler for download lifecycle hooks.
*
* @internal
*/
Expand Down
Loading