Skip to content

Commit 1a2a4d7

Browse files
committed
Redesign streams to avoid pipeline issues #10, #9
1 parent 65ae091 commit 1a2a4d7

File tree

3 files changed

+40
-44
lines changed

3 files changed

+40
-44
lines changed

built/lib/BulkDataClient.js

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,8 @@ class BulkDataClient extends events_1.EventEmitter {
433433
const downloads = downloadJobs.map(j => j.status);
434434
this.emit("allDownloadsComplete", downloads);
435435
if (this.options.saveManifest) {
436-
this.writeToDestination("manifest.json", stream_1.Readable.from(JSON.stringify(manifest, null, 4))).then(() => {
436+
const readable = stream_1.Readable.from(JSON.stringify(manifest, null, 4));
437+
(0, promises_1.pipeline)(readable, this.createDestinationStream("manifest.json")).then(() => {
437438
resolve(downloads);
438439
});
439440
}
@@ -537,7 +538,7 @@ class BulkDataClient extends events_1.EventEmitter {
537538
pdfToText: this.options.pdfToText,
538539
baseUrl: this.options.fhirUrl,
539540
save: (name, stream, subFolder) => {
540-
return this.writeToDestination(name, stream, subFolder);
541+
return (0, promises_1.pipeline)(stream, this.createDestinationStream(name, subFolder));
541542
},
542543
});
543544
docRefProcessor.on("attachment", () => _state.attachments += 1);
@@ -555,6 +556,10 @@ class BulkDataClient extends events_1.EventEmitter {
555556
// ---------------------------------------------------------------------
556557
// Write the file to the configured destination
557558
// ---------------------------------------------------------------------
559+
streams.push(this.createDestinationStream(fileName, subFolder));
560+
// ---------------------------------------------------------------------
561+
// Run the pipeline
562+
// ---------------------------------------------------------------------
558563
try {
559564
await (0, promises_1.pipeline)(streams);
560565
}
@@ -572,27 +577,21 @@ class BulkDataClient extends events_1.EventEmitter {
572577
fileSize: _state.uncompressedBytes,
573578
resourceCount: _state.resources
574579
});
575-
await this.writeToDestination(fileName, stringify, subFolder);
576580
}
577581
/**
578-
* Given a readable stream as input sends the data to the destination. The
579-
* actual actions taken are different depending on the destination:
582+
* Creates and returns a writable stream to the destination.
580583
* - For file system destination the files are written to the given location
581584
* - For S3 destinations the files are uploaded to S3
582585
* - For HTTP destinations the files are posted to the given URL
583586
* - If the destination is "" or "none" no action is taken (files are discarded)
584587
* @param fileName The desired fileName at destination
585-
* @param inputStream The input readable stream
586-
* @param subFolder
587-
* @returns
588+
* @param subFolder Optional subfolder
588589
*/
589-
writeToDestination(fileName, inputStream, subFolder = "") {
590+
createDestinationStream(fileName, subFolder = "") {
590591
const destination = String(this.options.destination || "none").trim();
591592
// No destination ------------------------------------------------------
592593
if (!destination || destination.toLowerCase() == "none") {
593-
return (0, promises_1.pipeline)(inputStream, new stream_1.Writable({
594-
write(chunk, encoding, cb) { cb(); }
595-
}));
594+
return new stream_1.Writable({ write(chunk, encoding, cb) { cb(); } });
596595
}
597596
// S3 ------------------------------------------------------------------
598597
if (destination.startsWith("s3://")) {
@@ -609,18 +608,20 @@ class BulkDataClient extends events_1.EventEmitter {
609608
if (subFolder) {
610609
bucket = (0, path_1.join)(bucket, subFolder);
611610
}
611+
const stream = new stream_1.PassThrough();
612612
const upload = new aws_sdk_1.default.S3.ManagedUpload({
613613
params: {
614614
Bucket: bucket,
615615
Key: fileName,
616-
Body: inputStream
616+
Body: stream
617617
}
618618
});
619-
return upload.promise();
619+
upload.promise().catch(console.error);
620+
return stream;
620621
}
621622
// HTTP ----------------------------------------------------------------
622623
if (destination.match(/^https?\:\/\//)) {
623-
return (0, promises_1.pipeline)(inputStream, request_1.default.stream.post((0, path_1.join)(destination, fileName) + "?folder=" + subFolder), new stream_1.PassThrough());
624+
return request_1.default.stream.post((0, path_1.join)(destination, fileName) + "?folder=" + subFolder);
624625
}
625626
// local filesystem destinations ---------------------------------------
626627
let path = destination.startsWith("file://") ?
@@ -636,7 +637,7 @@ class BulkDataClient extends events_1.EventEmitter {
636637
(0, fs_1.mkdirSync)(path);
637638
}
638639
}
639-
return (0, promises_1.pipeline)(inputStream, fs_1.default.createWriteStream((0, path_1.join)(path, fileName)));
640+
return fs_1.default.createWriteStream((0, path_1.join)(path, fileName));
640641
}
641642
/**
642643
* Given an URL query as URLSearchParams object, appends all the

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "bulk-data-client",
3-
"version": "1.1.0",
3+
"version": "1.1.1",
44
"description": "",
55
"main": "built/app.js",
66
"engines": {

src/lib/BulkDataClient.ts

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -614,10 +614,8 @@ class BulkDataClient extends EventEmitter
614614
const downloads = downloadJobs.map(j => j.status)
615615
this.emit("allDownloadsComplete", downloads)
616616
if (this.options.saveManifest) {
617-
this.writeToDestination(
618-
"manifest.json",
619-
Readable.from(JSON.stringify(manifest, null, 4))
620-
).then(() => {
617+
const readable = Readable.from(JSON.stringify(manifest, null, 4));
618+
pipeline(readable, this.createDestinationStream("manifest.json")).then(() => {
621619
resolve(downloads)
622620
});
623621
} else {
@@ -743,7 +741,7 @@ class BulkDataClient extends EventEmitter
743741
pdfToText : this.options.pdfToText,
744742
baseUrl : this.options.fhirUrl,
745743
save: (name: string, stream: Readable, subFolder: string) => {
746-
return this.writeToDestination(name, stream, subFolder)
744+
return pipeline(stream, this.createDestinationStream(name, subFolder))
747745
},
748746
})
749747

@@ -767,17 +765,21 @@ class BulkDataClient extends EventEmitter
767765
// ---------------------------------------------------------------------
768766
// Write the file to the configured destination
769767
// ---------------------------------------------------------------------
768+
streams.push(this.createDestinationStream(fileName, subFolder))
769+
770+
// ---------------------------------------------------------------------
771+
// Run the pipeline
772+
// ---------------------------------------------------------------------
770773
try {
771774
await pipeline(streams)
772-
773-
} catch (e: any) {
775+
}
776+
catch (e: any) {
774777
this.emit("downloadError", {
775778
body : null,
776779
code : e.code || null,
777780
fileUrl: e.fileUrl || file.url,
778781
message: String(e.message || "Downloading failed")
779782
})
780-
781783
throw e
782784
}
783785

@@ -786,30 +788,23 @@ class BulkDataClient extends EventEmitter
786788
fileSize : _state.uncompressedBytes,
787789
resourceCount: _state.resources!
788790
})
789-
790-
await this.writeToDestination(fileName, stringify, subFolder)
791791
}
792792

793793
/**
794-
* Given a readable stream as input sends the data to the destination. The
795-
* actual actions taken are different depending on the destination:
794+
* Creates and returns a writable stream to the destination.
796795
* - For file system destination the files are written to the given location
797796
* - For S3 destinations the files are uploaded to S3
798797
* - For HTTP destinations the files are posted to the given URL
799798
* - If the destination is "" or "none" no action is taken (files are discarded)
800799
* @param fileName The desired fileName at destination
801-
* @param inputStream The input readable stream
802-
* @param subFolder
803-
* @returns
800+
* @param subFolder Optional subfolder
804801
*/
805-
private writeToDestination(fileName: string, inputStream: Readable, subFolder = "") {
802+
private createDestinationStream(fileName: string, subFolder = ""): Writable {
806803
const destination = String(this.options.destination || "none").trim();
807804

808805
// No destination ------------------------------------------------------
809806
if (!destination || destination.toLowerCase() == "none") {
810-
return pipeline(inputStream, new Writable({
811-
write(chunk, encoding, cb) { cb() }
812-
}))
807+
return new Writable({ write(chunk, encoding, cb) { cb() } })
813808
}
814809

815810
// S3 ------------------------------------------------------------------
@@ -831,24 +826,24 @@ class BulkDataClient extends EventEmitter
831826
bucket = join(bucket, subFolder)
832827
}
833828

829+
const stream = new PassThrough()
830+
834831
const upload = new aws.S3.ManagedUpload({
835832
params: {
836833
Bucket: bucket,
837834
Key : fileName,
838-
Body : inputStream
835+
Body : stream
839836
}
840837
});
841838

842-
return upload.promise()
839+
upload.promise().catch(console.error)
840+
841+
return stream;
843842
}
844843

845844
// HTTP ----------------------------------------------------------------
846845
if (destination.match(/^https?\:\/\//)) {
847-
return pipeline(
848-
inputStream,
849-
request.stream.post(join(destination, fileName) + "?folder=" + subFolder),
850-
new PassThrough()
851-
);
846+
return request.stream.post(join(destination, fileName) + "?folder=" + subFolder)
852847
}
853848

854849
// local filesystem destinations ---------------------------------------
@@ -868,7 +863,7 @@ class BulkDataClient extends EventEmitter
868863
}
869864
}
870865

871-
return pipeline(inputStream, FS.createWriteStream(join(path, fileName)));
866+
return FS.createWriteStream(join(path, fileName));
872867
}
873868

874869
/**

0 commit comments

Comments
 (0)