diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java index 9d0f767d01..c28074ff6b 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java @@ -63,6 +63,7 @@ import org.apache.hc.core5.pool.ManagedConnPool; import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.pool.PoolReusePolicy; +import org.apache.hc.core5.pool.RouteSegmentedConnPool; import org.apache.hc.core5.pool.StrictConnPool; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; @@ -370,6 +371,14 @@ public H2AsyncRequester create() { new DefaultDisposalCallback<>(), connPoolListener); break; + case OFFLOCK: + connPool = new RouteSegmentedConnPool<>( + defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 20, + maxTotal > 0 ? maxTotal : 50, + timeToLive, + poolReusePolicy, + new DefaultDisposalCallback<>()); + break; case STRICT: default: connPool = new StrictConnPool<>( diff --git a/httpcore5-testing/pom.xml b/httpcore5-testing/pom.xml index 7741059258..368c897a32 100644 --- a/httpcore5-testing/pom.xml +++ b/httpcore5-testing/pom.xml @@ -110,6 +110,16 @@ junit-jupiter test + + org.openjdk.jmh + jmh-core + test + + + org.openjdk.jmh + jmh-generator-annprocess + test + @@ -135,6 +145,45 @@ + + + benchmark + + true + org.apache + + + + + org.codehaus.mojo + exec-maven-plugin + + + benchmark + test + + exec + + + test + java + + -classpath + + org.openjdk.jmh.Main + -rf + json + -rff + target/jmh-result.${benchmark}.json + ${benchmark} + + + + + + + + diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java new file mode 100644 index 0000000000..4a1861893b --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java @@ -0,0 +1,260 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.benchmark; + + +import java.io.IOException; +import java.util.Locale; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.io.ModalCloseable; +import org.apache.hc.core5.pool.DisposalCallback; +import org.apache.hc.core5.pool.LaxConnPool; +import org.apache.hc.core5.pool.ManagedConnPool; +import org.apache.hc.core5.pool.PoolEntry; +import org.apache.hc.core5.pool.PoolReusePolicy; +import org.apache.hc.core5.pool.PoolStats; +import org.apache.hc.core5.pool.RouteSegmentedConnPool; +import org.apache.hc.core5.pool.StrictConnPool; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Compare StrictConnPool, LaxConnPool, and RouteSegmentedConnPool (“OFFLOCK”) + * under different contention patterns and slow-disposal rates. + */ +@BenchmarkMode({Mode.Throughput, Mode.SampleTime}) +@Warmup(iterations = 3, time = 2, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@Fork(1) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class RoutePoolsJmh { + + /** + * Minimal connection that can simulate slow close. + */ + public static final class FakeConn implements ModalCloseable { + private final int closeDelayMs; + + public FakeConn(final int closeDelayMs) { + this.closeDelayMs = closeDelayMs; + } + + @Override + public void close(final CloseMode closeMode) { + if (closeDelayMs <= 0) { + return; + } + try { + Thread.sleep(closeDelayMs); + } catch (final InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void close() throws IOException { + + } + } + + /** + * All benchmark parameters & shared state live here (required by JMH). + */ + @State(Scope.Benchmark) + public static class BenchState { + + /** + * Which pool to benchmark. + * STRICT -> StrictConnPool + * LAX -> LaxConnPool + * OFFLOCK -> RouteSegmentedConnPool + */ + @Param({"STRICT", "LAX", "OFFLOCK"}) + public String policy; + + /** + * Number of distinct routes to spread load across. + * 1 = hot single route; 10 = multi-route scenario. + */ + @Param({"1", "10"}) + public int routes; + + /** + * Percent (0..100) of releases that will be non-reusable, + * triggering a discard (and thus a potentially slow close). + */ + @Param({"0", "5", "20"}) + public int slowClosePct; + + /** + * Sleep (ms) when a connection is discarded (slow close path). + */ + @Param({"0", "200"}) + public int closeSleepMs; + + /** + * Max total, default per-route — tuned to create contention. + */ + @Param({"32"}) + public int maxTotal; + @Param({"8"}) + public int defMaxPerRoute; + + /** + * Keep-alive on reusable releases. + */ + @Param({"5000"}) + public int keepAliveMs; + + ManagedConnPool pool; + String[] routeKeys; + DisposalCallback disposal; + + @Setup(Level.Trial) + public void setUp() { + // routes list + routeKeys = new String[routes]; + for (int i = 0; i < routes; i++) { + routeKeys[i] = "route-" + i; + } + + disposal = (c, m) -> { + if (c != null) { + c.close(m); + } + }; + + final TimeValue ttl = TimeValue.NEG_ONE_MILLISECOND; + + switch (policy.toUpperCase(Locale.ROOT)) { + case "STRICT": + pool = new StrictConnPool<>( + defMaxPerRoute, + maxTotal, + ttl, + PoolReusePolicy.LIFO, + disposal, + null); + break; + case "LAX": + pool = new LaxConnPool<>( + defMaxPerRoute, + ttl, + PoolReusePolicy.LIFO, + disposal, + null); + pool.setMaxTotal(maxTotal); + break; + case "OFFLOCK": + pool = new RouteSegmentedConnPool<>( + defMaxPerRoute, + maxTotal, + ttl, + PoolReusePolicy.LIFO, + disposal); + break; + default: + throw new IllegalArgumentException("Unknown policy: " + policy); + } + } + + @TearDown(Level.Trial) + public void tearDown() { + if (pool != null) { + pool.close(CloseMode.IMMEDIATE); + } + } + + String pickRoute() { + final int idx = ThreadLocalRandom.current().nextInt(routeKeys.length); + return routeKeys[idx]; + } + + boolean shouldDiscard() { + if (slowClosePct <= 0) return false; + return ThreadLocalRandom.current().nextInt(100) < slowClosePct; + } + } + + /** + * Lease+release on a randomly chosen route. + * Mix of reusable and non-reusable releases (to trigger discard/close). + */ + @Benchmark + @Threads(50) + public void leaseReleaseMixed(final BenchState s) throws Exception { + try { + final Future> f = s.pool.lease(s.pickRoute(), null, Timeout.ofMilliseconds(500), null); + final PoolEntry e = f.get(500, TimeUnit.MILLISECONDS); + if (!e.hasConnection()) e.assignConnection(new FakeConn(s.closeSleepMs)); + final boolean reusable = !s.shouldDiscard(); + if (reusable) { + e.updateExpiry(TimeValue.ofMilliseconds(s.keepAliveMs)); + s.pool.release(e, true); + } else { + s.pool.release(e, false); + } + } catch (final IllegalStateException ignored) { + + } + } + + + /** + * Optional stats probe to ensure the benchmark does "something". + * Not a measured benchmark; use only for sanity runs. + */ + @Benchmark + @Threads(1) + @OperationsPerInvocation(1) + @BenchmarkMode(Mode.SingleShotTime) + public void statsProbe(final BenchState s, final org.openjdk.jmh.infra.Blackhole bh) { + final PoolStats stats = s.pool.getTotalStats(); + bh.consume(stats.getAvailable()); + bh.consume(stats.getLeased()); + bh.consume(stats.getPending()); + } +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java index 3f8c82ac32..14b8802d10 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java @@ -46,6 +46,7 @@ import org.apache.hc.core5.pool.ManagedConnPool; import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.pool.PoolReusePolicy; +import org.apache.hc.core5.pool.RouteSegmentedConnPool; import org.apache.hc.core5.pool.StrictConnPool; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; @@ -260,6 +261,14 @@ public HttpAsyncRequester create() { new DefaultDisposalCallback<>(), connPoolListener); break; + case OFFLOCK: + connPool = new RouteSegmentedConnPool<>( + defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 20, + maxTotal > 0 ? maxTotal : 50, + timeToLive, + poolReusePolicy, + new DefaultDisposalCallback<>()); + break; case STRICT: default: connPool = new StrictConnPool<>( diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/RequesterBootstrap.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/RequesterBootstrap.java index bd3dfd3550..811c5cd6d5 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/RequesterBootstrap.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/RequesterBootstrap.java @@ -54,6 +54,7 @@ import org.apache.hc.core5.pool.ManagedConnPool; import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.pool.PoolReusePolicy; +import org.apache.hc.core5.pool.RouteSegmentedConnPool; import org.apache.hc.core5.pool.StrictConnPool; import org.apache.hc.core5.util.Timeout; @@ -213,6 +214,14 @@ public HttpRequester create() { new DefaultDisposalCallback<>(), connPoolListener); break; + case OFFLOCK: + connPool = new RouteSegmentedConnPool<>( + defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 20, + maxTotal > 0 ? maxTotal : 50, + timeToLive, + poolReusePolicy, + new DefaultDisposalCallback<>()); + break; case STRICT: default: connPool = new StrictConnPool<>( diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolConcurrencyPolicy.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolConcurrencyPolicy.java index 01216fd5c7..0583d7b31e 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolConcurrencyPolicy.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/PoolConcurrencyPolicy.java @@ -41,6 +41,11 @@ public enum PoolConcurrencyPolicy { /** * Strict connection max limit guarantees. */ - STRICT + STRICT, + /** + * Lock-free, route-segmented pool: avoids blocking leases during slow connection disposal. + * @since 5.4 + */ + OFFLOCK } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java new file mode 100644 index 0000000000..38b4bb4962 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java @@ -0,0 +1,499 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.pool; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.io.ModalCloseable; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; + +/** + * Lock-free, route-segmented connection pool. + * + *

This implementation keeps per-route state in independent segments and avoids + * holding a global lock while disposing of connections. Under slow closes + * (for example TLS shutdown or OS-level socket stalls), threads leasing + * connections on other routes are not blocked by disposal work.

+ * + * @param route key type + * @param connection type (must be {@link org.apache.hc.core5.io.ModalCloseable}) + * @see ManagedConnPool + * @see PoolReusePolicy + * @see DisposalCallback + * @since 5.4 + */ +@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) +@Experimental +public final class RouteSegmentedConnPool implements ManagedConnPool { + + private final PoolReusePolicy reusePolicy; + private final TimeValue timeToLive; + private final DisposalCallback disposal; + + private final AtomicInteger defaultMaxPerRoute = new AtomicInteger(5); + + private final ConcurrentHashMap segments = new ConcurrentHashMap<>(); + private final ConcurrentHashMap maxPerRoute = new ConcurrentHashMap<>(); + private final AtomicInteger totalAllocated = new AtomicInteger(0); + private final AtomicInteger maxTotal = new AtomicInteger(25); + + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final ScheduledExecutorService timeouts; + + public RouteSegmentedConnPool( + final int defaultMaxPerRoute, + final int maxTotal, + final TimeValue timeToLive, + final PoolReusePolicy reusePolicy, + final DisposalCallback disposal) { + + this.defaultMaxPerRoute.set(defaultMaxPerRoute > 0 ? defaultMaxPerRoute : 5); + this.maxTotal.set(maxTotal > 0 ? maxTotal : 25); + this.timeToLive = timeToLive != null ? timeToLive : TimeValue.NEG_ONE_MILLISECOND; + this.reusePolicy = reusePolicy != null ? reusePolicy : PoolReusePolicy.LIFO; + this.disposal = Args.notNull(disposal, "disposal"); + + final ThreadFactory tf = r -> { + final Thread t = new Thread(r, "seg-pool-timeouts"); + t.setDaemon(true); + return t; + }; + this.timeouts = Executors.newSingleThreadScheduledExecutor(tf); + } + + final class Segment { + final ConcurrentLinkedDeque> available = new ConcurrentLinkedDeque<>(); + final ConcurrentLinkedQueue waiters = new ConcurrentLinkedQueue<>(); + final AtomicInteger allocated = new AtomicInteger(0); + + int limitPerRoute(final R route) { + final Integer v = maxPerRoute.get(route); + return v != null ? v : defaultMaxPerRoute.get(); + } + } + + final class Waiter extends CompletableFuture> { + final Timeout requestTimeout; + final Object state; + volatile boolean cancelled; + + Waiter(final Timeout t, final Object s) { + this.requestTimeout = t != null ? t : Timeout.DISABLED; + this.state = s; + } + } + + @Override + public Future> lease( + final R route, + final Object state, + final Timeout requestTimeout, + final FutureCallback> callback) { + + ensureOpen(); + final Segment seg = segments.computeIfAbsent(route, r -> new Segment()); + + PoolEntry hit; + for (; ; ) { + hit = pollAvailable(seg, state); + if (hit == null) { + break; + } + final long now = System.currentTimeMillis(); + if (hit.getExpiryDeadline().isBefore(now) || isPastTtl(hit)) { + discardAndDecr(hit, CloseMode.GRACEFUL); + continue; + } + break; + } + if (hit != null) { + if (callback != null) { + callback.completed(hit); + } + return CompletableFuture.completedFuture(hit); + } + + for (; ; ) { + final int tot = totalAllocated.get(); + if (tot >= maxTotal.get()) { + break; + } + if (totalAllocated.compareAndSet(tot, tot + 1)) { + for (; ; ) { + final int per = seg.allocated.get(); + if (per >= seg.limitPerRoute(route)) { + totalAllocated.decrementAndGet(); + break; + } + if (seg.allocated.compareAndSet(per, per + 1)) { + final PoolEntry entry = new PoolEntry<>(route, timeToLive, disposal); + if (callback != null) { + callback.completed(entry); + } + return CompletableFuture.completedFuture(entry); + } + } + break; + } + } + + final Waiter w = new Waiter(requestTimeout, state); + seg.waiters.add(w); + + final PoolEntry late = pollAvailable(seg, state); + if (late != null && seg.waiters.remove(w)) { + if (callback != null) { + callback.completed(late); + } + w.complete(late); + return w; + } + + scheduleTimeout(w, seg, callback); + + if (callback != null) { + w.whenComplete((pe, ex) -> { + if (ex != null) { + callback.failed(ex instanceof Exception ? (Exception) ex : new Exception(ex)); + } else { + callback.completed(pe); + } + }); + } + return w; + } + + @Override + public void release(final PoolEntry entry, final boolean reusable) { + if (entry == null) { + return; + } + final R route = entry.getRoute(); + final Segment seg = segments.get(route); + if (seg == null) { + entry.discardConnection(CloseMode.GRACEFUL); + return; + } + + final long now = System.currentTimeMillis(); + final boolean stillValid = reusable && !isPastTtl(entry) && !entry.getExpiryDeadline().isBefore(now); + + if (stillValid) { + for (; ; ) { + final Waiter w = seg.waiters.poll(); + if (w == null) { + break; + } + if (w.cancelled) { + continue; + } + if (compatible(w.state, entry.getState())) { + if (w.complete(entry)) { + return; + } + } + } + offerAvailable(seg, entry); + } else { + discardAndDecr(entry, CloseMode.GRACEFUL); + } + + maybeCleanupSegment(route, seg); + } + + @Override + public void close() throws IOException { + close(CloseMode.GRACEFUL); + } + + @Override + public void close(final CloseMode closeMode) { + if (!closed.compareAndSet(false, true)) { + return; + } + + timeouts.shutdownNow(); + + for (final Map.Entry e : segments.entrySet()) { + final Segment seg = e.getValue(); + + // cancel waiters + for (final Waiter w : seg.waiters) { + w.cancelled = true; + w.completeExceptionally(new TimeoutException("Pool closed")); + } + seg.waiters.clear(); + + for (final PoolEntry p : seg.available) { + p.discardConnection(orImmediate(closeMode)); + } + seg.available.clear(); + + final int alloc = seg.allocated.getAndSet(0); + if (alloc != 0) { + totalAllocated.addAndGet(-alloc); + } + } + segments.clear(); + } + + @Override + public void closeIdle(final TimeValue idleTime) { + final long cutoff = System.currentTimeMillis() + - Math.max(0L, idleTime != null ? idleTime.toMilliseconds() : 0L); + + for (final Map.Entry e : segments.entrySet()) { + final R route = e.getKey(); + final Segment seg = e.getValue(); + + int processed = 0; + final int cap = 64; + for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) { + final PoolEntry p = it.next(); + if (p.getUpdated() <= cutoff) { + it.remove(); + discardAndDecr(p, CloseMode.GRACEFUL); + if (++processed == cap) { + break; + } + } + } + maybeCleanupSegment(route, seg); + } + } + + @Override + public void closeExpired() { + final long now = System.currentTimeMillis(); + + for (final Map.Entry e : segments.entrySet()) { + final R route = e.getKey(); + final Segment seg = e.getValue(); + + int processed = 0; + final int cap = 64; + for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) { + final PoolEntry p = it.next(); + if (p.getExpiryDeadline().isBefore(now) || isPastTtl(p)) { + it.remove(); + discardAndDecr(p, CloseMode.GRACEFUL); + if (++processed == cap) { + break; + } + } + } + maybeCleanupSegment(route, seg); + } + } + + @Override + public Set getRoutes() { + final Set out = new HashSet<>(); + for (final Map.Entry e : segments.entrySet()) { + final Segment s = e.getValue(); + if (!s.available.isEmpty() || s.allocated.get() > 0 || !s.waiters.isEmpty()) { + out.add(e.getKey()); + } + } + return out; + } + + @Override + public int getMaxTotal() { + return maxTotal.get(); + } + + @Override + public void setMaxTotal(final int max) { + maxTotal.set(Math.max(1, max)); + } + + @Override + public int getDefaultMaxPerRoute() { + return defaultMaxPerRoute.get(); + } + + @Override + public void setDefaultMaxPerRoute(final int max) { + defaultMaxPerRoute.set(Math.max(1, max)); + } + + @Override + public int getMaxPerRoute(final R route) { + final Integer v = maxPerRoute.get(route); + return v != null ? v : defaultMaxPerRoute.get(); + } + + @Override + public void setMaxPerRoute(final R route, final int max) { + if (max <= 0) { + maxPerRoute.remove(route); + } else { + maxPerRoute.put(route, max); + } + } + + @Override + public PoolStats getTotalStats() { + int leased = 0, availableCount = 0, pending = 0; + for (final Segment seg : segments.values()) { + final int alloc = seg.allocated.get(); + final int avail = seg.available.size(); + leased += Math.max(0, alloc - avail); + availableCount += avail; + pending += seg.waiters.size(); + } + return new PoolStats(leased, pending, availableCount, getMaxTotal()); + } + + @Override + public PoolStats getStats(final R route) { + final Segment seg = segments.get(route); + if (seg == null) { + return new PoolStats(0, 0, 0, getMaxPerRoute(route)); + } + final int alloc = seg.allocated.get(); + final int avail = seg.available.size(); + final int leased = Math.max(0, alloc - avail); + final int pending = seg.waiters.size(); + return new PoolStats(leased, pending, avail, getMaxPerRoute(route)); + } + + private void ensureOpen() { + if (closed.get()) { + throw new IllegalStateException("Pool is closed"); + } + } + + private boolean isPastTtl(final PoolEntry p) { + if (timeToLive == null || timeToLive.getDuration() < 0) { + return false; + } + return (System.currentTimeMillis() - p.getCreated()) >= timeToLive.toMilliseconds(); + } + + private void scheduleTimeout( + final Waiter w, + final Segment seg, + final FutureCallback> cb) { + + if (!TimeValue.isPositive(w.requestTimeout)) { + return; + } + timeouts.schedule(() -> { + if (w.isDone()) { + return; + } + w.cancelled = true; + final TimeoutException tex = new TimeoutException("Lease timed out"); + w.completeExceptionally(tex); + + final PoolEntry p = pollAvailable(seg, w.state); + if (p != null) { + boolean handedOff = false; + for (Waiter other; (other = seg.waiters.poll()) != null; ) { + if (!other.cancelled && compatible(other.state, p.getState())) { + handedOff = other.complete(p); + if (handedOff) { + break; + } + } + } + if (!handedOff) { + offerAvailable(seg, p); + } + } + }, w.requestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + + private void offerAvailable(final Segment seg, final PoolEntry p) { + if (reusePolicy == PoolReusePolicy.LIFO) { + seg.available.addFirst(p); + } else { + seg.available.addLast(p); + } + } + + private PoolEntry pollAvailable(final Segment seg, final Object neededState) { + for (final Iterator> it = seg.available.iterator(); it.hasNext(); ) { + final PoolEntry p = it.next(); + if (compatible(neededState, p.getState())) { + it.remove(); + return p; + } + } + return null; + } + + private boolean compatible(final Object needed, final Object have) { + return needed == null || Objects.equals(needed, have); + } + + private void discardAndDecr(final PoolEntry p, final CloseMode mode) { + p.discardConnection(orImmediate(mode)); + totalAllocated.decrementAndGet(); + final Segment seg = segments.get(p.getRoute()); + if (seg != null) { + seg.allocated.decrementAndGet(); + } + } + + private CloseMode orImmediate(final CloseMode m) { + return m != null ? m : CloseMode.IMMEDIATE; + } + + private void maybeCleanupSegment(final R route, final Segment seg) { + if (seg.allocated.get() == 0 && seg.available.isEmpty() && seg.waiters.isEmpty()) { + segments.remove(route, seg); + } + } +} diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/FakeConnection.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/FakeConnection.java new file mode 100644 index 0000000000..524699438d --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/FakeConnection.java @@ -0,0 +1,74 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.pool; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.io.ModalCloseable; + +final class FakeConnection implements ModalCloseable { + private final long closeSleepMs; + private final AtomicInteger closes = new AtomicInteger(0); + private final CountDownLatch closedLatch = new CountDownLatch(1); + + FakeConnection() { + this(0); + } + + FakeConnection(final long closeSleepMs) { + this.closeSleepMs = closeSleepMs; + } + + @Override + public void close(final CloseMode closeMode) { + if (closeSleepMs > 0) { + try { + Thread.sleep(closeSleepMs); + } catch (final InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + closes.incrementAndGet(); + closedLatch.countDown(); + } + + @Override + public void close() { + close(CloseMode.GRACEFUL); + } + + int closeCount() { + return closes.get(); + } + + boolean awaitClosed(final long ms) throws InterruptedException { + return closedLatch.await(ms, TimeUnit.MILLISECONDS); + } +} diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java new file mode 100644 index 0000000000..7d6b81b148 --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java @@ -0,0 +1,335 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.pool; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.io.ModalCloseable; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Test; + +public class RouteSegmentedConnPoolTest { + + private static RouteSegmentedConnPool newPool( + final int defPerRoute, final int maxTotal, final TimeValue ttl, final PoolReusePolicy reuse, + final DisposalCallback disposal) { + return new RouteSegmentedConnPool<>(defPerRoute, maxTotal, ttl, reuse, disposal); + } + + @Test + void basicLeaseReleaseAndHandoff() throws Exception { + final DisposalCallback disposal = FakeConnection::close; + final RouteSegmentedConnPool pool = + newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal); + + final PoolEntry e1 = pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + assertNotNull(e1); + assertEquals("r1", e1.getRoute()); + assertFalse(e1.hasConnection()); + e1.assignConnection(new FakeConnection()); + e1.updateState("A"); + e1.updateExpiry(TimeValue.ofSeconds(30)); + pool.release(e1, true); + + final Future> f2 = + pool.lease("r1", "A", Timeout.ofSeconds(1), null); + final PoolEntry e2 = f2.get(1, TimeUnit.SECONDS); + assertSame(e1, e2, "Should receive same entry via direct hand-off"); + pool.release(e2, true); + pool.close(CloseMode.IMMEDIATE); + } + + @Test + void perRouteAndTotalLimits() throws Exception { + final DisposalCallback disposal = FakeConnection::close; + final RouteSegmentedConnPool pool = + newPool(1, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal); + + final PoolEntry r1a = pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + final PoolEntry r2a = pool.lease("r2", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + + final Future> blocked = pool.lease("r1", null, Timeout.ofMilliseconds(150), null); + final ExecutionException ex = assertThrows( + ExecutionException.class, + () -> blocked.get(400, TimeUnit.MILLISECONDS)); + assertInstanceOf(TimeoutException.class, ex.getCause()); + assertEquals("Lease timed out", ex.getCause().getMessage()); + + r1a.assignConnection(new FakeConnection()); + r1a.updateExpiry(TimeValue.ofSeconds(5)); + pool.release(r1a, true); + + final PoolEntry r1b = + pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + assertNotNull(r1b); + pool.release(r2a, false); // drop + pool.release(r1b, false); + pool.close(CloseMode.IMMEDIATE); + } + + @Test + void stateCompatibilityNullMatchesAnything() throws Exception { + final RouteSegmentedConnPool pool = + newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close); + + final PoolEntry e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + e.assignConnection(new FakeConnection()); + e.updateState("X"); + e.updateExpiry(TimeValue.ofSeconds(30)); + pool.release(e, true); + + // waiter with null state must match + final PoolEntry got = + pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + assertSame(e, got); + pool.release(got, false); + pool.close(CloseMode.IMMEDIATE); + } + + @Test + void closeIdleRemovesStaleAvailable() throws Exception { + final RouteSegmentedConnPool pool = + newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close); + + final PoolEntry e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + e.assignConnection(new FakeConnection()); + e.updateExpiry(TimeValue.ofSeconds(30)); + pool.release(e, true); + + // sleep to make it idle + Thread.sleep(120); + pool.closeIdle(TimeValue.ofMilliseconds(50)); + + final PoolStats stats = pool.getStats("r"); + assertEquals(0, stats.getAvailable()); + pool.close(CloseMode.IMMEDIATE); + } + + @Test + void closeExpiredHonorsEntryExpiryOrTtl() throws Exception { + // TTL = 100ms, so entries become past-ttl quickly + final RouteSegmentedConnPool pool = + newPool(1, 1, TimeValue.ofMilliseconds(100), PoolReusePolicy.LIFO, FakeConnection::close); + + final PoolEntry e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + e.assignConnection(new FakeConnection()); + // keep alive "far", TTL will still kill it + e.updateExpiry(TimeValue.ofSeconds(10)); + pool.release(e, true); + + Thread.sleep(150); + pool.closeExpired(); + + final PoolStats stats = pool.getStats("r"); + assertEquals(0, stats.getAvailable(), "Expired/TTL entry should be gone"); + pool.close(CloseMode.IMMEDIATE); + } + + @Test + void waiterTimesOutAndIsFailed() throws Exception { + final RouteSegmentedConnPool pool = + newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close); + + // Occupy single slot and don't release + final PoolEntry e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + + final Future> waiter = + pool.lease("r", null, Timeout.ofMilliseconds(150), null); + + final ExecutionException ex = assertThrows( + ExecutionException.class, + () -> waiter.get(500, TimeUnit.MILLISECONDS)); + assertInstanceOf(TimeoutException.class, ex.getCause()); + assertEquals("Lease timed out", ex.getCause().getMessage()); + // cleanup + pool.release(e, false); + pool.close(CloseMode.IMMEDIATE); + } + + @Test + void poolCloseCancelsWaitersAndDrainsAvailable() throws Exception { + final RouteSegmentedConnPool pool = + newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close); + + // Consume the only slot so the next lease becomes a waiter + final Future> first = pool.lease("r", null, Timeout.ofSeconds(5), null); + first.get(); // allocated immediately, not released + + // Now this one queues as a waiter + final Future> waiter = + pool.lease("r", null, Timeout.ofSeconds(5), null); + + pool.close(CloseMode.IMMEDIATE); + + final ExecutionException ex = assertThrows(ExecutionException.class, waiter::get); + assertInstanceOf(TimeoutException.class, ex.getCause()); + assertEquals("Pool closed", ex.getCause().getMessage()); + } + + + @Test + void reusePolicyLifoVsFifoIsObservable() throws Exception { + final RouteSegmentedConnPool lifo = + newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close); + + final PoolEntry a = lifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + final PoolEntry b = lifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + a.assignConnection(new FakeConnection()); + a.updateExpiry(TimeValue.ofSeconds(10)); + lifo.release(a, true); + b.assignConnection(new FakeConnection()); + b.updateExpiry(TimeValue.ofSeconds(10)); + lifo.release(b, true); + + final PoolEntry firstLifo = + lifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + assertSame(b, firstLifo, "LIFO should return last released"); + lifo.release(firstLifo, false); + lifo.close(CloseMode.IMMEDIATE); + + final RouteSegmentedConnPool fifo = + newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.FIFO, FakeConnection::close); + final PoolEntry a2 = fifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + final PoolEntry b2 = fifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + a2.assignConnection(new FakeConnection()); + a2.updateExpiry(TimeValue.ofSeconds(10)); + fifo.release(a2, true); + b2.assignConnection(new FakeConnection()); + b2.updateExpiry(TimeValue.ofSeconds(10)); + fifo.release(b2, true); + + final PoolEntry firstFifo = + fifo.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + assertSame(a2, firstFifo, "FIFO should return first released"); + fifo.release(firstFifo, false); + fifo.close(CloseMode.IMMEDIATE); + } + + @Test + void disposalIsCalledOnDiscard() throws Exception { + final List closed = new ArrayList<>(); + final DisposalCallback disposal = (c, m) -> { + c.close(m); + closed.add(c); + }; + final RouteSegmentedConnPool pool = + newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal); + + final PoolEntry e = pool.lease("r", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + final FakeConnection conn = new FakeConnection(); + e.assignConnection(conn); + pool.release(e, false); + assertEquals(1, closed.size()); + assertEquals(1, closed.get(0).closeCount()); + pool.close(CloseMode.IMMEDIATE); + } + + @Test + void slowDisposalDoesNotBlockOtherRoutes() throws Exception { + final DisposalCallback disposal = FakeConnection::close; + final RouteSegmentedConnPool pool = + newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal); + + final PoolEntry e1 = pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + e1.assignConnection(new FakeConnection(600)); + final long startDiscard = System.nanoTime(); + pool.release(e1, false); + + final long t0 = System.nanoTime(); + final PoolEntry e2 = pool.lease("r2", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + final long tLeaseMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0); + assertTrue(tLeaseMs < 200, "Other route lease blocked by disposal: " + tLeaseMs + "ms"); + + pool.release(e2, false); + final long discardMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startDiscard); + assertTrue(discardMs >= 600, "Discard should reflect slow close path"); + + pool.close(CloseMode.IMMEDIATE); + } + + @Test + void getRoutesCoversAllocatedAvailableAndWaiters() throws Exception { + final RouteSegmentedConnPool pool = + newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, FakeConnection::close); + + assertTrue(pool.getRoutes().isEmpty(), "Initially there should be no routes"); + + final PoolEntry a = + pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + assertEquals(new HashSet(Collections.singletonList("rA")), pool.getRoutes(), + "rA must be listed because it is leased (allocated > 0)"); + + a.assignConnection(new FakeConnection()); + a.updateExpiry(TimeValue.ofSeconds(30)); + pool.release(a, true); + assertEquals(new HashSet<>(Collections.singletonList("rA")), pool.getRoutes(), + "rA must be listed because it has AVAILABLE entries"); + + final Future> waiterB = + pool.lease("rB", null, Timeout.ofMilliseconds(300), null); // enqueues immediately + final Set routesNow = pool.getRoutes(); + assertTrue(routesNow.contains("rA") && routesNow.contains("rB"), + "Both rA (available) and rB (waiter) must be listed"); + + final PoolEntry a2 = + pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS); + pool.release(a2, false); // discard + final Set afterDropA = pool.getRoutes(); + assertFalse(afterDropA.contains("rA"), "rA segment should be cleaned up"); + assertTrue(afterDropA.contains("rB"), "rB (waiter) should remain listed"); + + final ExecutionException ex = assertThrows( + ExecutionException.class, + () -> waiterB.get(600, TimeUnit.MILLISECONDS)); + assertInstanceOf(TimeoutException.class, ex.getCause()); + assertEquals("Lease timed out", ex.getCause().getMessage()); + + // Final cleanup: after close everything is cleared + pool.close(CloseMode.IMMEDIATE); + assertTrue(pool.getRoutes().isEmpty(), "All routes must be gone after close()"); + } + + +} diff --git a/pom.xml b/pom.xml index 9ea2647b85..33e59ce572 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,7 @@ 3.1.10 1.21.3 5.3 + 1.37 javax.net.ssl.SSLEngine,javax.net.ssl.SSLParameters,java.nio.ByteBuffer,java.nio.CharBuffer,jdk.net.ExtendedSocketOptions,jdk.net.Sockets @@ -161,6 +162,18 @@ junit-jupiter ${testcontainers.version} + + org.openjdk.jmh + jmh-core + ${jmh.version} + test + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + test +