Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 87 additions & 28 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const util = require('util')
const os = require('os')
const parseUrl = require('url').parse
const zlib = require('zlib')
const Writable = require('readable-stream').Writable
const {Writable, PassThrough} = require('readable-stream')
const pump = require('pump')
const eos = require('end-of-stream')
const streamToBuffer = require('fast-stream-to-buffer')
Expand Down Expand Up @@ -54,6 +54,21 @@ function Client (opts) {
Writable.call(this, opts)

const errorproxy = (err) => {
this._errors++

const retryIndex = this._errors === 0 ? 0 : this._errors - 1
const jitter = Math.random() * 0.2 + 0.9 // ±10% jitter
const backoff = (Math.min(retryIndex, 6) ** 2) * 1000 * jitter

if (backoff > 0) {
this._chopper.resetTimer(-1) // disable timer to prepare for back-off mode
this._backoffTimer = setTimeout(() => {
this._backoffTimer = null
this._chopper.resetTimer(this._chopperTime)
if (this._backoffCallback) this._backoffCallback()
}, backoff)
}

if (this.destroyed === false) this.emit('request-error', err)
}

Expand All @@ -64,6 +79,11 @@ function Client (opts) {
this._corkTimer = null
this._received = 0 // number of events given to the client for reporting
this.sent = 0 // number of events written to the socket
this._errors = 0 // number of requests that resulted in an error (dropped connection, non-2xx etc)
this._chopperSize = opts.size // needed to set highWatermark on buffer if we enter back-off mode
this._chopperTime = opts.time // needed to restore the normal time if we enter back-off mode
this._backoffTimer = null
this._backoffCallback = null
this._active = false
this._onflushed = null
this._transport = require(opts.serverUrl.protocol.slice(0, -1)) // 'http:' => 'http'
Expand Down Expand Up @@ -246,36 +266,12 @@ function onStream (opts, client, onerror) {
const requestOpts = getRequestOptions(opts, client._agent)

return function (stream, next) {
const onerrorproxy = (err) => {
stream.removeListener('error', onerrorproxy)
req.removeListener('error', onerrorproxy)
destroyStream(stream)
onerror(err)
}

let buffer, req
client._active = true

const req = client._transport.request(requestOpts, onResult(onerror))

// Mointor streams for errors so that we can make sure to destory the
// output stream as soon as that occurs
stream.on('error', onerrorproxy)
req.on('error', onerrorproxy)

req.on('socket', function (socket) {
// Sockets will automatically be unreffed by the HTTP agent when they are
// not in use by an HTTP request, but as we're keeping the HTTP request
// open, we need to unref the socket manually
socket.unref()
})

if (Number.isFinite(serverTimeout)) {
req.setTimeout(serverTimeout, function () {
req.abort()
})
}

pump(stream, req, function () {
pump(stream, requestProxy(requestOpts, onResult(client, onerror)), function () {
// This function is technically called with an error, but because we
// manually attach error listeners on all the streams in the pipeline
// above, we can safely ignore it.
Expand Down Expand Up @@ -316,10 +312,70 @@ function onStream (opts, client, onerror) {

// All requests to the APM Server must start with a metadata object
stream.write(client._encode({metadata: getMetadata(opts)}, Client.encoding.METADATA))

// Under normal opperation, just make a request and return it. If
// instructed to back off, make a temporary buffer to hold data until the
// request can be made
function requestProxy (opts, onresponse) {
if (client._backoffTimer) {
buffer = new PassThrough({highWaterMark: client._chopperSize * 2}) // twice as large to allow overflow
buffer.on('error', onerrorproxy)

eos(stream, function () {
client._backoffCallback = null
if (client._backoffTimer) {
// drop all data - back-off still in effect
buffer.destroy()
}
})

client._backoffCallback = function () {
client._backoffCallback = null
req = makeRequest(opts, onresponse)
buffer.pipe(req)
}

return buffer
} else {
return makeRequest(opts, onresponse)
}
}

function makeRequest (opts, onresponse) {
const req = client._transport.request(opts, onresponse)

req.on('error', onerrorproxy)

req.on('socket', function (socket) {
// Sockets will automatically be unreffed by the HTTP agent when they are
// not in use by an HTTP request, but as we're keeping the HTTP request
// open, we need to unref the socket manually
socket.unref()
})

if (Number.isFinite(serverTimeout)) {
req.setTimeout(serverTimeout, function () {
req.abort()
})
}

return req
}

// This function is attached to the error event of the different streams so
// that we can make sure to destroy the output stream as soon as an error
// occurs
function onerrorproxy (err) {
stream.removeListener('error', onerrorproxy)
if (buffer) buffer.removeListener('error', onerrorproxy)
if (req) req.removeListener('error', onerrorproxy)
destroyStream(stream)
onerror(err)
}
}
}

function onResult (onerror) {
function onResult (client, onerror) {
return streamToBuffer.onStream(function (err, buf, res) {
if (err) return onerror(err)
if (res.statusCode < 200 || res.statusCode > 299) {
Expand All @@ -345,6 +401,9 @@ function onResult (onerror) {
}

onerror(err)
} else {
client._errors = 0
client._chopper.resetTimer(client._chopperTime)
}
})
}
Expand Down
193 changes: 193 additions & 0 deletions test/backoff.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
'use strict'

const crypto = require('crypto')
const test = require('tape')
const utils = require('./lib/utils')

const APMServer = utils.APMServer
const processReq = utils.processReq
const assertReq = utils.assertReq
const assertMetadata = utils.assertMetadata
const assertEvent = utils.assertEvent

const testCases = [
{
name: 'only one error',
expectedData: [
assertMetadata,
assertEvent({span: {req: 2}}),
assertMetadata,
assertEvent({span: {req: 3}}),
assertMetadata,
assertEvent({span: {req: 4}})
],
requests: [
[0, true], // no back-off in effect, request fails
[0, false], // back-off in effect, request succeeds
[0, false], // no back-off in effect, request succeeds
[0, false] // no back-off in effect, request succeeds
]
},
{
name: 'only two errors',
expectedData: [
assertMetadata,
assertEvent({span: {req: 3}}),
assertMetadata,
assertEvent({span: {req: 4}}),
assertMetadata,
assertEvent({span: {req: 5}})
],
requests: [
[0, true], // no back-off in effect, request fails
[0, true], // back-off in effect, request fails
[1, false], // back-off in effect, request succeeds
[0, false], // no back-off in effect, request succeeds
[0, false] // no back-off in effect, request succeeds
]
},
{
name: 'top out at full back-off',
expectedData: [
assertMetadata,
assertEvent({span: {req: 9}}),
assertMetadata,
assertEvent({span: {req: 10}}),
assertMetadata,
assertEvent({span: {req: 11}})
],
requests: [
[0, true], // no back-off in effect, request fails
[0, true], // back-off in effect, request fails
[1, true], // back-off in effect, request fails
[4, true], // back-off in effect, request fails
[9, true], // back-off in effect, request fails
[16, true], // back-off in effect, request fails
[25, true], // back-off in effect, request fails
[36, true], // back-off in effect, request fails
[36, false], // back-off in effect, request succeeds
[0, false], // no back-off in effect, request succeeds
[0, false] // no back-off in effect, request succeeds
]
}
]

testCases.forEach(function ({name, expectedData, requests}) {
test('backoff delays - ' + name, function (t) {
let reqNo = 0
let start, client

const server = APMServer(function (req, res) {
const diff = Date.now() - start
const backoffTime = requests[reqNo - 1][0] * 1000
t.ok(diff > backoffTime && diff < backoffTime + 200, `should delay request between ${backoffTime} and ${backoffTime + 200}ms (was delayed ${diff}ms)`)

if (requests[reqNo - 1][1] === true) {
res.writeHead(500)
res.end()
} else {
assertReq(t, req)
req = processReq(req)
req.on('data', function (obj) {
expectedData.shift()(t, obj)
})
req.on('end', function () {
res.end()
if (reqNo < 4) {
setTimeout(makeReq, 10)
} else {
t.equal(expectedData.length, 0, 'should have seen all expected data')
server.close()
t.end()
}
})
}
}).client({time: 1000}, function (_client) {
client = _client
let emittedErrors = 0

client.on('error', function (err) {
emittedErrors++
if (requests[reqNo - 1][1] === true) {
t.equal(err.message, 'Unexpected response code from APM Server: 500', 'client should emit error')
t.equal(client._errors, emittedErrors, 'client error count should have been incremented to ' + emittedErrors)
makeReq()
} else {
t.error(err)
}
})

makeReq()
})

function makeReq () {
client.sendSpan({req: ++reqNo})
start = Date.now()
client.flush()
}
})
})

test('backoff - dropping data', function (t) {
let start, timer
let reqNo = 0
const backoffTimes = [0, 0, 1, 0]

const server = APMServer(function (req, res) {
const diff = Date.now() - start
const backoffTime = backoffTimes.shift() * 1000
t.ok(diff > backoffTime && diff < backoffTime + 200, `should delay request between ${backoffTime} and ${backoffTime + 200}ms (was delayed ${diff}ms)`)

req = processReq(req)
req.on('data', function (obj) {
if ('metadata' in obj) return
t.equal(obj.span.req, reqNo, 'event belongs to expected request no ' + reqNo)
t.equal(obj.span.ok, true, 'expected the event to get sent')
})
req.on('end', function () {
if (reqNo <= 2) {
res.writeHead(500)
res.end()
} else {
clearTimeout(timer)
res.end()
server.close()
t.end()
}
})
}).client({size: 256, time: 500}, function (client) {
client.on('error', function (err) {
if (reqNo === 1) {
t.equal(err.message, 'Unexpected response code from APM Server: 500', 'client should emit error')
t.equal(client._errors, 1, 'client error count should have been incremented to 1')

client.sendSpan({req: ++reqNo, ok: true, filler: crypto.randomBytes(32).toString('hex')})
start = Date.now()
client.flush()
} else if (reqNo === 2) {
t.equal(err.message, 'Unexpected response code from APM Server: 500', 'client should emit error')
t.equal(client._errors, 2, 'client error count should have been incremented to 2')

reqNo++
start = Date.now()

// these will be dropped because they are too big to be cached before the backoff
client.sendSpan({req: reqNo, ok: false, filler: crypto.randomBytes(32).toString('hex')}) // will not overflow
client.sendSpan({req: reqNo, ok: false, filler: crypto.randomBytes(32).toString('hex')}) // will trigger overflow

// this will be the first to get through after the backoff
client.sendSpan({req: reqNo, ok: true, filler: crypto.randomBytes(32).toString('hex')})

timer = setTimeout(function () {
t.fail('took too long')
}, 2000)
} else {
t.error(err)
}
})

client.sendSpan({req: ++reqNo, ok: true, filler: crypto.randomBytes(32).toString('hex')})
start = Date.now()
client.flush()
})
})
4 changes: 2 additions & 2 deletions test/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ function assertReq (t, req) {
assertReq.asserts = 7

function assertMetadata (t, obj) {
t.deepEqual(Object.keys(obj), ['metadata'])
t.deepEqual(Object.keys(obj), ['metadata'], 'should receive metadata')
const metadata = obj.metadata
t.deepEqual(Object.keys(metadata), ['service', 'process', 'system'])
const service = metadata.service
Expand All @@ -92,7 +92,7 @@ function assertMetadata (t, obj) {
t.ok(Array.isArray(_process.argv), 'process.title should be an array')
t.ok(_process.argv.length >= 2, 'process.title should contain at least two elements')
t.ok(/\/node$/.test(_process.argv[0]), `process.argv[0] should match /\\/node$/ (was: ${_process.argv[0]})`)
const regex = /(\/test\/(test|truncate|lib\/unref-client)\.js|node_modules\/\.bin\/tape)$/
const regex = /(\/test\/(test|backoff|truncate|lib\/unref-client)\.js|node_modules\/\.bin\/tape)$/
t.ok(regex.test(_process.argv[1]), `process.argv[1] should match ${regex} (was: ${_process.argv[1]})"`)
const system = metadata.system
t.ok(typeof system.hostname, 'string')
Expand Down