Skip to content

[v11.x] zlib: split JS code as prep for non-zlib-backed streams #25228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 128 additions & 111 deletions lib/zlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,21 +206,10 @@ function checkRangesOrGetDefault(number, name, lower, upper, def) {
return number;
}

// the Zlib class they all inherit from
// This thing manages the queue of requests, and returns
// true or false if there is anything in the queue when
// you call the .write() method.
function Zlib(opts, mode) {
// The base class for all Zlib-style streams.
function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) {
var chunkSize = Z_DEFAULT_CHUNK;
var flush = Z_NO_FLUSH;
var finishFlush = Z_FINISH;
var windowBits = Z_DEFAULT_WINDOWBITS;
var level = Z_DEFAULT_COMPRESSION;
var memLevel = Z_DEFAULT_MEMLEVEL;
var strategy = Z_DEFAULT_STRATEGY;
var dictionary;

// The Zlib class is not exported to user land, the mode should only be
// The ZlibBase class is not exported to user land, the mode should only be
// passed in by us.
assert(typeof mode === 'number');
assert(mode >= DEFLATE && mode <= UNZIP);
Expand All @@ -236,50 +225,11 @@ function Zlib(opts, mode) {

flush = checkRangesOrGetDefault(
opts.flush, 'options.flush',
Z_NO_FLUSH, Z_BLOCK, Z_NO_FLUSH);
Z_NO_FLUSH, Z_BLOCK, flush);

finishFlush = checkRangesOrGetDefault(
opts.finishFlush, 'options.finishFlush',
Z_NO_FLUSH, Z_BLOCK, Z_FINISH);

// windowBits is special. On the compression side, 0 is an invalid value.
// But on the decompression side, a value of 0 for windowBits tells zlib
// to use the window size in the zlib header of the compressed stream.
if ((opts.windowBits == null || opts.windowBits === 0) &&
(mode === INFLATE ||
mode === GUNZIP ||
mode === UNZIP)) {
windowBits = 0;
} else {
windowBits = checkRangesOrGetDefault(
opts.windowBits, 'options.windowBits',
Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, Z_DEFAULT_WINDOWBITS);
}

level = checkRangesOrGetDefault(
opts.level, 'options.level',
Z_MIN_LEVEL, Z_MAX_LEVEL, Z_DEFAULT_COMPRESSION);

memLevel = checkRangesOrGetDefault(
opts.memLevel, 'options.memLevel',
Z_MIN_MEMLEVEL, Z_MAX_MEMLEVEL, Z_DEFAULT_MEMLEVEL);

strategy = checkRangesOrGetDefault(
opts.strategy, 'options.strategy',
Z_DEFAULT_STRATEGY, Z_FIXED, Z_DEFAULT_STRATEGY);

dictionary = opts.dictionary;
if (dictionary !== undefined && !isArrayBufferView(dictionary)) {
if (isAnyArrayBuffer(dictionary)) {
dictionary = Buffer.from(dictionary);
} else {
throw new ERR_INVALID_ARG_TYPE(
'options.dictionary',
['Buffer', 'TypedArray', 'DataView', 'ArrayBuffer'],
dictionary
);
}
}
Z_NO_FLUSH, Z_BLOCK, finishFlush);

if (opts.encoding || opts.objectMode || opts.writableObjectMode) {
opts = _extend({}, opts);
Expand All @@ -288,39 +238,29 @@ function Zlib(opts, mode) {
opts.writableObjectMode = false;
}
}

Transform.call(this, opts);
this._hadError = false;
this.bytesWritten = 0;
this._handle = new binding.Zlib(mode);
this._handle = handle;
handle[owner_symbol] = this;
// Used by processCallback() and zlibOnError()
this._handle[owner_symbol] = this;
this._handle.onerror = zlibOnError;
this._hadError = false;
this._writeState = new Uint32Array(2);

if (!this._handle.init(windowBits,
level,
memLevel,
strategy,
this._writeState,
processCallback,
dictionary)) {
throw new ERR_ZLIB_INITIALIZATION_FAILED();
}

handle.onerror = zlibOnError;
this._outBuffer = Buffer.allocUnsafe(chunkSize);
this._outOffset = 0;
this._level = level;
this._strategy = strategy;

this._chunkSize = chunkSize;
this._defaultFlushFlag = flush;
this._finishFlushFlag = finishFlush;
this._nextFlush = -1;
this._info = opts && opts.info;
this._defaultFullFlushFlag = fullFlush;
this.once('end', this.close);
this._info = opts && opts.info;
}
inherits(Zlib, Transform);

Object.defineProperty(Zlib.prototype, '_closed', {
inherits(ZlibBase, Transform);

Object.defineProperty(ZlibBase.prototype, '_closed', {
configurable: true,
enumerable: true,
get() {
Expand All @@ -332,7 +272,7 @@ Object.defineProperty(Zlib.prototype, '_closed', {
// perspective, but it is inconsistent with all other streams exposed by Node.js
// that have this concept, where it stands for the number of bytes read
// *from* the stream (that is, net.Socket/tls.Socket & file system streams).
Object.defineProperty(Zlib.prototype, 'bytesRead', {
Object.defineProperty(ZlibBase.prototype, 'bytesRead', {
configurable: true,
enumerable: true,
get: deprecate(function() {
Expand All @@ -345,41 +285,15 @@ Object.defineProperty(Zlib.prototype, 'bytesRead', {
'This feature will be removed in the future.', 'DEP0108')
});

// This callback is used by `.params()` to wait until a full flush happened
// before adjusting the parameters. In particular, the call to the native
// `params()` function should not happen while a write is currently in progress
// on the threadpool.
function paramsAfterFlushCallback(level, strategy, callback) {
assert(this._handle, 'zlib binding closed');
this._handle.params(level, strategy);
if (!this._hadError) {
this._level = level;
this._strategy = strategy;
if (callback) callback();
}
}

Zlib.prototype.params = function params(level, strategy, callback) {
checkRangesOrGetDefault(level, 'level', Z_MIN_LEVEL, Z_MAX_LEVEL);
checkRangesOrGetDefault(strategy, 'strategy', Z_DEFAULT_STRATEGY, Z_FIXED);

if (this._level !== level || this._strategy !== strategy) {
this.flush(Z_SYNC_FLUSH,
paramsAfterFlushCallback.bind(this, level, strategy, callback));
} else {
process.nextTick(callback);
}
};

Zlib.prototype.reset = function reset() {
ZlibBase.prototype.reset = function() {
if (!this._handle)
assert(false, 'zlib binding closed');
return this._handle.reset();
};

// This is the _flush function called by the transform class,
// internally, when the last chunk has been written.
Zlib.prototype._flush = function _flush(callback) {
ZlibBase.prototype._flush = function(callback) {
this._transform(Buffer.alloc(0), '', callback);
};

Expand All @@ -400,12 +314,12 @@ function maxFlush(a, b) {
}

const flushBuffer = Buffer.alloc(0);
Zlib.prototype.flush = function flush(kind, callback) {
ZlibBase.prototype.flush = function(kind, callback) {
var ws = this._writableState;

if (typeof kind === 'function' || (kind === undefined && !callback)) {
callback = kind;
kind = Z_FULL_FLUSH;
kind = this._defaultFullFlushFlag;
}

if (ws.ended) {
Expand All @@ -424,17 +338,17 @@ Zlib.prototype.flush = function flush(kind, callback) {
}
};

Zlib.prototype.close = function close(callback) {
ZlibBase.prototype.close = function(callback) {
_close(this, callback);
this.destroy();
};

Zlib.prototype._destroy = function _destroy(err, callback) {
ZlibBase.prototype._destroy = function(err, callback) {
_close(this);
callback(err);
};

Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
ZlibBase.prototype._transform = function(chunk, encoding, cb) {
var flushFlag = this._defaultFlushFlag;
// We use a 'fake' zero-length chunk to carry information about flushes from
// the public API to the actual stream implementation.
Expand All @@ -451,7 +365,7 @@ Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
processChunk(this, chunk, flushFlag, cb);
};

Zlib.prototype._processChunk = function _processChunk(chunk, flushFlag, cb) {
ZlibBase.prototype._processChunk = function(chunk, flushFlag, cb) {
// _processChunk() is left for backwards compatibility
if (typeof cb === 'function')
processChunk(this, chunk, flushFlag, cb);
Expand Down Expand Up @@ -641,6 +555,109 @@ function _close(engine, callback) {
engine._handle = null;
}

const zlibDefaultOpts = {
flush: Z_NO_FLUSH,
finishFlush: Z_FINISH,
fullFlush: Z_FULL_FLUSH
};
// Base class for all streams actually backed by zlib and using zlib-specific
// parameters.
function Zlib(opts, mode) {
var windowBits = Z_DEFAULT_WINDOWBITS;
var level = Z_DEFAULT_COMPRESSION;
var memLevel = Z_DEFAULT_MEMLEVEL;
var strategy = Z_DEFAULT_STRATEGY;
var dictionary;

if (opts) {
// windowBits is special. On the compression side, 0 is an invalid value.
// But on the decompression side, a value of 0 for windowBits tells zlib
// to use the window size in the zlib header of the compressed stream.
if ((opts.windowBits == null || opts.windowBits === 0) &&
(mode === INFLATE ||
mode === GUNZIP ||
mode === UNZIP)) {
windowBits = 0;
} else {
windowBits = checkRangesOrGetDefault(
opts.windowBits, 'options.windowBits',
Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, Z_DEFAULT_WINDOWBITS);
}

level = checkRangesOrGetDefault(
opts.level, 'options.level',
Z_MIN_LEVEL, Z_MAX_LEVEL, Z_DEFAULT_COMPRESSION);

memLevel = checkRangesOrGetDefault(
opts.memLevel, 'options.memLevel',
Z_MIN_MEMLEVEL, Z_MAX_MEMLEVEL, Z_DEFAULT_MEMLEVEL);

strategy = checkRangesOrGetDefault(
opts.strategy, 'options.strategy',
Z_DEFAULT_STRATEGY, Z_FIXED, Z_DEFAULT_STRATEGY);

dictionary = opts.dictionary;
if (dictionary !== undefined && !isArrayBufferView(dictionary)) {
if (isAnyArrayBuffer(dictionary)) {
dictionary = Buffer.from(dictionary);
} else {
throw new ERR_INVALID_ARG_TYPE(
'options.dictionary',
['Buffer', 'TypedArray', 'DataView', 'ArrayBuffer'],
dictionary
);
}
}
}

const handle = new binding.Zlib(mode);
// Ideally, we could let ZlibBase() set up _writeState. I haven't been able
// to come up with a good solution that doesn't break our internal API,
// and with it all supported npm versions at the time of writing.
this._writeState = new Uint32Array(2);
if (!handle.init(windowBits,
level,
memLevel,
strategy,
this._writeState,
processCallback,
dictionary)) {
throw new ERR_ZLIB_INITIALIZATION_FAILED();
}

ZlibBase.call(this, opts, mode, handle, zlibDefaultOpts);

this._level = level;
this._strategy = strategy;
}
inherits(Zlib, ZlibBase);

// This callback is used by `.params()` to wait until a full flush happened
// before adjusting the parameters. In particular, the call to the native
// `params()` function should not happen while a write is currently in progress
// on the threadpool.
function paramsAfterFlushCallback(level, strategy, callback) {
assert(this._handle, 'zlib binding closed');
this._handle.params(level, strategy);
if (!this._hadError) {
this._level = level;
this._strategy = strategy;
if (callback) callback();
}
}

Zlib.prototype.params = function params(level, strategy, callback) {
checkRangesOrGetDefault(level, 'level', Z_MIN_LEVEL, Z_MAX_LEVEL);
checkRangesOrGetDefault(strategy, 'strategy', Z_DEFAULT_STRATEGY, Z_FIXED);

if (this._level !== level || this._strategy !== strategy) {
this.flush(Z_SYNC_FLUSH,
paramsAfterFlushCallback.bind(this, level, strategy, callback));
} else {
process.nextTick(callback);
}
};

// generic zlib
// minimal 2-byte header
function Deflate(opts) {
Expand Down