diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt index 511199701a..bb81d442d4 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt @@ -38,43 +38,46 @@ internal fun (suspend (R) -> T).startCoroutineUndispatched(receiver: R, c * * It starts the coroutine using [startCoroutineUninterceptedOrReturn]. */ -internal fun ScopeCoroutine.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? { - return undispatchedResult({ true }) { - block.startCoroutineUninterceptedOrReturn(receiver, this) - } -} +internal fun ScopeCoroutine.startUndispatchedOrReturn( + receiver: R, block: suspend R.() -> T +): Any? = startUndspatched(alwaysRethrow = true, receiver, block) /** * Same as [startUndispatchedOrReturn], but ignores [TimeoutCancellationException] on fast-path. */ internal fun ScopeCoroutine.startUndispatchedOrReturnIgnoreTimeout( receiver: R, block: suspend R.() -> T -): Any? { - return undispatchedResult({ e -> !(e is TimeoutCancellationException && e.coroutine === this) }) { - block.startCoroutineUninterceptedOrReturn(receiver, this) - } -} +): Any? = startUndspatched(alwaysRethrow = false, receiver, block) -private inline fun ScopeCoroutine.undispatchedResult( - shouldThrow: (Throwable) -> Boolean, - startBlock: () -> Any? +/** + * Starts and handles the result of an undispatched coroutine, potentially with children. + * For example, it handles `coroutineScope { ...suspend of throw, maybe start children... }` + * and `launch(start = UNDISPATCHED) { ... }` + * + * @param alwaysRethrow specifies whether an exception should be unconditioanlly rethrown. + * It is a tweak for 'withTimeout' in order to successfully return values when the block was cancelled: + * i.e. `withTimeout(1ms) { Thread.sleep(1000); 42 }` should not fail. + */ +private fun ScopeCoroutine.startUndspatched( + alwaysRethrow: Boolean, + receiver: R, block: suspend R.() -> T ): Any? { val result = try { - startBlock() + block.startCoroutineUninterceptedOrReturn(receiver, this) + } catch (e: DispatchException) { + // Special codepath for failing CoroutineDispatcher: rethrow an exception + // immediately without waiting for children to indicate something is wrong + dispatchExceptionAndMakeCompleting(e) } catch (e: Throwable) { CompletedExceptionally(e) } + /* - * We're trying to complete our undispatched block here and have three code-paths: - * (1) Coroutine is suspended. - * Otherwise, coroutine had returned result, so we are completing our block (and its job). - * (2) If we can't complete it or started waiting for children, we suspend. - * (3) If we have successfully completed the coroutine state machine here, - * then we take the actual final state of the coroutine from makeCompletingOnce and return it. - * - * shouldThrow parameter is a special code path for timeout coroutine: - * If timeout is exceeded, but withTimeout() block was not suspended, we would like to return block value, - * not a timeout exception. + * We are trying to complete our undispatched block with the following possible codepaths: + * 1) The coroutine just suspended. I.e. `coroutineScope { .. suspend here }`. + * Then just suspend + * 2) The coroutine completed with something, but has active children. Wait for them, also suspend + * 3) The coroutine succesfully completed. Return or rethrow its result. */ if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1) val state = makeCompletingOnce(result) @@ -82,7 +85,7 @@ private inline fun ScopeCoroutine.undispatchedResult( afterCompletionUndispatched() return if (state is CompletedExceptionally) { // (3) when { - shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont) + alwaysRethrow || notOwnTimeout(state.cause) -> throw recoverStackTrace(state.cause, uCont) result is CompletedExceptionally -> throw recoverStackTrace(result.cause, uCont) else -> result } @@ -90,3 +93,12 @@ private inline fun ScopeCoroutine.undispatchedResult( state.unboxState() } } + +private fun ScopeCoroutine<*>.notOwnTimeout(cause: Throwable): Boolean { + return cause !is TimeoutCancellationException || cause.coroutine !== this +} + +private fun ScopeCoroutine<*>.dispatchExceptionAndMakeCompleting(e: DispatchException): Nothing { + makeCompleting(CompletedExceptionally(e.cause)) + throw recoverStackTrace(e.cause, uCont) +} diff --git a/kotlinx-coroutines-core/common/test/ExperimentalDispatchModeTest.kt b/kotlinx-coroutines-core/common/test/UnconfinedCancellationTest.kt similarity index 97% rename from kotlinx-coroutines-core/common/test/ExperimentalDispatchModeTest.kt rename to kotlinx-coroutines-core/common/test/UnconfinedCancellationTest.kt index 98c5e66727..59845d841b 100644 --- a/kotlinx-coroutines-core/common/test/ExperimentalDispatchModeTest.kt +++ b/kotlinx-coroutines-core/common/test/UnconfinedCancellationTest.kt @@ -3,7 +3,7 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* import kotlin.test.* -class ExperimentalDispatchModeTest : TestBase() { +class UnconfinedCancellationTest : TestBase() { @Test fun testUnconfinedCancellation() = runTest { val parent = Job() diff --git a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt index ae3c503d60..022fb6b630 100644 --- a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt @@ -4,9 +4,15 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn import org.junit.* import org.junit.Test import org.junit.rules.* +import kotlin.coroutines.Continuation +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.startCoroutine import kotlin.test.* class FailFastOnStartTest : TestBase() { @@ -81,4 +87,24 @@ class FailFastOnStartTest : TestBase() { fun testAsyncNonChild() = runTest(expected = ::mainException) { async(Job() + Dispatchers.Main) { fail() } } + + @Test + fun testFlowOn() { + // See #4142, this test ensures that `coroutineScope { produce(failingDispatcher, ATOMIC) }` + // rethrows an exception. It does not help with the completion of such a coroutine though. + // `suspend {}` + start coroutine with custom `completion` to avoid waiting for test completion + expect(1) + val caller = suspend { + try { + emptyFlow().flowOn(Dispatchers.Main).collect { fail() } + } catch (e: Throwable) { + assertTrue(mainException(e)) + expect(2) + } + } + + caller.startCoroutine(Continuation(EmptyCoroutineContext) { + finish(3) + }) + } }