Skip to content

Fix receiveTimeout behaviour #903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/bolt-connection/src/bolt/response-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export default class ResponseHandler {
this._transformMetadata = transformMetadata || NO_OP_IDENTITY
this._observer = Object.assign(
{
onPendingObserversChange: NO_OP,
onError: NO_OP,
onFailure: NO_OP,
onErrorApplyTransformation: NO_OP_IDENTITY
Expand Down Expand Up @@ -156,6 +157,7 @@ export default class ResponseHandler {
*/
_updateCurrentObserver () {
this._currentObserver = this._pendingObservers.shift()
this._observer.onPendingObserversChange(this._pendingObservers.length)
}

_queueObserver (observer) {
Expand All @@ -168,6 +170,7 @@ export default class ResponseHandler {
} else {
this._pendingObservers.push(observer)
}
this._observer.onPendingObserversChange(this._pendingObservers.length)
return true
}

Expand Down
12 changes: 12 additions & 0 deletions packages/bolt-connection/src/channel/browser/browser-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ export default class WebSocketChannel {
*/
setupReceiveTimeout (receiveTimeout) {}

/**
* Stops the receive timeout for the channel.
*/
stopReceiveTimeout() {
}

/**
* Start the receive timeout for the channel.
*/
startReceiveTimeout () {
}

/**
* Set connection timeout on the given WebSocket, if configured.
* @return {number} the timeout id or null.
Expand Down
24 changes: 23 additions & 1 deletion packages/bolt-connection/src/channel/node/node-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ export default class NodeChannel {
this
)
this._connectionErrorCode = config.connectionErrorCode
this._receiveTimeout = null
this._receiveTimeoutStarted = false

this._conn = connect(
config,
Expand Down Expand Up @@ -353,7 +355,27 @@ export default class NodeChannel {
)
})

this._conn.setTimeout(receiveTimeout)
this._receiveTimeout = receiveTimeout
}

/**
* Stops the receive timeout for the channel.
*/
stopReceiveTimeout() {
if (this._receiveTimeout !== null && this._receiveTimeoutStarted) {
this._receiveTimeoutStarted = false
this._conn.setTimeout(0)
}
}

/**
* Start the receive timeout for the channel.
*/
startReceiveTimeout () {
if (this._receiveTimeout !== null && !this._receiveTimeoutStarted) {
this._receiveTimeoutStarted = true
this._conn.setTimeout(this._receiveTimeout)
}
}

/**
Expand Down
13 changes: 13 additions & 0 deletions packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export function createChannelConnection (
server: conn.server,
log: conn.logger,
observer: {
onPendingObserversChange: conn._handleOngoingRequestsNumberChange.bind(conn),
onError: conn._handleFatalError.bind(conn),
onFailure: conn._resetOnFailure.bind(conn),
onProtocolError: conn._handleProtocolError.bind(conn),
Expand Down Expand Up @@ -350,6 +351,18 @@ export default class ChannelConnection extends Connection {
return !this._isBroken && this._ch._open
}

/**
* Starts and stops the receive timeout timer.
* @param {number} requestsNumber Ongoing requests number
*/
_handleOngoingRequestsNumberChange(requestsNumber) {
if (requestsNumber === 0) {
this._ch.stopReceiveTimeout()
} else {
this._ch.startReceiveTimeout()
}
}

/**
* Call close on the channel.
* @returns {Promise<void>} - A promise that will be resolved when the underlying channel is closed.
Expand Down
120 changes: 118 additions & 2 deletions packages/bolt-connection/test/channel/node/node-channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ describe('NodeChannel', () => {
})

describe('.setupReceiveTimeout()', () => {
it('should call socket.setTimeout(receiveTimeout)', () => {
it('should not call socket.setTimeout(receiveTimeout)', () => {
const receiveTimeout = 42
const channel = createMockedChannel(true)

channel.setupReceiveTimeout(receiveTimeout)

expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])
expect(channel._conn.getCalls().setTimeout.length).toEqual(1)
})

it('should unsubscribe to the on connect and on timeout created on the create socket', () => {
Expand Down Expand Up @@ -108,6 +108,122 @@ describe('NodeChannel', () => {
expect(channel._conn.getCalls().off).toEqual([])
})
})

describe('.startReceiveTimeout()', () => {
describe('receive timeout is setup', () => {
it('should call socket.setTimeout(receiveTimeout) when called first', () => {
const { receiveTimeout, channel } = setup()

channel.startReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(2)
expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])
})

it ('should not call socket.setTimeout(receiveTimeout) if stream already started', () => {
const { receiveTimeout, channel } = setup()

// setup
channel.startReceiveTimeout()
expect(channel._conn.getCalls().setTimeout.length).toEqual(2)
expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])

// start again
channel.startReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(2)
expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])
})

it ('should call socket.setTimeout(receiveTimeout) when after stop', () => {
const { receiveTimeout, channel } = setup()

// setup
channel.startReceiveTimeout()
expect(channel._conn.getCalls().setTimeout.length).toEqual(2)
expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])
channel.stopReceiveTimeout()
expect(channel._conn.getCalls().setTimeout.length).toEqual(3)
expect(channel._conn.getCalls().setTimeout[2]).toEqual([0])

// start again
channel.startReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(4)
expect(channel._conn.getCalls().setTimeout[3]).toEqual([receiveTimeout])
})

function setup () {
const channel = createMockedChannel(true)
const receiveTimeout = 42
channel.setupReceiveTimeout(receiveTimeout)
return {channel, receiveTimeout}
}
})

describe('receive timemout is not setup', () => {
it ('should call not socket.setTimeout(receiveTimeout) when not started', () => {
const channel = createMockedChannel(true)

// start again
channel.startReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(1)
})
})
})

describe('.stopReceiveTimeout()', () => {
describe('when receive timeout is setup', () => {
it ('should not call socket.setTimeout(0) when not started', () => {
const { channel } = setup()

channel.stopReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(1)
})

it ('should call socket.setTimeout(0) when already started', () => {
const { channel } = setup()

channel.startReceiveTimeout()

channel.stopReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(3)
expect(channel._conn.getCalls().setTimeout[2]).toEqual([0])
})

it ('should not call socket.setTimeout(0) when already stopped', () => {
const { channel } = setup()

channel.startReceiveTimeout()
channel.stopReceiveTimeout()

channel.stopReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(3)
})

function setup () {
const channel = createMockedChannel(true)
const receiveTimeout = 42
channel.setupReceiveTimeout(receiveTimeout)
return {channel, receiveTimeout}
}
})

describe('when receive timeout is not setup', () => {
it ('should not call socket.setTimeout(0)', () => {
const channel = createMockedChannel(true)

channel.startReceiveTimeout()
channel.stopReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(1)
})
})
})
})

function createMockedChannel (connected, config = {}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe('ChannelConnection', () => {
[{ hints: { 'connection.recv_timeout_seconds': 0n } }],
[{ hints: { 'connection.recv_timeout_seconds': int(0) } }]
])(
'should call not call this._ch.setupReceiveTimeout() when onComplete metadata is %o',
'should not call this._ch.setupReceiveTimeout() when onComplete metadata is %o',
async metadata => {
const channel = {
setupReceiveTimeout: jest.fn().mockName('setupReceiveTimeout')
Expand Down Expand Up @@ -286,6 +286,60 @@ describe('ChannelConnection', () => {
})
})

describe('.__handleOngoingRequestsNumberChange()', () => {
it('should call channel.stopReceiveTimeout when requets number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined })

connection._handleOngoingRequestsNumberChange(0)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
})

it('should not call channel.startReceiveTimeout when requets number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined })

connection._handleOngoingRequestsNumberChange(0)

expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
})

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should call channel.startReceiveTimeout when requets number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined })

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(1)
})

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should not call channel.stopReceiveTimeout when requets number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined })

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
})
})

function spyOnConnectionChannel ({
channel,
errorHandler,
Expand Down
4 changes: 4 additions & 0 deletions packages/neo4j-driver/test/internal/dummy-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export default class DummyChannel {
this.written.push(buf)
}

stopReceiveTimeout () {}

startReceiveTimeout () {}

toHex () {
let out = ''
for (let i = 0; i < this.written.length; i++) {
Expand Down