Skip to content

Commit 75b67e5

Browse files
committed
Improve closing behavior for cache/informer.
1 parent d95e847 commit 75b67e5

File tree

4 files changed

+27
-5
lines changed

4 files changed

+27
-5
lines changed

examples/typescript/watch/watch-example.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as k8s from '@kubernetes/client-node';
33
const kc = new k8s.KubeConfig();
44
kc.loadFromDefault();
55

6+
let request: k8s.RequestResult = null;
67
const watch = new k8s.Watch(kc);
78
watch.watch('/api/v1/namespaces',
89
// optional query parameters can go here.
@@ -34,8 +35,12 @@ watch.watch('/api/v1/namespaces',
3435
(err) => {
3536
// tslint:disable-next-line:no-console
3637
console.log(err);
38+
if (request != null) {
39+
request!.destroy();
40+
}
3741
})
3842
.then((req) => {
43+
request = req;
3944
// watch returns a request object which you can use to abort the watch.
40-
setTimeout(() => { req.abort(); }, 10 * 1000);
45+
setTimeout(() => { req.abort(); req.destroy(); }, 10 * 1000);
4146
});

src/cache.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ADD, DELETE, ERROR, Informer, ListPromise, ObjectCallback, UPDATE } from './informer';
22
import { KubernetesObject } from './types';
3-
import { Watch } from './watch';
3+
import { RequestResult, Watch } from './watch';
44

55
export interface ObjectCache<T> {
66
get(name: string, namespace?: string): T | undefined;
@@ -12,6 +12,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
1212
private resourceVersion: string;
1313
private readonly indexCache: { [key: string]: T[] } = {};
1414
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};
15+
private request: RequestResult | null;
1516

1617
public constructor(
1718
private readonly path: string,
@@ -23,6 +24,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
2324
this.callbackCache[UPDATE] = [];
2425
this.callbackCache[DELETE] = [];
2526
this.callbackCache[ERROR] = [];
27+
this.request = null;
2628
this.resourceVersion = '';
2729
if (autoStart) {
2830
this.doneHandler(null);
@@ -73,6 +75,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
7375
}
7476

7577
private async doneHandler(err: any) {
78+
if (this.request != null) {
79+
this.request.destroy();
80+
this.request = null;
81+
}
7682
if (err) {
7783
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
7884
return;
@@ -85,7 +91,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
8591
const list = result.body;
8692
deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
8793
this.addOrUpdateItems(list.items);
88-
await this.watch.watch(
94+
this.request = await this.watch.watch(
8995
this.path,
9096
{ resourceVersion: list.metadata!.resourceVersion },
9197
this.watchHandler.bind(this),

src/watch.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
import byline = require('byline');
22
import request = require('request');
3+
import { Duplex } from 'stream';
34
import { KubeConfig } from './config';
45

56
export interface WatchUpdate {
67
type: string;
78
object: object;
89
}
910

11+
export interface RequestResult {
12+
pipe(stream: Duplex);
13+
destroy();
14+
}
15+
1016
export interface RequestInterface {
11-
webRequest(opts: request.Options, callback: (err, response, body) => void): any;
17+
webRequest(opts: request.Options, callback: (err, response, body) => void): RequestResult;
1218
}
1319

1420
export class DefaultRequest implements RequestInterface {
15-
public webRequest(opts: request.Options, callback: (err, response, body) => void): any {
21+
public webRequest(opts: request.Options, callback: (err, response, body) => void): RequestResult {
1622
return request(opts, callback);
1723
}
1824
}

src/watch_test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ describe('Watch', () => {
4747

4848
const fakeRequest = {
4949
pipe: (stream) => {},
50+
destroy: () => {},
5051
};
5152

5253
when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest);
@@ -112,6 +113,7 @@ describe('Watch', () => {
112113
stream.write(JSON.stringify(obj1) + '\n');
113114
stream.write(JSON.stringify(obj2) + '\n');
114115
},
116+
destroy: () => {},
115117
};
116118

117119
when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest);
@@ -180,6 +182,7 @@ describe('Watch', () => {
180182
stream.emit('error', errIn);
181183
stream.emit('close');
182184
},
185+
destroy: () => {},
183186
};
184187

185188
when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest);
@@ -240,6 +243,7 @@ describe('Watch', () => {
240243
stream.write(JSON.stringify(obj1) + '\n');
241244
stream.emit('close');
242245
},
246+
destroy: () => {},
243247
};
244248

245249
when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest);
@@ -298,6 +302,7 @@ describe('Watch', () => {
298302
stream.write(JSON.stringify(obj) + '\n');
299303
stream.write('{"truncated json\n');
300304
},
305+
destroy: () => {},
301306
};
302307

303308
when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest);

0 commit comments

Comments
 (0)