@@ -77,9 +77,7 @@ protected void releaseResources() throws IOException {
77
77
declareQueue (TEST_QUEUE_NAME , DLX , "routing_key" , null );
78
78
}
79
79
80
- @ Test public void declareQueueWithInvalidDeadLetterExchangeArg ()
81
- throws IOException
82
- {
80
+ @ Test public void declareQueueWithInvalidDeadLetterExchangeArg () {
83
81
try {
84
82
declareQueue (133 );
85
83
fail ("x-dead-letter-exchange must be a valid exchange name" );
@@ -101,9 +99,7 @@ protected void releaseResources() throws IOException {
101
99
}
102
100
}
103
101
104
- @ Test public void declareQueueWithInvalidDeadLetterRoutingKeyArg ()
105
- throws IOException
106
- {
102
+ @ Test public void declareQueueWithInvalidDeadLetterRoutingKeyArg () {
107
103
try {
108
104
declareQueue ("foo" , "amq.direct" , 144 , null );
109
105
fail ("x-dead-letter-routing-key must be a string" );
@@ -125,11 +121,9 @@ protected void releaseResources() throws IOException {
125
121
}
126
122
}
127
123
128
- @ Test public void declareQueueWithRoutingKeyButNoDeadLetterExchange ()
129
- throws IOException
130
- {
124
+ @ Test public void declareQueueWithRoutingKeyButNoDeadLetterExchange () {
131
125
try {
132
- Map <String , Object > args = new HashMap <String , Object >();
126
+ Map <String , Object > args = new HashMap <>();
133
127
args .put (DLX_RK_ARG , "foo" );
134
128
135
129
channel .queueDeclare (randomQueueName (), false , true , false , args );
@@ -139,11 +133,10 @@ protected void releaseResources() throws IOException {
139
133
}
140
134
}
141
135
142
- @ Test public void redeclareQueueWithRoutingKeyButNoDeadLetterExchange ()
143
- throws IOException , InterruptedException {
136
+ @ Test public void redeclareQueueWithRoutingKeyButNoDeadLetterExchange () {
144
137
try {
145
138
String queueName = randomQueueName ();
146
- Map <String , Object > args = new HashMap <String , Object >();
139
+ Map <String , Object > args = new HashMap <>();
147
140
channel .queueDeclare (queueName , false , true , false , args );
148
141
149
142
args .put (DLX_RK_ARG , "foo" );
@@ -165,7 +158,7 @@ protected void releaseResources() throws IOException {
165
158
}
166
159
167
160
@ Test public void deadLetterQueueTTLPromptExpiry () throws Exception {
168
- Map <String , Object > args = new HashMap <String , Object >();
161
+ Map <String , Object > args = new HashMap <>();
169
162
args .put ("x-message-ttl" , TTL );
170
163
declareQueue (TEST_QUEUE_NAME , DLX , null , args );
171
164
channel .queueBind (TEST_QUEUE_NAME , "amq.direct" , "test" );
@@ -205,7 +198,7 @@ protected void releaseResources() throws IOException {
205
198
publishAt (start );
206
199
basicGet (TEST_QUEUE_NAME );
207
200
// publish a 2nd message and immediately fetch it in ack mode
208
- publishAt (start + TTL * 1 / 2 );
201
+ publishAt (start + TTL / 2 );
209
202
GetResponse r = channel .basicGet (TEST_QUEUE_NAME , false );
210
203
// publish a 3rd message
211
204
publishAt (start + TTL * 3 / 4 );
@@ -250,20 +243,17 @@ protected void releaseResources() throws IOException {
250
243
// the DLQ *AND* should remain there, not getting removed after a subsequent
251
244
// wait time > 100ms
252
245
sleep (500 );
253
- consumeN (DLQ , 1 , new WithResponse () {
254
- @ SuppressWarnings ("unchecked" )
255
- public void process (GetResponse getResponse ) {
256
- assertNull (getResponse .getProps ().getExpiration ());
257
- Map <String , Object > headers = getResponse .getProps ().getHeaders ();
258
- assertNotNull (headers );
259
- ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
260
- assertNotNull (death );
261
- assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" );
262
- final Map <String , Object > deathHeader =
263
- (Map <String , Object >)death .get (0 );
264
- assertEquals ("100" , deathHeader .get ("original-expiration" ).toString ());
265
- }
266
- });
246
+ consumeN (DLQ , 1 , getResponse -> {
247
+ assertNull (getResponse .getProps ().getExpiration ());
248
+ Map <String , Object > headers = getResponse .getProps ().getHeaders ();
249
+ assertNotNull (headers );
250
+ ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
251
+ assertNotNull (death );
252
+ assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" );
253
+ final Map <String , Object > deathHeader =
254
+ (Map <String , Object >)death .get (0 );
255
+ assertEquals ("100" , deathHeader .get ("original-expiration" ).toString ());
256
+ });
267
257
}
268
258
269
259
@ Test public void deadLetterOnReject () throws Exception {
@@ -315,23 +305,20 @@ public void process(GetResponse getResponse) {
315
305
316
306
// There should now be two copies of each message on DLQ2: one
317
307
// with one set of death headers, and another with two sets.
318
- consumeN (DLQ2 , MSG_COUNT *2 , new WithResponse () {
319
- @ SuppressWarnings ("unchecked" )
320
- public void process (GetResponse getResponse ) {
321
- Map <String , Object > headers = getResponse .getProps ().getHeaders ();
322
- assertNotNull (headers );
323
- ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
324
- assertNotNull (death );
325
- if (death .size () == 1 ) {
326
- assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" );
327
- } else if (death .size () == 2 ) {
328
- assertDeathReason (death , 0 , DLQ , "expired" );
329
- assertDeathReason (death , 1 , TEST_QUEUE_NAME , "expired" );
330
- } else {
331
- fail ("message was dead-lettered more times than expected" );
332
- }
333
- }
334
- });
308
+ consumeN (DLQ2 , MSG_COUNT *2 , getResponse -> {
309
+ Map <String , Object > headers = getResponse .getProps ().getHeaders ();
310
+ assertNotNull (headers );
311
+ ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
312
+ assertNotNull (death );
313
+ if (death .size () == 1 ) {
314
+ assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" );
315
+ } else if (death .size () == 2 ) {
316
+ assertDeathReason (death , 0 , DLQ , "expired" );
317
+ assertDeathReason (death , 1 , TEST_QUEUE_NAME , "expired" );
318
+ } else {
319
+ fail ("message was dead-lettered more times than expected" );
320
+ }
321
+ });
335
322
}
336
323
337
324
@ Test public void deadLetterSelf () throws Exception {
@@ -379,9 +366,9 @@ public void handleDelivery(String consumerTag, Envelope envelope,
379
366
channel .queueBind (DLQ , DLX , "test" );
380
367
channel .queueBind (DLQ2 , DLX , "test-other" );
381
368
382
- Map <String , Object > headers = new HashMap <String , Object >();
383
- headers .put ("CC" , Arrays . asList ("foo" ));
384
- headers .put ("BCC" , Arrays . asList ("bar" ));
369
+ Map <String , Object > headers = new HashMap <>();
370
+ headers .put ("CC" , Collections . singletonList ("foo" ));
371
+ headers .put ("BCC" , Collections . singletonList ("bar" ));
385
372
386
373
publishN (MSG_COUNT , (new AMQP .BasicProperties .Builder ())
387
374
.headers (headers )
@@ -390,27 +377,24 @@ public void handleDelivery(String consumerTag, Envelope envelope,
390
377
sleep (100 );
391
378
392
379
consumeN (DLQ , 0 , WithResponse .NULL );
393
- consumeN (DLQ2 , MSG_COUNT , new WithResponse () {
394
- @ SuppressWarnings ("unchecked" )
395
- public void process (GetResponse getResponse ) {
396
- Map <String , Object > headers = getResponse .getProps ().getHeaders ();
397
- assertNotNull (headers );
398
- assertNull (headers .get ("CC" ));
399
- assertNull (headers .get ("BCC" ));
400
-
401
- ArrayList <Object > death = (ArrayList <Object >)headers .get ("x-death" );
402
- assertNotNull (death );
403
- assertEquals (1 , death .size ());
404
- assertDeathReason (death , 0 , TEST_QUEUE_NAME ,
405
- "expired" , "amq.direct" ,
406
- Arrays .asList ("test" , "foo" ));
407
- }
408
- });
380
+ consumeN (DLQ2 , MSG_COUNT , getResponse -> {
381
+ Map <String , Object > headers1 = getResponse .getProps ().getHeaders ();
382
+ assertNotNull (headers1 );
383
+ assertNull (headers1 .get ("CC" ));
384
+ assertNull (headers1 .get ("BCC" ));
385
+
386
+ ArrayList <Object > death = (ArrayList <Object >) headers1 .get ("x-death" );
387
+ assertNotNull (death );
388
+ assertEquals (1 , death .size ());
389
+ assertDeathReason (death , 0 , TEST_QUEUE_NAME ,
390
+ "expired" , "amq.direct" ,
391
+ Arrays .asList ("test" , "foo" ));
392
+ });
409
393
}
410
394
411
395
@ SuppressWarnings ("unchecked" )
412
396
@ Test public void republish () throws Exception {
413
- Map <String , Object > args = new HashMap <String , Object >();
397
+ Map <String , Object > args = new HashMap <>();
414
398
args .put ("x-message-ttl" , 100 );
415
399
declareQueue (TEST_QUEUE_NAME , DLX , null , args );
416
400
channel .queueBind (TEST_QUEUE_NAME , "amq.direct" , "test" );
@@ -430,10 +414,10 @@ public void process(GetResponse getResponse) {
430
414
assertNotNull (death );
431
415
assertEquals (1 , death .size ());
432
416
assertDeathReason (death , 0 , TEST_QUEUE_NAME , "expired" , "amq.direct" ,
433
- Arrays . asList ("test" ));
417
+ Collections . singletonList ("test" ));
434
418
435
419
// Make queue zero length
436
- args = new HashMap <String , Object >();
420
+ args = new HashMap <>();
437
421
args .put ("x-max-length" , 0 );
438
422
channel .queueDelete (TEST_QUEUE_NAME );
439
423
declareQueue (TEST_QUEUE_NAME , DLX , null , args );
@@ -457,9 +441,9 @@ public void process(GetResponse getResponse) {
457
441
assertNotNull (death );
458
442
assertEquals (2 , death .size ());
459
443
assertDeathReason (death , 0 , TEST_QUEUE_NAME , "maxlen" , "amq.direct" ,
460
- Arrays . asList ("test" ));
444
+ Collections . singletonList ("test" ));
461
445
assertDeathReason (death , 1 , TEST_QUEUE_NAME , "expired" , "amq.direct" ,
462
- Arrays . asList ("test" ));
446
+ Collections . singletonList ("test" ));
463
447
464
448
//Set invalid headers
465
449
headers .put ("x-death" , "[I, am, not, array]" );
@@ -478,39 +462,35 @@ public void process(GetResponse getResponse) {
478
462
assertNotNull (death );
479
463
assertEquals (1 , death .size ());
480
464
assertDeathReason (death , 0 , TEST_QUEUE_NAME , "maxlen" , "amq.direct" ,
481
- Arrays .asList ("test" ));
482
-
483
- }
484
-
485
- public void rejectionTest (final boolean useNack ) throws Exception {
486
- deadLetterTest (new Callable <Void >() {
487
- public Void call () throws Exception {
488
- for (int x = 0 ; x < MSG_COUNT ; x ++) {
489
- GetResponse getResponse =
490
- channel .basicGet (TEST_QUEUE_NAME , false );
491
- long tag = getResponse .getEnvelope ().getDeliveryTag ();
492
- if (useNack ) {
493
- channel .basicNack (tag , false , false );
494
- } else {
495
- channel .basicReject (tag , false );
496
- }
497
- }
498
- return null ;
465
+ Collections .singletonList ("test" ));
466
+
467
+ }
468
+
469
+ private void rejectionTest (final boolean useNack ) throws Exception {
470
+ deadLetterTest ((Callable <Void >) () -> {
471
+ for (int x = 0 ; x < MSG_COUNT ; x ++) {
472
+ GetResponse getResponse =
473
+ channel .basicGet (TEST_QUEUE_NAME , false );
474
+ long tag = getResponse .getEnvelope ().getDeliveryTag ();
475
+ if (useNack ) {
476
+ channel .basicNack (tag , false , false );
477
+ } else {
478
+ channel .basicReject (tag , false );
499
479
}
500
- }, null , "rejected" );
480
+ }
481
+ return null ;
482
+ }, null , "rejected" );
501
483
}
502
484
503
485
private void deadLetterTest (final Runnable deathTrigger ,
504
486
Map <String , Object > queueDeclareArgs ,
505
487
String reason )
506
488
throws Exception
507
489
{
508
- deadLetterTest (new Callable <Object >() {
509
- public Object call () throws Exception {
510
- deathTrigger .run ();
511
- return null ;
512
- }
513
- }, queueDeclareArgs , reason );
490
+ deadLetterTest (() -> {
491
+ deathTrigger .run ();
492
+ return null ;
493
+ }, queueDeclareArgs , reason );
514
494
}
515
495
516
496
private void deadLetterTest (Callable <?> deathTrigger ,
@@ -531,35 +511,30 @@ private void deadLetterTest(Callable<?> deathTrigger,
531
511
}
532
512
533
513
public static void consume (final Channel channel , final String reason ) throws IOException {
534
- consumeN (channel , DLQ , MSG_COUNT , new WithResponse () {
535
- @ SuppressWarnings ("unchecked" )
536
- public void process (GetResponse getResponse ) {
537
- Map <String , Object > headers = getResponse .getProps ().getHeaders ();
538
- assertNotNull (headers );
539
- ArrayList <Object > death = (ArrayList <Object >) headers .get ("x-death" );
540
- assertNotNull (death );
541
- // the following assertions shouldn't be checked on version lower than 3.7
542
- // as the headers are new in 3.7
543
- // see https://github.com/rabbitmq/rabbitmq-server/issues/1332
544
- if (TestUtils .isVersion37orLater (channel .getConnection ())) {
545
- assertNotNull (headers .get ("x-first-death-queue" ));
546
- assertNotNull (headers .get ("x-first-death-reason" ));
547
- assertNotNull (headers .get ("x-first-death-exchange" ));
548
- }
549
- assertEquals (1 , death .size ());
550
- assertDeathReason (death , 0 , TEST_QUEUE_NAME , reason ,
551
- "amq.direct" ,
552
- Arrays .asList ("test" ));
514
+ consumeN (channel , DLQ , MSG_COUNT , getResponse -> {
515
+ Map <String , Object > headers = getResponse .getProps ().getHeaders ();
516
+ assertNotNull (headers );
517
+ ArrayList <Object > death = (ArrayList <Object >) headers .get ("x-death" );
518
+ assertNotNull (death );
519
+ // the following assertions shouldn't be checked on version lower than 3.7
520
+ // as the headers are new in 3.7
521
+ // see https://github.com/rabbitmq/rabbitmq-server/issues/1332
522
+ if (TestUtils .isVersion37orLater (channel .getConnection ())) {
523
+ assertNotNull (headers .get ("x-first-death-queue" ));
524
+ assertNotNull (headers .get ("x-first-death-reason" ));
525
+ assertNotNull (headers .get ("x-first-death-exchange" ));
553
526
}
527
+ assertEquals (1 , death .size ());
528
+ assertDeathReason (death , 0 , TEST_QUEUE_NAME , reason ,
529
+ "amq.direct" ,
530
+ Collections .singletonList ("test" ));
554
531
});
555
532
}
556
533
557
534
private void ttlTest (final long ttl ) throws Exception {
558
535
Map <String , Object > args = new HashMap <String , Object >();
559
536
args .put ("x-message-ttl" , ttl );
560
- deadLetterTest (new Runnable () {
561
- public void run () { sleep (ttl + 1500 ); }
562
- }, args , "expired" );
537
+ deadLetterTest (() -> sleep (ttl + 1500 ), args , "expired" );
563
538
}
564
539
565
540
private void sleep (long millis ) {
@@ -604,7 +579,7 @@ private void declareQueue(String queue, Object deadLetterExchange,
604
579
throws IOException
605
580
{
606
581
if (args == null ) {
607
- args = new HashMap <String , Object >();
582
+ args = new HashMap <>();
608
583
}
609
584
610
585
if (ttl > 0 ){
@@ -674,7 +649,7 @@ private static void assertDeathReason(List<Object> death, int num,
674
649
(Map <String , Object >)death .get (num );
675
650
assertEquals (exchange , deathHeader .get ("exchange" ).toString ());
676
651
677
- List <String > deathRKs = new ArrayList <String >();
652
+ List <String > deathRKs = new ArrayList <>();
678
653
for (Object rk : (ArrayList <?>)deathHeader .get ("routing-keys" )) {
679
654
deathRKs .add (rk .toString ());
680
655
}
@@ -694,13 +669,11 @@ private static void assertDeathReason(List<Object> death, int num,
694
669
assertEquals (reason , deathHeader .get ("reason" ).toString ());
695
670
}
696
671
697
- private static interface WithResponse {
698
- static final WithResponse NULL = new WithResponse () {
699
- public void process (GetResponse getResponse ) {
700
- }
701
- };
672
+ private interface WithResponse {
673
+ WithResponse NULL = getResponse -> {
674
+ };
702
675
703
- public void process (GetResponse response );
676
+ void process (GetResponse response );
704
677
}
705
678
706
679
private static String randomQueueName () {
@@ -709,9 +682,9 @@ private static String randomQueueName() {
709
682
710
683
class AccumulatingMessageConsumer extends DefaultConsumer {
711
684
712
- BlockingQueue <byte []> messages = new LinkedBlockingQueue <byte [] >();
685
+ BlockingQueue <byte []> messages = new LinkedBlockingQueue <>();
713
686
714
- public AccumulatingMessageConsumer (Channel channel ) {
687
+ AccumulatingMessageConsumer (Channel channel ) {
715
688
super (channel );
716
689
}
717
690
0 commit comments