diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index 19f027f5fc4..917fa271302 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -109,6 +109,7 @@ final class CachingRlsLbClient { // LRU cache based on access order (BACKOFF and actual data will be here) @GuardedBy("lock") private final RlsAsyncLruCache linkedHashLruCache; + private final Future periodicCleaner; // any RPC on the fly will cached in this map @GuardedBy("lock") private final Map pendingCallCache = new HashMap<>(); @@ -177,10 +178,10 @@ private CachingRlsLbClient(Builder builder) { new RlsAsyncLruCache( rlsConfig.cacheSizeBytes(), new AutoCleaningEvictionListener(builder.evictionListener), - scheduledExecutorService, ticker, - lock, helper); + periodicCleaner = + scheduledExecutorService.scheduleAtFixedRate(this::periodicClean, 1, 1, TimeUnit.MINUTES); logger = helper.getChannelLogger(); String serverHost = null; try { @@ -261,6 +262,12 @@ static Status convertRlsServerStatus(Status status, String serverName) { serverName, status.getCode(), status.getDescription())); } + private void periodicClean() { + synchronized (lock) { + linkedHashLruCache.cleanupExpiredEntries(); + } + } + /** Populates async cache entry for new request. */ @GuardedBy("lock") private CachedRouteLookupResponse asyncRlsCall( @@ -343,6 +350,7 @@ final CachedRouteLookupResponse get(final RouteLookupRequest request) { void close() { logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed"); synchronized (lock) { + periodicCleaner.cancel(false); // all childPolicyWrapper will be returned via AutoCleaningEvictionListener linkedHashLruCache.close(); // TODO(creamsoup) maybe cancel all pending requests @@ -892,15 +900,8 @@ private static final class RlsAsyncLruCache RlsAsyncLruCache(long maxEstimatedSizeBytes, @Nullable EvictionListener evictionListener, - ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) { - super( - maxEstimatedSizeBytes, - evictionListener, - 1, - TimeUnit.MINUTES, - ses, - ticker, - lock); + Ticker ticker, RlsLbHelper helper) { + super(maxEstimatedSizeBytes, evictionListener, ticker); this.helper = checkNotNull(helper, "helper"); } diff --git a/rls/src/main/java/io/grpc/rls/LinkedHashLruCache.java b/rls/src/main/java/io/grpc/rls/LinkedHashLruCache.java index c1cbb28f29e..ba0575efa57 100644 --- a/rls/src/main/java/io/grpc/rls/LinkedHashLruCache.java +++ b/rls/src/main/java/io/grpc/rls/LinkedHashLruCache.java @@ -29,46 +29,30 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; /** * A LinkedHashLruCache implements least recently used caching where it supports access order lru * cache eviction while allowing entry level expiration time. When the cache reaches max capacity, * LruCache try to remove up to one already expired entries. If it doesn't find any expired entries, - * it will remove based on access order of entry. On top of this, LruCache also proactively removes - * expired entries based on configured time interval. + * it will remove based on access order of entry. To proactively clean up expired entries, call + * {@link #cleanupExpiredEntries()} (e.g., via a recurring timer). */ -@ThreadSafe abstract class LinkedHashLruCache implements LruCache { - private final Object lock; - - @GuardedBy("lock") private final LinkedHashMap delegate; - private final PeriodicCleaner periodicCleaner; private final Ticker ticker; private final EvictionListener evictionListener; - private final AtomicLong estimatedSizeBytes = new AtomicLong(); + private long estimatedSizeBytes; private long estimatedMaxSizeBytes; LinkedHashLruCache( final long estimatedMaxSizeBytes, @Nullable final EvictionListener evictionListener, - int cleaningInterval, - TimeUnit cleaningIntervalUnit, - ScheduledExecutorService ses, - final Ticker ticker, - Object lock) { + final Ticker ticker) { checkState(estimatedMaxSizeBytes > 0, "max estimated cache size should be positive"); this.estimatedMaxSizeBytes = estimatedMaxSizeBytes; - this.lock = checkNotNull(lock, "lock"); this.evictionListener = new SizeHandlingEvictionListener(evictionListener); this.ticker = checkNotNull(ticker, "ticker"); delegate = new LinkedHashMap( @@ -78,7 +62,7 @@ abstract class LinkedHashLruCache implements LruCache { /* accessOrder= */ true) { @Override protected boolean removeEldestEntry(Map.Entry eldest) { - if (estimatedSizeBytes.get() <= LinkedHashLruCache.this.estimatedMaxSizeBytes) { + if (estimatedSizeBytes <= LinkedHashLruCache.this.estimatedMaxSizeBytes) { return false; } @@ -94,7 +78,6 @@ protected boolean removeEldestEntry(Map.Entry eldest) { return false; } }; - periodicCleaner = new PeriodicCleaner(ses, cleaningInterval, cleaningIntervalUnit).start(); } /** @@ -124,16 +107,14 @@ protected long estimatedMaxSizeBytes() { /** Updates size for given key if entry exists. It is useful if the cache value is mutated. */ public void updateEntrySize(K key) { - synchronized (lock) { - SizedValue entry = readInternal(key); - if (entry == null) { - return; - } - int prevSize = entry.size; - int newSize = estimateSizeOf(key, entry.value); - entry.size = newSize; - estimatedSizeBytes.addAndGet(newSize - prevSize); + SizedValue entry = readInternal(key); + if (entry == null) { + return; } + int prevSize = entry.size; + int newSize = estimateSizeOf(key, entry.value); + entry.size = newSize; + estimatedSizeBytes += newSize - prevSize; } /** @@ -141,7 +122,7 @@ public void updateEntrySize(K key) { * #estimateSizeOf(java.lang.Object, java.lang.Object)}. */ public long estimatedSizeBytes() { - return estimatedSizeBytes.get(); + return estimatedSizeBytes; } @Override @@ -151,12 +132,10 @@ public final V cache(K key, V value) { checkNotNull(value, "value"); SizedValue existing; int size = estimateSizeOf(key, value); - synchronized (lock) { - estimatedSizeBytes.addAndGet(size); - existing = delegate.put(key, new SizedValue(size, value)); - if (existing != null) { - evictionListener.onEviction(key, existing, EvictionType.REPLACED); - } + estimatedSizeBytes += size; + existing = delegate.put(key, new SizedValue(size, value)); + if (existing != null) { + evictionListener.onEviction(key, existing, EvictionType.REPLACED); } return existing == null ? null : existing.value; } @@ -176,13 +155,11 @@ public final V read(K key) { @CheckReturnValue private SizedValue readInternal(K key) { checkNotNull(key, "key"); - synchronized (lock) { - SizedValue existing = delegate.get(key); - if (existing != null && isExpired(key, existing.value, ticker.read())) { - return null; - } - return existing; + SizedValue existing = delegate.get(key); + if (existing != null && isExpired(key, existing.value, ticker.read())) { + return null; } + return existing; } @Override @@ -195,26 +172,22 @@ public final V invalidate(K key) { private V invalidate(K key, EvictionType cause) { checkNotNull(key, "key"); checkNotNull(cause, "cause"); - synchronized (lock) { - SizedValue existing = delegate.remove(key); - if (existing != null) { - evictionListener.onEviction(key, existing, cause); - } - return existing == null ? null : existing.value; + SizedValue existing = delegate.remove(key); + if (existing != null) { + evictionListener.onEviction(key, existing, cause); } + return existing == null ? null : existing.value; } @Override public final void invalidateAll() { - synchronized (lock) { - Iterator> iterator = delegate.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue() != null) { - evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.EXPLICIT); - } - iterator.remove(); + Iterator> iterator = delegate.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue() != null) { + evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.EXPLICIT); } + iterator.remove(); } } @@ -227,13 +200,11 @@ public final boolean hasCacheEntry(K key) { /** Returns shallow copied values in the cache. */ public final List values() { - synchronized (lock) { - List list = new ArrayList<>(delegate.size()); - for (SizedValue value : delegate.values()) { - list.add(value.value); - } - return Collections.unmodifiableList(list); + List list = new ArrayList<>(delegate.size()); + for (SizedValue value : delegate.values()) { + list.add(value.value); } + return Collections.unmodifiableList(list); } /** @@ -243,27 +214,25 @@ public final List values() { */ protected final boolean fitToLimit() { boolean removedAnyUnexpired = false; - synchronized (lock) { - if (estimatedSizeBytes.get() <= estimatedMaxSizeBytes) { - // new size is larger no need to do cleanup - return false; - } - // cleanup expired entries - long now = ticker.read(); - cleanupExpiredEntries(now); - - // cleanup eldest entry until new size limit - Iterator> lruIter = delegate.entrySet().iterator(); - while (lruIter.hasNext() && estimatedMaxSizeBytes < this.estimatedSizeBytes.get()) { - Map.Entry entry = lruIter.next(); - if (!shouldInvalidateEldestEntry(entry.getKey(), entry.getValue().value, now)) { - break; // Violates some constraint like minimum age so stop our cleanup - } - lruIter.remove(); - // eviction listener will update the estimatedSizeBytes - evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.SIZE); - removedAnyUnexpired = true; + if (estimatedSizeBytes <= estimatedMaxSizeBytes) { + // new size is larger no need to do cleanup + return false; + } + // cleanup expired entries + long now = ticker.read(); + cleanupExpiredEntries(now); + + // cleanup eldest entry until new size limit + Iterator> lruIter = delegate.entrySet().iterator(); + while (lruIter.hasNext() && estimatedMaxSizeBytes < this.estimatedSizeBytes) { + Map.Entry entry = lruIter.next(); + if (!shouldInvalidateEldestEntry(entry.getKey(), entry.getValue().value, now)) { + break; // Violates some constraint like minimum age so stop our cleanup } + lruIter.remove(); + // eviction listener will update the estimatedSizeBytes + evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.SIZE); + removedAnyUnexpired = true; } return removedAnyUnexpired; } @@ -273,18 +242,19 @@ protected final boolean fitToLimit() { * removing expired entries and removing oldest entries by LRU order. */ public final void resize(long newSizeBytes) { - synchronized (lock) { - this.estimatedMaxSizeBytes = newSizeBytes; - fitToLimit(); - } + this.estimatedMaxSizeBytes = newSizeBytes; + fitToLimit(); } @Override @CheckReturnValue public final int estimatedSize() { - synchronized (lock) { - return delegate.size(); - } + return delegate.size(); + } + + /** Returns {@code true} if any entries were removed. */ + public final boolean cleanupExpiredEntries() { + return cleanupExpiredEntries(ticker.read()); } private boolean cleanupExpiredEntries(long now) { @@ -295,16 +265,14 @@ private boolean cleanupExpiredEntries(long now) { private boolean cleanupExpiredEntries(int maxExpiredEntries, long now) { checkArgument(maxExpiredEntries > 0, "maxExpiredEntries must be positive"); boolean removedAny = false; - synchronized (lock) { - Iterator> lruIter = delegate.entrySet().iterator(); - while (lruIter.hasNext() && maxExpiredEntries > 0) { - Map.Entry entry = lruIter.next(); - if (isExpired(entry.getKey(), entry.getValue().value, now)) { - lruIter.remove(); - evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.EXPIRED); - removedAny = true; - maxExpiredEntries--; - } + Iterator> lruIter = delegate.entrySet().iterator(); + while (lruIter.hasNext() && maxExpiredEntries > 0) { + Map.Entry entry = lruIter.next(); + if (isExpired(entry.getKey(), entry.getValue().value, now)) { + lruIter.remove(); + evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.EXPIRED); + removedAny = true; + maxExpiredEntries--; } } return removedAny; @@ -312,48 +280,7 @@ private boolean cleanupExpiredEntries(int maxExpiredEntries, long now) { @Override public final void close() { - synchronized (lock) { - periodicCleaner.stop(); - invalidateAll(); - } - } - - /** Periodically cleans up the AsyncRequestCache. */ - private final class PeriodicCleaner { - - private final ScheduledExecutorService ses; - private final int interval; - private final TimeUnit intervalUnit; - private ScheduledFuture scheduledFuture; - - PeriodicCleaner(ScheduledExecutorService ses, int interval, TimeUnit intervalUnit) { - this.ses = checkNotNull(ses, "ses"); - checkState(interval > 0, "interval must be positive"); - this.interval = interval; - this.intervalUnit = checkNotNull(intervalUnit, "intervalUnit"); - } - - PeriodicCleaner start() { - checkState(scheduledFuture == null, "cleaning task can be started only once"); - this.scheduledFuture = - ses.scheduleAtFixedRate(new CleaningTask(), interval, interval, intervalUnit); - return this; - } - - void stop() { - if (scheduledFuture != null) { - scheduledFuture.cancel(false); - scheduledFuture = null; - } - } - - private class CleaningTask implements Runnable { - - @Override - public void run() { - cleanupExpiredEntries(ticker.read()); - } - } + invalidateAll(); } /** A {@link EvictionListener} keeps track of size. */ @@ -367,7 +294,7 @@ private final class SizeHandlingEvictionListener implements EvictionListener( MAX_SIZE, evictionListener, - 10, - TimeUnit.NANOSECONDS, - fakeClock.getScheduledExecutorService(), - fakeClock.getTicker(), - new Object()) { + fakeClock.getTicker()) { @Override protected boolean isExpired(Integer key, Entry value, long nowNanos) { - return value.expireTime <= nowNanos; + return value.expireTime - nowNanos <= 0; } @Override @@ -107,9 +103,11 @@ public void eviction_expire() { cache.cache(1, survivor); fakeClock.forwardTime(10, TimeUnit.NANOSECONDS); + cache.cleanupExpiredEntries(); verify(evictionListener).onEviction(0, toBeEvicted, EvictionType.EXPIRED); fakeClock.forwardTime(10, TimeUnit.NANOSECONDS); + cache.cleanupExpiredEntries(); verify(evictionListener).onEviction(1, survivor, EvictionType.EXPIRED); } @@ -160,6 +158,7 @@ public void eviction_cleanupShouldRemoveAlreadyExpired() { assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE); fakeClock.forwardTime(1, TimeUnit.MINUTES); + cache.cleanupExpiredEntries(); assertThat(cache.read(MAX_SIZE)).isNull(); assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE - 1); verify(evictionListener).onEviction(eq(MAX_SIZE), any(Entry.class), eq(EvictionType.EXPIRED));