diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java index 01d1f44d060..df22f12ba6d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java @@ -135,7 +135,7 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP private boolean sequenceAware; - private LockRegistry lockRegistry = new DefaultLockRegistry(); + private LockRegistry lockRegistry = new DefaultLockRegistry(); private boolean lockRegistrySet = false; @@ -198,7 +198,7 @@ public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) { this(processor, new SimpleMessageStore(0), null, null); } - public void setLockRegistry(LockRegistry lockRegistry) { + public void setLockRegistry(LockRegistry lockRegistry) { Assert.isTrue(!this.lockRegistrySet, "'this.lockRegistry' can not be reset once its been set"); Assert.notNull(lockRegistry, "'lockRegistry' must not be null"); this.lockRegistry = lockRegistry; @@ -516,7 +516,7 @@ protected boolean isSequenceAware() { return this.sequenceAware; } - protected LockRegistry getLockRegistry() { + protected LockRegistry getLockRegistry() { return this.lockRegistry; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java index 72ec5b296d0..1a066f2f7b2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/AggregatorFactoryBean.java @@ -66,7 +66,7 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe private String outputChannelName; - private LockRegistry lockRegistry; + private LockRegistry lockRegistry; private MessageGroupStore messageStore; @@ -125,7 +125,7 @@ public void setOutputChannelName(String outputChannelName) { this.outputChannelName = outputChannelName; } - public void setLockRegistry(LockRegistry lockRegistry) { + public void setLockRegistry(LockRegistry lockRegistry) { this.lockRegistry = lockRegistry; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java index a531fba07b7..c677d3bcffd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/CorrelationHandlerSpec.java @@ -312,7 +312,7 @@ public S forceReleaseAdvice(Advice... advice) { * @param lockRegistry the {@link LockRegistry} to use. * @return the endpoint spec. */ - public S lockRegistry(LockRegistry lockRegistry) { + public S lockRegistry(LockRegistry lockRegistry) { Assert.notNull(lockRegistry, "'lockRegistry' must not be null."); this.handler.setLockRegistry(lockRegistry); return _this(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/LockRequestHandlerAdvice.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/LockRequestHandlerAdvice.java index 82520cd7288..01af17b893a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/LockRequestHandlerAdvice.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/LockRequestHandlerAdvice.java @@ -48,7 +48,7 @@ */ public class LockRequestHandlerAdvice extends AbstractRequestHandlerAdvice { - private final LockRegistry lockRegistry; + private final LockRegistry lockRegistry; private final Expression lockKeyExpression; @@ -65,7 +65,7 @@ public class LockRequestHandlerAdvice extends AbstractRequestHandlerAdvice { * @param lockRegistry the {@link LockRegistry} to use. * @param lockKey the static (shared) lock key for all the calls. */ - public LockRequestHandlerAdvice(LockRegistry lockRegistry, Object lockKey) { + public LockRequestHandlerAdvice(LockRegistry lockRegistry, Object lockKey) { this(lockRegistry, new ValueExpression<>(lockKey)); } @@ -75,7 +75,7 @@ public LockRequestHandlerAdvice(LockRegistry lockRegistry, Object lockKey) { * @param lockRegistry the {@link LockRegistry} to use. * @param lockKeyExpression the SpEL expression to evaluate a lock key against request message. */ - public LockRequestHandlerAdvice(LockRegistry lockRegistry, Expression lockKeyExpression) { + public LockRequestHandlerAdvice(LockRegistry lockRegistry, Expression lockKeyExpression) { Assert.notNull(lockRegistry, "'lockRegistry' must not be null"); Assert.notNull(lockKeyExpression, "'lockKeyExpression' must not be null"); this.lockRegistry = lockRegistry; @@ -88,7 +88,7 @@ public LockRequestHandlerAdvice(LockRegistry lockRegistry, Expression lockKeyExp * @param lockRegistry the {@link LockRegistry} to use. * @param lockKeyFunction the function to evaluate a lock key against request message. */ - public LockRequestHandlerAdvice(LockRegistry lockRegistry, Function, Object> lockKeyFunction) { + public LockRequestHandlerAdvice(LockRegistry lockRegistry, Function, Object> lockKeyFunction) { Assert.notNull(lockRegistry, "'lockRegistry' must not be null"); Assert.notNull(lockKeyFunction, "'lockKeyFunction' must not be null"); this.lockRegistry = lockRegistry; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/metadata/PropertiesPersistingMetadataStore.java b/spring-integration-core/src/main/java/org/springframework/integration/metadata/PropertiesPersistingMetadataStore.java index f7d2c996875..e37af6b4966 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/metadata/PropertiesPersistingMetadataStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/metadata/PropertiesPersistingMetadataStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,7 +65,7 @@ public class PropertiesPersistingMetadataStore implements ConcurrentMetadataStor private final DefaultPropertiesPersister persister = new DefaultPropertiesPersister(); - private final LockRegistry lockRegistry = new DefaultLockRegistry(); + private final LockRegistry lockRegistry = new DefaultLockRegistry(); private String baseDirectory = System.getProperty("java.io.tmpdir") + "/spring-integration/"; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java index 89c8bdb05fb..e389428b9d4 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java @@ -67,7 +67,7 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG private boolean timeoutOnIdle; - private LockRegistry lockRegistry = new DefaultLockRegistry(); + private LockRegistry lockRegistry = new DefaultLockRegistry(); protected AbstractMessageGroupStore() { } @@ -127,12 +127,12 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) { * @param lockRegistry lockRegistryType * @since 6.5 */ - public final void setLockRegistry(LockRegistry lockRegistry) { + public final void setLockRegistry(LockRegistry lockRegistry) { Assert.notNull(lockRegistry, "The LockRegistry cannot be null"); this.lockRegistry = lockRegistry; } - protected LockRegistry getLockRegistry() { + protected LockRegistry getLockRegistry() { return this.lockRegistry; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java index ded03f5c95f..c79860df806 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java @@ -106,7 +106,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperB * @param lockRegistry The lock registry. * @see #SimpleMessageStore(int, int, long, LockRegistry) */ - public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry lockRegistry) { + public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry lockRegistry) { this(individualCapacity, groupCapacity, 0, lockRegistry); } @@ -122,7 +122,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistr */ @SuppressWarnings("this-escape") public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperBoundTimeout, - LockRegistry lockRegistry) { + LockRegistry lockRegistry) { super(false); Assert.notNull(lockRegistry, "The LockRegistry cannot be null"); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java index d1ac3092e5d..e0224fc59fd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -74,7 +74,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe * A lock registry. The locks it manages should be global (whatever that means for the * system) and expiring, in case the holder dies without notifying anyone. */ - private final LockRegistry locks; + private final LockRegistry locks; /** * Candidate for leader election. User injects this to receive callbacks on leadership @@ -162,7 +162,7 @@ public String getRole() { * candidate (which just logs the leadership events). * @param locks lock registry */ - public LockRegistryLeaderInitiator(LockRegistry locks) { + public LockRegistryLeaderInitiator(LockRegistry locks) { this(locks, new DefaultCandidate()); } @@ -172,7 +172,7 @@ public LockRegistryLeaderInitiator(LockRegistry locks) { * @param locks lock registry * @param candidate leadership election candidate */ - public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) { + public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) { Assert.notNull(locks, "'locks' must not be null"); Assert.notNull(candidate, "'candidate' must not be null"); this.locks = locks; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/DefaultLockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/DefaultLockRegistry.java index ee64b89ffb4..1a95e86e98b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/DefaultLockRegistry.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/DefaultLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ * @since 2.1.1 * */ -public final class DefaultLockRegistry implements LockRegistry { +public final class DefaultLockRegistry implements LockRegistry { private final Lock[] lockTable; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/DistributedLock.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/DistributedLock.java new file mode 100644 index 00000000000..4943e5b6d44 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/DistributedLock.java @@ -0,0 +1,48 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.locks; + +import java.time.Duration; +import java.util.concurrent.locks.Lock; + +/** + * A distributed {@link Lock} extension. + * + * @author Eddie Cho + * + * @since 7.0 + */ +public interface DistributedLock extends Lock { + + /** + * Attempt to acquire a lock with a specific time-to-live + * @param ttl the specific time-to-live for the lock status data + */ + void lock(Duration ttl); + + /** + * Attempt to acquire a lock with a specific time-to-live + * @param waitTime the maximum time to wait for the lock + * @param ttl the specific time-to-live for the lock status data + * @return {@code true} if the lock was acquired and {@code false} + * if the waiting time elapsed before the lock was acquired + * @throws InterruptedException if the current thread is interrupted + * while acquiring the lock (and interruption of lock + * acquisition is supported) + */ + boolean tryLock(Duration waitTime, Duration ttl) throws InterruptedException; +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/ExpirableLockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/ExpirableLockRegistry.java index 7bf62a04dc6..f3fbfc593d3 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/ExpirableLockRegistry.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/ExpirableLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,15 +16,18 @@ package org.springframework.integration.support.locks; +import java.util.concurrent.locks.Lock; + /** * A {@link LockRegistry} implementing this interface supports the removal of aged locks * that are not currently locked. + * @param The expected class of the lock implementation * * @author Gary Russell * @since 4.2 * */ -public interface ExpirableLockRegistry extends LockRegistry { +public interface ExpirableLockRegistry extends LockRegistry { /** * Remove locks last acquired more than 'age' ago that are not currently locked. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java index 74d3cb80bbd..c2d834326aa 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ /** * Strategy for maintaining a registry of shared locks. + * @param The expected class of the lock implementation * * @author Oleg Zhurakousky * @author Gary Russell @@ -34,14 +35,14 @@ * @since 2.1.1 */ @FunctionalInterface -public interface LockRegistry { +public interface LockRegistry { /** * Obtain the lock associated with the parameter object. * @param lockKey The object with which the lock is associated. * @return The associated lock. */ - Lock obtain(Object lockKey); + L obtain(Object lockKey); /** * Perform the provided task when the lock for the key is locked. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/PassThruLockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/PassThruLockRegistry.java index 9292f02c60f..cad2184ea6b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/PassThruLockRegistry.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/PassThruLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ * @since 2.2 * */ -public final class PassThruLockRegistry implements LockRegistry { +public final class PassThruLockRegistry implements LockRegistry { @Override public Lock obtain(Object lockKey) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java index db3fa4b5567..d3eea07586b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/locks/RenewableLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 the original author or authors. + * Copyright 2020-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,19 +16,24 @@ package org.springframework.integration.support.locks; +import java.time.Duration; +import java.util.concurrent.locks.Lock; + import org.springframework.scheduling.TaskScheduler; /** * A {@link LockRegistry} implementing this interface supports the renewal * of the time to live of a lock. + * @param The expected class of the lock implementation * * @author Alexandre Strubel * @author Artem Bilan * @author Youbin Wu + * @author Eddie Cho * * @since 5.4 */ -public interface RenewableLockRegistry extends LockRegistry { +public interface RenewableLockRegistry extends LockRegistry { /** * Renew the time to live of the lock is associated with the parameter object. @@ -37,6 +42,15 @@ public interface RenewableLockRegistry extends LockRegistry { */ void renewLock(Object lockKey); + /** + * Renew the time to live of the lock is associated with the parameter object with a specific value. + * The lock must be held by the current thread + * @param lockKey The object with which the lock is associated. + * @param ttl the specific time-to-live for the lock status data + * @since 7.0 + */ + void renewLock(Object lockKey, Duration ttl); + /** * Set the {@link TaskScheduler} to use for the renewal task. * When renewalTaskScheduler is set, it will be used to periodically renew the lock to ensure that diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java index f47155fd488..5400732930f 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2024 the original author or authors. + * Copyright 2012-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,7 +61,7 @@ public class LockRegistryLeaderInitiatorTests { private CountDownLatch revoked = new CountDownLatch(1); - private final LockRegistry registry = new DefaultLockRegistry(); + private final LockRegistry registry = new DefaultLockRegistry(); private final LockRegistryLeaderInitiator initiator = new LockRegistryLeaderInitiator(this.registry, new DefaultCandidate()); @@ -159,6 +159,7 @@ public void publishOnGranted(Object source, Context context, String role) { } @Test + @SuppressWarnings("rawtypes") public void competingWithLock() throws Exception { // switch used to toggle which registry obtains lock AtomicBoolean firstLocked = new AtomicBoolean(true); @@ -220,6 +221,7 @@ public void competingWithLock() throws Exception { } @Test + @SuppressWarnings("rawtypes") public void testGracefulLeaderSelectorExit() throws Exception { AtomicReference throwableAtomicReference = new AtomicReference<>(); @@ -275,7 +277,7 @@ public void testExceptionFromLock() throws Exception { } }).given(mockLock).tryLock(anyLong(), any(TimeUnit.class)); - LockRegistry registry = lockKey -> mockLock; + LockRegistry registry = lockKey -> mockLock; CountDownLatch onGranted = new CountDownLatch(1); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/locks/DefaultLockRegistryTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/locks/DefaultLockRegistryTests.java index c6420e500b5..6d5584fc5e8 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/support/locks/DefaultLockRegistryTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/locks/DefaultLockRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,7 @@ public void testBadMaskOutOfRange() { // 32bits @Test public void testSingleLockCreation() { - LockRegistry registry = new DefaultLockRegistry(0); + LockRegistry registry = new DefaultLockRegistry(0); Lock a = registry.obtain(23); Lock b = registry.obtain(new Object()); Lock c = registry.obtain("hello"); @@ -67,7 +67,7 @@ public void testSingleLockCreation() { @Test public void testSame() { - LockRegistry registry = new DefaultLockRegistry(); + LockRegistry registry = new DefaultLockRegistry(); Lock lock1 = registry.obtain(new Object() { @Override @@ -87,7 +87,7 @@ public int hashCode() { @Test public void testDifferent() { - LockRegistry registry = new DefaultLockRegistry(); + LockRegistry registry = new DefaultLockRegistry(); Lock lock1 = registry.obtain(new Object() { @Override @@ -107,7 +107,7 @@ public int hashCode() { @Test public void testAllDifferentAndSame() { - LockRegistry registry = new DefaultLockRegistry(3); + LockRegistry registry = new DefaultLockRegistry(3); Lock[] locks = new Lock[4]; locks[0] = registry.obtain(new Object() { @@ -213,7 +213,7 @@ public int hashCode() { @Test public void cyclicBarrierIsBrokenWhenExecutedConcurrentlyInLock() throws Exception { - LockRegistry registry = new DefaultLockRegistry(1); + LockRegistry registry = new DefaultLockRegistry(1); CyclicBarrier cyclicBarrier = new CyclicBarrier(2); CountDownLatch brokenBarrierLatch = new CountDownLatch(2); @@ -245,7 +245,7 @@ public void cyclicBarrierIsBrokenWhenExecutedConcurrentlyInLock() throws Excepti @Test public void executeLockedIsTimedOutInOtherThread() throws Exception { - LockRegistry registry = new DefaultLockRegistry(1); + LockRegistry registry = new DefaultLockRegistry(1); String lockKey = "lockKey"; Duration waitLockDuration = Duration.ofMillis(100); diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java b/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java index caf5bc82acc..b620785ea5e 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -162,7 +162,7 @@ public class FileWritingMessageHandler extends AbstractReplyProducingMessageHand private boolean appendNewLine = false; - private LockRegistry lockRegistry = new PassThruLockRegistry(); + private LockRegistry lockRegistry = new PassThruLockRegistry(); private int bufferSize = DEFAULT_BUFFER_SIZE; diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/lock/HazelcastLockRegistry.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/lock/HazelcastLockRegistry.java index 0f1d70d4f86..f74627f432f 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/lock/HazelcastLockRegistry.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/lock/HazelcastLockRegistry.java @@ -32,7 +32,7 @@ * which is not Open Source anymore since Hazelcast 5.5. */ @Deprecated(forRemoval = true, since = "6.5") -public class HazelcastLockRegistry implements LockRegistry { +public class HazelcastLockRegistry implements LockRegistry { private final HazelcastInstance client; diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java index a04cbcbfcdd..41f094cac39 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/DefaultLockRepository.java @@ -101,7 +101,7 @@ public class DefaultLockRepository private String deleteExpiredQuery = """ DELETE FROM %sLOCK - WHERE REGION=? AND CREATED_DATE=? + WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? AND EXPIRED_AFTER>=? """; private String renewQuery = """ UPDATE %sLOCK - SET CREATED_DATE=? + SET EXPIRED_AFTER=? WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=? """; @@ -192,7 +192,9 @@ public void setPrefix(String prefix) { /** * Specify the time (in milliseconds) to expire deadlocks. * @param timeToLive the time to expire deadlocks. + * @deprecated since 7.0, the default time-to-live can be set by the constructor of {@link JdbcLockRegistry} */ + @Deprecated(since = "7.0") public void setTimeToLive(int timeToLive) { this.ttl = Duration.ofMillis(timeToLive); } @@ -220,8 +222,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws *
 	 * {@code
 	 *  UPDATE %sLOCK
-	 * 			SET CLIENT_ID=?, CREATED_DATE=?
-	 * 			WHERE REGION=? AND LOCK_KEY=? AND (CLIENT_ID=? OR CREATED_DATE
 	 * @param updateQuery the query to update a lock record.
@@ -248,7 +250,7 @@ public String getUpdateQuery() {
 	 * Set a custom {@code INSERT} query for a lock record.
 	 * The {@link #getInsertQuery()} can be used as a template for customization.
 	 * The default query is
-	 * {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE) VALUES (?, ?, ?, ?)}.
+	 * {@code INSERT INTO %sLOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE, EXPIRED_AFTER) VALUES (?, ?, ?, ?, ?)}.
 	 * For example a PostgreSQL {@code ON CONFLICT DO NOTHING} hint can be provided like this:
 	 * 
 	 * {@code
@@ -282,7 +284,7 @@ public String getInsertQuery() {
 	 * 
 	 * {@code
 	 *  UPDATE %sLOCK
-	 * 			SET CREATED_DATE=?
+	 * 			SET EXPIRED_AFTER=?
 	 * 			WHERE REGION=? AND LOCK_KEY=? AND CLIENT_ID=?
 	 * }
 	 * 
@@ -396,17 +398,23 @@ public boolean delete(String lock) { } @Override + @Deprecated(since = "7.0") public boolean acquire(String lock) { + return this.acquire(lock, this.ttl); + } + + @Override + public boolean acquire(String lock, Duration ttlDuration) { Boolean result = this.readCommittedTransactionTemplate.execute( transactionStatus -> { - if (this.template.update(this.updateQuery, this.id, epochMillis(), - this.region, lock, this.id, ttlEpochMillis()) > 0) { + if (this.template.update(this.updateQuery, this.id, ttlEpochMillis(ttlDuration), + this.region, lock, this.id, epochMillis()) > 0) { return true; } try { return this.template.update(this.insertQuery, this.region, lock, this.id, - epochMillis()) > 0; + epochMillis(), ttlEpochMillis(ttlDuration)) > 0; } catch (DataIntegrityViolationException ex) { return false; @@ -421,7 +429,7 @@ public boolean isAcquired(String lock) { transactionStatus -> Integer.valueOf(1).equals( this.template.queryForObject(this.countQuery, - Integer.class, this.region, lock, this.id, ttlEpochMillis()))); + Integer.class, this.region, lock, this.id, epochMillis()))); return Boolean.TRUE.equals(result); } @@ -429,19 +437,25 @@ public boolean isAcquired(String lock) { public void deleteExpired() { this.defaultTransactionTemplate.executeWithoutResult( transactionStatus -> - this.template.update(this.deleteExpiredQuery, this.region, ttlEpochMillis())); + this.template.update(this.deleteExpiredQuery, this.region, epochMillis())); } @Override + @Deprecated(since = "7.0") public boolean renew(String lock) { + return this.renew(lock, this.ttl); + } + + @Override + public boolean renew(String lock, Duration ttlDuration) { final Boolean result = this.defaultTransactionTemplate.execute( transactionStatus -> - this.template.update(this.renewQuery, epochMillis(), this.region, lock, this.id) == 1); + this.template.update(this.renewQuery, ttlEpochMillis(ttlDuration), this.region, lock, this.id) == 1); return Boolean.TRUE.equals(result); } - private Timestamp ttlEpochMillis() { - return Timestamp.valueOf(currentTime().minus(this.ttl)); + private Timestamp ttlEpochMillis(Duration ttl) { + return Timestamp.valueOf(currentTime().plus(ttl)); } private static Timestamp epochMillis() { diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java index e966d6bb428..d86bf0f298a 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.dao.CannotAcquireLockException; import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.dao.TransientDataAccessException; +import org.springframework.integration.support.locks.DistributedLock; import org.springframework.integration.support.locks.ExpirableLockRegistry; import org.springframework.integration.support.locks.RenewableLockRegistry; import org.springframework.integration.util.UUIDConverter; @@ -61,7 +62,7 @@ * * @since 4.3 */ -public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry { +public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry { private static final int DEFAULT_IDLE = 100; @@ -85,16 +86,36 @@ protected boolean removeEldestEntry(Entry eldest) { private int cacheCapacity = DEFAULT_CAPACITY; + /** + * Default value for the time-to-live property. + * @since 7.0 + */ + public static final Duration DEFAULT_TTL = Duration.ofSeconds(10); + + private final Duration ttl; + /** * Construct an instance based on the provided {@link LockRepository}. * @param client the {@link LockRepository} to rely on. */ public JdbcLockRegistry(LockRepository client) { this.client = client; + this.ttl = DEFAULT_TTL; + } + + /** + * Create a lock registry with the supplied lock expiration. + * @param client the {@link LockRepository} to rely on. + * @param expireAfter The expiration in {@link Duration}. + * @since 7.0 + */ + public JdbcLockRegistry(LockRepository client, Duration expireAfter) { + this.client = client; + this.ttl = expireAfter; } /** - * Specify a @link Duration} to sleep between lock record insert/update attempts. + * Specify a {@link Duration} to sleep between lock record insert/update attempts. * Defaults to 100 milliseconds. * @param idleBetweenTries the {@link Duration} to sleep between insert/update attempts. * @since 5.1.8 @@ -114,7 +135,7 @@ public void setCacheCapacity(int cacheCapacity) { } @Override - public Lock obtain(Object lockKey) { + public DistributedLock obtain(Object lockKey) { Assert.isInstanceOf(String.class, lockKey); String path = pathFor((String) lockKey); this.lock.lock(); @@ -148,6 +169,11 @@ public void expireUnusedOlderThan(long age) { @Override public void renewLock(Object lockKey) { + this.renewLock(lockKey, this.ttl); + } + + @Override + public void renewLock(Object lockKey, Duration customTtl) { Assert.isInstanceOf(String.class, lockKey); String path = pathFor((String) lockKey); JdbcLock jdbcLock; @@ -162,12 +188,17 @@ public void renewLock(Object lockKey) { if (jdbcLock == null) { throw new IllegalStateException("Could not found mutex at " + path); } - if (!jdbcLock.renew()) { + if (!jdbcLock.renew(customTtl)) { throw new IllegalStateException("Could not renew mutex at " + path); } } - private static final class JdbcLock implements Lock { + private static Duration convertToDuration(long time, TimeUnit timeUnit) { + long timeInMilliseconds = TimeUnit.MILLISECONDS.convert(time, timeUnit); + return Duration.ofMillis(timeInMilliseconds); + } + + private final class JdbcLock implements DistributedLock { private final LockRepository mutex; @@ -191,10 +222,15 @@ public long getLastUsed() { @Override public void lock() { + lock(JdbcLockRegistry.this.ttl); + } + + @Override + public void lock(Duration ttl) { this.delegate.lock(); while (true) { try { - while (!doLock()) { + while (!doLock(ttl)) { Thread.sleep(this.idleBetweenTries.toMillis()); } break; @@ -225,7 +261,7 @@ public void lockInterruptibly() throws InterruptedException { this.delegate.lockInterruptibly(); while (true) { try { - while (!doLock()) { + while (!doLock(JdbcLockRegistry.this.ttl)) { Thread.sleep(this.idleBetweenTries.toMillis()); if (Thread.currentThread().isInterrupted()) { throw new InterruptedException(); @@ -261,15 +297,20 @@ public boolean tryLock() { @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return tryLock(Duration.of(time, unit.toChronoUnit()), JdbcLockRegistry.this.ttl); + } + + @Override + public boolean tryLock(Duration waitTime, Duration ttl) throws InterruptedException { long now = System.currentTimeMillis(); - if (!this.delegate.tryLock(time, unit)) { + if (!this.delegate.tryLock(waitTime.toMillis(), TimeUnit.MILLISECONDS)) { return false; } - long expire = now + TimeUnit.MILLISECONDS.convert(time, unit); + long expire = now + waitTime.toMillis(); boolean acquired; while (true) { try { - while (!(acquired = doLock()) && System.currentTimeMillis() < expire) { //NOSONAR + while (!(acquired = doLock(ttl)) && System.currentTimeMillis() < expire) { //NOSONAR Thread.sleep(this.idleBetweenTries.toMillis()); } if (!acquired) { @@ -287,8 +328,8 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { } } - private boolean doLock() { - boolean acquired = this.mutex.acquire(this.path); + private boolean doLock(Duration ttl) { + boolean acquired = this.mutex.acquire(this.path, ttl); if (acquired) { this.lastUsed = System.currentTimeMillis(); } @@ -340,13 +381,29 @@ public boolean isAcquiredInThisProcess() { return this.delegate.isLocked(); } + /** + * Renew the time-to-live of the distributed lock + * @return {@code true} if the lock's time-to-live was successfully renewed; + * {@code false} if the time-to-live could not be renewed + */ public boolean renew() { + return renew(JdbcLockRegistry.this.ttl); + } + + /** + * Renew the time-to-live of the distributed lock + * @param ttl the new time-to-live value for the lock status data + * @return {@code true} if the lock's time-to-live was successfully renewed; + * {@code false} if the time-to-live could not be renewed + * @since 7.0 + */ + public boolean renew(Duration ttl) { if (!this.delegate.isHeldByCurrentThread()) { throw new IllegalMonitorStateException("The current thread doesn't own mutex at " + this.path); } while (true) { try { - boolean renewed = this.mutex.renew(this.path); + boolean renewed = this.mutex.renew(this.path, ttl); if (renewed) { this.lastUsed = System.currentTimeMillis(); } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java index 247e6fccb5e..95b986f9dd0 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/LockRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.integration.jdbc.lock; import java.io.Closeable; +import java.time.Duration; /** * Encapsulation of the SQL shunting that is needed for locks. A {@link JdbcLockRegistry} @@ -55,16 +56,36 @@ public interface LockRepository extends Closeable { * Acquire a lock for a key. * @param lock the key for lock to acquire. * @return acquired or not. + * @deprecated since 7.0, we allow custom time-to-live value */ + @Deprecated(since = "7.0") boolean acquire(String lock); + /** + * Acquire a lock for a key with specific time-to-live value + * @param lock the key for lock to acquire. + * @param ttl the custom time-to-live value + * @return acquired or not. + */ + boolean acquire(String lock, Duration ttl); + /** * Renew the lease for a lock. * @param lock the lock to renew. * @return renewed or not. + * @deprecated since 7.0, we allow custom time-to-live value */ + @Deprecated(since = "7.0") boolean renew(String lock); + /** + * Renew the lease for a lock with specific time-to-live value + * @param lock the key for lock to acquire. + * @param ttl the custom time-to-live value + * @return renewed or not. + */ + boolean renew(String lock, Duration ttl); + @Override void close(); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-db2.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-db2.sql index 82dcc727305..6a582fb5879 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-db2.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-db2.sql @@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-derby.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-derby.sql index 7914556a41f..eaacbb4ca0b 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-derby.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-derby.sql @@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-h2.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-h2.sql index 1751ab4cbcb..767d6eac31b 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-h2.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-h2.sql @@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-hsqldb.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-hsqldb.sql index 22592c1a875..4842ccbcf05 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-hsqldb.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-hsqldb.sql @@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-mysql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-mysql.sql index 655688a8913..75d320d59d9 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-mysql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-mysql.sql @@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE DATETIME(6) NOT NULL, + CREATED_DATE DATETIME(6) NOT NULL, + EXPIRED_AFTER DATETIME(6) NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ) ENGINE=InnoDB; diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-oracle.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-oracle.sql index e465da9993f..a037343a13d 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-oracle.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-oracle.sql @@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK ( LOCK_KEY VARCHAR2(36) NOT NULL, REGION VARCHAR2(100) NOT NULL, CLIENT_ID VARCHAR2(36), - CREATED_DATE TIMESTAMP NOT NULL, + CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql index 45894c57ea9..18cc90d129b 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql @@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE TIMESTAMP NOT NULL, + CREATED_DATE TIMESTAMP NOT NULL, + EXPIRED_AFTER TIMESTAMP NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sqlserver.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sqlserver.sql index 6c1c091ef9f..60057608c60 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sqlserver.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sqlserver.sql @@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE DATETIME NOT NULL, + CREATED_DATE DATETIME NOT NULL, + EXPIRED_AFTER DATETIME NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ); diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sybase.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sybase.sql index 7aa89388770..2f28160b189 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sybase.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-sybase.sql @@ -30,7 +30,8 @@ CREATE TABLE INT_LOCK ( LOCK_KEY CHAR(36) NOT NULL, REGION VARCHAR(100) NOT NULL, CLIENT_ID CHAR(36), - CREATED_DATE DATETIME NOT NULL, + CREATED_DATE DATETIME NOT NULL, + EXPIRED_AFTER DATETIME NOT NULL, constraint INT_LOCK_PK primary key (LOCK_KEY, REGION) ) LOCK DATAROWS; diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/DefaultLockRepositoryTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/DefaultLockRepositoryTests.java index c8cd38b55bf..468cc277425 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/DefaultLockRepositoryTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/DefaultLockRepositoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,15 @@ package org.springframework.integration.jdbc.lock; import java.sql.Connection; +import java.time.Duration; + +import javax.sql.DataSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.springframework.transaction.annotation.Isolation; @@ -41,27 +45,33 @@ */ @SpringJUnitConfig(locations = "JdbcLockRegistryTests-context.xml") @DirtiesContext -public class DefaultLockRepositoryTests { +class DefaultLockRepositoryTests { + + @Autowired + private DataSource dataSource; + + @Autowired + private DataSourceTransactionManager transactionManager; @Autowired private LockRepository client; @BeforeEach - public void clear() { + void clear() { this.client.close(); } @Transactional @Test - public void testNewTransactionIsStartedWhenTransactionIsAlreadyActive() { + void testNewTransactionIsStartedWhenTransactionIsAlreadyActive() { // Make sure a transaction is active assertThat(TransactionSynchronizationManager.isActualTransactionActive()).isTrue(); TransactionSynchronization transactionSynchronization = spy(TransactionSynchronization.class); TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); - this.client.acquire("foo"); // 1 - this.client.renew("foo"); // 2 + this.client.acquire("foo", Duration.ofSeconds(10)); // 1 + this.client.renew("foo", Duration.ofSeconds(10)); // 2 this.client.delete("foo"); // 3 this.client.isAcquired("foo"); // 4 this.client.deleteExpired(); // 5 @@ -76,17 +86,147 @@ public void testNewTransactionIsStartedWhenTransactionIsAlreadyActive() { @Transactional(isolation = Isolation.REPEATABLE_READ) @Test - public void testIsAcquiredFromRepeatableReadTransaction() { + void testIsAcquiredFromRepeatableReadTransaction() { // Make sure a transaction with REPEATABLE_READ isolation level is active assertThat(TransactionSynchronizationManager.isActualTransactionActive()).isTrue(); assertThat(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel()) .isEqualTo(Connection.TRANSACTION_REPEATABLE_READ); - this.client.acquire("foo"); + this.client.acquire("foo", Duration.ofSeconds(10)); assertThat(this.client.isAcquired("foo")).isTrue(); this.client.delete("foo"); assertThat(this.client.isAcquired("foo")).isFalse(); } + @Test + void testAcquired() { + client.acquire("foo", Duration.ofMillis(100)); + assertThat(this.client.isAcquired("foo")).isTrue(); + } + + @Test + void testAcquireSameLockTwiceAndTtlWillBeUpdated() throws InterruptedException { + client.acquire("foo", Duration.ofMillis(150)); + Thread.sleep(100); + client.acquire("foo", Duration.ofMillis(150)); + Thread.sleep(60); + assertThat(this.client.isAcquired("foo")).isTrue(); + } + + @Test + void testAcquiredLockIsAcquiredByAnotherProcess() { + DefaultLockRepository lockRepositoryOfAnotherProcess = new DefaultLockRepository(dataSource); + lockRepositoryOfAnotherProcess.setTransactionManager(transactionManager); + lockRepositoryOfAnotherProcess.afterSingletonsInstantiated(); + lockRepositoryOfAnotherProcess.afterPropertiesSet(); + + lockRepositoryOfAnotherProcess.acquire("foo", Duration.ofMillis(100)); + assertThat(this.client.acquire("foo", Duration.ofMillis(100))).isFalse(); + + lockRepositoryOfAnotherProcess.close(); + } + + @Test + void testAcquiredLockIsAcquiredByAnotherProcessButExpired() throws InterruptedException { + DefaultLockRepository lockRepositoryOfAnotherProcess = new DefaultLockRepository(dataSource); + lockRepositoryOfAnotherProcess.setTransactionManager(transactionManager); + lockRepositoryOfAnotherProcess.afterSingletonsInstantiated(); + lockRepositoryOfAnotherProcess.afterPropertiesSet(); + + lockRepositoryOfAnotherProcess.acquire("foo", Duration.ofMillis(100)); + Thread.sleep(110); + assertThat(this.client.acquire("foo", Duration.ofMillis(100))).isTrue(); + + lockRepositoryOfAnotherProcess.close(); + } + + @Test + void testIsAcquiredLockIsAcquiredByAnotherProcess() { + DefaultLockRepository lockRepositoryOfAnotherProcess = new DefaultLockRepository(dataSource); + lockRepositoryOfAnotherProcess.setTransactionManager(transactionManager); + lockRepositoryOfAnotherProcess.afterSingletonsInstantiated(); + lockRepositoryOfAnotherProcess.afterPropertiesSet(); + + lockRepositoryOfAnotherProcess.acquire("foo", Duration.ofMillis(100)); + assertThat(this.client.isAcquired("foo")).isFalse(); + + lockRepositoryOfAnotherProcess.close(); + } + + @Test + void testIsAcquiredLockIsExpired() throws InterruptedException { + client.acquire("foo", Duration.ofMillis(100)); + Thread.sleep(110); + assertThat(this.client.isAcquired("foo")).isFalse(); + } + + @Test + void testRenew() throws InterruptedException { + client.acquire("foo", Duration.ofMillis(150)); + Thread.sleep(100); + assertThat(client.renew("foo", Duration.ofMillis(150))).isTrue(); + Thread.sleep(60); + assertThat(this.client.isAcquired("foo")).isTrue(); + } + + @Test + void testRenewLockIsExpiredAndLockStatusHasBeenCleanUp() throws InterruptedException { + client.acquire("foo", Duration.ofMillis(100)); + Thread.sleep(110); + client.deleteExpired(); + + assertThat(this.client.renew("foo", Duration.ofMillis(100))).isFalse(); + } + + @Test + void testRenewLockIsAcquiredByAnotherProcess() { + DefaultLockRepository lockRepositoryOfAnotherProcess = new DefaultLockRepository(dataSource); + lockRepositoryOfAnotherProcess.setTransactionManager(transactionManager); + lockRepositoryOfAnotherProcess.afterSingletonsInstantiated(); + lockRepositoryOfAnotherProcess.afterPropertiesSet(); + + lockRepositoryOfAnotherProcess.acquire("foo", Duration.ofMillis(100)); + assertThat(this.client.renew("foo", Duration.ofMillis(100))).isFalse(); + + lockRepositoryOfAnotherProcess.close(); + } + + @Test + void testDelete() { + this.client.acquire("foo", Duration.ofSeconds(10)); + assertThat(this.client.delete("foo")).isTrue(); + assertThat(this.client.isAcquired("foo")).isFalse(); + } + + @Test + void testDeleteLockIsAcquiredByAnotherProcess() { + DefaultLockRepository lockRepositoryOfAnotherProcess = new DefaultLockRepository(dataSource); + lockRepositoryOfAnotherProcess.setTransactionManager(transactionManager); + lockRepositoryOfAnotherProcess.afterSingletonsInstantiated(); + lockRepositoryOfAnotherProcess.afterPropertiesSet(); + + lockRepositoryOfAnotherProcess.acquire("foo", Duration.ofSeconds(10)); + assertThat(this.client.delete("foo")).isFalse(); + + lockRepositoryOfAnotherProcess.close(); + } + + @Test + void testDeleteAfterLockIsExpiredAndLockStatusHasBeenCleanUp() throws InterruptedException { + client.acquire("foo", Duration.ofMillis(100)); + Thread.sleep(200); + client.deleteExpired(); + + assertThat(this.client.delete("foo")).isFalse(); + } + + @Test + void testDeleteExpired() throws InterruptedException { + client.acquire("foo", Duration.ofMillis(100)); + Thread.sleep(200); + client.deleteExpired(); + + assertThat(this.client.renew("foo", Duration.ofMillis(100))).isFalse(); + } } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java index 28f5db6ffff..997677d36d4 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDelegateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 the original author or authors. + * Copyright 2020-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.transaction.TransactionTimedOutException; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -51,7 +52,7 @@ public void clear() { repository = mock(LockRepository.class); registry = new JdbcLockRegistry(repository); - when(repository.acquire(anyString())).thenReturn(true); + when(repository.acquire(anyString(), any())).thenReturn(true); } @Test diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java index 394bf935e24..e5f0d921f8d 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryDifferentClientTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.integration.jdbc.lock; +import java.time.Duration; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.List; @@ -285,24 +286,23 @@ void testExclusiveAccess() throws Exception { @Test void testOutOfDateLockTaken() throws Exception { + Duration ttl = Duration.ofMillis(100); DefaultLockRepository client1 = new DefaultLockRepository(dataSource); - client1.setTimeToLive(100); client1.setApplicationContext(this.context); client1.afterPropertiesSet(); client1.afterSingletonsInstantiated(); DefaultLockRepository client2 = new DefaultLockRepository(dataSource); - client2.setTimeToLive(100); client2.setApplicationContext(this.context); client2.afterPropertiesSet(); client2.afterSingletonsInstantiated(); - Lock lock1 = new JdbcLockRegistry(client1).obtain("foo"); + Lock lock1 = new JdbcLockRegistry(client1, ttl).obtain("foo"); final BlockingQueue data = new LinkedBlockingQueue<>(); final CountDownLatch latch = new CountDownLatch(1); lock1.lockInterruptibly(); Thread.sleep(500); new SimpleAsyncTaskExecutor() .execute(() -> { - Lock lock2 = new JdbcLockRegistry(client2).obtain("foo"); + Lock lock2 = new JdbcLockRegistry(client2, ttl).obtain("foo"); try { lock2.lockInterruptibly(); data.add(1); @@ -327,17 +327,16 @@ void testOutOfDateLockTaken() throws Exception { @Test void testRenewLock() throws Exception { + Duration ttl = Duration.ofMillis(500); DefaultLockRepository client1 = new DefaultLockRepository(dataSource); - client1.setTimeToLive(500); client1.setApplicationContext(this.context); client1.afterPropertiesSet(); client1.afterSingletonsInstantiated(); DefaultLockRepository client2 = new DefaultLockRepository(dataSource); - client2.setTimeToLive(500); client2.setApplicationContext(this.context); client2.afterPropertiesSet(); client2.afterSingletonsInstantiated(); - JdbcLockRegistry registry = new JdbcLockRegistry(client1); + JdbcLockRegistry registry = new JdbcLockRegistry(client1, ttl); Lock lock1 = registry.obtain("foo"); final BlockingQueue data = new LinkedBlockingQueue<>(); final CountDownLatch latch1 = new CountDownLatch(2); @@ -345,7 +344,7 @@ void testRenewLock() throws Exception { lock1.lockInterruptibly(); new SimpleAsyncTaskExecutor() .execute(() -> { - Lock lock2 = new JdbcLockRegistry(client2).obtain("foo"); + Lock lock2 = new JdbcLockRegistry(client2, ttl).obtain("foo"); try { latch1.countDown(); lock2.lockInterruptibly(); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests-context.xml index 0dc9eab475e..cf89eb86022 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests-context.xml @@ -25,7 +25,7 @@ + value="INSERT INTO INT_LOCK (REGION, LOCK_KEY, CLIENT_ID, CREATED_DATE, EXPIRED_AFTER) VALUES (?, ?, ?, ?, ?)"/> diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java index e4190770f96..2ac71f31864 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.integration.jdbc.lock; +import java.time.Duration; import java.util.ConcurrentModificationException; import java.util.Map; import java.util.Queue; @@ -31,6 +32,7 @@ import javax.sql.DataSource; import org.h2.jdbc.JdbcSQLSyntaxErrorException; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,6 +41,7 @@ import org.springframework.context.ApplicationContextException; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.integration.support.locks.DistributedLock; import org.springframework.integration.test.util.TestUtils; import org.springframework.integration.util.UUIDConverter; import org.springframework.test.annotation.DirtiesContext; @@ -117,6 +120,46 @@ void testLockInterruptibly() throws Exception { } } + @Test + void testLockWithCustomTtl() throws Exception { + JdbcLockRegistry lockRegistry = new JdbcLockRegistry(client, Duration.ofMillis(100)); + long sleepTimeLongerThanDefaultTTL = 110; + for (int i = 0; i < 10; i++) { + DistributedLock lock = lockRegistry.obtain("foo"); + lock.lock(Duration.ofMillis(200)); + try { + assertThat(TestUtils.getPropertyValue(lockRegistry, "locks", Map.class)).hasSize(1); + Thread.sleep(sleepTimeLongerThanDefaultTTL); + } + finally { + lock.unlock(); + } + } + + lockRegistry.expireUnusedOlderThan(0); + assertThat(TestUtils.getPropertyValue(lockRegistry, "locks", Map.class)).isEmpty(); + } + + @Test + void testTryLockWithCustomTtl() throws Exception { + JdbcLockRegistry lockRegistry = new JdbcLockRegistry(client, Duration.ofMillis(100)); + long sleepTimeLongerThanDefaultTTL = 110; + for (int i = 0; i < 10; i++) { + DistributedLock lock = lockRegistry.obtain("foo"); + lock.tryLock(Duration.ofMillis(100), Duration.ofMillis(200)); + try { + assertThat(TestUtils.getPropertyValue(lockRegistry, "locks", Map.class)).hasSize(1); + Thread.sleep(sleepTimeLongerThanDefaultTTL); + } + finally { + lock.unlock(); + } + } + + lockRegistry.expireUnusedOlderThan(0); + assertThat(TestUtils.getPropertyValue(lockRegistry, "locks", Map.class)).isEmpty(); + } + @Test void testReentrantLock() { for (int i = 0; i < 10; i++) { @@ -154,11 +197,10 @@ void testReentrantLockInterruptibly() throws Exception { @Test void testReentrantLockAfterExpiration() throws Exception { DefaultLockRepository client = new DefaultLockRepository(dataSource); - client.setTimeToLive(1); client.setApplicationContext(this.context); client.afterPropertiesSet(); client.afterSingletonsInstantiated(); - JdbcLockRegistry registry = new JdbcLockRegistry(client); + JdbcLockRegistry registry = new JdbcLockRegistry(client, Duration.ofMillis(1)); Lock lock1 = registry.obtain("foo"); assertThat(lock1.tryLock()).isTrue(); Thread.sleep(100); @@ -336,6 +378,27 @@ void testLockRenewLockNotOwned() { .isThrownBy(() -> registry.renewLock("foo")); } + @Test + void testLockRenewWithCustomTtl() throws InterruptedException { + DefaultLockRepository clientOfAnotherProcess = new DefaultLockRepository(dataSource); + clientOfAnotherProcess.setApplicationContext(this.context); + clientOfAnotherProcess.afterPropertiesSet(); + clientOfAnotherProcess.afterSingletonsInstantiated(); + JdbcLockRegistry registryOfAnotherProcess = new JdbcLockRegistry(clientOfAnotherProcess, Duration.ofMillis(100)); + final DistributedLock lock = this.registry.obtain("foo"); + final Lock lockOfAnotherProcess = registryOfAnotherProcess.obtain("foo"); + + assertThat(lock.tryLock(Duration.ofMillis(100), Duration.ofMillis(100))).isTrue(); + try { + registry.renewLock("foo", Duration.ofSeconds(2)); + Thread.sleep(110); + assertThat(lockOfAnotherProcess.tryLock(100, TimeUnit.MILLISECONDS)).isFalse(); + } + finally { + Assertions.assertDoesNotThrow(lock::unlock); + } + } + @Test void concurrentObtainCapacityTest() throws InterruptedException { final int KEY_CNT = 500; @@ -510,19 +573,17 @@ void noTableThrowsExceptionOnStart() { @Test void testUnlockAfterLockStatusHasBeenExpiredAndLockHasBeenAcquiredByAnotherProcess() throws Exception { - int ttl = 100; + Duration ttl = Duration.ofMillis(100); DefaultLockRepository client1 = new DefaultLockRepository(dataSource); client1.setApplicationContext(this.context); - client1.setTimeToLive(ttl); client1.afterPropertiesSet(); client1.afterSingletonsInstantiated(); DefaultLockRepository client2 = new DefaultLockRepository(dataSource); client2.setApplicationContext(this.context); - client2.setTimeToLive(ttl); client2.afterPropertiesSet(); client2.afterSingletonsInstantiated(); - JdbcLockRegistry process1Registry = new JdbcLockRegistry(client1); - JdbcLockRegistry process2Registry = new JdbcLockRegistry(client2); + JdbcLockRegistry process1Registry = new JdbcLockRegistry(client1, ttl); + JdbcLockRegistry process2Registry = new JdbcLockRegistry(client2, ttl); Lock lock1 = process1Registry.obtain("foo"); Lock lock2 = process2Registry.obtain("foo"); @@ -539,10 +600,9 @@ void testUnlockAfterLockStatusHasBeenExpiredAndLockHasBeenAcquiredByAnotherProce void testUnlockAfterLockStatusHasBeenExpiredAndDeleted() throws Exception { DefaultLockRepository client = new DefaultLockRepository(dataSource); client.setApplicationContext(this.context); - client.setTimeToLive(100); client.afterPropertiesSet(); client.afterSingletonsInstantiated(); - JdbcLockRegistry registry = new JdbcLockRegistry(client); + JdbcLockRegistry registry = new JdbcLockRegistry(client, Duration.ofMillis(100)); Lock lock = registry.obtain("foo"); lock.lock(); diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java index 67968141500..a1b798ae139 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2024 the original author or authors. + * Copyright 2014-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; +import org.springframework.integration.support.locks.DistributedLock; import org.springframework.integration.support.locks.ExpirableLockRegistry; import org.springframework.integration.support.locks.RenewableLockRegistry; import org.springframework.scheduling.TaskScheduler; @@ -98,7 +99,7 @@ * @since 4.0 * */ -public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean, RenewableLockRegistry { +public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean, RenewableLockRegistry { private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class); @@ -133,7 +134,7 @@ protected boolean removeEldestEntry(Entry eldest) { private final StringRedisTemplate redisTemplate; - private final long expireAfter; + private final Duration expireAfter; private int cacheCapacity = DEFAULT_CAPACITY; @@ -169,7 +170,7 @@ protected boolean removeEldestEntry(Entry eldest) { private volatile RedisMessageListenerContainer redisMessageListenerContainer; /** - * Constructs a lock registry with the default (60 second) lock expiration. + * Create a lock registry with the default (60 second) lock expiration. * @param connectionFactory The connection factory. * @param registryKey The key prefix for locks. */ @@ -178,12 +179,23 @@ public RedisLockRegistry(RedisConnectionFactory connectionFactory, String regist } /** - * Constructs a lock registry with the supplied lock expiration. + * Create a lock registry with the supplied lock expiration. * @param connectionFactory The connection factory. * @param registryKey The key prefix for locks. * @param expireAfter The expiration in milliseconds. */ public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) { + this(connectionFactory, registryKey, Duration.ofMillis(expireAfter)); + } + + /** + * Create a lock registry with the supplied lock expiration. + * @param connectionFactory The connection factory. + * @param registryKey The key prefix for locks. + * @param expireAfter The expiration in {@link Duration}. + * @since 7.0 + */ + public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, Duration expireAfter) { Assert.notNull(connectionFactory, "'connectionFactory' cannot be null"); Assert.notNull(registryKey, "'registryKey' cannot be null"); this.redisTemplate = new StringRedisTemplate(connectionFactory); @@ -233,7 +245,7 @@ public void setCacheCapacity(int cacheCapacity) { } /** - * Specify a @link Duration} to sleep between obtainLock attempts. + * Specify a {@link Duration} to sleep between obtainLock attempts. * Defaults to 100 milliseconds. * @param idleBetweenTries the {@link Duration} to sleep between obtainLock attempts. * @since 6.4.0 @@ -258,7 +270,7 @@ public void setRedisLockType(RedisLockType redisLockType) { } @Override - public Lock obtain(Object lockKey) { + public DistributedLock obtain(Object lockKey) { Assert.isInstanceOf(String.class, lockKey); String path = (String) lockKey; this.lock.lock(); @@ -309,6 +321,11 @@ public void destroy() { @Override public void renewLock(Object lockKey) { + this.renewLock(lockKey, RedisLockRegistry.this.expireAfter); + } + + @Override + public void renewLock(Object lockKey, Duration ttl) { String path = (String) lockKey; RedisLock redisLock; this.lock.lock(); @@ -322,7 +339,7 @@ public void renewLock(Object lockKey) { throw new IllegalStateException("Could not renew mutex at " + path); } - if (!redisLock.renew()) { + if (!redisLock.renew(ttl.toMillis())) { throw new IllegalStateException("Could not renew mutex at " + path); } } @@ -350,7 +367,7 @@ private Function getRedisLockConstructor(RedisLockType redisL }; } - private abstract class RedisLock implements Lock { + private abstract class RedisLock implements DistributedLock { private static final String OBTAIN_LOCK_SCRIPT = """ local lockClientId = redis.call('GET', KEYS[1]) @@ -401,11 +418,12 @@ public long getLockedAt() { /** * Attempt to acquire a lock in redis. * @param time the maximum time(milliseconds) to wait for the lock, -1 infinity + * @param expireAfter the time-to-live(milliseconds) for the lock status data * @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired * @throws InterruptedException – * if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported) */ - protected abstract boolean tryRedisLockInner(long time) throws ExecutionException, InterruptedException; + protected abstract boolean tryRedisLockInner(long time, long expireAfter) throws ExecutionException, InterruptedException; /** * Unlock the lock using the unlink method in redis. @@ -419,10 +437,15 @@ public long getLockedAt() { @Override public final void lock() { + this.lock(RedisLockRegistry.this.expireAfter); + } + + @Override + public void lock(Duration ttl) { this.localLock.lock(); while (true) { try { - if (tryRedisLock(-1L)) { + if (tryRedisLock(-1L, ttl.toMillis())) { return; } } @@ -449,7 +472,7 @@ public final void lockInterruptibly() throws InterruptedException { this.localLock.lockInterruptibly(); while (true) { try { - if (tryRedisLock(-1L)) { + if (tryRedisLock(-1L, RedisLockRegistry.this.expireAfter.toMillis())) { return; } } @@ -478,12 +501,16 @@ public final boolean tryLock() { @Override public final boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - if (!this.localLock.tryLock(time, unit)) { + return this.tryLock(Duration.of(time, unit.toChronoUnit()), RedisLockRegistry.this.expireAfter); + } + + @Override + public boolean tryLock(Duration waitTime, Duration ttl) throws InterruptedException { + if (!this.localLock.tryLock(waitTime.toMillis(), TimeUnit.MILLISECONDS)) { return false; } try { - long waitTime = TimeUnit.MILLISECONDS.convert(time, unit); - boolean acquired = tryRedisLock(waitTime); + boolean acquired = tryRedisLock(waitTime.toMillis(), ttl.toMillis()); if (!acquired) { this.localLock.unlock(); } @@ -496,27 +523,27 @@ public final boolean tryLock(long time, TimeUnit unit) throws InterruptedExcepti return false; } - private boolean tryRedisLock(long time) throws ExecutionException, InterruptedException { - final boolean acquired = tryRedisLockInner(time); + private boolean tryRedisLock(long time, long expireAfter) throws ExecutionException, InterruptedException { + final boolean acquired = tryRedisLockInner(time, expireAfter); if (acquired) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Acquired lock; " + this); } this.lockedAt = System.currentTimeMillis(); if (RedisLockRegistry.this.renewalTaskScheduler != null) { - Duration delay = Duration.ofMillis(RedisLockRegistry.this.expireAfter / 3); + Duration delay = Duration.ofMillis(expireAfter / 3); this.renewFuture = - RedisLockRegistry.this.renewalTaskScheduler.scheduleWithFixedDelay(this::renew, delay); + RedisLockRegistry.this.renewalTaskScheduler.scheduleWithFixedDelay(() -> this.renew(expireAfter), delay); } } return acquired; } - protected final Boolean obtainLock() { + protected final Boolean obtainLock(long expireAfter) { return RedisLockRegistry.this.redisTemplate .execute(OBTAIN_LOCK_REDIS_SCRIPT, Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId, - String.valueOf(RedisLockRegistry.this.expireAfter)); + String.valueOf(expireAfter)); } @Override @@ -586,10 +613,10 @@ else if (Boolean.FALSE.equals(unlinkResult)) { } } - protected final boolean renew() { + protected final boolean renew(long expireAfter) { boolean res = Boolean.TRUE.equals(RedisLockRegistry.this.redisTemplate.execute( RENEW_REDIS_SCRIPT, Collections.singletonList(this.lockKey), - RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter))); + RedisLockRegistry.this.clientId, String.valueOf(expireAfter))); if (!res) { stopRenew(); } @@ -691,8 +718,8 @@ private RedisPubSubLock(String path) { } @Override - protected boolean tryRedisLockInner(long time) throws ExecutionException, InterruptedException { - return subscribeLock(time); + protected boolean tryRedisLockInner(long time, long expireAfter) throws ExecutionException, InterruptedException { + return subscribeLock(time, expireAfter); } @Override @@ -711,9 +738,9 @@ private boolean removeLockKeyWithScript(RedisScript redisScript) { RedisLockRegistry.this.clientId, RedisLockRegistry.this.unLockChannelKey)); } - private boolean subscribeLock(long time) throws ExecutionException, InterruptedException { + private boolean subscribeLock(long time, long expireAfter) throws ExecutionException, InterruptedException { final long expiredTime = System.currentTimeMillis() + time; - if (obtainLock()) { + if (obtainLock(expireAfter)) { return true; } @@ -728,17 +755,17 @@ private boolean subscribeLock(long time) throws ExecutionException, InterruptedE Future future = RedisLockRegistry.this.unlockNotifyMessageListener.subscribeLock(this.lockKey); //DCL - if (obtainLock()) { + if (obtainLock(expireAfter)) { return true; } try { //if short expireAfter key expire for ttl, no receive unlock msg - long waitTime = time >= 0 ? time : RedisLockRegistry.this.expireAfter; + long waitTime = time >= 0 ? time : RedisLockRegistry.this.expireAfter.toMillis(); future.get(waitTime, TimeUnit.MILLISECONDS); } catch (TimeoutException ignore) { } - if (obtainLock()) { + if (obtainLock(expireAfter)) { return true; } } @@ -830,10 +857,10 @@ private RedisSpinLock(String path) { } @Override - protected boolean tryRedisLockInner(long time) throws InterruptedException { + protected boolean tryRedisLockInner(long time, long expireAfter) throws InterruptedException { long now = System.currentTimeMillis(); if (time == -1L) { - while (!obtainLock()) { + while (!obtainLock(expireAfter)) { Thread.sleep(RedisLockRegistry.this.idleBetweenTries.toMillis()); //NOSONAR } return true; @@ -841,7 +868,7 @@ protected boolean tryRedisLockInner(long time) throws InterruptedException { else { long expire = now + TimeUnit.MILLISECONDS.convert(time, TimeUnit.MILLISECONDS); boolean acquired; - while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR + while (!(acquired = obtainLock(expireAfter)) && System.currentTimeMillis() < expire) { //NOSONAR Thread.sleep(RedisLockRegistry.this.idleBetweenTries.toMillis()); //NOSONAR } return acquired; diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java index bb0313a1f15..dc73c0b7c92 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/RedisLockRegistryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2024 the original author or authors. + * Copyright 2014-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.integration.redis.util; +import java.time.Duration; import java.util.ConcurrentModificationException; import java.util.List; import java.util.Map; @@ -50,6 +51,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.integration.redis.RedisContainerTest; import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType; +import org.springframework.integration.support.locks.DistributedLock; import org.springframework.integration.test.util.TestUtils; import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; @@ -120,6 +122,48 @@ void testLock(RedisLockType testRedisLockType) { registry.destroy(); } + @ParameterizedTest + @EnumSource(RedisLockType.class) + void testLockWithCustomTtl(RedisLockType testRedisLockType) throws InterruptedException { + RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100); + long sleepTimeLongerThanDefaultTTL = 200; + registry.setRedisLockType(testRedisLockType); + for (int i = 0; i < 3; i++) { + DistributedLock lock = registry.obtain("foo"); + lock.lock(Duration.ofMillis(500)); + try { + assertThat(getRedisLockRegistryLocks(registry)).hasSize(1); + Thread.sleep(sleepTimeLongerThanDefaultTTL); + } + finally { + assertThatNoException().isThrownBy(lock::unlock); + } + } + registry.expireUnusedOlderThan(-1000); + assertThat(getRedisLockRegistryLocks(registry)).isEmpty(); + } + + @ParameterizedTest + @EnumSource(RedisLockType.class) + void testTryLockWithCustomTtl(RedisLockType testRedisLockType) throws InterruptedException { + RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100); + long sleepTimeLongerThanDefaultTTL = 200; + registry.setRedisLockType(testRedisLockType); + for (int i = 0; i < 3; i++) { + DistributedLock lock = registry.obtain("foo"); + lock.tryLock(Duration.ofMillis(100), Duration.ofMillis(500)); + try { + assertThat(getRedisLockRegistryLocks(registry)).hasSize(1); + Thread.sleep(sleepTimeLongerThanDefaultTTL); + } + finally { + assertThatNoException().isThrownBy(lock::unlock); + } + } + registry.expireUnusedOlderThan(-1000); + assertThat(getRedisLockRegistryLocks(registry)).isEmpty(); + } + @ParameterizedTest @EnumSource(RedisLockType.class) void testUnlockAfterLockStatusHasBeenExpired(RedisLockType testRedisLockType) throws InterruptedException { @@ -944,6 +988,29 @@ void testLockRenewLockNotOwned(RedisLockType redisLockType) { .isThrownBy(() -> registry.renewLock("foo")); } + @ParameterizedTest + @EnumSource(RedisLockType.class) + void testLockRenewWithCustomTtl(RedisLockType redisLockType) throws InterruptedException { + final RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey); + final RedisLockRegistry registryOfAnotherProcess = new RedisLockRegistry(redisConnectionFactory, this.registryKey); + registry.setRedisLockType(redisLockType); + registryOfAnotherProcess.setRedisLockType(redisLockType); + final DistributedLock lock = registry.obtain("foo"); + final Lock lockOfAnotherProcess = registryOfAnotherProcess.obtain("foo"); + long ttl = 100; + long sleepTimeLongerThanTtl = 110; + assertThat(lock.tryLock(Duration.ofMillis(100), Duration.ofMillis(ttl))).isTrue(); + try { + registry.renewLock("foo", Duration.ofSeconds(2)); + Thread.sleep(sleepTimeLongerThanTtl); + assertThat(lockOfAnotherProcess.tryLock(100, TimeUnit.MILLISECONDS)).isFalse(); + } + finally { + lock.unlock(); + } + registryOfAnotherProcess.destroy(); + } + @Test void testInitialiseWithCustomExecutor() { RedisLockRegistry redisLockRegistry = new RedisLockRegistry(redisConnectionFactory, "registryKey"); diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/lock/ZookeeperLockRegistry.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/lock/ZookeeperLockRegistry.java index fece7d598f0..03704238e85 100644 --- a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/lock/ZookeeperLockRegistry.java +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/lock/ZookeeperLockRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2023 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ * @since 4.2 * */ -public class ZookeeperLockRegistry implements ExpirableLockRegistry, DisposableBean { +public class ZookeeperLockRegistry implements ExpirableLockRegistry, DisposableBean { private static final String DEFAULT_ROOT = "/SpringIntegration-LockRegistry"; diff --git a/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc b/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc index 09d9022e3ab..d0184aaf48b 100644 --- a/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc +++ b/src/reference/antora/modules/ROOT/pages/distributed-locks.adoc @@ -35,3 +35,26 @@ Spring Integration provides these `LockRegistry` implementations for distributed * xref:zookeeper.adoc#zk-lock-registry[Zookeeper] https://github.com/spring-projects/spring-integration-aws[Spring Integration AWS] extension also implements a `DynamoDbLockRegistry`. + +Starting with version 7.0, the `DistributedLock` interface has been introduced, providing new methods, `lock(Duration ttl`) and `tryLock(long time, TimeUnit unit, Duration ttl)`, to acquire a lock with a custom time-to-live (TTL). +Both `JdbcLock` and `RedisLock` implement `DistributedLock` interface to support the feature of customized time-to-live. +The `LockRegistry` is now a generic interface for types that extend `Lock`. +The `RenewableLockRegistry` interface now provides new `renewLock(Object lockKey, Duration ttl)` method, allowing you to renew the lock with a custom time-to-live value. +Both `JdbcLockRegistry` and `RedisLockRegistry` implement `LockRegistry` and `RenewableLockRegistry` interfaces with the type parameter `DistributedLock`. + +Below is an example of how to obtain a `DistributedLock` from a registry and acquire it with a specific time-to-live value: +[source,java] +---- +DistributedLock lock = registry.obtain("foo"); +Duration timeToLive = Duration.ofMillis(500); + +if(lock.tryLock(100, TimeUnit.MILLISECONDS, timeToLive)){ + try { + // do something + } catch (Exception e) { + // handle exception + } finally{ + lock. unlock(); + } +} +---- \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc b/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc index 067a3e6c66e..cf935d91c17 100644 --- a/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc +++ b/src/reference/antora/modules/ROOT/pages/jdbc/lock-registry.adoc @@ -58,4 +58,20 @@ lockRepository.setInsertQuery(lockRepository.getInsertQuery() + " ON CONFLICT DO ---- Starting with version 6.4, the `LockRepository.delete()` method return the result of removing ownership of a distributed lock. -And the `JdbcLockRegistry.JdbcLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired. \ No newline at end of file +And the `JdbcLockRegistry.JdbcLock.unlock()` method throws `ConcurrentModificationException` if the ownership of the lock is expired. + +Starting with version 7.0, the `JdbcLock` implements `DistributedLock` interface to support the feature of customized time-to-live (TTL) for the lock status data. +A `JdbcLock` can now be acquired using the `lock(Duration ttl)` or `tryLock(long time, TimeUnit unit, Duration ttl)` method, with a specified time-to-live (TTL) value. +The `JdbcLockRegistry` now provides new `renewLock(Object lockKey, Duration ttl)` method, allowing you to renew the lock with a custom time-to-live value. +The default time-to-live for all `JdbcLock` instances stored in the same `JdbcLockRegistry` can now be set by the new constructor `JdbcLockRegistry(LockRepository client, Duration expireAfter)`. +The APIs of `LockRepository` and `DefaultLockRepository` are also modified to support the feature. +[IMPORTANT] +==== +If you're already using an earlier version of `JdbcLockRegistry` or `DefaultLockRepository`, please execute the necessary DDL to modify the `INT_LOCK` table before upgrading to this version. + +Here is an example of the Postgres DDL for adding the new column to the lock table: +[source,sql] +---- +ALTER TABLE INT_LOCK ADD EXPIRED_AFTER TIMESTAMP NOT NULL; +---- +==== \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/redis.adoc b/src/reference/antora/modules/ROOT/pages/redis.adoc index 98302aa17d5..cf62221ce72 100644 --- a/src/reference/antora/modules/ROOT/pages/redis.adoc +++ b/src/reference/antora/modules/ROOT/pages/redis.adoc @@ -862,3 +862,7 @@ Starting with version 6.4, instead of throwing `IllegalStateException`, the `Red Starting with version 6.4, a `RedisLockRegistry.setRenewalTaskScheduler()` is added to configure the scheduler for periodic renewal of locks. When it is set, the lock will be automatically renewed every `1/3` of the expiration time after the lock is successfully acquired, until unlocked or the redis key is removed. + +Starting with version 7.0, the `RedisLock` implements `DistributedLock` interface to support the feature of customized time-to-live (TTL) for the lock status data. +A `RedisLock` can now be acquired using the `lock(Duration ttl)` or `tryLock(long time, TimeUnit unit, Duration ttl)` method, with a specified time-to-live (TTL) value. +The `RedisLockRegistry` now provides new `renewLock(Object lockKey, Duration ttl)` method, allowing you to renew the lock with a custom time-to-live value. \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 01ac532be57..81423547eaa 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -23,4 +23,19 @@ Junit 4 Based Support Components are deprecated. == New Components The JDBC module now provides a Java DSL API via its dedicated `org.springframework.integration.jdbc.dsl.Jdbc` factory. -The xref:jdbc/dsl.adoc[] chapter provides more details. \ No newline at end of file +The xref:jdbc/dsl.adoc[] chapter provides more details. + +A new `DistributedLock` interface has been introduced, providing new methods, `lock(Duration ttl`) and `tryLock(long time, TimeUnit unit, Duration ttl)`, to acquire a lock with a custom time-to-live (TTL). +See xref:distributed-locks.adoc[] for more information. + +[[x7.0-jdbc-changes]] +=== JDBC Changes + +The `JdbcLock` now supports the feature of customized time-to-live for the lock status data. +See xref:jdbc/lock-registry.adoc[] for more information. + +[[x7.0-redis-changes]] +=== Redis Changes + +The `RedisLock` now supports the feature of customized time-to-live for the lock status data. +See xref:redis.adoc#redis-lock-registry[Redis Lock Registry] for more information. \ No newline at end of file