@@ -21,7 +21,6 @@ import ResultSummary from './result-summary'
21
21
import Record from './record'
22
22
import { Query } from './types'
23
23
import { observer , util , connectionHolder } from './internal'
24
- import { CompletedObserver , FailedObserver } from './internal/observers'
25
24
26
25
const { EMPTY_CONNECTION_HOLDER } = connectionHolder
27
26
@@ -130,12 +129,14 @@ class Result implements Promise<QueryResult> {
130
129
*/
131
130
keys ( ) : Promise < string [ ] > {
132
131
return new Promise ( ( resolve , reject ) => {
133
- this . _streamObserverPromise . then ( observer =>
134
- observer . subscribe ( {
135
- onKeys : keys => resolve ( keys ) ,
136
- onError : err => reject ( err )
137
- } )
138
- )
132
+ this . _streamObserverPromise
133
+ . then ( observer =>
134
+ observer . subscribe ( {
135
+ onKeys : keys => resolve ( keys ) ,
136
+ onError : err => reject ( err )
137
+ } )
138
+ )
139
+ . catch ( reject )
139
140
} )
140
141
}
141
142
@@ -150,13 +151,16 @@ class Result implements Promise<QueryResult> {
150
151
*/
151
152
summary ( ) : Promise < ResultSummary > {
152
153
return new Promise ( ( resolve , reject ) => {
153
- this . _streamObserverPromise . then ( o => {
154
- o . cancel ( )
155
- o . subscribe ( {
156
- onCompleted : metadata => resolve ( metadata ) ,
157
- onError : err => reject ( err )
154
+ this . _streamObserverPromise
155
+ . then ( o => {
156
+ o . cancel ( )
157
+ o . subscribe ( {
158
+ onCompleted : metadata =>
159
+ this . _createSummary ( metadata ) . then ( resolve , reject ) ,
160
+ onError : err => reject ( err )
161
+ } )
158
162
} )
159
- } )
163
+ . catch ( reject )
160
164
} )
161
165
}
162
166
@@ -247,43 +251,8 @@ class Result implements Promise<QueryResult> {
247
251
subscribe ( observer : ResultObserver ) : void {
248
252
const onCompletedOriginal = observer . onCompleted || DEFAULT_ON_COMPLETED
249
253
const onCompletedWrapper = ( metadata : any ) => {
250
- const connectionHolder = this . _connectionHolder
251
- const {
252
- validatedQuery : query ,
253
- params : parameters
254
- } = util . validateQueryAndParameters ( this . _query , this . _parameters , {
255
- skipAsserts : true
256
- } )
257
-
258
- function complete ( protocolVersion ?: number ) {
259
- onCompletedOriginal . call (
260
- observer ,
261
- new ResultSummary ( query , parameters , metadata , protocolVersion )
262
- )
263
- }
264
-
265
- function release ( ) {
266
- // notify connection holder that the used connection is not needed any more because result has
267
- // been fully consumed; call the original onCompleted callback after that
268
- return connectionHolder . releaseConnection ( )
269
- }
270
-
271
- connectionHolder . getConnection ( ) . then (
272
- // onFulfilled:
273
- connection => {
274
- release ( ) . then ( ( ) =>
275
- complete (
276
- connection !== undefined
277
- ? connection . protocol ( ) . version
278
- : undefined
279
- )
280
- )
281
- } ,
282
-
283
- // onRejected:
284
- _ => {
285
- complete ( )
286
- }
254
+ this . _createSummary ( metadata ) . then ( summary =>
255
+ onCompletedOriginal . call ( observer , summary )
287
256
)
288
257
}
289
258
@@ -300,9 +269,11 @@ class Result implements Promise<QueryResult> {
300
269
}
301
270
observer . onError = onErrorWrapper
302
271
303
- this . _streamObserverPromise . then ( o => {
304
- return o . subscribe ( observer )
305
- } )
272
+ this . _streamObserverPromise
273
+ . then ( o => {
274
+ return o . subscribe ( observer )
275
+ } )
276
+ . catch ( error => observer . onError ! ( error ) )
306
277
}
307
278
308
279
/**
@@ -315,6 +286,34 @@ class Result implements Promise<QueryResult> {
315
286
_cancel ( ) : void {
316
287
this . _streamObserverPromise . then ( o => o . cancel ( ) )
317
288
}
289
+
290
+ private _createSummary ( metadata : any ) : Promise < ResultSummary > {
291
+ const {
292
+ validatedQuery : query ,
293
+ params : parameters
294
+ } = util . validateQueryAndParameters ( this . _query , this . _parameters , {
295
+ skipAsserts : true
296
+ } )
297
+ const connectionHolder = this . _connectionHolder
298
+
299
+ return connectionHolder
300
+ . getConnection ( )
301
+ . then (
302
+ // onFulfilled:
303
+ connection =>
304
+ connectionHolder
305
+ . releaseConnection ( )
306
+ . then ( ( ) =>
307
+ connection ? connection . protocol ( ) . version : undefined
308
+ ) ,
309
+ // onRejected:
310
+ _ => undefined
311
+ )
312
+ . then (
313
+ protocolVersion =>
314
+ new ResultSummary ( query , parameters , metadata , protocolVersion )
315
+ )
316
+ }
318
317
}
319
318
320
319
function captureStacktrace ( ) : string | null {
0 commit comments