From a804f501f8b4ff249e8fe6edfbcea44d2bd1a4d4 Mon Sep 17 00:00:00 2001 From: "Ilya.Usov" Date: Mon, 2 Sep 2024 14:22:32 +0200 Subject: [PATCH 1/3] Isolate`onWireReceived` to avoid completely broken protocol if something went wrong (failed deserialization) --- .../com/jetbrains/rd/framework/MessageBroker.kt | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt index 56f9626f3..650fa23ee 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt @@ -7,6 +7,7 @@ import com.jetbrains.rd.framework.impl.ProtocolContexts import com.jetbrains.rd.util.Queue import com.jetbrains.rd.util.Sync import com.jetbrains.rd.util.blockingPutUnique +import com.jetbrains.rd.util.error import com.jetbrains.rd.util.lifetime.Lifetime import com.jetbrains.rd.util.lifetime.intersect import com.jetbrains.rd.util.lifetime.isAlive @@ -87,11 +88,14 @@ class MessageBroker(queueMessages: Boolean = false) : IPrintable { return } - - AllowBindingCookie.allowBind { - val messageContext = protocol.contexts.readContext(buffer) - val helper = RdWireableDispatchHelper(entry.lifetime, id, protocol, messageContext) - entry.subscription.onWireReceived(buffer, helper) + try { + AllowBindingCookie.allowBind { + val messageContext = protocol.contexts.readContext(buffer) + val helper = RdWireableDispatchHelper(entry.lifetime, id, protocol, messageContext) + entry.subscription.onWireReceived(buffer, helper) + } + } catch (e: Throwable) { + log.error("Unexpected exception happened during processing a protocol event", e) } } From c85561e0e33b580925c74f692eed18c81a6db664 Mon Sep 17 00:00:00 2001 From: "Ilya.Usov" Date: Fri, 25 Oct 2024 15:47:29 +0200 Subject: [PATCH 2/3] Deprecate asCoroutineDispatcher that allows inlining because isDispatchNeeded() = false can leads to a deadlock --- .../threading/coroutines/ISchedulerCoroutineUtil.kt | 2 ++ .../util/threading/coroutines/ISourceCoroutineUtil.kt | 10 +++++++++- .../rd/framework/util/IRdEndpointCoroutineUtil.kt | 6 ++++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISchedulerCoroutineUtil.kt b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISchedulerCoroutineUtil.kt index 3712fba75..b866e3fc2 100644 --- a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISchedulerCoroutineUtil.kt +++ b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISchedulerCoroutineUtil.kt @@ -10,4 +10,6 @@ private class SchedulerCoroutineDispatcher(private val scheduler: IScheduler, pr } val IScheduler.asCoroutineDispatcher get() = (this as? CoroutineDispatcher) ?: asCoroutineDispatcher(false) + +@Deprecated("Use asCoroutineDispatcher that doesn't allow inlining because isDispatchNeeded()=false can lead to deadlocks") fun IScheduler.asCoroutineDispatcher(allowInlining: Boolean): CoroutineDispatcher = SchedulerCoroutineDispatcher(this, allowInlining) diff --git a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISourceCoroutineUtil.kt b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISourceCoroutineUtil.kt index 4fefceeff..f702e7858 100644 --- a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISourceCoroutineUtil.kt +++ b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISourceCoroutineUtil.kt @@ -4,6 +4,7 @@ import com.jetbrains.rd.util.lifetime.Lifetime import com.jetbrains.rd.util.reactive.IScheduler import com.jetbrains.rd.util.reactive.ISource import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Deferred import kotlin.coroutines.CoroutineContext @@ -52,7 +53,14 @@ suspend fun ISource.nextNotNullValue(): T = } fun ISource.adviseSuspend(lifetime: Lifetime, scheduler: IScheduler, handler: suspend (T) -> Unit) { - adviseSuspend(lifetime, scheduler.asCoroutineDispatcher(allowInlining = true), handler) + val context = scheduler.asCoroutineDispatcher + advise(lifetime) { + val start = if (scheduler.isActive) CoroutineStart.UNDISPATCHED else CoroutineStart.DEFAULT + lifetime.launch(context, start) { + handler(it) + } + } + } fun ISource.adviseSuspend(lifetime: Lifetime, context: CoroutineContext, handler: suspend (T) -> Unit) { diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/util/IRdEndpointCoroutineUtil.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/util/IRdEndpointCoroutineUtil.kt index c342d931d..b44c7606a 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/util/IRdEndpointCoroutineUtil.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/util/IRdEndpointCoroutineUtil.kt @@ -5,7 +5,9 @@ import com.jetbrains.rd.framework.impl.RdEndpoint import com.jetbrains.rd.util.lifetime.Lifetime import com.jetbrains.rd.util.reactive.IScheduler import com.jetbrains.rd.util.threading.SynchronousScheduler +import com.jetbrains.rd.util.threading.coroutines.asCoroutineDispatcher import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.async fun IRdEndpoint.setSuspend( cancellationScheduler: IScheduler? = null, @@ -15,10 +17,10 @@ fun IRdEndpoint.setSuspend( // wireScheduler is not be available if RdEndpoint is not bound val coroutineDispatcher by lazy { val scheduler = handlerScheduler ?: (this as RdEndpoint).protocol?.scheduler ?: SynchronousScheduler - scheduler.asCoroutineDispatcher(allowInlining = true) + scheduler.asCoroutineDispatcher } set(cancellationScheduler, handlerScheduler) { lt, req -> - lt.startAsync(coroutineDispatcher) { handler(lt, req) }.toRdTask() + lt.coroutineScope.async(coroutineDispatcher, CoroutineStart.UNDISPATCHED) { handler(lt, req) }.toRdTask() } } \ No newline at end of file From 53544d2a133ff910b60ffcaed15eb95f31f3a29d Mon Sep 17 00:00:00 2001 From: "Ilya.Usov" Date: Fri, 25 Oct 2024 15:48:22 +0200 Subject: [PATCH 3/3] Fire signal even if we are unbound --- .../com/jetbrains/rd/framework/impl/RdSignal.kt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdSignal.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdSignal.kt index 977469216..11db787c6 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdSignal.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdSignal.kt @@ -48,15 +48,15 @@ class RdSignal(val valueSerializer: ISerializer = Polymorphic()) : RdRe val proto = protocol val ctx = serializationContext - if (proto == null || ctx == null) - return + if (proto != null && ctx != null) { + val wire = proto.wire - val wire = proto.wire - - wire.send(rdid) { buffer -> - logSend.trace { "signal `$location` ($rdid):: value = ${value.printToString()}" } - valueSerializer.write(ctx, buffer, value) + wire.send(rdid) { buffer -> + logSend.trace { "signal `$location` ($rdid):: value = ${value.printToString()}" } + valueSerializer.write(ctx, buffer, value) + } } + signal.fire(value) }