@@ -260,27 +260,11 @@ private async Task<ErrorMap> GetErrorMap(IConnection connection, IRequestSpan sp
260
260
Span = childSpan
261
261
} ;
262
262
263
- using var ctp = CancellationTokenPairSource . FromTimeout ( _context . ClusterOptions . KvTimeout , cancellationToken ) ;
264
- try
265
- {
266
- await ExecuteOp ( connection , errorMapOp , ctp . TokenPair ) . ConfigureAwait ( false ) ;
267
-
268
- return new ErrorMap ( errorMapOp . GetValue ( ) ) ;
269
- }
270
- catch ( OperationCanceledException ex )
271
- {
272
- //Check to see if it was because of a "hung" socket which causes the token to timeout
273
- if ( ctp . IsInternalCancellation )
274
- {
275
- ThrowHelper . ThrowTimeoutException ( errorMapOp , ex , _redactor ) ;
276
- }
277
-
278
- throw ;
279
- }
280
- finally
281
- {
282
- errorMapOp . StopRecording ( ) ;
283
- }
263
+ return await ExecuteInternalOperationAsync ( connection , errorMapOp ,
264
+ ExecuteOp ,
265
+ static ( _ , op ) => new ErrorMap ( op . GetValue ( ) ) ,
266
+ cancellationToken )
267
+ . ConfigureAwait ( false ) ;
284
268
}
285
269
286
270
public async Task HelloHello ( )
@@ -364,27 +348,11 @@ private async Task<ServerFeatures[]> Hello(IConnection connection, IRequestSpan
364
348
Span = childSpan ,
365
349
} ;
366
350
367
- using var ctp = CancellationTokenPairSource . FromTimeout ( _context . ClusterOptions . KvTimeout , cancellationToken ) ;
368
- try
369
- {
370
- await ExecuteOp ( connection , heloOp , ctp . TokenPair ) . ConfigureAwait ( false ) ;
371
-
372
- return heloOp . GetValue ( ) ;
373
- }
374
- catch ( OperationCanceledException ex )
375
- {
376
- //Check to see if it was because of a "hung" socket which causes the token to timeout
377
- if ( ctp . IsInternalCancellation )
378
- {
379
- ThrowHelper . ThrowTimeoutException ( heloOp , ex , _redactor ) ;
380
- }
381
-
382
- throw ;
383
- }
384
- finally
385
- {
386
- heloOp . StopRecording ( ) ;
387
- }
351
+ return await ExecuteInternalOperationAsync ( connection , heloOp ,
352
+ ExecuteOp ,
353
+ static ( _ , op ) => op . GetValue ( ) ,
354
+ cancellationToken )
355
+ . ConfigureAwait ( false ) ;
388
356
}
389
357
390
358
public async Task < Manifest > GetManifest ( )
@@ -398,27 +366,13 @@ public async Task<Manifest> GetManifest()
398
366
Span = rootSpan ,
399
367
} ;
400
368
401
- using var ctp = CancellationTokenPairSource . FromTimeout ( _context . ClusterOptions . KvTimeout ) ;
402
- try
403
- {
404
- await ExecuteOp ( ConnectionPool , manifestOp , ctp . TokenPair ) . ConfigureAwait ( false ) ;
369
+ await ExecuteInternalOperationAsync ( ConnectionPool , manifestOp ,
370
+ ExecuteOp ,
371
+ static ( _ , op ) => op . GetValue ( ) ,
372
+ default ( CancellationToken ) )
373
+ . ConfigureAwait ( false ) ;
405
374
406
- return manifestOp . GetValue ( ) ;
407
- }
408
- catch ( OperationCanceledException ex )
409
- {
410
- //Check to see if it was because of a "hung" socket which causes the token to timeout
411
- if ( ctp . IsInternalCancellation )
412
- {
413
- ThrowHelper . ThrowTimeoutException ( manifestOp , ex , _redactor ) ;
414
- }
415
-
416
- throw ;
417
- }
418
- finally
419
- {
420
- manifestOp . StopRecording ( ) ;
421
- }
375
+ return manifestOp . GetValue ( ) ;
422
376
}
423
377
424
378
public async Task SelectBucketAsync ( string bucketName , CancellationToken cancellationToken = default )
@@ -453,46 +407,34 @@ public async Task<BucketConfig> GetClusterMap(ConfigVersion? latestVersionOnClie
453
407
configOp . Revision = latestVersionOnClient . Value . Revision ;
454
408
}
455
409
456
- using var ctp = CancellationTokenPairSource . FromTimeout ( _context . ClusterOptions . KvTimeout ,
457
- cancellationToken ) ;
458
- try
459
- {
460
- // Get config map operations are high priority and should not be queued, so use TrySendImmediatelyAsync
461
- var status = await ExecuteOpImmediatelyAsync ( ConnectionPool , configOp , ctp . TokenPair )
462
- . ConfigureAwait ( false ) ;
463
- if ( status == ResponseStatus . KeyNotFound )
410
+ var config = await ExecuteInternalOperationAsync ( ConnectionPool , configOp ,
411
+ ExecuteOpImmediatelyAsync ,
412
+ static ( status , op ) =>
464
413
{
465
- //Throw here as this will trigger bootstrapping via HTTP because CCCP not supported
466
- throw status . CreateException ( configOp , string . Empty ) ;
467
- }
414
+ if ( status == ResponseStatus . KeyNotFound )
415
+ {
416
+ //Throw here as this will trigger bootstrapping via HTTP because CCCP not supported
417
+ throw status . CreateException ( op , string . Empty ) ;
418
+ }
468
419
469
- //Return back the config and swap any $HOST placeholders
470
- var config = configOp . GetValue ( ) ;
420
+ //Return back the config and swap any $HOST placeholders
421
+ var config = op . GetValue ( ) ;
471
422
472
- // Propagate any exception that occurred during parsing, this prevents NREs later if config is null
473
- configOp . Exception ? . Throw ( ) ;
423
+ // Propagate any exception that occurred during parsing, this prevents NREs later if config is null
424
+ op . Exception ? . Throw ( ) ;
474
425
475
- if ( config is not null )
476
- {
477
- config . ReplacePlaceholderWithBootstrapHost ( EndPoint . Host ) ;
478
- config . NetworkResolution = _context . ClusterOptions . EffectiveNetworkResolution ;
479
- }
426
+ return config ;
427
+ } ,
428
+ cancellationToken )
429
+ . ConfigureAwait ( false ) ;
480
430
481
- return config ;
482
- }
483
- catch ( OperationCanceledException ex )
484
- {
485
- //Check to see if it was because of a "hung" socket which causes the token to timeout
486
- if ( ctp . IsInternalCancellation )
487
- {
488
- ThrowHelper . ThrowTimeoutException ( configOp , ex , _redactor ) ;
489
- }
490
- throw ;
491
- }
492
- finally
431
+ if ( config is not null )
493
432
{
494
- configOp . StopRecording ( ) ;
433
+ config . ReplacePlaceholderWithBootstrapHost ( EndPoint . Host ) ;
434
+ config . NetworkResolution = _context . ClusterOptions . EffectiveNetworkResolution ;
495
435
}
436
+
437
+ return config ;
496
438
}
497
439
498
440
private void BuildServiceUris ( )
@@ -591,6 +533,49 @@ private async Task<ResponseStatus> SendAsyncWithCircuitBreaker(IOperation op, Ca
591
533
}
592
534
}
593
535
536
+ /// <summary>
537
+ /// Executes an operation internal to the ClusterNode, such as a Hello or GetManifest operation.
538
+ /// Handles timeouts and metrics tracking, forwarding the actual execution to the provided executor.
539
+ /// The provided projector may process the result and throw exceptions if necessary.
540
+ /// </summary>
541
+ /// <typeparam name="TConnection">Type of connection.</typeparam>
542
+ /// <typeparam name="TOperation">Type of operation.</typeparam>
543
+ /// <typeparam name="TResult">Type of result.</typeparam>
544
+ /// <param name="connection">The <see cref="IConnectionPool"/> or <see cref="IConnection"/> to use.</param>
545
+ /// <param name="operation">The operation to execute.</param>
546
+ /// <param name="executor">One of the ExecuteOp or ExecuteOpImmediatelyAsync delegates.</param>
547
+ /// <param name="projector">Callback to perform projections on the result.</param>
548
+ /// <param name="cancellationToken">Cancellation token.</param>
549
+ /// <returns>The result returned by the <paramref name="projector" />.</returns>
550
+ private async Task < TResult > ExecuteInternalOperationAsync < TConnection , TOperation , TResult > (
551
+ TConnection connection ,
552
+ TOperation operation ,
553
+ Func < TConnection , TOperation , CancellationTokenPair , Task < ResponseStatus > > executor ,
554
+ Func < ResponseStatus , TOperation , TResult > projector ,
555
+ CancellationToken cancellationToken )
556
+ where TOperation : class , IOperation
557
+ {
558
+ using var ctp = CancellationTokenPairSource . FromTimeout (
559
+ _context . ClusterOptions . KvTimeout , cancellationToken ) ;
560
+ try
561
+ {
562
+ var status = await executor ( connection , operation , ctp . TokenPair )
563
+ . ConfigureAwait ( false ) ;
564
+
565
+ return projector ( status , operation ) ;
566
+ }
567
+ catch ( OperationCanceledException ex ) when ( ctp . IsInternalCancellation )
568
+ {
569
+ //Check to see if it was because of a "hung" socket which causes the token to timeout
570
+ ThrowHelper . ThrowTimeoutException ( operation , ex , _redactor ) ;
571
+ return default ; // unreachable
572
+ }
573
+ finally
574
+ {
575
+ operation . StopRecording ( ) ;
576
+ }
577
+ }
578
+
594
579
public Task < ResponseStatus > ExecuteOp ( IConnection connection , IOperation op , CancellationTokenPair tokenPair = default )
595
580
{
596
581
// op and connection come back via lambda parameters to prevent an extra closure heap allocation
@@ -857,28 +842,19 @@ async Task IConnectionInitializer.SelectBucketAsync(IConnection connection, stri
857
842
Span = rootSpan ,
858
843
} ;
859
844
860
- using var ctp = CancellationTokenPairSource . FromTimeout ( _context . ClusterOptions . KvTimeout , cancellationToken ) ;
861
- try
862
- {
863
- var status = await ExecuteOp ( connection , selectBucketOp , ctp . TokenPair ) . ConfigureAwait ( false ) ;
864
- if ( status != ResponseStatus . Success )
845
+ await ExecuteInternalOperationAsync ( connection , selectBucketOp ,
846
+ ExecuteOp ,
847
+ static ( status , op ) =>
865
848
{
866
- throw status . CreateException ( selectBucketOp , bucketName ) ;
867
- }
868
- }
869
- catch ( OperationCanceledException ex )
870
- {
871
- //Check to see if it was because of a "hung" socket which causes the token to timeout
872
- if ( ctp . IsInternalCancellation )
873
- {
874
- ThrowHelper . ThrowTimeoutException ( selectBucketOp , ex , _redactor ) ;
875
- }
876
- throw ;
877
- }
878
- finally
879
- {
880
- selectBucketOp . StopRecording ( ) ;
881
- }
849
+ if ( status != ResponseStatus . Success )
850
+ {
851
+ throw status . CreateException ( op , op . Key ) ;
852
+ }
853
+
854
+ return ( object ) null ; // We don't need the return value
855
+ } ,
856
+ cancellationToken )
857
+ . ConfigureAwait ( false ) ;
882
858
}
883
859
catch ( DocumentNotFoundException )
884
860
{
0 commit comments