From fbdd7f2d4dce0d8df1b71515855c2bbbdf511290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 11 Dec 2024 15:08:43 +0100 Subject: [PATCH 1/3] Use dynamic batch publishing by default And set initial credits to 10 by default, as dynamic batching creates smaller chunks for low ingress. --- src/docs/asciidoc/api.adoc | 6 +++--- .../com/rabbitmq/stream/ConsumerBuilder.java | 8 ++++++-- .../rabbitmq/stream/ConsumerFlowStrategy.java | 8 +++++--- .../com/rabbitmq/stream/ProducerBuilder.java | 16 ++++++++++------ .../stream/impl/StreamConsumerBuilder.java | 4 ++-- .../stream/impl/StreamProducerBuilder.java | 2 +- .../stream/impl/ConsumersCoordinatorTest.java | 2 +- .../MessageCountConsumerFlowStrategyTest.java | 4 ++-- 8 files changed, 30 insertions(+), 20 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index c075f91453..d490a5d12b 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -470,7 +470,7 @@ blocking when the limit is reached. |`dynamicBatch` |Adapt batch size depending on ingress rate. -|false +|true |`confirmTimeout` |[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal @@ -897,11 +897,11 @@ Useful when using an external store for offset tracking. |`flow#initialCredits` |Number of credits when the subscription is created. Increase for higher throughput at the expense of memory usage. -|1 +|10 |`flow#strategy` |The `ConsumerFlowStrategy` to use. -|`ConsumerFlowStrategy#creditOnChunkArrival(1)` +|`ConsumerFlowStrategy#creditOnChunkArrival(10)` |=== [NOTE] diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index 9e391d525c..2579548c01 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -245,10 +245,14 @@ interface FlowConfiguration { /** * The number of initial credits for the subscription. * - *

Default is 1. + *

Default is 10. * *

This calls uses {@link ConsumerFlowStrategy#creditOnChunkArrival(int)}. * + *

Use a small value like 1 for streams with large chunks (several hundreds of messages per + * chunk) and higher values (5 or more) for streams with small chunks (1 or a few messages per + * chunk). + * * @param initialCredits the number of initial credits * @return this configuration instance * @see ConsumerFlowStrategy#creditOnChunkArrival(int) diff --git a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java index 3d6b6e94b1..9f32a2f9fd 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java @@ -1,4 +1,4 @@ -// Copyright (c) 2023 Broadcom. All Rights Reserved. +// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -120,13 +120,14 @@ static ConsumerFlowStrategy creditOnChunkArrival() { * * @param initialCredits number of initial credits * @return flow strategy + * @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int) */ static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) { return new CreditOnChunkArrivalConsumerFlowStrategy(initialCredits); } /** - * Strategy that provides 1 initial credit and a credit when half of the chunk messages are + * Strategy that provides 10 initial credits and a credit when half of the chunk messages are * processed. * *

Make sure to call {@link MessageHandler.Context#processed()} on every message when using @@ -135,7 +136,7 @@ static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) { * @return flow strategy */ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() { - return creditOnProcessedMessageCount(1, 0.5); + return creditOnProcessedMessageCount(10, 0.5); } /** @@ -147,6 +148,7 @@ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() { * * @param initialCredits number of initial credits * @return flow strategy + * @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int) */ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed(int initialCredits) { return creditOnProcessedMessageCount(initialCredits, 0.5); diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index dca01baf9f..a445dc2fb4 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -106,22 +106,26 @@ public interface ProducerBuilder { /** * Adapt batch size depending on ingress rate. * - *

A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive - * for sustained high ingress rates. + *

A dynamic-batch approach improves latency for low ingress rates. * *

Set this flag to true if you want as little delay as possible between calling * {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker. + * Consumers should provide enough initial credits (between 5 and 10, depending on the workload), + * see {@link ConsumerBuilder#flow()} and {@link + * ConsumerBuilder.FlowConfiguration#initialCredits(int)}. * *

Set this flag to false if latency is not critical for your use case and you - * want the highest throughput possible for both publishing and consuming. + * want the highest throughput possible for both publishing and consuming. Consumers can provide 1 + * initial credit (depending on the workload), see {@link ConsumerBuilder#flow()} and {@link + * ConsumerBuilder.FlowConfiguration#initialCredits(int)}. * - *

Dynamic batch is not activated by default (dynamicBatch = false). - * - *

Dynamic batch is experimental. + *

Dynamic batch is activated by default (dynamicBatch = true). * * @param dynamicBatch * @return this builder instance * @since 0.20.0 + * @see ConsumerBuilder#flow() + * @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int) */ ProducerBuilder dynamicBatch(boolean dynamicBatch); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index c6364d1aa3..ce8660c6a5 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -428,7 +428,7 @@ private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) { this.consumerBuilder = consumerBuilder; } - private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(); + private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(10); @Override public FlowConfiguration initialCredits(int initialCredits) { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 54807489e2..244b2b1746 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -29,7 +29,7 @@ class StreamProducerBuilder implements ProducerBuilder { static final boolean DEFAULT_DYNAMIC_BATCH = - Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "false")); + Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); private final StreamEnvironment environment; diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index 3777d7e7d9..7dd5e8da48 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -665,7 +665,7 @@ void ignoredMessageShouldTriggerMessageProcessing() { new ConsumerFlowStrategy() { @Override public int initialCredits() { - return 1; + return 10; } @Override diff --git a/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java b/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java index 34166a8f2e..90ec80ae1c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2023 Broadcom. All Rights Reserved. +// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -55,7 +55,7 @@ void smallChunksAndSmallRatiosShouldCredit() { } ConsumerFlowStrategy build(double ratio) { - return creditOnProcessedMessageCount(1, ratio); + return creditOnProcessedMessageCount(10, ratio); } ConsumerFlowStrategy.Context context(long messageCount) { From 3d6b1dc1d573c219c30b15c645d842a90c6ab05e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 11 Dec 2024 15:34:35 +0100 Subject: [PATCH 2/3] Use 1 initial credit in test --- .../com/rabbitmq/stream/impl/StreamConsumerTest.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index be2dc4a8c3..b97ba4ba69 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -203,7 +203,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception { environment.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) .flow() - .strategy(creditWhenHalfMessagesProcessed()) + .strategy(creditWhenHalfMessagesProcessed(1)) .builder(); List messageContexts = synchronizedList(new ArrayList<>()); @@ -244,14 +244,13 @@ void asynchronousProcessingWithFlowControl() { int messageCount = 100_000; publishAndWaitForConfirms(cf, messageCount, stream); - ExecutorService executorService = - Executors.newFixedThreadPool(getRuntime().availableProcessors()); - try { + try (ExecutorService executorService = + Executors.newFixedThreadPool(getRuntime().availableProcessors())) { CountDownLatch latch = new CountDownLatch(messageCount); environment.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) .flow() - .strategy(creditWhenHalfMessagesProcessed()) + .strategy(creditWhenHalfMessagesProcessed(1)) .builder() .messageHandler( (ctx, message) -> @@ -262,8 +261,6 @@ void asynchronousProcessingWithFlowControl() { })) .build(); assertThat(latch).is(completed()); - } finally { - executorService.shutdownNow(); } } From dfd94df11017b76218af74497bf20b4678412ed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 11 Dec 2024 15:42:51 +0100 Subject: [PATCH 3/3] Fix test --- .../java/com/rabbitmq/stream/impl/StreamConsumerTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index b97ba4ba69..0f439df0b8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -243,9 +243,9 @@ void consumeWithAsyncConsumerFlowControl() throws Exception { void asynchronousProcessingWithFlowControl() { int messageCount = 100_000; publishAndWaitForConfirms(cf, messageCount, stream); - - try (ExecutorService executorService = - Executors.newFixedThreadPool(getRuntime().availableProcessors())) { + ExecutorService executorService = + Executors.newFixedThreadPool(getRuntime().availableProcessors()); + try { CountDownLatch latch = new CountDownLatch(messageCount); environment.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) @@ -261,6 +261,8 @@ void asynchronousProcessingWithFlowControl() { })) .build(); assertThat(latch).is(completed()); + } finally { + executorService.shutdownNow(); } }