@@ -24,6 +24,7 @@ import {alloc} from './buf';
24
24
import { Node , Path , PathSegment , Relationship , UnboundRelationship } from '../graph-types' ;
25
25
import { newError } from './../error' ;
26
26
import ChannelConfig from './ch-config' ;
27
+ import StreamObserver from './stream-observer' ;
27
28
28
29
let Channel ;
29
30
if ( NodeChannel . available ) {
@@ -272,7 +273,7 @@ class Connection {
272
273
* failing, and the connection getting ejected from the session pool.
273
274
*
274
275
* @param err an error object, forwarded to all current and future subscribers
275
- * @private
276
+ * @protected
276
277
*/
277
278
_handleFatalError ( err ) {
278
279
this . _isBroken = true ;
@@ -289,6 +290,12 @@ class Connection {
289
290
}
290
291
291
292
_handleMessage ( msg ) {
293
+ if ( this . _isBroken ) {
294
+ // ignore all incoming messages when this connection is broken. all previously pending observers failed
295
+ // with the fatal error. all future observers will fail with same fatal error.
296
+ return ;
297
+ }
298
+
292
299
const payload = msg . fields [ 0 ] ;
293
300
294
301
switch ( msg . signature ) {
@@ -301,7 +308,7 @@ class Connection {
301
308
try {
302
309
this . _currentObserver . onCompleted ( payload ) ;
303
310
} finally {
304
- this . _currentObserver = this . _pendingObservers . shift ( ) ;
311
+ this . _updateCurrentObserver ( ) ;
305
312
}
306
313
break ;
307
314
case FAILURE :
@@ -310,7 +317,7 @@ class Connection {
310
317
this . _currentFailure = newError ( payload . message , payload . code ) ;
311
318
this . _currentObserver . onError ( this . _currentFailure ) ;
312
319
} finally {
313
- this . _currentObserver = this . _pendingObservers . shift ( ) ;
320
+ this . _updateCurrentObserver ( ) ;
314
321
// Things are now broken. Pending observers will get FAILURE messages routed until
315
322
// We are done handling this failure.
316
323
if ( ! this . _isHandlingFailure ) {
@@ -340,7 +347,7 @@ class Connection {
340
347
else if ( this . _currentObserver . onError )
341
348
this . _currentObserver . onError ( payload ) ;
342
349
} finally {
343
- this . _currentObserver = this . _pendingObservers . shift ( ) ;
350
+ this . _updateCurrentObserver ( ) ;
344
351
}
345
352
break ;
346
353
default :
@@ -351,7 +358,8 @@ class Connection {
351
358
/** Queue an INIT-message to be sent to the database */
352
359
initialize ( clientName , token , observer ) {
353
360
log ( "C" , "INIT" , clientName , token ) ;
354
- this . _queueObserver ( observer ) ;
361
+ const initObserver = new InitObserver ( this , observer ) ;
362
+ this . _queueObserver ( initObserver ) ;
355
363
this . _packer . packStruct ( INIT , [ this . _packable ( clientName ) , this . _packable ( token ) ] ,
356
364
( err ) => this . _handleFatalError ( err ) ) ;
357
365
this . _chunker . messageBoundary ( ) ;
@@ -437,6 +445,14 @@ class Connection {
437
445
}
438
446
}
439
447
448
+ /**
449
+ * Pop next pending observer form the list of observers and make it current observer.
450
+ * @protected
451
+ */
452
+ _updateCurrentObserver ( ) {
453
+ this . _currentObserver = this . _pendingObservers . shift ( ) ;
454
+ }
455
+
440
456
/**
441
457
* Synchronize - flush all queued outgoing messages and route their responses
442
458
* to their respective handlers.
@@ -489,6 +505,42 @@ function connect(url, config = {}, connectionErrorCode = null) {
489
505
return new Connection ( new Ch ( channelConfig ) , completeUrl ) ;
490
506
}
491
507
508
+ /**
509
+ * Observer that wraps user-defined observer for INIT message and handles initialization failures. Connection is
510
+ * closed by the server if processing of INIT message fails so this observer will handle initialization failure
511
+ * as a fatal error.
512
+ */
513
+ class InitObserver extends StreamObserver {
514
+
515
+ /**
516
+ * @constructor
517
+ * @param {Connection } connection the connection used to send INIT message.
518
+ * @param {StreamObserver } originalObserver the observer to wrap and delegate calls to.
519
+ */
520
+ constructor ( connection , originalObserver ) {
521
+ super ( ) ;
522
+ this . _connection = connection ;
523
+ this . _originalObserver = originalObserver || NO_OP_OBSERVER ;
524
+ }
525
+
526
+ onNext ( record ) {
527
+ this . _originalObserver . onNext ( record ) ;
528
+ }
529
+
530
+ onError ( error ) {
531
+ this . _connection . _updateCurrentObserver ( ) ; // make sure this same observer will not be called again
532
+ try {
533
+ this . _originalObserver . onError ( error ) ;
534
+ } finally {
535
+ this . _connection . _handleFatalError ( error ) ;
536
+ }
537
+ }
538
+
539
+ onCompleted ( metaData ) {
540
+ this . _originalObserver . onCompleted ( metaData ) ;
541
+ }
542
+ }
543
+
492
544
export {
493
545
connect ,
494
546
parseScheme ,
0 commit comments