Skip to content

Commit 470f6f0

Browse files
committed
Update file download pipeline #8
1 parent 3f8d2e3 commit 470f6f0

File tree

2 files changed

+35
-8
lines changed

2 files changed

+35
-8
lines changed

index.d.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,13 @@ export declare namespace BulkDataClient {
222222
* useful information, for example which site imported this data.
223223
*/
224224
metadata?: Record<string, any>
225+
226+
/**
227+
* Path to log file. Absolute, or relative to process CWD. If not
228+
* provided, the file will be called log.ndjson and will be stored in
229+
* the downloads folder.
230+
*/
231+
file?: string
225232
}
226233

227234
interface CLIOptions {
@@ -448,6 +455,8 @@ export declare namespace BulkDataClient {
448455
* Only needed if `destination` points to S3
449456
*/
450457
awsSecretAccessKey: string
458+
459+
log: LoggingOptions
451460
}
452461

453462
interface JWK {

src/lib/BulkDataClient.ts

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -674,12 +674,15 @@ class BulkDataClient extends EventEmitter
674674
// Just "remember" the progress values but don't emit anything yet
675675
download.on("progress", state => Object.assign(_state, state))
676676

677+
const streams: (NodeJS.ReadableStream | NodeJS.WritableStream | NodeJS.ReadWriteStream)[] = [];
678+
677679
// Start the download (the stream will be paused though)
678-
let processPipeline: Readable = await download.run({
680+
let downloadStream: Readable = await download.run({
679681
accessToken,
680682
signal: this.abortController.signal,
681683
requestOptions: this.options.requests
682-
}).catch(e => {
684+
})
685+
.catch(e => {
683686
if (e instanceof FileDownloadError) {
684687
this.emit("downloadError", {
685688
body: null, // Buffer
@@ -691,6 +694,8 @@ class BulkDataClient extends EventEmitter
691694
throw e
692695
});
693696

697+
streams.push(downloadStream)
698+
694699
// ---------------------------------------------------------------------
695700
// Create an NDJSON parser to verify that every single line is valid
696701
// ---------------------------------------------------------------------
@@ -709,9 +714,8 @@ class BulkDataClient extends EventEmitter
709714
expectedCount: exportType == "output" ? file.count || -1 : -1,
710715
expectedResourceType
711716
})
712-
713-
processPipeline = processPipeline.pipe(parser);
714-
717+
718+
streams.push(parser)
715719

716720
// ---------------------------------------------------------------------
717721
// Download attachments
@@ -745,7 +749,7 @@ class BulkDataClient extends EventEmitter
745749

746750
docRefProcessor.on("attachment", () => _state.attachments! += 1)
747751

748-
processPipeline = processPipeline.pipe(docRefProcessor)
752+
streams.push(docRefProcessor)
749753
}
750754

751755

@@ -757,19 +761,33 @@ class BulkDataClient extends EventEmitter
757761
_state.resources! += 1
758762
onProgress(_state)
759763
});
760-
processPipeline = processPipeline.pipe(stringify);
764+
streams.push(stringify)
761765

762766

763767
// ---------------------------------------------------------------------
764768
// Write the file to the configured destination
765769
// ---------------------------------------------------------------------
766-
await this.writeToDestination(fileName, processPipeline, subFolder)
770+
try {
771+
await pipeline(streams)
772+
773+
} catch (e: any) {
774+
this.emit("downloadError", {
775+
body : null,
776+
code : e.code || null,
777+
fileUrl: e.fileUrl || file.url,
778+
message: String(e.message || "Downloading failed")
779+
})
767780

781+
throw e
782+
}
783+
768784
this.emit("downloadComplete", {
769785
fileUrl : file.url,
770786
fileSize : _state.uncompressedBytes,
771787
resourceCount: _state.resources!
772788
})
789+
790+
await this.writeToDestination(fileName, stringify, subFolder)
773791
}
774792

775793
/**

0 commit comments

Comments
 (0)