18
18
19
19
import com .mongodb .MongoClientException ;
20
20
import com .mongodb .MongoIncompatibleDriverException ;
21
+ import com .mongodb .MongoInterruptedException ;
21
22
import com .mongodb .MongoTimeoutException ;
22
23
import com .mongodb .ServerAddress ;
23
24
import com .mongodb .connection .ClusterDescription ;
35
36
import com .mongodb .internal .diagnostics .logging .Logger ;
36
37
import com .mongodb .internal .diagnostics .logging .Loggers ;
37
38
import com .mongodb .internal .selector .LatencyMinimizingServerSelector ;
39
+ import com .mongodb .internal .time .StartTime ;
40
+ import com .mongodb .internal .time .Timeout ;
38
41
import com .mongodb .lang .Nullable ;
39
42
import com .mongodb .selector .CompositeServerSelector ;
40
43
import com .mongodb .selector .ServerSelector ;
59
62
import static com .mongodb .internal .VisibleForTesting .AccessModifier .PRIVATE ;
60
63
import static com .mongodb .internal .connection .EventHelper .wouldDescriptionsGenerateEquivalentEvents ;
61
64
import static com .mongodb .internal .event .EventListenerHelper .singleClusterListener ;
62
- import static com .mongodb .internal .thread .InterruptionUtil .interruptAndCreateMongoInterruptedException ;
63
65
import static java .lang .String .format ;
64
66
import static java .util .Arrays .asList ;
65
67
import static java .util .Comparator .comparingInt ;
@@ -102,47 +104,31 @@ public ClusterClock getClock() {
102
104
public ServerTuple selectServer (final ServerSelector serverSelector , final OperationContext operationContext ) {
103
105
isTrue ("open" , !isClosed ());
104
106
105
- try {
106
- CountDownLatch currentPhase = phase .get ();
107
- ClusterDescription curDescription = description ;
108
- ServerSelector compositeServerSelector = getCompositeServerSelector (serverSelector );
109
- ServerTuple serverTuple = selectServer (compositeServerSelector , curDescription );
110
-
111
- boolean selectionFailureLogged = false ;
112
-
113
- long startTimeNanos = System .nanoTime ();
114
- long curTimeNanos = startTimeNanos ;
115
- long maxWaitTimeNanos = getMaxWaitTimeNanos ();
116
-
117
- while (true ) {
118
- throwIfIncompatible (curDescription );
107
+ ServerSelector compositeServerSelector = getCompositeServerSelector (serverSelector );
108
+ boolean selectionFailureLogged = false ;
109
+ StartTime startTime = StartTime .now ();
110
+ Timeout timeout = startServerSelectionTimeout (startTime );
119
111
120
- if (serverTuple != null ) {
121
- return serverTuple ;
122
- }
123
-
124
- if (curTimeNanos - startTimeNanos > maxWaitTimeNanos ) {
125
- throw createTimeoutException (serverSelector , curDescription );
126
- }
112
+ while (true ) {
113
+ CountDownLatch currentPhaseLatch = phase .get ();
114
+ ClusterDescription currentDescription = description ;
115
+ ServerTuple serverTuple = selectServer (compositeServerSelector , currentDescription );
127
116
128
- if (!selectionFailureLogged ) {
129
- logServerSelectionFailure (serverSelector , curDescription );
130
- selectionFailureLogged = true ;
131
- }
132
-
133
- connect ();
134
-
135
- currentPhase .await (Math .min (maxWaitTimeNanos - (curTimeNanos - startTimeNanos ), getMinWaitTimeNanos ()), NANOSECONDS );
136
-
137
- curTimeNanos = System .nanoTime ();
138
-
139
- currentPhase = phase .get ();
140
- curDescription = description ;
141
- serverTuple = selectServer (compositeServerSelector , curDescription );
117
+ throwIfIncompatible (currentDescription );
118
+ if (serverTuple != null ) {
119
+ return serverTuple ;
142
120
}
143
-
144
- } catch (InterruptedException e ) {
145
- throw interruptAndCreateMongoInterruptedException (format ("Interrupted while waiting for a server that matches %s" , serverSelector ), e );
121
+ if (timeout .hasExpired ()) {
122
+ throw createTimeoutException (serverSelector , currentDescription , startTime );
123
+ }
124
+ if (!selectionFailureLogged ) {
125
+ logServerSelectionFailure (serverSelector , currentDescription , timeout );
126
+ selectionFailureLogged = true ;
127
+ }
128
+ connect ();
129
+ Timeout heartbeatLimitedTimeout = timeout .orEarlier (startMinWaitHeartbeatTimeout ());
130
+ heartbeatLimitedTimeout .awaitOn (currentPhaseLatch ,
131
+ () -> format ("waiting for a server that matches %s" , serverSelector ));
146
132
}
147
133
}
148
134
@@ -154,8 +140,10 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati
154
140
if (LOGGER .isTraceEnabled ()) {
155
141
LOGGER .trace (format ("Asynchronously selecting server with selector %s" , serverSelector ));
156
142
}
157
- ServerSelectionRequest request = new ServerSelectionRequest (serverSelector , getCompositeServerSelector (serverSelector ),
158
- getMaxWaitTimeNanos (), callback );
143
+ StartTime startTime = StartTime .now ();
144
+ Timeout timeout = startServerSelectionTimeout (startTime );
145
+ ServerSelectionRequest request = new ServerSelectionRequest (
146
+ serverSelector , getCompositeServerSelector (serverSelector ), timeout , startTime , callback );
159
147
160
148
CountDownLatch currentPhase = phase .get ();
161
149
ClusterDescription currentDescription = description ;
@@ -169,49 +157,41 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati
169
157
public ClusterDescription getDescription () {
170
158
isTrue ("open" , !isClosed ());
171
159
172
- try {
173
- CountDownLatch currentPhase = phase .get ();
174
- ClusterDescription curDescription = description ;
175
-
176
- boolean selectionFailureLogged = false ;
160
+ boolean selectionFailureLogged = false ;
177
161
178
- long startTimeNanos = System .nanoTime ();
179
- long curTimeNanos = startTimeNanos ;
180
- long maxWaitTimeNanos = getMaxWaitTimeNanos ();
162
+ StartTime startTime = StartTime .now ();
163
+ Timeout timeout = startServerSelectionTimeout (startTime );
164
+ while (true ) {
165
+ CountDownLatch currentPhaseLatch = phase .get ();
166
+ ClusterDescription currentDescription = description ;
181
167
182
- while (curDescription .getType () == ClusterType .UNKNOWN ) {
168
+ if (currentDescription .getType () != ClusterType .UNKNOWN ) {
169
+ return currentDescription ;
170
+ }
183
171
184
- if (curTimeNanos - startTimeNanos > maxWaitTimeNanos ) {
185
- throw new MongoTimeoutException (format ("Timed out after %d ms while waiting to connect. Client view of cluster state "
186
- + " is %s" ,
187
- settings . getServerSelectionTimeout ( MILLISECONDS ),
188
- curDescription .getShortDescription ()));
189
- }
172
+ if (timeout . hasExpired () ) {
173
+ throw new MongoTimeoutException (format (
174
+ "Timed out after %d ms while waiting to connect. Client view of cluster state is %s" ,
175
+ startTime . elapsed (). toMillis ( ),
176
+ currentDescription .getShortDescription ()));
177
+ }
190
178
191
- if (!selectionFailureLogged ) {
192
- if (LOGGER .isInfoEnabled ()) {
193
- if (settings .getServerSelectionTimeout (MILLISECONDS ) < 0 ) {
194
- LOGGER .info ("Cluster description not yet available. Waiting indefinitely." );
195
- } else {
196
- LOGGER .info (format ("Cluster description not yet available. Waiting for %d ms before timing out" ,
197
- settings .getServerSelectionTimeout (MILLISECONDS )));
198
- }
179
+ if (!selectionFailureLogged ) {
180
+ if (LOGGER .isInfoEnabled ()) {
181
+ if (timeout .isInfinite ()) {
182
+ LOGGER .info ("Cluster description not yet available. Waiting indefinitely." );
183
+ } else {
184
+ LOGGER .info (format ("Cluster description not yet available. Waiting for %d ms before timing out" ,
185
+ timeout .remaining (MILLISECONDS )));
199
186
}
200
- selectionFailureLogged = true ;
201
187
}
188
+ selectionFailureLogged = true ;
189
+ }
202
190
203
- connect ();
204
-
205
- currentPhase .await (Math .min (maxWaitTimeNanos - (curTimeNanos - startTimeNanos ), getMinWaitTimeNanos ()), NANOSECONDS );
206
-
207
- curTimeNanos = System .nanoTime ();
191
+ connect ();
208
192
209
- currentPhase = phase .get ();
210
- curDescription = description ;
211
- }
212
- return curDescription ;
213
- } catch (InterruptedException e ) {
214
- throw interruptAndCreateMongoInterruptedException ("Interrupted while waiting to connect" , e );
193
+ Timeout heartbeatLimitedTimeout = timeout .orEarlier (startMinWaitHeartbeatTimeout ());
194
+ heartbeatLimitedTimeout .awaitOn (currentPhaseLatch , () -> "waiting to connect" );
215
195
}
216
196
}
217
197
@@ -280,19 +260,20 @@ private void updatePhase() {
280
260
withLock (() -> phase .getAndSet (new CountDownLatch (1 )).countDown ());
281
261
}
282
262
283
- private long getMaxWaitTimeNanos () {
284
- if (settings .getServerSelectionTimeout (NANOSECONDS ) < 0 ) {
285
- return Long .MAX_VALUE ;
286
- }
287
- return settings .getServerSelectionTimeout (NANOSECONDS );
263
+ private Timeout startServerSelectionTimeout (final StartTime startTime ) {
264
+ long ms = settings .getServerSelectionTimeout (MILLISECONDS );
265
+ return startTime .timeoutAfterOrInfiniteIfNegative (ms , MILLISECONDS );
288
266
}
289
267
290
- private long getMinWaitTimeNanos () {
291
- return serverFactory .getSettings ().getMinHeartbeatFrequency (NANOSECONDS );
268
+ private Timeout startMinWaitHeartbeatTimeout () {
269
+ long minHeartbeatFrequency = serverFactory .getSettings ().getMinHeartbeatFrequency (NANOSECONDS );
270
+ minHeartbeatFrequency = Math .max (0 , minHeartbeatFrequency );
271
+ return Timeout .expiresIn (minHeartbeatFrequency , NANOSECONDS );
292
272
}
293
273
294
- private boolean handleServerSelectionRequest (final ServerSelectionRequest request , final CountDownLatch currentPhase ,
295
- final ClusterDescription description ) {
274
+ private boolean handleServerSelectionRequest (
275
+ final ServerSelectionRequest request , final CountDownLatch currentPhase ,
276
+ final ClusterDescription description ) {
296
277
try {
297
278
if (currentPhase != request .phase ) {
298
279
CountDownLatch prevPhase = request .phase ;
@@ -308,21 +289,23 @@ private boolean handleServerSelectionRequest(final ServerSelectionRequest reques
308
289
ServerTuple serverTuple = selectServer (request .compositeSelector , description );
309
290
if (serverTuple != null ) {
310
291
if (LOGGER .isTraceEnabled ()) {
311
- LOGGER .trace (format ("Asynchronously selected server %s" , serverTuple .getServerDescription ().getAddress ()));
292
+ LOGGER .trace (format ("Asynchronously selected server %s" ,
293
+ serverTuple .getServerDescription ().getAddress ()));
312
294
}
313
295
request .onResult (serverTuple , null );
314
296
return true ;
315
297
}
316
298
if (prevPhase == null ) {
317
- logServerSelectionFailure (request .originalSelector , description );
299
+ logServerSelectionFailure (request .originalSelector , description , request . getTimeout () );
318
300
}
319
301
}
320
302
321
- if (request .timedOut ()) {
303
+ if (request .getTimeout (). hasExpired ()) {
322
304
if (LOGGER .isTraceEnabled ()) {
323
305
LOGGER .trace ("Asynchronously failed server selection after timeout" );
324
306
}
325
- request .onResult (null , createTimeoutException (request .originalSelector , description ));
307
+ request .onResult (null , createTimeoutException (
308
+ request .originalSelector , description , request .getStartTime ()));
326
309
return true ;
327
310
}
328
311
@@ -333,14 +316,15 @@ private boolean handleServerSelectionRequest(final ServerSelectionRequest reques
333
316
}
334
317
}
335
318
336
- private void logServerSelectionFailure (final ServerSelector serverSelector , final ClusterDescription curDescription ) {
319
+ private void logServerSelectionFailure (final ServerSelector serverSelector ,
320
+ final ClusterDescription curDescription , final Timeout timeout ) {
337
321
if (LOGGER .isInfoEnabled ()) {
338
- if (settings . getServerSelectionTimeout ( MILLISECONDS ) < 0 ) {
322
+ if (timeout . isInfinite () ) {
339
323
LOGGER .info (format ("No server chosen by %s from cluster description %s. Waiting indefinitely." ,
340
324
serverSelector , curDescription ));
341
325
} else {
342
326
LOGGER .info (format ("No server chosen by %s from cluster description %s. Waiting for %d ms before timing out" ,
343
- serverSelector , curDescription , settings . getServerSelectionTimeout (MILLISECONDS )));
327
+ serverSelector , curDescription , timeout . remaining (MILLISECONDS )));
344
328
}
345
329
}
346
330
}
@@ -426,27 +410,29 @@ private MongoIncompatibleDriverException createIncompatibleException(final Clust
426
410
return new MongoIncompatibleDriverException (message , curDescription );
427
411
}
428
412
429
- private MongoTimeoutException createTimeoutException (final ServerSelector serverSelector , final ClusterDescription curDescription ) {
430
- return new MongoTimeoutException (format ("Timed out after %d ms while waiting for a server that matches %s. "
431
- + "Client view of cluster state is %s" ,
432
- settings .getServerSelectionTimeout (MILLISECONDS ), serverSelector ,
433
- curDescription .getShortDescription ()));
413
+ private MongoTimeoutException createTimeoutException (final ServerSelector serverSelector ,
414
+ final ClusterDescription curDescription , final StartTime startTime ) {
415
+ return new MongoTimeoutException (format (
416
+ "Timed out after %d ms while waiting for a server that matches %s. Client view of cluster state is %s" ,
417
+ startTime .elapsed ().toMillis (),
418
+ serverSelector ,
419
+ curDescription .getShortDescription ()));
434
420
}
435
421
436
422
private static final class ServerSelectionRequest {
437
423
private final ServerSelector originalSelector ;
438
424
private final ServerSelector compositeSelector ;
439
- private final long maxWaitTimeNanos ;
440
425
private final SingleResultCallback <ServerTuple > callback ;
441
- private final long startTimeNanos = System .nanoTime ();
426
+ private final Timeout timeout ;
427
+ private final StartTime startTime ;
442
428
private CountDownLatch phase ;
443
429
444
430
ServerSelectionRequest (final ServerSelector serverSelector , final ServerSelector compositeSelector ,
445
- final long maxWaitTimeNanos ,
446
- final SingleResultCallback <ServerTuple > callback ) {
431
+ final Timeout timeout , final StartTime startTime , final SingleResultCallback <ServerTuple > callback ) {
447
432
this .originalSelector = serverSelector ;
448
433
this .compositeSelector = compositeSelector ;
449
- this .maxWaitTimeNanos = maxWaitTimeNanos ;
434
+ this .timeout = timeout ;
435
+ this .startTime = startTime ;
450
436
this .callback = callback ;
451
437
}
452
438
@@ -458,12 +444,12 @@ void onResult(@Nullable final ServerTuple serverTuple, @Nullable final Throwable
458
444
}
459
445
}
460
446
461
- boolean timedOut () {
462
- return System . nanoTime () - startTimeNanos > maxWaitTimeNanos ;
447
+ Timeout getTimeout () {
448
+ return timeout ;
463
449
}
464
450
465
- long getRemainingTime () {
466
- return startTimeNanos + maxWaitTimeNanos - System . nanoTime () ;
451
+ StartTime getStartTime () {
452
+ return startTime ;
467
453
}
468
454
}
469
455
@@ -498,25 +484,28 @@ public void run() {
498
484
while (!isClosed ) {
499
485
CountDownLatch currentPhase = phase .get ();
500
486
ClusterDescription curDescription = description ;
501
- long waitTimeNanos = Long .MAX_VALUE ;
487
+
488
+ Timeout timeout = Timeout .infinite ();
502
489
503
490
for (Iterator <ServerSelectionRequest > iter = waitQueue .iterator (); iter .hasNext ();) {
504
491
ServerSelectionRequest nextRequest = iter .next ();
505
492
if (handleServerSelectionRequest (nextRequest , currentPhase , curDescription )) {
506
493
iter .remove ();
507
494
} else {
508
- waitTimeNanos = Math .min (nextRequest .getRemainingTime (), Math .min (getMinWaitTimeNanos (), waitTimeNanos ));
495
+ timeout = timeout
496
+ .orEarlier (nextRequest .getTimeout ())
497
+ .orEarlier (startMinWaitHeartbeatTimeout ());
509
498
}
510
499
}
511
500
512
501
// if there are any waiters that were not satisfied, connect
513
- if (waitTimeNanos < Long . MAX_VALUE ) {
502
+ if (! timeout . isInfinite () ) {
514
503
connect ();
515
504
}
516
505
517
506
try {
518
- currentPhase . await ( waitTimeNanos , NANOSECONDS );
519
- } catch (InterruptedException closed ) {
507
+ timeout . awaitOn ( currentPhase , () -> "ignored" );
508
+ } catch (MongoInterruptedException closed ) {
520
509
// The cluster has been closed and the while loop will exit.
521
510
}
522
511
}
0 commit comments