Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ describe(S3TransferManager.name, () => {
}
});

describe.skip("(SEP) download single object tests", () => {
describe("(SEP) download single object tests", () => {
async function sepTests(
objectType: "single" | "multipart",
multipartType: "PART" | "RANGE",
Expand Down
114 changes: 85 additions & 29 deletions lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,29 +211,29 @@ export class S3TransferManager implements IS3TransferManager {
/**
* What is missing from the revised SEP and this implementation currently?
* PART mode:
* - Step 5: validate GetObject response for each part
* - If validation fails at any point, cancel all ongoing requests and error out
* - (DONE) Step 5: validate GetObject response for each part
* - If validation fails at any point, cancel all ongoing requests and error out
* - Step 6: after all requests have been sent, validate that the total number of part GET requests sent matches with the
* expected `PartsCount`
* - Step 7: when creating DownloadResponse, set accordingly:
* - (DONE) `ContentLength` : total length of the object saved from Step 3
* - (DONE) `ContentRange`: based on `bytes 0-(ContentLength -1)/ContentLength`
* - If ChecksumType is `COMPOSITE`, set all checksum value members to null as
* the checksum value returned from a part GET request is not the composite
* checksum for the entire object
* - (DONE) `ContentLength` : total length of the object saved from Step 3
* - (DONE) `ContentRange`: based on `bytes 0-(ContentLength -1)/ContentLength`
* - If ChecksumType is `COMPOSITE`, set all checksum value members to null as
* the checksum value returned from a part GET request is not the composite
* checksum for the entire object
* RANGE mode:
* - Step 7: validate GetObject response for each part. If validation fails or a
* - (DONE) Step 7: validate GetObject response for each part. If validation fails or a
* request fails at any point, cancel all ongoing requests and return an error to
* the user.
* - Step 8: after all requests have sent, validate that the total number of ranged
* GET requests sent matches with the expected number saved from Step 5.
* - Step 9: create DownloadResponse. Copy the fields in GetObject response from
* Step 3 and set the following fields accordingly:
* - (DONE) `ContentLength` : total length of the object saved from Step 3
* - (DONE) `ContentRange`: based on `bytes 0-(ContentLength -1)/ContentLength`
* - If ChecksumType is `COMPOSITE`, set all checksum value members to null as
* the checksum value returned from a part GET request is not the composite
* checksum for the entire object
* - (DONE) `ContentLength` : total length of the object saved from Step 3
* - (DONE) `ContentRange`: based on `bytes 0-(ContentLength -1)/ContentLength`
* - If ChecksumType is `COMPOSITE`, set all checksum value members to null as
* the checksum value returned from a part GET request is not the composite
* checksum for the entire object
* Checksum validation notes:
* -
*
Expand Down Expand Up @@ -370,8 +370,8 @@ export class S3TransferManager implements IS3TransferManager {
};
const initialPart = await this.s3ClientInstance.send(new GetObjectCommand(initialPartRequest), transferOptions);
const initialETag = initialPart.ETag ?? undefined;
const partSize = initialPart.ContentLength;
totalSize = initialPart.ContentRange ? Number.parseInt(initialPart.ContentRange.split("/")[1]) : undefined;

this.dispatchTransferInitiatedEvent(request, totalSize);
if (initialPart.Body) {
if (initialPart.Body && typeof (initialPart.Body as any).getReader === "function") {
Expand All @@ -383,9 +383,12 @@ export class S3TransferManager implements IS3TransferManager {
streams.push(Promise.resolve(initialPart.Body));
requests.push(initialPartRequest);
}

this.updateResponseLengthAndRange(initialPart, totalSize);
this.assignMetadata(metadata, initialPart);
this.updateChecksumValues(initialPart, metadata);

let partCount = 1;
if (initialPart.PartsCount! > 1) {
for (let part = 2; part <= initialPart.PartsCount!; part++) {
this.checkAborted(transferOptions);
Expand All @@ -398,7 +401,7 @@ export class S3TransferManager implements IS3TransferManager {
const getObject = this.s3ClientInstance
.send(new GetObjectCommand(getObjectRequest), transferOptions)
.then((response) => {
// this.validatePartRange(part, response.ContentRange, this.targetPartSizeBytes);
this.validatePartDownload(part, response.ContentRange, partSize ?? 0);
if (response.Body && typeof (response.Body as any).getReader === "function") {
const reader = (response.Body as any).getReader();
(response.Body as any).getReader = function () {
Expand All @@ -410,8 +413,15 @@ export class S3TransferManager implements IS3TransferManager {

streams.push(getObject);
requests.push(getObjectRequest);
partCount++;
}
}

if (partCount !== initialPart.PartsCount) {
throw new Error(
`The number of parts downloaded (${partCount}) does not match the expected number (${initialPart.PartsCount})`
);
}
} else {
this.checkAborted(transferOptions);

Expand All @@ -428,6 +438,7 @@ export class S3TransferManager implements IS3TransferManager {
}
this.updateResponseLengthAndRange(getObject, totalSize);
this.assignMetadata(metadata, getObject);
this.updateChecksumValues(getObject, metadata);
}

return {
Expand All @@ -447,6 +458,7 @@ export class S3TransferManager implements IS3TransferManager {
let left = 0;
let right = this.targetPartSizeBytes - 1;
let maxRange = Number.POSITIVE_INFINITY;
let remainingLength = 1;

if (request.Range != null) {
const [userRangeLeft, userRangeRight] = request.Range.replace("bytes=", "").split("-").map(Number);
Expand All @@ -455,17 +467,25 @@ export class S3TransferManager implements IS3TransferManager {
left = userRangeLeft;
right = Math.min(userRangeRight, left + S3TransferManager.MIN_PART_SIZE - 1);
}

let remainingLength = 1;
const getObjectRequest: GetObjectCommandInput = {
...request,
Range: `bytes=${left}-${right}`,
};
const initialRangeGet = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions);
this.validateRangeDownload(`bytes=${left}-${right}`, initialRangeGet.ContentRange);
const initialETag = initialRangeGet.ETag ?? undefined;
const totalSize = initialRangeGet.ContentRange
? Number.parseInt(initialRangeGet.ContentRange.split("/")[1])
: undefined;

let expectedRequestCount = 1;
if (totalSize) {
const contentLength = totalSize;
const remainingBytes = Math.max(0, contentLength - (right - left + 1));
const additionalRequests = Math.ceil(remainingBytes / S3TransferManager.MIN_PART_SIZE);
expectedRequestCount += additionalRequests;
}

if (initialRangeGet.Body && typeof (initialRangeGet.Body as any).getReader === "function") {
const reader = (initialRangeGet.Body as any).getReader();
(initialRangeGet.Body as any).getReader = function () {
Expand All @@ -474,16 +494,17 @@ export class S3TransferManager implements IS3TransferManager {
}

this.dispatchTransferInitiatedEvent(request, totalSize);

streams.push(Promise.resolve(initialRangeGet.Body!));
requests.push(getObjectRequest);

this.updateResponseLengthAndRange(initialRangeGet, totalSize);
this.assignMetadata(metadata, initialRangeGet);
this.updateChecksumValues(initialRangeGet, metadata);

left = right + 1;
right = Math.min(left + S3TransferManager.MIN_PART_SIZE - 1, maxRange);
remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0;

let actualRequestCount = 1;
while (remainingLength > 0) {
this.checkAborted(transferOptions);

Expand All @@ -496,6 +517,7 @@ export class S3TransferManager implements IS3TransferManager {
const getObject = this.s3ClientInstance
.send(new GetObjectCommand(getObjectRequest), transferOptions)
.then((response) => {
this.validateRangeDownload(range, response.ContentRange);
if (response.Body && typeof (response.Body as any).getReader === "function") {
const reader = (response.Body as any).getReader();
(response.Body as any).getReader = function () {
Expand All @@ -507,12 +529,19 @@ export class S3TransferManager implements IS3TransferManager {

streams.push(getObject);
requests.push(getObjectRequest);
actualRequestCount++;

left = right + 1;
right = Math.min(left + S3TransferManager.MIN_PART_SIZE - 1, maxRange);
remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0;
}

if (expectedRequestCount !== actualRequestCount) {
throw new Error(
`The number of ranged GET requests sent (${actualRequestCount}) does not match the expected number (${expectedRequestCount})`
);
}

return {
totalSize,
};
Expand Down Expand Up @@ -541,6 +570,15 @@ export class S3TransferManager implements IS3TransferManager {
}
}

private updateChecksumValues(initialPart: DownloadResponse, metadata: Omit<DownloadResponse, "Body">) {
if (initialPart.ChecksumType === "COMPOSITE") {
metadata.ChecksumCRC32 = undefined;
metadata.ChecksumCRC32C = undefined;
metadata.ChecksumSHA1 = undefined;
metadata.ChecksumSHA256 = undefined;
}
}

private checkAborted(transferOptions?: TransferOptions): void {
if (transferOptions?.abortSignal?.aborted) {
throw Object.assign(new Error("Download aborted."), { name: "AbortError" });
Expand Down Expand Up @@ -592,7 +630,7 @@ export class S3TransferManager implements IS3TransferManager {
}
}

private validatePartRange(partNumber: number, contentRange: string | undefined, partSize: number) {
private validatePartDownload(partNumber: number, contentRange: string | undefined, partSize: number) {
if (!contentRange) return;

const match = contentRange.match(/bytes (\d+)-(\d+)\/(\d+)/);
Expand All @@ -605,15 +643,6 @@ export class S3TransferManager implements IS3TransferManager {
const expectedStart = (partNumber - 1) * partSize;
const expectedEnd = Math.min(expectedStart + partSize - 1, total - 1);

// console.log({
// partNumber,
// start,
// end,
// total,
// expectedStart,
// expectedEnd
// });

if (start !== expectedStart) {
throw new Error(`Expected part ${partNumber} to start at ${expectedStart} but got ${start}`);
}
Expand All @@ -622,4 +651,31 @@ export class S3TransferManager implements IS3TransferManager {
throw new Error(`Expected part ${partNumber} to end at ${expectedEnd} but got ${end}`);
}
}

private validateRangeDownload(requestRange: string, responseRange: string | undefined) {
if (!responseRange) return;

const match = responseRange.match(/bytes (\d+)-(\d+)\/(\d+)/);
if (!match) throw new Error(`Invalid ContentRange format: ${responseRange}`);

const start = Number.parseInt(match[1]);
const end = Number.parseInt(match[2]);
const total = Number.parseInt(match[3]);

const rangeMatch = requestRange.match(/bytes=(\d+)-(\d+)/);
if (!rangeMatch) throw new Error(`Invalid Range format: ${requestRange}`);

const expectedStart = Number.parseInt(rangeMatch[1]);
const expectedEnd = Number.parseInt(rangeMatch[2]);

if (start !== expectedStart) {
throw new Error(`Expected range to start at ${expectedStart} but got ${start}`);
}

const isFinalPart = end + 1 === total;

if (end !== expectedEnd && !(isFinalPart && end < expectedEnd)) {
throw new Error(`Expected range to end at ${expectedEnd} but got ${end}`);
}
}
}