From 2bc88edf1cb8dda500479d6946c2ff3dca949a82 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Fri, 25 Jul 2025 15:20:30 +0100 Subject: [PATCH 1/3] launch ChannelFlow.collect CoroutineStart.UNDISPATCHED when dispatcher didn't change --- .../common/src/flow/internal/ChannelFlow.kt | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index c676eedf62..449720964c 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -109,14 +109,23 @@ public abstract class ChannelFlow( * For non-atomic start it is possible to observe the situation, * where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`) * handlers, while the pipeline before does not, because it was cancelled during its dispatch. - * Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks. + * Thus `onCompletion` and `finally` blocks won't be executed, and it may lead to a different kind of memory leaks. */ public open fun produceImpl(scope: CoroutineScope): ReceiveChannel = - scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun) + produceImplInternal(scope, CoroutineStart.ATOMIC) + + internal open fun produceImplInternal(scope: CoroutineScope, start: CoroutineStart): ReceiveChannel = scope.produce(context, produceCapacity, onBufferOverflow, start = start, block = collectToFun) override suspend fun collect(collector: FlowCollector): Unit = coroutineScope { - collector.emitAll(produceImpl(this)) + // If upstream and collect have the same dispatcher, launch the `produce` coroutine undispatched. + // This allows the collector to reliably subscribe to the flow before it starts emitting. + val current = currentCoroutineContext()[ContinuationInterceptor] + val desired = context[ContinuationInterceptor] + val start = if (desired == null || desired == current) { + CoroutineStart.UNDISPATCHED + } else CoroutineStart.ATOMIC + collector.emitAll(produceImplInternal(this, start)) } protected open fun additionalToStringProps(): String? = null From 32ffab8238128f15cf978b57d5b539f440d5cc71 Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Fri, 25 Jul 2025 15:35:39 +0100 Subject: [PATCH 2/3] remove subscribed assert + reword --- .../common/test/flow/channels/ChannelFlowTest.kt | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index 26c596da07..1d7ce160f2 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -206,13 +206,11 @@ class ChannelFlowTest : TestBase() { } @Test - fun testDoesntDispatchWhenUnnecessarilyWhenCollected() = runTest { + fun testDoesntDispatchUnnecessarilyWhenCollected() = runTest { expect(1) - var subscribed = false val myFlow = flow { expect(3) - subscribed = true - yield() + yield() // In other words, testing that this will be the first suspension point in `collectLatest`. expect(5) } launch(start = CoroutineStart.UNDISPATCHED) { @@ -223,6 +221,5 @@ class ChannelFlowTest : TestBase() { finish(6) } expect(4) - assertTrue(subscribed) } } From c6cf8146b7f544e8116e6b5233a586968ee4050c Mon Sep 17 00:00:00 2001 From: Natasha Murashkina Date: Mon, 28 Jul 2025 18:21:12 +0100 Subject: [PATCH 3/3] add test for old behavior (dispatches when needed) --- .../common/test/flow/channels/ChannelFlowTest.kt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index 1d7ce160f2..8a36d3358b 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -222,4 +222,19 @@ class ChannelFlowTest : TestBase() { } expect(4) } + + @Test + fun testDispatchesToDifferentDispatcherWhenCollected() = runTest { + expect(1) + val myFlow = flow { + finish(4) + }.flowOn(wrapperDispatcher()) + launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + myFlow.collectLatest { + expectUnreached() + } + } + expect(3) + } }