diff --git a/examples/typescript/watch/watch-example.ts b/examples/typescript/watch/watch-example.ts index 3a4723085e..a02f16c4be 100644 --- a/examples/typescript/watch/watch-example.ts +++ b/examples/typescript/watch/watch-example.ts @@ -3,6 +3,7 @@ import * as k8s from '@kubernetes/client-node'; const kc = new k8s.KubeConfig(); kc.loadFromDefault(); +let request: k8s.RequestResult = null; const watch = new k8s.Watch(kc); watch.watch('/api/v1/namespaces', // optional query parameters can go here. @@ -34,8 +35,12 @@ watch.watch('/api/v1/namespaces', (err) => { // tslint:disable-next-line:no-console console.log(err); + if (request != null) { + request!.destroy(); + } }) .then((req) => { + request = req; // watch returns a request object which you can use to abort the watch. - setTimeout(() => { req.abort(); }, 10 * 1000); + setTimeout(() => { req.abort(); req.destroy(); }, 10 * 1000); }); diff --git a/src/cache.ts b/src/cache.ts index 07765e3e9a..c0a84682d5 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -1,6 +1,6 @@ import { ADD, DELETE, ERROR, Informer, ListPromise, ObjectCallback, UPDATE } from './informer'; import { KubernetesObject } from './types'; -import { Watch } from './watch'; +import { RequestResult, Watch } from './watch'; export interface ObjectCache { get(name: string, namespace?: string): T | undefined; @@ -12,6 +12,8 @@ export class ListWatch implements ObjectCache, In private resourceVersion: string; private readonly indexCache: { [key: string]: T[] } = {}; private readonly callbackCache: { [key: string]: Array> } = {}; + private request: RequestResult | null; + private stopped: boolean; public constructor( private readonly path: string, @@ -23,7 +25,9 @@ export class ListWatch implements ObjectCache, In this.callbackCache[UPDATE] = []; this.callbackCache[DELETE] = []; this.callbackCache[ERROR] = []; + this.request = null; this.resourceVersion = ''; + this.stopped = false; if (autoStart) { this.doneHandler(null); } @@ -73,8 +77,17 @@ export class ListWatch implements ObjectCache, In } private async doneHandler(err: any) { + if (this.request != null) { + this.request.destroy(); + this.request = null; + } if (err) { - this.callbackCache[ERROR].forEach((elt: ObjectCallback) => elt(err)); + // On an error, the done handler is called twice with the same error, see details in + // watch.ts + if (!this.stopped) { + this.stopped = true; + this.callbackCache[ERROR].forEach((elt: ObjectCallback) => elt(err)); + } return; } // TODO: Don't always list here for efficiency @@ -85,7 +98,7 @@ export class ListWatch implements ObjectCache, In const list = result.body; deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice()); this.addOrUpdateItems(list.items); - await this.watch.watch( + this.request = await this.watch.watch( this.path, { resourceVersion: list.metadata!.resourceVersion }, this.watchHandler.bind(this), diff --git a/src/watch.ts b/src/watch.ts index 9593c056d1..4fd542870e 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -1,5 +1,6 @@ import byline = require('byline'); import request = require('request'); +import { Duplex } from 'stream'; import { KubeConfig } from './config'; export interface WatchUpdate { @@ -7,12 +8,17 @@ export interface WatchUpdate { object: object; } +export interface RequestResult { + pipe(stream: Duplex); + destroy(); +} + export interface RequestInterface { - webRequest(opts: request.Options, callback: (err, response, body) => void): any; + webRequest(opts: request.Options, callback: (err, response, body) => void): RequestResult; } export class DefaultRequest implements RequestInterface { - public webRequest(opts: request.Options, callback: (err, response, body) => void): any { + public webRequest(opts: request.Options, callback: (err, response, body) => void): RequestResult { return request(opts, callback); } } @@ -53,6 +59,7 @@ export class Watch { uri: url, useQuerystring: true, json: true, + pool: false, }; await this.config.applyToRequest(requestOptions); @@ -70,6 +77,10 @@ export class Watch { errOut = err; done(err); }); + // TODO: I don't love this, because both 'error' and 'close' call the done handler with the same error + // We should probably only do one or the other, but there's challenges because of async delivery and it's + // important to know if the close event is occurring because of an error. So for now, this needs to be + // handled in the client. stream.on('close', () => done(errOut)); const req = this.requestImpl.webRequest(requestOptions, (error, response, body) => { diff --git a/src/watch_test.ts b/src/watch_test.ts index 53361ca2a5..a17cc4e8b2 100644 --- a/src/watch_test.ts +++ b/src/watch_test.ts @@ -47,6 +47,7 @@ describe('Watch', () => { const fakeRequest = { pipe: (stream) => {}, + destroy: () => {}, }; when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest); @@ -112,6 +113,7 @@ describe('Watch', () => { stream.write(JSON.stringify(obj1) + '\n'); stream.write(JSON.stringify(obj2) + '\n'); }, + destroy: () => {}, }; when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest); @@ -180,6 +182,7 @@ describe('Watch', () => { stream.emit('error', errIn); stream.emit('close'); }, + destroy: () => {}, }; when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest); @@ -240,6 +243,7 @@ describe('Watch', () => { stream.write(JSON.stringify(obj1) + '\n'); stream.emit('close'); }, + destroy: () => {}, }; when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest); @@ -298,6 +302,7 @@ describe('Watch', () => { stream.write(JSON.stringify(obj) + '\n'); stream.write('{"truncated json\n'); }, + destroy: () => {}, }; when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest);