diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index 37494535b6..3c288d5428 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -28,17 +28,22 @@ final class DynamicBatch implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class); - private static final int MIN_BATCH_SIZE = 32; - private static final int MAX_BATCH_SIZE = 8192; + private static final int MIN_BATCH_SIZE = 16; private final BlockingQueue requests = new LinkedBlockingQueue<>(); private final BatchConsumer consumer; - private final int configuredBatchSize; + private final int configuredBatchSize, minBatchSize, maxBatchSize; private final Thread thread; - DynamicBatch(BatchConsumer consumer, int batchSize) { + DynamicBatch(BatchConsumer consumer, int batchSize, int maxUnconfirmed) { this.consumer = consumer; - this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); + if (batchSize < maxUnconfirmed) { + this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2); + } else { + this.minBatchSize = min(1, maxUnconfirmed / 2); + } + this.configuredBatchSize = batchSize; + this.maxBatchSize = batchSize * 2; this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop); this.thread.start(); } @@ -104,9 +109,9 @@ private void maybeCompleteBatch(State state, boolean increaseIfCompleted) { boolean completed = this.consumer.process(state.items); if (completed) { if (increaseIfCompleted) { - state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE); + state.batchSize = min(state.batchSize * 2, this.maxBatchSize); } else { - state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE); + state.batchSize = max(state.batchSize / 2, this.minBatchSize); } state.items = new ArrayList<>(state.batchSize); } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index ee8c397e13..8c763cde86 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -38,6 +38,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { DynamicBatchMessageAccumulator( int subEntrySize, int batchSize, + int maxUnconfirmedMessages, Codec codec, int maxFrameSize, ToLongFunction publishSequenceFunction, @@ -75,7 +76,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize); + batchSize, + maxUnconfirmedMessages); } else { byte compressionCode = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); @@ -124,7 +126,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize * subEntrySize); + batchSize * subEntrySize, + maxUnconfirmedMessages); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java index 5ae8faa7dd..691fc65c57 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java @@ -30,6 +30,7 @@ static MessageAccumulator createMessageAccumulator( boolean dynamicBatch, int subEntrySize, int batchSize, + int maxUnconfirmedMessages, CompressionCodec compressionCodec, Codec codec, ByteBufAllocator byteBufAllocator, @@ -44,6 +45,7 @@ static MessageAccumulator createMessageAccumulator( return new DynamicBatchMessageAccumulator( subEntrySize, batchSize, + maxUnconfirmedMessages, codec, maxFrameSize, publishSequenceFunction, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 27552512c8..fd3cb8f25d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -180,6 +180,7 @@ public int fragmentLength(Object entity) { dynamicBatch, subEntrySize, batchSize, + maxUnconfirmedMessages, compressionCodec, environment.codec(), environment.byteBufAllocator(), diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index 50698320f8..07a2877385 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -71,7 +71,7 @@ void itemAreProcessed() { sync.down(items.size()); return true; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { RateLimiter rateLimiter = RateLimiter.create(10000); IntStream.range(0, itemCount) .forEach( @@ -102,7 +102,7 @@ void failedProcessingIsReplayed() throws Exception { } return result; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { int firstRoundCount = itemCount / 5; IntStream.range(0, firstRoundCount) .forEach( @@ -132,7 +132,7 @@ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception { return true; }; - try (DynamicBatch batch = new DynamicBatch<>(action, batchSize)) { + try (DynamicBatch batch = new DynamicBatch<>(action, batchSize, 10_000)) { MetricRegistry metrics = new MetricRegistry(); Meter rate = metrics.meter("publishing-rate"); AtomicBoolean keepGoing = new AtomicBoolean(true);