Skip to content

Fixed timeout when acquiring connection from the pool #330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions src/v1/internal/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* limitations under the License.
*/

import {promiseOrTimeout} from './util';
import PoolConfig from './pool-config';
import {newError} from '../error';

class Pool {
/**
Expand Down Expand Up @@ -65,20 +65,17 @@ class Pool {
allRequests[key] = [];
}

let request;
return new Promise((resolve, reject) => {
let request;

return promiseOrTimeout(
this._acquisitionTimeout,
new Promise(
(resolve, reject) => {
request = new PendingRequest(resolve);
const timeoutId = setTimeout(() => {
allRequests[key] = allRequests[key].filter(item => item !== request);
reject(newError(`Connection acquisition timed out in ${this._acquisitionTimeout} ms.`));
}, this._acquisitionTimeout);

allRequests[key].push(request);
}
), () => {
allRequests[key] = allRequests[key].filter(item => item !== request);
}
);
request = new PendingRequest(resolve, timeoutId);
allRequests[key].push(request);
});
}

/**
Expand Down Expand Up @@ -208,11 +205,13 @@ function resourceReleased(key, activeResourceCounts) {

class PendingRequest {

constructor(resolve) {
constructor(resolve, timeoutId) {
this._resolve = resolve;
this._timeoutId = timeoutId;
}

resolve(resource) {
clearTimeout(this._timeoutId);
this._resolve(resource);
}

Expand Down
33 changes: 0 additions & 33 deletions src/v1/internal/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
* limitations under the License.
*/

import {newError} from '../error';

const ENCRYPTION_ON = "ENCRYPTION_ON";
const ENCRYPTION_OFF = "ENCRYPTION_OFF";

Expand Down Expand Up @@ -64,42 +62,11 @@ function isString(str) {
return Object.prototype.toString.call(str) === '[object String]';
}

function promiseOrTimeout(timeout, otherPromise, onTimeout) {
let resultPromise = null;

const timeoutPromise = new Promise((resolve, reject) => {
const id = setTimeout(() => {
if (onTimeout && typeof onTimeout === 'function') {
onTimeout();
}

reject(newError(`Operation timed out in ${timeout} ms.`));
}, timeout);

// this "executor" function is executed immediately, even before the Promise constructor returns
// thus it's safe to initialize resultPromise variable here, where timeout id variable is accessible
resultPromise = otherPromise.then(result => {
clearTimeout(id);
return result;
}).catch(error => {
clearTimeout(id);
throw error;
});
});

if (resultPromise == null) {
throw new Error('Result promise not initialized');
}

return Promise.race([resultPromise, timeoutPromise]);
}

export {
isEmptyObjectOrNull,
isString,
assertString,
assertCypherStatement,
promiseOrTimeout,
ENCRYPTION_ON,
ENCRYPTION_OFF
}
59 changes: 56 additions & 3 deletions test/internal/pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,10 @@ describe('Pool', () => {
done();
});

setTimeout(() => r1.close(), 1000);
setTimeout(() => {
expectNumberOfAcquisitionRequests(pool, key, 1);
r1.close();
}, 1000);
});
});

Expand All @@ -445,7 +448,7 @@ describe('Pool', () => {

pool.acquire(key).catch(error => {
expect(error.message).toContain('timed out');

expectNumberOfAcquisitionRequests(pool, key, 0);
done();
});
});
Expand All @@ -472,14 +475,64 @@ describe('Pool', () => {

pool.acquire(key).then(r2 => {
expect(r2.id).toEqual(2);

expectNoPendingAcquisitionRequests(pool);
done();
});
});
});

it('should work fine when resources released together with acquisition timeout', done => {
const acquisitionTimeout = 1000;
let counter = 0;

const key = 'bolt://localhost:7687';
const pool = new Pool(
(url, release) => new Resource(url, counter++, release),
resource => {
},
() => true,
new PoolConfig(2, acquisitionTimeout)
);

pool.acquire(key).then(resource1 => {
expect(resource1.id).toEqual(0);

pool.acquire(key).then(resource2 => {
expect(resource2.id).toEqual(1);

// try to release both resources around the time acquisition fails with timeout
// double-release used to cause deletion of acquire requests in the pool and failure of the timeout
// such background failure made this test fail, not the existing assertions
setTimeout(() => {
resource1.close();
resource2.close();
}, acquisitionTimeout);

pool.acquire(key).then(someResource => {
expect(someResource).toBeDefined();
expect(someResource).not.toBeNull();
expectNoPendingAcquisitionRequests(pool);
done(); // ok, promise got resolved before the timeout
}).catch(error => {
expect(error).toBeDefined();
expect(error).not.toBeNull();
expectNoPendingAcquisitionRequests(pool);
done(); // also ok, timeout fired before promise got resolved
});
});
});
});

});

function expectNoPendingAcquisitionRequests(pool) {
expect(pool._acquireRequests).toEqual({});
}

function expectNumberOfAcquisitionRequests(pool, key, expectedNumber) {
expect(pool._acquireRequests[key].length).toEqual(expectedNumber);
}

class Resource {

constructor(key, id, release) {
Expand Down
65 changes: 0 additions & 65 deletions test/internal/util.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,71 +74,6 @@ describe('util', () => {
verifyInvalidCypherStatement(console.log);
});

it('should time out', () => {
expect(() => util.promiseOrTimeout(500, new Promise(), null)).toThrow();
});

it('should not time out', done => {
util.promiseOrTimeout(500, Promise.resolve(0), null).then((result) => {
expect(result).toEqual(0);
done();
})
});

it('should call clear action when timed out', done => {
let marker = 0;

let clearAction = () => {
marker = 1;
};

util.promiseOrTimeout(500, new Promise((resolve, reject) => { }), clearAction).catch((error) => {
expect(marker).toEqual(1);
done();
});
});

it('should not trigger both promise and timeout', done => {
const timeout = 500;

let timeoutFired = false;
let result = null;
let error = null;

const resultPromise = util.promiseOrTimeout(
timeout,
new Promise(resolve => {
setTimeout(() => {
resolve(42);
}, timeout);
}),
() => {
timeoutFired = true;
}
);

resultPromise.then(r => {
result = r;
}).catch(e => {
error = e;
});

setTimeout(() => {
if (timeoutFired) {
// timeout fired - result should not be set, error should be set
expect(result).toBeNull();
expect(error).not.toBeNull();
expect(error.message).toEqual(`Operation timed out in ${timeout} ms.`);
done();
} else {
// timeout did not fire - result should be set, error should not be set
expect(result).toEqual(42);
expect(error).toBeNull();
done();
}
}, timeout * 2);
});

function verifyValidString(str) {
expect(util.assertString(str, 'Test string')).toBe(str);
}
Expand Down