diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 3297616866..5a9deb7ae4 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -429,6 +429,15 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10), RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500), + RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000), + RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000), + RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10), + RUN_ENGINE_RUN_LOCK_BASE_DELAY: z.coerce.number().int().default(100), + RUN_ENGINE_RUN_LOCK_MAX_DELAY: z.coerce.number().int().default(3000), + RUN_ENGINE_RUN_LOCK_BACKOFF_MULTIPLIER: z.coerce.number().default(1.8), + RUN_ENGINE_RUN_LOCK_JITTER_FACTOR: z.coerce.number().default(0.15), + RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME: z.coerce.number().int().default(15000), + RUN_ENGINE_WORKER_REDIS_HOST: z .string() .optional() diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index de83c92d20..559c363988 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -75,6 +75,16 @@ function createRunEngine() { enableAutoPipelining: true, ...(env.RUN_ENGINE_RUN_LOCK_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, + duration: env.RUN_ENGINE_RUN_LOCK_DURATION, + automaticExtensionThreshold: env.RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD, + retryConfig: { + maxAttempts: env.RUN_ENGINE_RUN_LOCK_MAX_RETRIES, + baseDelay: env.RUN_ENGINE_RUN_LOCK_BASE_DELAY, + maxDelay: env.RUN_ENGINE_RUN_LOCK_MAX_DELAY, + backoffMultiplier: env.RUN_ENGINE_RUN_LOCK_BACKOFF_MULTIPLIER, + jitterFactor: env.RUN_ENGINE_RUN_LOCK_JITTER_FACTOR, + maxTotalWaitTime: env.RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME, + }, }, tracer, meter, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 620eac3f64..f5fcdca179 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -98,6 +98,17 @@ export class RunEngine { logger: this.logger, tracer: trace.getTracer("RunLocker"), meter: options.meter, + duration: options.runLock.duration ?? 5000, + automaticExtensionThreshold: options.runLock.automaticExtensionThreshold ?? 1000, + retryConfig: { + maxAttempts: 10, + baseDelay: 100, + maxDelay: 3000, + backoffMultiplier: 1.8, + jitterFactor: 0.15, + maxTotalWaitTime: 15000, + ...options.runLock.retryConfig, + }, }); const keys = new RunQueueFullKeyProducer(); @@ -486,56 +497,52 @@ export class RunEngine { span.setAttribute("runId", taskRun.id); - await this.runLock.lock("trigger", [taskRun.id], 5000, async (signal) => { - //create associated waitpoint (this completes when the run completes) - const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint( - prisma, - { - projectId: environment.project.id, - environmentId: environment.id, - completedByTaskRunId: taskRun.id, - } - ); - - //triggerAndWait or batchTriggerAndWait - if (resumeParentOnCompletion && parentTaskRunId) { - //this will block the parent run from continuing until this waitpoint is completed (and removed) - await this.waitpointSystem.blockRunWithWaitpoint({ - runId: parentTaskRunId, - waitpoints: associatedWaitpoint.id, - projectId: associatedWaitpoint.projectId, - organizationId: environment.organization.id, - batch, - workerId, - runnerId, - tx: prisma, - releaseConcurrency, - }); + //create associated waitpoint (this completes when the run completes) + const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint( + prisma, + { + projectId: environment.project.id, + environmentId: environment.id, + completedByTaskRunId: taskRun.id, } + ); - //Make sure lock extension succeeded - signal.throwIfAborted(); - - if (taskRun.delayUntil) { - // Schedule the run to be enqueued at the delayUntil time - await this.delayedRunSystem.scheduleDelayedRunEnqueuing({ - runId: taskRun.id, - delayUntil: taskRun.delayUntil, - }); - } else { - await this.enqueueSystem.enqueueRun({ - run: taskRun, - env: environment, - workerId, - runnerId, - tx: prisma, - }); + //triggerAndWait or batchTriggerAndWait + if (resumeParentOnCompletion && parentTaskRunId) { + //this will block the parent run from continuing until this waitpoint is completed (and removed) + await this.waitpointSystem.blockRunWithWaitpoint({ + runId: parentTaskRunId, + waitpoints: associatedWaitpoint.id, + projectId: associatedWaitpoint.projectId, + organizationId: environment.organization.id, + batch, + workerId, + runnerId, + tx: prisma, + releaseConcurrency, + }); + } - if (taskRun.ttl) { - await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl }); - } + if (taskRun.delayUntil) { + // Schedule the run to be enqueued at the delayUntil time + await this.delayedRunSystem.scheduleDelayedRunEnqueuing({ + runId: taskRun.id, + delayUntil: taskRun.delayUntil, + }); + } else { + if (taskRun.ttl) { + await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl }); } - }); + + await this.enqueueSystem.enqueueRun({ + run: taskRun, + env: environment, + workerId, + runnerId, + tx: prisma, + skipRunLock: true, + }); + } this.eventBus.emit("runCreated", { time: new Date(), @@ -1155,7 +1162,7 @@ export class RunEngine { tx?: PrismaClientOrTransaction; }) { const prisma = tx ?? this.prisma; - return await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => { + return await this.runLock.lock("handleStalledSnapshot", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { this.logger.log( diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 1e120a3feb..a58c0dcfdc 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -22,12 +22,50 @@ const SemanticAttributes = { LOCK_SUCCESS: "run_engine.lock.success", }; +export class LockAcquisitionTimeoutError extends Error { + constructor( + public readonly resources: string[], + public readonly totalWaitTime: number, + public readonly attempts: number, + message?: string + ) { + super( + message || + `Failed to acquire lock on resources [${resources.join( + ", " + )}] after ${totalWaitTime}ms and ${attempts} attempts` + ); + this.name = "LockAcquisitionTimeoutError"; + } +} + interface LockContext { resources: string; signal: redlock.RedlockAbortSignal; lockType: string; } +interface ManualLockContext { + lock: redlock.Lock; + timeout: NodeJS.Timeout | null | undefined; + extension: Promise | undefined; +} + +export interface LockRetryConfig { + /** Maximum number of locking attempts (default: 10) */ + maxAttempts?: number; + /** Initial delay in milliseconds (default: 200) */ + baseDelay?: number; + /** Maximum delay cap in milliseconds (default: 5000) */ + maxDelay?: number; + /** Exponential backoff multiplier (default: 1.5) */ + backoffMultiplier?: number; + /** Jitter factor as percentage (default: 0.1 for 10%) */ + jitterFactor?: number; + /** Maximum total wait time in milliseconds (default: 30000) */ + maxTotalWaitTime?: number; +} + export class RunLocker { private redlock: InstanceType; private asyncLocalStorage: AsyncLocalStorage; @@ -35,21 +73,43 @@ export class RunLocker { private tracer: Tracer; private meter: Meter; private activeLocks: Map = new Map(); + private activeManualContexts: Map = new Map(); private lockDurationHistogram: Histogram; + private retryConfig: Required; + private duration: number; + private automaticExtensionThreshold: number; + + constructor(options: { + redis: Redis; + logger: Logger; + tracer: Tracer; + meter?: Meter; + duration?: number; + automaticExtensionThreshold?: number; + retryConfig?: LockRetryConfig; + }) { + // Initialize configuration values + this.duration = options.duration ?? 5000; + this.automaticExtensionThreshold = options.automaticExtensionThreshold ?? 500; - constructor(options: { redis: Redis; logger: Logger; tracer: Tracer; meter?: Meter }) { this.redlock = new Redlock([options.redis], { - driftFactor: 0.01, - retryCount: 10, - retryDelay: 200, // time in ms - retryJitter: 200, // time in ms - automaticExtensionThreshold: 500, // time in ms + retryCount: 0, // Disable Redlock's internal retrying - we handle retries ourselves }); this.asyncLocalStorage = new AsyncLocalStorage(); this.logger = options.logger; this.tracer = options.tracer; this.meter = options.meter ?? getMeter("run-engine"); + // Initialize retry configuration with defaults + this.retryConfig = { + maxAttempts: options.retryConfig?.maxAttempts ?? 10, + baseDelay: options.retryConfig?.baseDelay ?? 200, + maxDelay: options.retryConfig?.maxDelay ?? 5000, + backoffMultiplier: options.retryConfig?.backoffMultiplier ?? 1.5, + jitterFactor: options.retryConfig?.jitterFactor ?? 0.1, + maxTotalWaitTime: options.retryConfig?.maxTotalWaitTime ?? 30000, + }; + const activeLocksObservableGauge = this.meter.createObservableGauge("run_engine.locks.active", { description: "The number of active locks by type", unit: "locks", @@ -84,14 +144,9 @@ export class RunLocker { } /** Locks resources using RedLock. It won't lock again if we're already inside a lock with the same resources. */ - async lock( - name: string, - resources: string[], - duration: number, - routine: (signal: redlock.RedlockAbortSignal) => Promise - ): Promise { + async lock(name: string, resources: string[], routine: () => Promise): Promise { const currentContext = this.asyncLocalStorage.getStore(); - const joinedResources = resources.sort().join(","); + const joinedResources = [...resources].sort().join(","); return startSpan( this.tracer, @@ -100,7 +155,7 @@ export class RunLocker { if (currentContext && currentContext.resources === joinedResources) { span.setAttribute("nested", true); // We're already inside a lock with the same resources, just run the routine - return routine(currentContext.signal); + return routine(); } span.setAttribute("nested", false); @@ -110,39 +165,7 @@ export class RunLocker { const lockStartTime = performance.now(); const [error, result] = await tryCatch( - this.redlock.using(resources, duration, async (signal) => { - const newContext: LockContext = { - resources: joinedResources, - signal, - lockType: name, - }; - - // Track active lock - this.activeLocks.set(lockId, { - lockType: name, - resources: resources, - }); - - let lockSuccess = true; - try { - return this.asyncLocalStorage.run(newContext, async () => { - return routine(signal); - }); - } catch (lockError) { - lockSuccess = false; - throw lockError; - } finally { - // Record lock duration - const lockDuration = performance.now() - lockStartTime; - this.lockDurationHistogram.record(lockDuration, { - [SemanticAttributes.LOCK_TYPE]: name, - [SemanticAttributes.LOCK_SUCCESS]: lockSuccess.toString(), - }); - - // Remove from active locks when done - this.activeLocks.delete(lockId); - } - }) + this.#acquireAndExecute(name, resources, this.duration, routine, lockId, lockStartTime) ); if (error) { @@ -153,18 +176,340 @@ export class RunLocker { [SemanticAttributes.LOCK_SUCCESS]: "false", }); - this.logger.error("[RunLocker] Error locking resources", { error, resources, duration }); + this.logger.error("[RunLocker] Error locking resources", { + error, + resources, + duration: this.duration, + }); throw error; } return result; }, { - attributes: { name, resources, timeout: duration }, + attributes: { name, resources, timeout: this.duration }, } ); } + /** Manual lock acquisition with exponential backoff retry logic */ + async #acquireAndExecute( + name: string, + resources: string[], + duration: number, + routine: () => Promise, + lockId: string, + lockStartTime: number + ): Promise { + // Sort resources to ensure consistent lock acquisition order and prevent deadlocks + const sortedResources = [...resources].sort(); + const joinedResources = sortedResources.join(","); + + // Use configured retry settings with exponential backoff + const { maxAttempts, baseDelay, maxDelay, backoffMultiplier, jitterFactor, maxTotalWaitTime } = + this.retryConfig; + + // Track timing for total wait time limit + let totalWaitTime = 0; + + // Retry the lock acquisition with exponential backoff + let lock: redlock.Lock | undefined; + let lastError: Error | undefined; + + for (let attempt = 0; attempt <= maxAttempts; attempt++) { + const [error, acquiredLock] = await tryCatch(this.redlock.acquire(sortedResources, duration)); + + if (!error && acquiredLock) { + lock = acquiredLock; + if (attempt > 0) { + this.logger.debug("[RunLocker] Lock acquired after retries", { + name, + resources: sortedResources, + attempts: attempt + 1, + totalWaitTime: Math.round(totalWaitTime), + }); + } + break; + } + + lastError = error instanceof Error ? error : new Error(String(error)); + + // Check if we've exceeded total wait time limit + if (totalWaitTime >= maxTotalWaitTime) { + this.logger.warn("[RunLocker] Lock acquisition exceeded total wait time limit", { + name, + resources: sortedResources, + attempts: attempt + 1, + totalWaitTime: Math.round(totalWaitTime), + maxTotalWaitTime, + }); + throw new LockAcquisitionTimeoutError( + sortedResources, + Math.round(totalWaitTime), + attempt + 1, + `Lock acquisition on resources [${sortedResources.join( + ", " + )}] exceeded total wait time limit of ${maxTotalWaitTime}ms` + ); + } + + // If this is the last attempt, throw timeout error + if (attempt === maxAttempts) { + this.logger.warn("[RunLocker] Lock acquisition exhausted all retries", { + name, + resources: sortedResources, + attempts: attempt + 1, + totalWaitTime: Math.round(totalWaitTime), + lastError: lastError.message, + }); + throw new LockAcquisitionTimeoutError( + sortedResources, + Math.round(totalWaitTime), + attempt + 1, + `Lock acquisition on resources [${sortedResources.join(", ")}] failed after ${ + attempt + 1 + } attempts` + ); + } + + // Check if it's a retryable error (lock contention) + // ExecutionError: General redlock failure (including lock contention) + // ResourceLockedError: Specific lock contention error (if thrown) + const isRetryableError = + error && (error.name === "ResourceLockedError" || error.name === "ExecutionError"); + + if (isRetryableError) { + // Calculate exponential backoff delay with jitter and cap + const exponentialDelay = Math.min( + baseDelay * Math.pow(backoffMultiplier, attempt), + maxDelay + ); + const jitter = exponentialDelay * jitterFactor * (Math.random() * 2 - 1); // ±jitterFactor% jitter + const delay = Math.min(maxDelay, Math.max(0, Math.round(exponentialDelay + jitter))); + + // Update total wait time before delay + totalWaitTime += delay; + + this.logger.debug("[RunLocker] Lock acquisition failed, retrying with backoff", { + name, + resources: sortedResources, + attempt: attempt + 1, + delay, + totalWaitTime: Math.round(totalWaitTime), + error: error.message, + errorName: error.name, + }); + + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + + // For other errors (non-retryable), throw immediately + this.logger.error("[RunLocker] Lock acquisition failed with non-retryable error", { + name, + resources: sortedResources, + attempt: attempt + 1, + error: lastError.message, + errorName: lastError.name, + }); + throw lastError; + } + + // Safety guard: ensure lock was successfully acquired + if (!lock) { + this.logger.error("[RunLocker] Lock was not acquired despite completing retry loop", { + name, + resources: sortedResources, + maxAttempts, + totalWaitTime: Math.round(totalWaitTime), + lastError: lastError?.message, + }); + throw new LockAcquisitionTimeoutError( + sortedResources, + Math.round(totalWaitTime), + maxAttempts + 1, + `Lock acquisition on resources [${sortedResources.join(", ")}] failed unexpectedly` + ); + } + + // Create an AbortController for our signal + const controller = new AbortController(); + const signal = controller.signal as redlock.RedlockAbortSignal; + + const manualContext: ManualLockContext = { + lock, + timeout: undefined, + extension: undefined, + }; + + // Track the manual context for cleanup + this.activeManualContexts.set(lockId, manualContext); + + // Set up auto-extension starting from when lock was actually acquired + this.#setupAutoExtension(manualContext, duration, signal, controller); + + try { + const newContext: LockContext = { + resources: joinedResources, + signal, + lockType: name, + }; + + // Track active lock + this.activeLocks.set(lockId, { + lockType: name, + resources: sortedResources, + }); + + let lockSuccess = true; + try { + const result = await this.asyncLocalStorage.run(newContext, async () => { + return routine(); + }); + + return result; + } catch (lockError) { + lockSuccess = false; + throw lockError; + } finally { + // Record lock duration + const lockDuration = performance.now() - lockStartTime; + this.lockDurationHistogram.record(lockDuration, { + [SemanticAttributes.LOCK_TYPE]: name, + [SemanticAttributes.LOCK_SUCCESS]: lockSuccess.toString(), + }); + + // Remove from active locks when done + this.activeLocks.delete(lockId); + } + } finally { + // Remove from active manual contexts + this.activeManualContexts.delete(lockId); + + // Clean up extension mechanism - this ensures auto extension stops after routine finishes + this.#cleanupExtension(manualContext); + + // Release the lock using tryCatch + const [releaseError] = await tryCatch(lock.release()); + if (releaseError) { + this.logger.warn("[RunLocker] Error releasing lock", { + error: releaseError, + resources: sortedResources, + lockValue: lock.value, + }); + } + } + } + + /** Set up automatic lock extension */ + #setupAutoExtension( + context: ManualLockContext, + duration: number, + signal: redlock.RedlockAbortSignal, + controller: AbortController + ): void { + if (this.automaticExtensionThreshold > duration - 100) { + // Don't set up auto-extension if duration is too short + return; + } + + const scheduleExtension = (): void => { + const timeUntilExtension = + context.lock.expiration - Date.now() - this.automaticExtensionThreshold; + + if (timeUntilExtension > 0) { + // Check for cleanup immediately before scheduling to prevent race condition + if (context.timeout !== null) { + context.timeout = setTimeout(() => { + context.extension = this.#extendLock( + context, + duration, + signal, + controller, + scheduleExtension + ); + }, timeUntilExtension); + } + } + }; + + scheduleExtension(); + } + + /** Extend a lock */ + async #extendLock( + context: ManualLockContext, + duration: number, + signal: redlock.RedlockAbortSignal, + controller: AbortController, + scheduleNext: () => void + ): Promise { + // Check if cleanup has started before proceeding + if (context.timeout === null) { + return; + } + + context.timeout = undefined; + + const [error, newLock] = await tryCatch(context.lock.extend(duration)); + + if (!error && newLock) { + context.lock = newLock; + // Schedule next extension (cleanup check is now inside scheduleNext) + scheduleNext(); + } else { + if (context.lock.expiration > Date.now()) { + // If lock hasn't expired yet, schedule a retry instead of recursing + // This prevents stack overflow from repeated extension failures + if (context.timeout !== null) { + const retryDelay = 100; // Short delay before retry + context.timeout = setTimeout(() => { + context.extension = this.#extendLock( + context, + duration, + signal, + controller, + scheduleNext + ); + }, retryDelay); + } + } else { + // Lock has expired, abort the signal + signal.error = error instanceof Error ? error : new Error(String(error)); + controller.abort(); + } + } + } + + /** Clean up extension mechanism */ + #cleanupExtension(context: ManualLockContext): void { + // Signal that we're cleaning up by setting timeout to null + if (context.timeout) { + clearTimeout(context.timeout); + } + context.timeout = null; + + // Wait for any in-flight extension to complete + if (context.extension) { + context.extension.catch(() => { + // Ignore errors during cleanup + }); + } + } + + async lockIf( + condition: boolean, + name: string, + resources: string[], + routine: () => Promise + ): Promise { + if (condition) { + return this.lock(name, resources, routine); + } else { + return routine(); + } + } + isInsideLock(): boolean { return !!this.asyncLocalStorage.getStore(); } @@ -173,7 +518,37 @@ export class RunLocker { return this.asyncLocalStorage.getStore()?.resources; } + getRetryConfig(): Readonly> { + return { ...this.retryConfig }; + } + + getDuration(): number { + return this.duration; + } + + getAutomaticExtensionThreshold(): number { + return this.automaticExtensionThreshold; + } + async quit() { + // Clean up all active manual contexts + for (const [lockId, context] of this.activeManualContexts) { + this.#cleanupExtension(context); + + // Try to release any remaining locks + const [releaseError] = await tryCatch(context.lock.release()); + if (releaseError) { + this.logger.warn("[RunLocker] Error releasing lock during quit", { + error: releaseError, + lockId, + lockValue: context.lock.value, + }); + } + } + + this.activeManualContexts.clear(); + this.activeLocks.clear(); + await this.redlock.quit(); } } diff --git a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts index 3fe96ea238..1220ce43d2 100644 --- a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts @@ -53,7 +53,7 @@ export class CheckpointSystem { }): Promise { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock("createCheckpoint", [runId], 5_000, async () => { + return await this.$.runLock.lock("createCheckpoint", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); const isValidSnapshot = @@ -267,7 +267,7 @@ export class CheckpointSystem { }): Promise { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock("continueRunExecution", [runId], 5_000, async () => { + return await this.$.runLock.lock("continueRunExecution", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); if (snapshot.id !== snapshotId) { diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index 9a45977442..e3dca4b544 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -37,7 +37,7 @@ export class DelayedRunSystem { this.$.tracer, "rescheduleDelayedRun", async () => { - return await this.$.runLock.lock("rescheduleDelayedRun", [runId], 5_000, async () => { + return await this.$.runLock.lock("rescheduleDelayedRun", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); //if the run isn't just created then we can't reschedule it diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 0bfcf44c62..bff9ec9cd6 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -80,8 +80,7 @@ export class DequeueSystem { const dequeuedRun = await this.$.runLock.lock( "dequeueFromWorkerQueue", [runId], - 5000, - async (signal) => { + async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); if (!isDequeueableExecutionStatus(snapshot.executionStatus)) { @@ -548,7 +547,7 @@ export class DequeueSystem { statusReason, }); - return this.$.runLock.lock("pendingVersion", [runId], 5_000, async (signal) => { + return this.$.runLock.lock("pendingVersion", [runId], async () => { this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version lock acquired", { runId, reason, diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index 1f5383fe0a..395e44727c 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -33,6 +33,7 @@ export class EnqueueSystem { completedWaitpoints, workerId, runnerId, + skipRunLock, }: { run: TaskRun; env: MinimalAuthenticatedEnvironment; @@ -51,10 +52,11 @@ export class EnqueueSystem { }[]; workerId?: string; runnerId?: string; + skipRunLock?: boolean; }) { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock("enqueueRun", [run.id], 5000, async () => { + return await this.$.runLock.lockIf(!skipRunLock, "enqueueRun", [run.id], async () => { const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { run: run, snapshot: { diff --git a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts index 0f9583df4f..c89c1fe709 100644 --- a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts +++ b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts @@ -263,7 +263,6 @@ export class ReleaseConcurrencySystem { return await this.$.runLock.lock( "executeReleaseConcurrencyForSnapshot", [snapshot.runId], - 5_000, async () => { const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId); diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index e7aec0202d..1a45c4108f 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -85,7 +85,7 @@ export class RunAttemptSystem { this.$.tracer, "startRunAttempt", async (span) => { - return this.$.runLock.lock("startRunAttempt", [runId], 5000, async () => { + return this.$.runLock.lock("startRunAttempt", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { @@ -441,7 +441,7 @@ export class RunAttemptSystem { this.$.tracer, "#completeRunAttemptSuccess", async (span) => { - return this.$.runLock.lock("attemptSucceeded", [runId], 5_000, async (signal) => { + return this.$.runLock.lock("attemptSucceeded", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { @@ -594,7 +594,7 @@ export class RunAttemptSystem { this.$.tracer, "completeRunAttemptFailure", async (span) => { - return this.$.runLock.lock("attemptFailed", [runId], 5_000, async (signal) => { + return this.$.runLock.lock("attemptFailed", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { @@ -905,7 +905,7 @@ export class RunAttemptSystem { }): Promise<{ wasRequeued: boolean } & ExecutionResult> { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock("tryNackAndRequeue", [run.id], 5000, async (signal) => { + return await this.$.runLock.lock("tryNackAndRequeue", [run.id], async () => { //we nack the message, this allows another work to pick up the run const gotRequeued = await this.$.runQueue.nackMessage({ orgId, @@ -982,7 +982,7 @@ export class RunAttemptSystem { reason = reason ?? "Cancelled by user"; return startSpan(this.$.tracer, "cancelRun", async (span) => { - return this.$.runLock.lock("cancelRun", [runId], 5_000, async (signal) => { + return this.$.runLock.lock("cancelRun", [runId], async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); //already finished, do nothing diff --git a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts index 5ab957c989..cbed7b98ad 100644 --- a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts @@ -23,7 +23,7 @@ export class TtlSystem { async expireRun({ runId, tx }: { runId: string; tx?: PrismaClientOrTransaction }) { const prisma = tx ?? this.$.prisma; - await this.$.runLock.lock("expireRun", [runId], 5_000, async () => { + await this.$.runLock.lock("expireRun", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); //if we're executing then we won't expire the run diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 92f0885c07..52a8094e4b 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -380,7 +380,7 @@ export class WaitpointSystem { let $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints; - return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], 5000, async () => { + return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => { let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId); //block the run with the waitpoints, returning how many waitpoints are pending @@ -549,7 +549,7 @@ export class WaitpointSystem { } //4. Continue the run whether it's executing or not - await this.$.runLock.lock("continueRunIfUnblocked", [runId], 5000, async () => { + await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => { const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); if (isFinishedOrPendingFinished(snapshot.executionStatus)) { diff --git a/internal-packages/run-engine/src/engine/tests/locking.test.ts b/internal-packages/run-engine/src/engine/tests/locking.test.ts index bdb5f61242..04b43fd19a 100644 --- a/internal-packages/run-engine/src/engine/tests/locking.test.ts +++ b/internal-packages/run-engine/src/engine/tests/locking.test.ts @@ -1,56 +1,53 @@ import { createRedisClient } from "@internal/redis"; import { redisTest } from "@internal/testcontainers"; import { expect } from "vitest"; -import { RunLocker } from "../locking.js"; +import { RunLocker, LockAcquisitionTimeoutError } from "../locking.js"; import { trace } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; describe("RunLocker", () => { redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ - redis, - logger, - tracer: trace.getTracer("RunLockTest"), - }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + try { expect(runLock.isInsideLock()).toBe(false); - await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { - expect(signal).toBeDefined(); + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); }); expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } }); redisTest("Test double locking works", { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { expect(runLock.isInsideLock()).toBe(false); - await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { - expect(signal).toBeDefined(); + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); //should be able to "lock it again" - await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { - expect(signal).toBeDefined(); + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); }); }); expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } }); @@ -59,14 +56,14 @@ describe("RunLocker", () => { { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { expect(runLock.isInsideLock()).toBe(false); await expect( - runLock.lock("test-lock", ["test-1"], 5000, async () => { + runLock.lock("test-lock", ["test-1"], async () => { throw new Error("Test error"); }) ).rejects.toThrow("Test error"); @@ -74,7 +71,7 @@ describe("RunLocker", () => { // Verify the lock was released expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } } ); @@ -84,18 +81,18 @@ describe("RunLocker", () => { { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { expect(runLock.isInsideLock()).toBe(false); await expect( - runLock.lock("test-lock", ["test-1"], 5000, async () => { + runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); // Nested lock with same resource - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); throw new Error("Inner lock error"); }); @@ -105,20 +102,29 @@ describe("RunLocker", () => { // Verify all locks were released expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } } ); - redisTest("Test lock throws when it times out", { timeout: 15_000 }, async ({ redisOptions }) => { + redisTest("Test lock throws when it times out", { timeout: 45_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxAttempts: 3, + baseDelay: 100, + maxTotalWaitTime: 2000, // 2 second timeout for faster test + }, + }); + try { // First, ensure we can acquire the lock normally let firstLockAcquired = false; - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { firstLockAcquired = true; }); //wait for 20ms @@ -127,19 +133,18 @@ describe("RunLocker", () => { expect(firstLockAcquired).toBe(true); // Now create a long-running lock - const lockPromise1 = runLock.lock("test-lock", ["test-1"], 5000, async () => { - // Hold the lock longer than all possible retry attempts - // (10 retries * (200ms delay + 200ms max jitter) = ~4000ms max) - await new Promise((resolve) => setTimeout(resolve, 5000)); + const lockPromise1 = runLock.lock("test-lock", ["test-1"], async () => { + // Hold the lock longer than the retry timeout + await new Promise((resolve) => setTimeout(resolve, 10000)); }); - // Try to acquire same lock immediately + // Try to acquire same lock immediately - should timeout with LockAcquisitionTimeoutError await expect( - runLock.lock("test-lock", ["test-1"], 5000, async () => { + runLock.lock("test-lock", ["test-1"], async () => { // This should never execute expect(true).toBe(false); }) - ).rejects.toThrow("unable to achieve a quorum"); + ).rejects.toThrow(LockAcquisitionTimeoutError); // Complete the first lock await lockPromise1; @@ -147,7 +152,7 @@ describe("RunLocker", () => { // Verify final state expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } }); @@ -156,17 +161,17 @@ describe("RunLocker", () => { { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + try { + await runLock.lock("test-lock", ["test-1"], async () => { // First lock acquired expect(runLock.isInsideLock()).toBe(true); // Try to acquire the same resource with a very short timeout // This should work because we already hold the lock - await runLock.lock("test-lock", ["test-1"], 100, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { expect(runLock.isInsideLock()).toBe(true); // Wait longer than the timeout to prove it doesn't matter await new Promise((resolve) => setTimeout(resolve, 500)); @@ -176,7 +181,7 @@ describe("RunLocker", () => { // Verify final state expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); } } ); @@ -186,13 +191,13 @@ describe("RunLocker", () => { { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); - try { - const logger = new Logger("RunLockTest", "debug"); - const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + try { // First verify we can acquire the lock normally let firstLockAcquired = false; - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { firstLockAcquired = true; }); expect(firstLockAcquired).toBe(true); @@ -201,7 +206,7 @@ describe("RunLocker", () => { let outerLockExecuted = false; let innerLockExecuted = false; - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { outerLockExecuted = true; expect(runLock.isInsideLock()).toBe(true); expect(runLock.getCurrentResources()).toBe("test-1"); @@ -209,13 +214,13 @@ describe("RunLocker", () => { // Try to acquire the same resource in a nested lock // This should work immediately without any retries // because we already hold the lock - await runLock.lock("test-lock", ["test-1"], 5000, async () => { + await runLock.lock("test-lock", ["test-1"], async () => { innerLockExecuted = true; expect(runLock.isInsideLock()).toBe(true); expect(runLock.getCurrentResources()).toBe("test-1"); - // Sleep longer than retry attempts would take - // (10 retries * (200ms delay + 200ms max jitter) = ~4000ms max) + // Sleep longer than retry attempts would normally take + // This proves the nested lock doesn't go through the retry logic await new Promise((resolve) => setTimeout(resolve, 5000)); }); }); @@ -225,7 +230,762 @@ describe("RunLocker", () => { expect(innerLockExecuted).toBe(true); expect(runLock.isInsideLock()).toBe(false); } finally { - await redis.quit(); + await runLock.quit(); + } + } + ); + + redisTest( + "Test configurable retry settings work", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxAttempts: 2, + baseDelay: 50, + maxDelay: 200, + backoffMultiplier: 2.0, + jitterFactor: 0.1, + maxTotalWaitTime: 1000, + }, + }); + + try { + // Verify configuration is set correctly + const config = runLock.getRetryConfig(); + expect(config.maxAttempts).toBe(2); + expect(config.baseDelay).toBe(50); + expect(config.maxDelay).toBe(200); + expect(config.backoffMultiplier).toBe(2.0); + expect(config.jitterFactor).toBe(0.1); + expect(config.maxTotalWaitTime).toBe(1000); + + // Test that the lock still works normally + await runLock.lock("test-lock", ["test-config"], async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + + expect(runLock.isInsideLock()).toBe(false); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test LockAcquisitionTimeoutError contains correct information", + { timeout: 25_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxAttempts: 2, + baseDelay: 50, + maxTotalWaitTime: 500, // Shorter timeout to ensure failure + }, + }); + + try { + // Create a long-running lock that will definitely outlast the retry timeout + const lockPromise = runLock.lock("test-lock", ["test-error"], async () => { + await new Promise((resolve) => setTimeout(resolve, 15000)); // Hold for 15 seconds + }); + + // Wait a bit to ensure the first lock is acquired + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Try to acquire same lock and capture the timeout error + try { + await runLock.lock("test-lock", ["test-error"], async () => { + expect(true).toBe(false); // Should never execute + }); + expect(true).toBe(false); // Should not reach here + } catch (error) { + expect(error).toBeInstanceOf(LockAcquisitionTimeoutError); + + if (error instanceof LockAcquisitionTimeoutError) { + expect(error.resources).toEqual(["test-error"]); + expect(error.attempts).toBeGreaterThan(0); + expect(error.attempts).toBeLessThanOrEqual(3); // maxAttempts + 1 + expect(error.totalWaitTime).toBeGreaterThan(0); + expect(error.totalWaitTime).toBeLessThanOrEqual(800); // Some tolerance + expect(error.name).toBe("LockAcquisitionTimeoutError"); + expect(error.message).toContain("test-error"); + expect(error.message).toContain(`${error.attempts} attempts`); + } + } + + // Complete the first lock + await lockPromise; + } finally { + await runLock.quit(); + } + } + ); + + redisTest("Test default configuration values", { timeout: 15_000 }, async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + // No retryConfig provided - should use defaults + }); + + try { + const config = runLock.getRetryConfig(); + expect(config.maxAttempts).toBe(10); + expect(config.baseDelay).toBe(200); + expect(config.maxDelay).toBe(5000); + expect(config.backoffMultiplier).toBe(1.5); + expect(config.jitterFactor).toBe(0.1); + expect(config.maxTotalWaitTime).toBe(30000); + + // Test that it still works + await runLock.lock("test-lock", ["test-default"], async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + } finally { + await runLock.quit(); + } + }); + + redisTest( + "Test partial configuration override", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxAttempts: 5, + maxTotalWaitTime: 10000, + // Other values should use defaults + }, + }); + + try { + const config = runLock.getRetryConfig(); + expect(config.maxAttempts).toBe(5); // Overridden + expect(config.maxTotalWaitTime).toBe(10000); // Overridden + expect(config.baseDelay).toBe(200); // Default + expect(config.maxDelay).toBe(5000); // Default + expect(config.backoffMultiplier).toBe(1.5); // Default + expect(config.jitterFactor).toBe(0.1); // Default + } finally { + await runLock.quit(); + } + } + ); + + redisTest("Test lockIf functionality", { timeout: 15_000 }, async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + let executedWithLock = false; + let executedWithoutLock = false; + + // Test with condition = true (should acquire lock) + await runLock.lockIf(true, "test-lock", ["test-lockif"], async () => { + executedWithLock = true; + expect(runLock.isInsideLock()).toBe(true); + expect(runLock.getCurrentResources()).toBe("test-lockif"); + }); + + expect(executedWithLock).toBe(true); + expect(runLock.isInsideLock()).toBe(false); + + // Test with condition = false (should not acquire lock) + await runLock.lockIf(false, "test-lock", ["test-lockif"], async () => { + executedWithoutLock = true; + expect(runLock.isInsideLock()).toBe(false); + expect(runLock.getCurrentResources()).toBeUndefined(); + }); + + expect(executedWithoutLock).toBe(true); + } finally { + await runLock.quit(); + } + }); + + redisTest( + "Test concurrent locks on different resources", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + const results: string[] = []; + + // Start multiple concurrent locks on different resources + const lock1Promise = runLock.lock("test-lock", ["resource-1"], async () => { + results.push("lock1-start"); + await new Promise((resolve) => setTimeout(resolve, 100)); + results.push("lock1-end"); + return "result1"; + }); + + const lock2Promise = runLock.lock("test-lock", ["resource-2"], async () => { + results.push("lock2-start"); + await new Promise((resolve) => setTimeout(resolve, 100)); + results.push("lock2-end"); + return "result2"; + }); + + const lock3Promise = runLock.lock("test-lock", ["resource-3"], async () => { + results.push("lock3-start"); + await new Promise((resolve) => setTimeout(resolve, 100)); + results.push("lock3-end"); + return "result3"; + }); + + const [result1, result2, result3] = await Promise.all([ + lock1Promise, + lock2Promise, + lock3Promise, + ]); + + expect(result1).toBe("result1"); + expect(result2).toBe("result2"); + expect(result3).toBe("result3"); + + // All locks should have started (concurrent execution) + expect(results).toContain("lock1-start"); + expect(results).toContain("lock2-start"); + expect(results).toContain("lock3-start"); + expect(results).toContain("lock1-end"); + expect(results).toContain("lock2-end"); + expect(results).toContain("lock3-end"); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test multiple resources in single lock", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + await runLock.lock("test-lock", ["resource-a", "resource-b", "resource-c"], async () => { + expect(runLock.isInsideLock()).toBe(true); + // Resources should be sorted and joined + expect(runLock.getCurrentResources()).toBe("resource-a,resource-b,resource-c"); + }); + + // Test that resource order doesn't matter (should be normalized) + await runLock.lock("test-lock", ["resource-c", "resource-a", "resource-b"], async () => { + expect(runLock.getCurrentResources()).toBe("resource-a,resource-b,resource-c"); + }); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test different lock names on same resources don't interfere", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + const results: string[] = []; + + // These should be able to run concurrently despite same resources + // because they have different lock names + const promise1 = runLock.lock("lock-type-1", ["shared-resource"], async () => { + results.push("type1-start"); + await new Promise((resolve) => setTimeout(resolve, 200)); + results.push("type1-end"); + }); + + const promise2 = runLock.lock("lock-type-2", ["shared-resource"], async () => { + results.push("type2-start"); + await new Promise((resolve) => setTimeout(resolve, 200)); + results.push("type2-end"); + }); + + await Promise.all([promise1, promise2]); + + // Both should have executed (different lock names don't block each other) + expect(results).toContain("type1-start"); + expect(results).toContain("type1-end"); + expect(results).toContain("type2-start"); + expect(results).toContain("type2-end"); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test default duration configuration", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + // Test with custom default duration + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + duration: 8000, + }); + + try { + // Test that the default duration is set correctly + expect(runLock.getDuration()).toBe(8000); + + // Test lock without specifying duration (should use default) + const startTime = Date.now(); + await runLock.lock("test-lock", ["default-duration-test"], async () => { + expect(runLock.isInsideLock()).toBe(true); + // Sleep for a bit to ensure the lock is working + await new Promise((resolve) => setTimeout(resolve, 100)); + }); + const elapsed = Date.now() - startTime; + expect(elapsed).toBeGreaterThan(90); // Should have completed successfully + + // Test lockIf without duration (should use default) + await runLock.lockIf(true, "test-lock", ["lockif-default"], async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test automatic extension threshold configuration", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + // Test with custom automatic extension threshold + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + automaticExtensionThreshold: 200, // Custom threshold + duration: 800, + }); + + try { + // Test that the threshold is set correctly + expect(runLock.getAutomaticExtensionThreshold()).toBe(200); + expect(runLock.getDuration()).toBe(800); // Should use configured value + + // Test lock extension with custom threshold + // Use a short lock duration but longer operation to trigger extension + await runLock.lock("test-lock", ["extension-threshold-test"], async () => { + expect(runLock.isInsideLock()).toBe(true); + // Sleep longer than lock duration to ensure extension works + await new Promise((resolve) => setTimeout(resolve, 1200)); + }); + } finally { + await runLock.quit(); + } + } + ); + + redisTest("Test Redlock retry configuration", { timeout: 10_000 }, async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + // Test that we can configure all settings + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + duration: 3000, + automaticExtensionThreshold: 300, + retryConfig: { + maxAttempts: 5, + baseDelay: 150, + }, + }); + + try { + // Verify all configurations are set + expect(runLock.getDuration()).toBe(3000); + expect(runLock.getAutomaticExtensionThreshold()).toBe(300); + + const retryConfig = runLock.getRetryConfig(); + expect(retryConfig.maxAttempts).toBe(5); + expect(retryConfig.baseDelay).toBe(150); + + // Test basic functionality with all custom configs + await runLock.lock("test-lock", ["all-config-test"], async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + } finally { + await runLock.quit(); + } + }); + + redisTest( + "Test production-optimized configuration", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + // Test with production-optimized settings (similar to RunEngine) + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + duration: 10000, + automaticExtensionThreshold: 2000, + retryConfig: { + maxAttempts: 15, + baseDelay: 100, + maxDelay: 3000, + backoffMultiplier: 1.8, + jitterFactor: 0.15, + maxTotalWaitTime: 25000, + }, + }); + + try { + // Verify production configuration + expect(runLock.getDuration()).toBe(10000); + expect(runLock.getAutomaticExtensionThreshold()).toBe(2000); + + const retryConfig = runLock.getRetryConfig(); + expect(retryConfig.maxAttempts).toBe(15); + expect(retryConfig.baseDelay).toBe(100); + expect(retryConfig.maxDelay).toBe(3000); + expect(retryConfig.backoffMultiplier).toBe(1.8); + expect(retryConfig.jitterFactor).toBe(0.15); + expect(retryConfig.maxTotalWaitTime).toBe(25000); + + // Test lock with default duration (should use 10 seconds) + const startTime = Date.now(); + await runLock.lock("test-lock", ["production-config"], async () => { + expect(runLock.isInsideLock()).toBe(true); + // Simulate a typical operation duration + await new Promise((resolve) => setTimeout(resolve, 200)); + }); + const elapsed = Date.now() - startTime; + expect(elapsed).toBeGreaterThan(190); + expect(elapsed).toBeLessThan(1000); // Should complete quickly for successful operation + } finally { + await runLock.quit(); + } + } + ); + + redisTest("Test configuration edge cases", { timeout: 15_000 }, async ({ redisOptions }) => { + const logger = new Logger("RunLockTest", "debug"); + + // Test with maxAttempts = 0 + const redis1 = createRedisClient(redisOptions); + const runLock1 = new RunLocker({ + redis: redis1, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxAttempts: 0, + baseDelay: 100, + maxTotalWaitTime: 1000, + }, + }); + + try { + const config = runLock1.getRetryConfig(); + expect(config.maxAttempts).toBe(0); + + // Should work for successful acquisitions + await runLock1.lock("test-lock", ["test-edge"], async () => { + expect(runLock1.isInsideLock()).toBe(true); + }); + } finally { + await runLock1.quit(); + } + + // Test with very small delays + const redis2 = createRedisClient(redisOptions); + const runLock2 = new RunLocker({ + redis: redis2, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxAttempts: 2, + baseDelay: 1, + maxDelay: 10, + backoffMultiplier: 2.0, + jitterFactor: 0.5, + maxTotalWaitTime: 100, + }, + }); + + try { + const config = runLock2.getRetryConfig(); + expect(config.baseDelay).toBe(1); + expect(config.maxDelay).toBe(10); + expect(config.jitterFactor).toBe(0.5); + + await runLock2.lock("test-lock", ["test-small"], async () => { + expect(runLock2.isInsideLock()).toBe(true); + }); + } finally { + await runLock2.quit(); + } + }); + + redisTest("Test total wait time configuration", { timeout: 10_000 }, async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + retryConfig: { + maxAttempts: 100, // High retry count + baseDelay: 100, + maxTotalWaitTime: 500, // But low total wait time + }, + }); + + try { + // Test that total wait time configuration is properly applied + const config = runLock.getRetryConfig(); + expect(config.maxAttempts).toBe(100); + expect(config.maxTotalWaitTime).toBe(500); + expect(config.baseDelay).toBe(100); + + // Basic functionality test with the configuration + await runLock.lock("test-lock", ["test-timing-config"], async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + + expect(runLock.isInsideLock()).toBe(false); + } finally { + await runLock.quit(); + } + }); + + redisTest( + "Test quit functionality and cleanup", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + // Acquire some locks to create state + await runLock.lock("test-lock", ["quit-test-1"], async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + + // Verify we can still acquire locks + await runLock.lock("test-lock", ["quit-test-2"], async () => { + expect(runLock.isInsideLock()).toBe(true); + }); + + // Now quit should clean everything up + await runLock.quit(); + + // After quit, should be able to create new instance and acquire locks + const newRedis = createRedisClient(redisOptions); + const newRunLock = new RunLocker({ + redis: newRedis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + await newRunLock.lock("test-lock", ["quit-test-1"], async () => { + expect(newRunLock.isInsideLock()).toBe(true); + }); + } finally { + await newRunLock.quit(); + } + } + ); + + redisTest( + "Test lock extension during long operations", + { timeout: 20_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + duration: 1000, + }); + + try { + let lockExtended = false; + const startTime = Date.now(); + + // Acquire lock with short duration but long operation + await runLock.lock("test-lock", ["extension-test"], async () => { + // Operation longer than lock duration - should trigger extension + await new Promise((resolve) => setTimeout(resolve, 2500)); + + const elapsed = Date.now() - startTime; + expect(elapsed).toBeGreaterThan(2000); + + // If we get here, extension must have worked + lockExtended = true; + }); + + expect(lockExtended).toBe(true); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test getCurrentResources in various states", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + try { + // Outside any lock + expect(runLock.getCurrentResources()).toBeUndefined(); + expect(runLock.isInsideLock()).toBe(false); + + await runLock.lock("test-lock", ["resource-x", "resource-y"], async () => { + // Inside lock + expect(runLock.getCurrentResources()).toBe("resource-x,resource-y"); + expect(runLock.isInsideLock()).toBe(true); + + await runLock.lock("test-lock", ["resource-x", "resource-y"], async () => { + // Nested lock with same resources + expect(runLock.getCurrentResources()).toBe("resource-x,resource-y"); + expect(runLock.isInsideLock()).toBe(true); + }); + }); + + // Outside lock again + expect(runLock.getCurrentResources()).toBeUndefined(); + expect(runLock.isInsideLock()).toBe(false); + } finally { + await runLock.quit(); + } + } + ); + + redisTest( + "Test retry behavior with exact timing", + { timeout: 25_000 }, + async ({ redisOptions }) => { + const redis1 = createRedisClient(redisOptions); + const redis2 = createRedisClient(redisOptions); + const logger = new Logger("RunLockTest", "debug"); + + const runLock1 = new RunLocker({ + redis: redis1, + logger, + tracer: trace.getTracer("RunLockTest"), + }); + + const runLock2 = new RunLocker({ + redis: redis2, + logger, + tracer: trace.getTracer("RunLockTest"), + duration: 30000, + retryConfig: { + maxAttempts: 3, + baseDelay: 100, + maxDelay: 500, + backoffMultiplier: 2.0, + jitterFactor: 0, // No jitter for predictable timing + maxTotalWaitTime: 10000, + }, + }); + + try { + // Create blocking lock with first instance - make it last much longer than retry logic + const blockingPromise = runLock1.lock("test-lock", ["timing-test"], async () => { + await new Promise((resolve) => setTimeout(resolve, 15000)); + }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const startTime = Date.now(); + try { + await runLock2.lock("test-lock", ["timing-test"], async () => { + expect(true).toBe(false); + }); + expect(true).toBe(false); // Should not reach here + } catch (error) { + const elapsed = Date.now() - startTime; + expect(error).toBeInstanceOf(LockAcquisitionTimeoutError); + + if (error instanceof LockAcquisitionTimeoutError) { + expect(error.attempts).toBe(4); // 0 + 3 retries + // With backoff: 100ms + 200ms + 400ms = 700ms total wait time + expect(error.totalWaitTime).toBeGreaterThan(600); + expect(error.totalWaitTime).toBeLessThan(800); + expect(elapsed).toBeGreaterThan(600); + } + } + + await blockingPromise; + } finally { + await runLock1.quit(); + await runLock2.quit(); } } ); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index f07dd703ab..50e7c8da4e 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -13,6 +13,7 @@ import { FairQueueSelectionStrategyOptions } from "../run-queue/fairQueueSelecti import { MinimalAuthenticatedEnvironment } from "../shared/index.js"; import { workerCatalog } from "./workerCatalog.js"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; +import { LockRetryConfig } from "./locking.js"; export type RunEngineOptions = { prisma: PrismaClient; @@ -45,6 +46,9 @@ export type RunEngineOptions = { }; runLock: { redis: RedisOptions; + duration?: number; + automaticExtensionThreshold?: number; + retryConfig?: LockRetryConfig; }; /** If not set then checkpoints won't ever be used */ retryWarmStartThresholdMs?: number;