fix(stdio): #573, #574 customize dispatchers & scope for stdio server transport configuration#584
fix(stdio): #573, #574 customize dispatchers & scope for stdio server transport configuration#584
Conversation
…port Scope lifecycle is fixed (#574 resolved), processing dispatcher defaults to `Dispatchers.Default` (#573 resolved), scope context is clean (no spurious dispatcher stacking), and @volatile on the three job vars addresses the visibility race. - Introduce a `Configuration` class for `StdioServerTransport` to improve API flexibility and readability. - Updated transport initialization to use a builder block for configuring parameters such as I/O streams, buffer sizes, dispatchers, and parent coroutine scope. Test: Added integration test validating the builder functionality.
… baselines - Added `InjectDispatcher` rule to detekt exclusions for test folders in `detekt.yml`. - Removed resolved `InjectDispatcher` issues from baseline files.
c4cf6ac to
570184d
Compare
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
| * val transport = StdioServerTransport { | ||
| * source = System.`in`.asInput(), | ||
| * sink = System.out.asSink(), | ||
| * } |
There was a problem hiding this comment.
It seems to me this is an incorrect example
as I remember, need to call buffered
There was a problem hiding this comment.
Creating buffered instance happens internally here
There was a problem hiding this comment.
I just noticed that commas are also used here
| */ | ||
| @OptIn(ExperimentalAtomicApi::class) | ||
| public class StdioServerTransport(private val inputStream: Source, outputStream: Sink) : AbstractTransport() { | ||
| public class StdioServerTransport(block: Configuration.() -> Unit) : AbstractTransport() { |
There was a problem hiding this comment.
Isn’t Configuration class used in streamable constructor?
Why is a builder used here then?
There was a problem hiding this comment.
What do you mean? StreamableHttpServerTransport uses different configuration class. This is StdioServerTransport.Configuration
There was a problem hiding this comment.
You are using two different API approaches to create transport
There was a problem hiding this comment.
Accordingly, we need to settle on a single approach: either update/add a builder for streamable, or make the changes here for stdio
| public class Configuration internal constructor( | ||
| public var source: Source? = null, | ||
| public var sink: Sink? = null, | ||
| public var readBufferSize: Long = READ_BUFFER_SIZE, |
There was a problem hiding this comment.
This doesn’t need to be made public
| public var readChannelBufferSize: Int = Channel.UNLIMITED, | ||
| public var writeChannelBufferSize: Int = Channel.UNLIMITED, |
There was a problem hiding this comment.
This also shouldn’t be public
There was a problem hiding this comment.
I disagree with this. Buffer sizes should be configurable for production systems. The default value of UNLIMITED is a clear performance issue (OOM risk). I am not changing it at this time to avoid unexpected changes for users but this is a defect for sure.
There was a problem hiding this comment.
The default value of UNLIMITED is a clear performance issue
That makes sense
At the same time, I'd still suggest keeping it private and changing this to BUFFERED. Leaving it as UNLIMITED seems unreasonable to me, sooner or later we'll likely have to change it anyway
Later on, we can add the ability to configure this parameter, which will be easier to do within an already agreed-upon API
I'd also suggest renaming it, since it's essentially a capacity
There was a problem hiding this comment.
If we want to configure backpressure, let's pass the entire Channel as a parameter.
| public var readingJobDispatcher: CoroutineDispatcher = IODispatcher, | ||
| public var writingJobDispatcher: CoroutineDispatcher = IODispatcher, | ||
| public var processingJobDispatcher: CoroutineDispatcher = Dispatchers.Default, |
There was a problem hiding this comment.
Why are there three dispatchers if one would be enough?
There was a problem hiding this comment.
One dispatcher is not enough. Here is why:
processingJobDispatcherfor issue StdioServerTransport: Request handlers should not run on Dispatchers.IO #573.readingJobDispatcherandwritingJobDispatchershould be configurable to run IO jobs on virtual threads.Dispatchers.IOuses platform threads by default.- I have considered having only one dispatcher for IO, but rejected this option for the sake of flexibility to implement the bulkhead pattern. This is crucial for high-load applications where read and write thread pools should be isolated.
There was a problem hiding this comment.
@kpavlov better do val stdioDispatcher = ioDispatcher.limitedParallelism(2) to have dedicated threads for your jobs. The don't consume the global IO dispatcher pool in this case. So you don't need to pass a dispatcher for each job (parameters should specify a kind of dispatcher for IO, but not every single dispatcher for every job)
There was a problem hiding this comment.
or you pass an IO dispatcher as a parameter and write a doc that at least 2 threads must be allocated in the dispatcher. So either a user passes a simple Dispatchers.IO, or also manually makes Dispatchers.IO.limitedParallelism(2)
There was a problem hiding this comment.
The approach you implemented does not follow from either the attached issue or the original one
Splitting into io and default, along with the ability to pass a value for io, fully addresses the request
agree with @nerzhulart
| public var processingJobDispatcher: CoroutineDispatcher = Dispatchers.Default, | ||
| public var readChannelBufferSize: Int = Channel.UNLIMITED, | ||
| public var writeChannelBufferSize: Int = Channel.UNLIMITED, | ||
| public var coroutineScope: CoroutineScope? = null, |
There was a problem hiding this comment.
Why does coroutineScope need to be public?
There was a problem hiding this comment.
I'd pass a scope as a constructor parameter
There was a problem hiding this comment.
The coroutine scope that's used in this transport is completely detached from the coroutine tree, which may cause leaks. Please add an optional CoroutineScope parameter
agree
There was a problem hiding this comment.
This is public, because it's builder's field. It is copied to the private val field in constructor and the builder got distroyed
|
|
||
| @Volatile | ||
| private var readingJob: Job? = null | ||
|
|
||
| @Volatile | ||
| private var sendingJob: Job? = null | ||
| private var processingJob: Job? = null | ||
|
|
||
| private val coroutineContext: CoroutineContext = IODispatcher + SupervisorJob() | ||
| private val scope = CoroutineScope(coroutineContext) | ||
| private val readChannel = Channel<ByteArray>(Channel.UNLIMITED) | ||
| private val writeChannel = Channel<JSONRPCMessage>(Channel.UNLIMITED) | ||
| private val outputSink = outputStream.buffered() | ||
| @Volatile | ||
| private var processingJob: Job? = null |
There was a problem hiding this comment.
Can we get rid of storing the Job fields and use a scope instead?
It seems to me that this would eliminate potential race conditions
There was a problem hiding this comment.
Let us address this issue within the scope of #518.
The close() method already contains a race condition which this PR does not exacerbate. The test is passing.
We should retain the current algorithm and pattern for the time being.
There was a problem hiding this comment.
You are already making a change here, so instead of adding volatile, it could simply be removed jobs
There was a problem hiding this comment.
I don't quite follow the reasoning here, if you're already doing a refactor, why not address this as well? Is there some constraint I’m missing?
| printOutput = output.asSink().buffered() | ||
| } | ||
|
|
||
| @OptIn(InternalAPI::class) |
There was a problem hiding this comment.
I think it’s better not to use InternalAPI from other libraries unless there’s a clear necessity
There was a problem hiding this comment.
I agree with you in general. This is for clientDispatcher and is not causing any issues here.
kpavlov
left a comment
There was a problem hiding this comment.
@devcrocod, thanks for your review! I’ve replied to your comments.
@e5l, I’d be grateful if you could also take a look.
| * val transport = StdioServerTransport { | ||
| * source = System.`in`.asInput(), | ||
| * sink = System.out.asSink(), | ||
| * } |
There was a problem hiding this comment.
Creating buffered instance happens internally here
| */ | ||
| @OptIn(ExperimentalAtomicApi::class) | ||
| public class StdioServerTransport(private val inputStream: Source, outputStream: Sink) : AbstractTransport() { | ||
| public class StdioServerTransport(block: Configuration.() -> Unit) : AbstractTransport() { |
There was a problem hiding this comment.
What do you mean? StreamableHttpServerTransport uses different configuration class. This is StdioServerTransport.Configuration
| public class Configuration internal constructor( | ||
| public var source: Source? = null, | ||
| public var sink: Sink? = null, | ||
| public var readBufferSize: Long = READ_BUFFER_SIZE, |
There was a problem hiding this comment.
This is your opinion. There is a similar request for StreamableHttpTransport (#521). So, making buffer size customizable makes sense to me. Might be useful for performance tuning.
There was a problem hiding this comment.
Of course, this is just my opinion, I'm not claiming it's truth
And I’m not saying that this is a bad change
At the same time, it locks api. As I mentioned in another comment, we can always add something later without trouble, but removing or changing it afterwards would be more problematic
If we want to pass such things through the main config, then that's ok
| public var readingJobDispatcher: CoroutineDispatcher = IODispatcher, | ||
| public var writingJobDispatcher: CoroutineDispatcher = IODispatcher, | ||
| public var processingJobDispatcher: CoroutineDispatcher = Dispatchers.Default, |
There was a problem hiding this comment.
One dispatcher is not enough. Here is why:
processingJobDispatcherfor issue StdioServerTransport: Request handlers should not run on Dispatchers.IO #573.readingJobDispatcherandwritingJobDispatchershould be configurable to run IO jobs on virtual threads.Dispatchers.IOuses platform threads by default.- I have considered having only one dispatcher for IO, but rejected this option for the sake of flexibility to implement the bulkhead pattern. This is crucial for high-load applications where read and write thread pools should be isolated.
| public var readChannelBufferSize: Int = Channel.UNLIMITED, | ||
| public var writeChannelBufferSize: Int = Channel.UNLIMITED, |
There was a problem hiding this comment.
I disagree with this. Buffer sizes should be configurable for production systems. The default value of UNLIMITED is a clear performance issue (OOM risk). I am not changing it at this time to avoid unexpected changes for users but this is a defect for sure.
| public var processingJobDispatcher: CoroutineDispatcher = Dispatchers.Default, | ||
| public var readChannelBufferSize: Int = Channel.UNLIMITED, | ||
| public var writeChannelBufferSize: Int = Channel.UNLIMITED, | ||
| public var coroutineScope: CoroutineScope? = null, |
|
|
||
| @Volatile | ||
| private var readingJob: Job? = null | ||
|
|
||
| @Volatile | ||
| private var sendingJob: Job? = null | ||
| private var processingJob: Job? = null | ||
|
|
||
| private val coroutineContext: CoroutineContext = IODispatcher + SupervisorJob() | ||
| private val scope = CoroutineScope(coroutineContext) | ||
| private val readChannel = Channel<ByteArray>(Channel.UNLIMITED) | ||
| private val writeChannel = Channel<JSONRPCMessage>(Channel.UNLIMITED) | ||
| private val outputSink = outputStream.buffered() | ||
| @Volatile | ||
| private var processingJob: Job? = null |
There was a problem hiding this comment.
Let us address this issue within the scope of #518.
The close() method already contains a race condition which this PR does not exacerbate. The test is passing.
We should retain the current algorithm and pattern for the time being.
| printOutput = output.asSink().buffered() | ||
| } | ||
|
|
||
| @OptIn(InternalAPI::class) |
There was a problem hiding this comment.
I agree with you in general. This is for clientDispatcher and is not causing any issues here.
|
cc: @i1bro |
|
This PR introduces quite a few changes beyond addressing the specific issue
I think it would be better not to add many things to the public api without a request for them. We can always add this later without breaking compatibility, but removing or changing it afterwards would be much hard |
|
@devcrocod, where exactly this pr worsening existing design? |
| * @property coroutineScope The [CoroutineScope] used for managing coroutines. | ||
| */ | ||
| @Suppress("LongParameterList") | ||
| public class Configuration internal constructor( |
There was a problem hiding this comment.
I'd not use such kind of builders. IMHO they look fancy but from use-site it's usually overengineering. I'd prefer put necessary parameters as parameters, like source and sink, as well as parent scope and io dispatcher (that's not presented in KMP). The default dispatcher can be overidden by passing a specific scope with the necessary dispatcher, but I'm sure the default one fits always. All the rest optional/default parmeters like buffer size I'd hide under StdioOptions data class with default instance if possible
| public var readingJobDispatcher: CoroutineDispatcher = IODispatcher, | ||
| public var writingJobDispatcher: CoroutineDispatcher = IODispatcher, | ||
| public var processingJobDispatcher: CoroutineDispatcher = Dispatchers.Default, | ||
| public var readChannelBufferSize: Int = Channel.UNLIMITED, |
There was a problem hiding this comment.
when doing channel sizes configurable always specify where backpressure occurs (if occurs) and add tests for the limited cases
| private val readBuffer = ReadBuffer() | ||
| private val initialized: AtomicBoolean = AtomicBoolean(false) | ||
|
|
||
| @Volatile |
There was a problem hiding this comment.
I don't think you even need these fields for jobs. Just run an orchestration job in init and inside run all necessary jobs - read, write, processing together, then you may join them. In this case you get structured concurrency that joins in a single job. Less state (especially something like volatile) less problems
| }.apply { | ||
| invokeOnCompletion { cause -> | ||
| logJobCompletion("Message sending", cause) | ||
| if (cause is CancellationException) { |
There was a problem hiding this comment.
I'd try to control cancellation via channels
| source.close() | ||
| }.onFailure { logger.warn(it) { "Failed to close stdin" } } | ||
|
|
||
| readingJob?.cancel() |
There was a problem hiding this comment.
when you hardly cancel the jobs they don't have a chance to terminate gracefully, I think better to close the channels and streams and handle semantic exceptions from them
|
@kpavlov I replied to each comment again in case additional details were needed |
|
@e5l @devcrocod @nerzhulart I would appreciate it if you could share a link to the official Kotlin guide on writing idiomatic APIs. Until then, I am confused, as the answers appear to be based on personal preferences. |
132ebd9 to
ac552f7
Compare
…ove `Configuration` class - Replaced the builder-based setup with a streamlined primary constructor for `StdioServerTransport`. - Removed the `Configuration` class for reduced complexity and enhanced readability. - Updated `StdioServerTransportTest` to reflect the refactored initialization. - Refactor cleanup.
ac552f7 to
4693529
Compare
Add a customizable builder to StdioServerTransport
Scope lifecycle is fixed (StdioServerTransport: The coroutine scope that's used in this transport is completely detached from the coroutine tree #574 resolved), processing dispatcher defaults to
Dispatchers.Default(StdioServerTransport: Request handlers should not run on Dispatchers.IO #573 resolved), scope context is clean (no spurious dispatcher stacking), and@Volatileon the three job vars addresses the visibility race.Introduce a
Configurationclass forStdioServerTransportto improve API flexibility and readability. Now one can provide a dispatcher for IO running on Virtual ThreadsUpdated transport initialization to use a builder block for configuring parameters such as I/O streams, buffer sizes, dispatchers, and parent coroutine scope.
Added integration test validating the builder functionality.
Added
InjectDispatcherrule to detekt exclusions for test folders indetekt.yml. RemovedInjectDispatcherissues from detekt baseline files.NB! This PR doesn't fully resolve race conditions on close(). It will be resolved in the course of #518
Motivation and Context
#573, #574
How Has This Been Tested?
Integration test / regression tests
Breaking Changes
No, only API is only extended
Types of changes
Checklist
Additional context