From 589e75e1af4cb3c72e1e2606e1c7193f85daac5a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 8 Nov 2024 19:09:45 +0100 Subject: [PATCH 1/4] Properly rename test that is responsible for fast-path failing for undispatched mode --- ...imentalDispatchModeTest.kt => UnconfinedCancellationTest.kt} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename kotlinx-coroutines-core/common/test/{ExperimentalDispatchModeTest.kt => UnconfinedCancellationTest.kt} (97%) diff --git a/kotlinx-coroutines-core/common/test/ExperimentalDispatchModeTest.kt b/kotlinx-coroutines-core/common/test/UnconfinedCancellationTest.kt similarity index 97% rename from kotlinx-coroutines-core/common/test/ExperimentalDispatchModeTest.kt rename to kotlinx-coroutines-core/common/test/UnconfinedCancellationTest.kt index 98c5e66727..59845d841b 100644 --- a/kotlinx-coroutines-core/common/test/ExperimentalDispatchModeTest.kt +++ b/kotlinx-coroutines-core/common/test/UnconfinedCancellationTest.kt @@ -3,7 +3,7 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* import kotlin.test.* -class ExperimentalDispatchModeTest : TestBase() { +class UnconfinedCancellationTest : TestBase() { @Test fun testUnconfinedCancellation() = runTest { val parent = Job() From b96c9a249ebf4c0872de733f928b462b931896b7 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Sun, 10 Nov 2024 20:28:06 +0100 Subject: [PATCH 2/4] Simplify, document and reduce the binary size of Undispatched.kt --- .../common/src/intrinsics/Undispatched.kt | 53 ++++++++++--------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt index 511199701a..0392817e13 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt @@ -38,43 +38,42 @@ internal fun (suspend (R) -> T).startCoroutineUndispatched(receiver: R, c * * It starts the coroutine using [startCoroutineUninterceptedOrReturn]. */ -internal fun ScopeCoroutine.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? { - return undispatchedResult({ true }) { - block.startCoroutineUninterceptedOrReturn(receiver, this) - } -} +internal fun ScopeCoroutine.startUndispatchedOrReturn( + receiver: R, block: suspend R.() -> T +): Any? = startUndspatched(alwaysRethrow = true, receiver, block) /** * Same as [startUndispatchedOrReturn], but ignores [TimeoutCancellationException] on fast-path. */ internal fun ScopeCoroutine.startUndispatchedOrReturnIgnoreTimeout( receiver: R, block: suspend R.() -> T -): Any? { - return undispatchedResult({ e -> !(e is TimeoutCancellationException && e.coroutine === this) }) { - block.startCoroutineUninterceptedOrReturn(receiver, this) - } -} +): Any? = startUndspatched(alwaysRethrow = false, receiver, block) -private inline fun ScopeCoroutine.undispatchedResult( - shouldThrow: (Throwable) -> Boolean, - startBlock: () -> Any? +/** + * Starts and handles the result of an undispatched coroutine, potentially with children. + * For example, it handles `coroutineScope { ...suspend of throw, maybe start children... }` + * and `launch(start = UNDISPATCHED) { ... }` + * + * @param alwaysRethrow specifies whether an exception should be unconditioanlly rethrown. + * It is a tweak for 'withTimeout' in order to successfully return values when the block was cancelled: + * i.e. `withTimeout(1ms) { Thread.sleep(1000); 42 }` should not fail. + */ +private fun ScopeCoroutine.startUndspatched( + alwaysRethrow: Boolean, + receiver: R, block: suspend R.() -> T ): Any? { val result = try { - startBlock() + block.startCoroutineUninterceptedOrReturn(receiver, this) } catch (e: Throwable) { CompletedExceptionally(e) } + /* - * We're trying to complete our undispatched block here and have three code-paths: - * (1) Coroutine is suspended. - * Otherwise, coroutine had returned result, so we are completing our block (and its job). - * (2) If we can't complete it or started waiting for children, we suspend. - * (3) If we have successfully completed the coroutine state machine here, - * then we take the actual final state of the coroutine from makeCompletingOnce and return it. - * - * shouldThrow parameter is a special code path for timeout coroutine: - * If timeout is exceeded, but withTimeout() block was not suspended, we would like to return block value, - * not a timeout exception. + * We are trying to complete our undispatched block with the following possible codepaths: + * 1) The coroutine just suspended. I.e. `coroutineScope { .. suspend here }`. + * Then just suspend + * 2) The coroutine completed with something, but has active children. Wait for them, also suspend + * 3) The coroutine succesfully completed. Return or rethrow its result. */ if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1) val state = makeCompletingOnce(result) @@ -82,7 +81,7 @@ private inline fun ScopeCoroutine.undispatchedResult( afterCompletionUndispatched() return if (state is CompletedExceptionally) { // (3) when { - shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont) + alwaysRethrow || notOwnTimeout(state.cause) -> throw recoverStackTrace(state.cause, uCont) result is CompletedExceptionally -> throw recoverStackTrace(result.cause, uCont) else -> result } @@ -90,3 +89,7 @@ private inline fun ScopeCoroutine.undispatchedResult( state.unboxState() } } + +private fun ScopeCoroutine<*>.notOwnTimeout(cause: Throwable): Boolean { + return cause !is TimeoutCancellationException || cause.coroutine !== this +} From 83897686e20b83d76077b7d5789928b8873085fa Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 11 Nov 2024 12:58:36 +0100 Subject: [PATCH 3/4] Rethrow dispatcher exceptions during the undispatched coroutines start immediately Otherwise, in the face of malfunctioning dispatcher, there might be no trace of any error, making it extremely hard to debug. The root cause should be addressed by #4209 Fixes #4142 --- .../common/src/intrinsics/Undispatched.kt | 9 ++++++ .../jvm/test/FailFastOnStartTest.kt | 28 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt index 0392817e13..0d2c3625fb 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt @@ -64,6 +64,10 @@ private fun ScopeCoroutine.startUndspatched( ): Any? { val result = try { block.startCoroutineUninterceptedOrReturn(receiver, this) + } catch (e: DispatchException) { + // Special codepath for failing CoroutineDispatcher: rethrow an exception + // immediately without waiting for children to indicate something is wrong + dispatchException(e) } catch (e: Throwable) { CompletedExceptionally(e) } @@ -93,3 +97,8 @@ private fun ScopeCoroutine.startUndspatched( private fun ScopeCoroutine<*>.notOwnTimeout(cause: Throwable): Boolean { return cause !is TimeoutCancellationException || cause.coroutine !== this } + +private fun ScopeCoroutine<*>.dispatchException(e: DispatchException) { + makeCompleting(CompletedExceptionally(e.cause)) + throw throw recoverStackTrace(e.cause, uCont) +} diff --git a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt index ae3c503d60..b637453f1f 100644 --- a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt @@ -4,9 +4,17 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn import org.junit.* import org.junit.Test import org.junit.rules.* +import org.junit.runners.model.TestTimedOutException +import java.util.concurrent.TimeoutException +import kotlin.coroutines.Continuation +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.startCoroutine import kotlin.test.* class FailFastOnStartTest : TestBase() { @@ -81,4 +89,24 @@ class FailFastOnStartTest : TestBase() { fun testAsyncNonChild() = runTest(expected = ::mainException) { async(Job() + Dispatchers.Main) { fail() } } + + @Test + fun testFlowOn() { + // See #4142, this test ensures that `coroutineScope { produce(failingDispatcher, ATOMIC) }` + // rethrows an exception. It does not help with the completion of such a coroutine though. + // Hack to avoid waiting for test completion + expect(1) + val caller = suspend { + try { + emptyFlow().flowOn(Dispatchers.Main).collect { fail() } + } catch (e: Throwable) { + assertTrue(mainException(e)) + expect(2) + } + } + + caller.startCoroutine(Continuation(EmptyCoroutineContext) { + finish(3) + }) + } } From f7acf1e0a47af4b47ee55f0f401f72ec147b4feb Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 18 Feb 2025 12:17:33 +0100 Subject: [PATCH 4/4] ~cleanup --- .../common/src/intrinsics/Undispatched.kt | 6 +++--- kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt | 6 ++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt index 0d2c3625fb..bb81d442d4 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt @@ -67,7 +67,7 @@ private fun ScopeCoroutine.startUndspatched( } catch (e: DispatchException) { // Special codepath for failing CoroutineDispatcher: rethrow an exception // immediately without waiting for children to indicate something is wrong - dispatchException(e) + dispatchExceptionAndMakeCompleting(e) } catch (e: Throwable) { CompletedExceptionally(e) } @@ -98,7 +98,7 @@ private fun ScopeCoroutine<*>.notOwnTimeout(cause: Throwable): Boolean { return cause !is TimeoutCancellationException || cause.coroutine !== this } -private fun ScopeCoroutine<*>.dispatchException(e: DispatchException) { +private fun ScopeCoroutine<*>.dispatchExceptionAndMakeCompleting(e: DispatchException): Nothing { makeCompleting(CompletedExceptionally(e.cause)) - throw throw recoverStackTrace(e.cause, uCont) + throw recoverStackTrace(e.cause, uCont) } diff --git a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt index b637453f1f..022fb6b630 100644 --- a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt @@ -10,8 +10,6 @@ import kotlinx.coroutines.flow.flowOn import org.junit.* import org.junit.Test import org.junit.rules.* -import org.junit.runners.model.TestTimedOutException -import java.util.concurrent.TimeoutException import kotlin.coroutines.Continuation import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine @@ -91,10 +89,10 @@ class FailFastOnStartTest : TestBase() { } @Test - fun testFlowOn() { + fun testFlowOn() { // See #4142, this test ensures that `coroutineScope { produce(failingDispatcher, ATOMIC) }` // rethrows an exception. It does not help with the completion of such a coroutine though. - // Hack to avoid waiting for test completion + // `suspend {}` + start coroutine with custom `completion` to avoid waiting for test completion expect(1) val caller = suspend { try {