Skip to content
This repository was archived by the owner on Apr 22, 2023. It is now read-only.

fs: Add DuplexStream and fs.createDuplexStream #8002

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions doc/api/fs.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,45 @@ Emitted when the WriteStream's file is opened.
The number of bytes written so far. Does not include data that is still queued
for writing.


## fs.createDuplexStream(path, [options])

Returns a new DuplexStream object (See `Duplex Stream`).

`options` is an object with the following defaults:

{ flags: 'r+',
encoding: null,
fd: null,
mode: 0666,
autoClose: true
}

For more information on `options`, see `ReadStream` and `WriteStream`.
Note that replacing a file rather than modifying it may require a `flags`
mode of `w+` rather than the default mode `r+`.

Note that if `autoClose` is set to false, the `fd` used by `DuplexStream`
will still be closed if `end` is called.

## Class: fs.DuplexStream

`DuplexStream` is a [Duplex Stream](stream.html#stream_class_stream_duplex).
It is built using parasitic inheritance from the the above ReadStream and
WriteStream classes.

### Event: 'open'

* `fd` {Integer} file descriptor used by the ReadStream.

Emitted when the DuplexStream's file is opened.

### file.bytesWritten

The number of bytes written so far. Does not include data that is still queued
for writing.


## Class: fs.FSWatcher

Objects returned from `fs.watch()` are of this type.
Expand Down
114 changes: 108 additions & 6 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var EventEmitter = require('events').EventEmitter;

var Readable = Stream.Readable;
var Writable = Stream.Writable;
var Duplex = Stream.Duplex;

var kMinPoolSpace = 128;
var kMaxLength = require('smalloc').kMaxLength;
Expand Down Expand Up @@ -1467,7 +1468,7 @@ function ReadStream(path, options) {
this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/

this.start = options.hasOwnProperty('start') ? options.start : undefined;
this.end = options.hasOwnProperty('end') ? options.end : undefined;
this._end = options.hasOwnProperty('end') ? options.end : undefined;
this.autoClose = options.hasOwnProperty('autoClose') ?
options.autoClose : true;
this.pos = undefined;
Expand All @@ -1476,16 +1477,21 @@ function ReadStream(path, options) {
if (!util.isNumber(this.start)) {
throw TypeError('start must be a Number');
}
if (util.isUndefined(this.end)) {
this.end = Infinity;
} else if (!util.isNumber(this.end)) {
if (util.isUndefined(this._end)) {
this._end = Infinity;
} else if (!util.isNumber(this._end)) {
throw TypeError('end must be a Number');
}

if (this.start > this.end) {
if (this.start > this._end) {
throw new Error('start must be <= end');
}

if (this.start < 0) {
throw new Error('start must be >= zero');
}


this.pos = this.start;
}

Expand Down Expand Up @@ -1542,7 +1548,7 @@ ReadStream.prototype._read = function(n) {
var start = pool.used;

if (!util.isUndefined(this.pos))
toRead = Math.min(this.end - this.pos + 1, toRead);
toRead = Math.min(this._end - this.pos + 1, toRead);

// already read everything we were supposed to read!
// treat as EOF.
Expand Down Expand Up @@ -1704,6 +1710,102 @@ WriteStream.prototype.close = ReadStream.prototype.close;
WriteStream.prototype.destroySoon = WriteStream.prototype.end;




fs.createDuplexStream = function(path, options) {
return new DuplexStream(path, options);
};

util.inherits(DuplexStream, Duplex);
fs.DuplexStream = DuplexStream;
function DuplexStream(path, options) {
if (!(this instanceof DuplexStream))
return new DuplexStream(path, options);

// a little bit bigger buffer and water marks by default
options = util._extend({
highWaterMark: 64 * 1024
}, options || {});

Duplex.call(this, options);

this.path = path;

this.fd = options.hasOwnProperty('fd') ? options.fd : null;
this.flags = options.hasOwnProperty('flags') ? options.flags : 'r+';
this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/

this.start = options.hasOwnProperty('start') ? options.start : undefined;
this._end = options.hasOwnProperty('end') ? options.end : undefined;
this.autoClose = options.hasOwnProperty('autoClose') ?
options.autoClose : true;
this.pos = undefined;
this.bytesWritten = 0;

if (!util.isUndefined(this.start)) {
if (!util.isNumber(this.start)) {
throw TypeError('start must be a Number');
}

if (util.isUndefined(this._end)) {
this._end = Infinity;
} else if (!util.isNumber(this._end)) {
throw TypeError('end must be a Number');
}

if (this.start > this._end) {
throw new Error('start must be <= end');
}

if (this.start < 0) {
throw new Error('start must be >= zero');
}

this.pos = this.start;
}

if (!util.isNumber(this.fd))
this.open();

// dispose on finish.
this.once('finish', this.close);

this.once('end', function() {
if (this.autoClose) {
this.destroy();
}
});
}

fs.FileDuplexStream = DuplexStream;


DuplexStream.prototype.open = function() {
var self = this;
fs.open(this.path, this.flags, this.mode, function(er, fd) {
if (er) {
if (self.autoClose) {
self.destroy();
}
self.emit('error', er);
return;
}

self.fd = fd;
self.emit('open', fd);
// start the flow of data.
self.read();
});
};

DuplexStream.prototype._read = ReadStream.prototype._read;
DuplexStream.prototype._write = WriteStream.prototype._write;
DuplexStream.prototype.destroy = ReadStream.prototype.destroy;
DuplexStream.prototype.close = ReadStream.prototype.close;

// There is no shutdown() for files.
DuplexStream.prototype.destroySoon = WriteStream.prototype.end;

// SyncWriteStream is internal. DO NOT USE.
// Temporary hack for process.stdout and process.stderr when piped to files.
function SyncWriteStream(fd, options) {
Expand Down
53 changes: 53 additions & 0 deletions test/simple/test-fs-duplex-stream-change-open.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

var common = require('../common');
var assert = require('assert');

var path = require('path'),
fs = require('fs'),
Duplex = require('stream').Duplex;

var file = path.join(common.tmpDir, 'write.txt');

var stream = fs.DuplexStream(file, { flags : 'w+' }),
_fs_close = fs.close,
_fs_open = fs.open;

// change the fs.open with an identical function after the DuplexStream
// has pushed it onto its internal action queue, but before it's
// returned. This simulates AOP-style extension of the fs lib.
fs.open = function() {
return _fs_open.apply(fs, arguments);
};

fs.close = function(fd) {
assert.ok(fd, 'fs.close must not be called with an undefined fd.');
fs.close = _fs_close;
fs.open = _fs_open;
}

stream.write('foo');
stream.end();

process.on('exit', function() {
assert.equal(fs.open, _fs_open);
});
42 changes: 42 additions & 0 deletions test/simple/test-fs-duplex-stream-end.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

var common = require('../common');
var assert = require('assert');
var path = require('path');
var fs = require('fs');

(function() {
var file = path.join(common.tmpDir, 'write-end-test0.txt');
var stream = fs.createDuplexStream(file, { flags: 'w+' });
stream.end();
stream.on('close', common.mustCall(function() { }));
})();

(function() {
var file = path.join(common.tmpDir, 'write-end-test1.txt');
var stream = fs.createDuplexStream(file, { flags: 'w+' });
stream.end('a\n', 'utf8');
stream.on('close', common.mustCall(function() {
var content = fs.readFileSync(file, 'utf8');
assert.equal(content, 'a\n');
}));
})();
63 changes: 63 additions & 0 deletions test/simple/test-fs-duplex-stream-read-err.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

var common = require('../common');
var assert = require('assert');
var fs = require('fs');

var stream = fs.createDuplexStream(__filename, {
bufferSize: 64
});
var err = new Error('BAM');

stream.on('error', common.mustCall(function errorHandler(err_) {
console.error('error event');
process.nextTick(function() {
assert.equal(stream.fd, null);
assert.equal(err_, err);
});
}));

fs.close = common.mustCall(function(fd_, cb) {
assert.equal(fd_, stream.fd);
process.nextTick(cb);
});

var read = fs.read;
fs.read = function() {
// first time is ok.
read.apply(fs, arguments);
// then it breaks
fs.read = function() {
var cb = arguments[arguments.length - 1];
process.nextTick(function() {
cb(err);
});
// and should not be called again!
fs.read = function() {
throw new Error('BOOM!');
};
};
};

stream.on('data', function(buf) {
stream.on('data', assert.fail); // no more 'data' events should follow
});
Loading