Skip to content

[WIP] Add abort support #437

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,7 @@
"url-search-params": "^0.10.0",
"whatwg-url": "^5.0.0"
},
"dependencies": {}
"dependencies": {
"abort-controller": "^1.0.1"
}
}
24 changes: 17 additions & 7 deletions src/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,27 @@ export default function Body(body, {
this[INTERNALS] = {
body,
disturbed: false,
error: null
error: null,
rejectCurrentPromise: undefined
};
this.size = size;
this.timeout = timeout;

if (body instanceof Stream) {
// handle stream error, such as incorrect content-encoding
body.on('error', err => {
this[INTERNALS].error = new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err);
let error;
if (err instanceof FetchError) {
error = err;
} else {
error = new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err);
}
const { rejectCurrentPromise } = this[INTERNALS];
if (typeof rejectCurrentPromise === 'function') {
rejectCurrentPromise(error);
} else {
this[INTERNALS].error = error;
}
});
}
}
Expand Down Expand Up @@ -231,10 +244,7 @@ function consumeBody() {
}, this.timeout);
}

// handle stream error, such as incorrect content-encoding
this.body.on('error', err => {
reject(new FetchError(`Invalid response body while trying to fetch ${this.url}: ${err.message}`, 'system', err));
});
this[INTERNALS].rejectCurrentPromise = reject;

this.body.on('data', chunk => {
if (abort || chunk === null) {
Expand All @@ -251,7 +261,7 @@ function consumeBody() {
accum.push(chunk);
});

this.body.on('end', () => {
this.body.once('end', () => {
if (abort) {
return;
}
Expand Down
34 changes: 29 additions & 5 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import Body, { writeToStream, getTotalBytes } from './body';
import Response from './response';
import Headers, { createHeadersLenient } from './headers';
import Request, { getNodeRequestOptions } from './request';
import Request, { getAbortSignal, getNodeRequestOptions } from './request';
import FetchError from './fetch-error';

const http = require('http');
Expand Down Expand Up @@ -39,17 +39,35 @@ export default function fetch(url, opts) {
return new fetch.Promise((resolve, reject) => {
// build request object
const request = new Request(url, opts);
const signal = getAbortSignal(request);
if (signal.aborted) {
reject(new FetchError(`Fetch to ${request.url} has been aborted`, 'aborted'));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any way this could be more inline with the browser spec? rejecting with an AbortError? when using with isomorphic fetch this would result in having to catch two types of errors for node and browser

https://dom.spec.whatwg.org/#aborting-ongoing-activities

return;
}

const options = getNodeRequestOptions(request);

const send = (options.protocol === 'https:' ? https : http).request;

// send request
const req = send(options);
let reqTimeout;
let body;

function abortCallback() {
const error = new FetchError(`Fetch to ${request.url} has been aborted`, 'aborted');
reject(error);
if (body !== undefined) {
body.emit('error', error);
}
finalize();
}
signal.addEventListener('abort', abortCallback);

function finalize() {
req.abort();
clearTimeout(reqTimeout);
signal.removeEventListener('abort', abortCallback);
}

if (request.timeout) {
Expand All @@ -61,12 +79,17 @@ export default function fetch(url, opts) {
});
}

req.on('error', err => {
function errorHandler(err) {
reject(new FetchError(`request to ${request.url} failed, reason: ${err.message}`, 'system', err));
if (body !== undefined) {
body.emit('error', err);
}
finalize();
});
}

req.on('error', errorHandler);

req.on('response', res => {
req.once('response', res => {
clearTimeout(reqTimeout);

const headers = createHeadersLenient(res.headers);
Expand Down Expand Up @@ -138,7 +161,8 @@ export default function fetch(url, opts) {
}

// prepare response
let body = res.pipe(new PassThrough());
res.on('error', errorHandler);
body = res.pipe(new PassThrough());
const response_options = {
url: request.url,
status: res.statusCode,
Expand Down
46 changes: 45 additions & 1 deletion src/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import Headers, { exportNodeCompatibleHeaders } from './headers.js';
import Body, { clone, extractContentType, getTotalBytes } from './body';

const { format: format_url, parse: parse_url } = require('url');
const { AbortController, AbortSignal } = require('abort-controller');

const INTERNALS = Symbol('Request internals');

Expand All @@ -27,6 +28,14 @@ function isRequest(input) {
);
}

// TODO: use toString check after https://github.com/mysticatea/abort-controller/pull/5
function isAbortSignal(input) {
return (
typeof input === 'object' &&
input instanceof AbortSignal
);
}

/**
* Request class
*
Expand All @@ -37,6 +46,7 @@ function isRequest(input) {
export default class Request {
constructor(input, init = {}) {
let parsedURL;
let signal;

// normalize input
if (!isRequest(input)) {
Expand All @@ -52,6 +62,14 @@ export default class Request {
input = {};
} else {
parsedURL = parse_url(input.url);
signal = input[INTERNALS].signal;
}

if (init.signal != null) {
if (!isAbortSignal(init.signal)) {
throw new TypeError('Provided signal must be an AbortSignal object');
}
signal = init.signal;
}

let method = init.method || input.method || 'GET';
Expand Down Expand Up @@ -82,11 +100,23 @@ export default class Request {
}
}

const abortController = new AbortController();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could someone explain why we are initializing another internal AbortController?

Copy link
Collaborator Author

@TimothyGu TimothyGu Jun 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per spec, the AbortSignal we expose through signal must be a sanitized object, and not be the signal passed in. To create a new AbortSignal we have to create another AbortController.

if (signal !== undefined) {
if (signal.aborted) {
abortController.abort();
} else {
signal.addEventListener('abort', () => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This eventListener is never removed, is it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it could be of any help maybe adding {once: true} to addEventListener could help cleaning up things

Copy link
Collaborator Author

@TimothyGu TimothyGu Apr 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mika-fischer No, and it's very difficult to do so. We could add a { once: true } though.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But won't this cause issues, for instance in the case where I have a long running Node.js application where I have a global AbortController (to be able to abort all requests when the application shuts down) and each request in the whole lifetime of the app (let's say hundreds of thousands) adds a listener, none of which is ever removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😔 yes. It’s also one of the reasons why this is a WIP.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just save a cleanup closure (which just removes the listener from the parent AbortController) and attach this closure to the appropriate events of the Node.js Request/Response so that it gets called when the request is finished?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cleanup closure would remove the listener from the parent AbortController after fetch() terminates for any reason, right? Would that affect the ability to cancel a fetch that reuses a Request object for multiple fetches? Does that maybe mean that the listener should be added each time the fetch begins and removed when the fetch terminates?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.once() seems like it would only help if the request terminated by being aborted, but I might be missing something.

abortController.abort();
});
}
}

this[INTERNALS] = {
method,
redirect: init.redirect || input.redirect || 'follow',
headers,
parsedURL
parsedURL,
signal: abortController.signal
};

// node-fetch-only options
Expand Down Expand Up @@ -116,6 +146,10 @@ export default class Request {
return this[INTERNALS].redirect;
}

get signal() {
return this[INTERNALS].signal;
}

/**
* Clone this request
*
Expand Down Expand Up @@ -143,6 +177,16 @@ Object.defineProperties(Request.prototype, {
clone: { enumerable: true }
});

/**
* Get the AbortSignal object belonging to a Request.
*
* @param Request A Request instance
* @return AbortSignal request's signal
*/
export function getAbortSignal(request) {
return request[INTERNALS].signal;
}

/**
* Convert a Request to Node.js http request options.
*
Expand Down