diff --git a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISchedulerCoroutineUtil.kt b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISchedulerCoroutineUtil.kt index 3712fba75..b866e3fc2 100644 --- a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISchedulerCoroutineUtil.kt +++ b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISchedulerCoroutineUtil.kt @@ -10,4 +10,6 @@ private class SchedulerCoroutineDispatcher(private val scheduler: IScheduler, pr } val IScheduler.asCoroutineDispatcher get() = (this as? CoroutineDispatcher) ?: asCoroutineDispatcher(false) + +@Deprecated("Use asCoroutineDispatcher that doesn't allow inlining because isDispatchNeeded()=false can lead to deadlocks") fun IScheduler.asCoroutineDispatcher(allowInlining: Boolean): CoroutineDispatcher = SchedulerCoroutineDispatcher(this, allowInlining) diff --git a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISourceCoroutineUtil.kt b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISourceCoroutineUtil.kt index 4fefceeff..f702e7858 100644 --- a/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISourceCoroutineUtil.kt +++ b/rd-kt/rd-core/src/main/kotlin/com/jetbrains/rd/util/threading/coroutines/ISourceCoroutineUtil.kt @@ -4,6 +4,7 @@ import com.jetbrains.rd.util.lifetime.Lifetime import com.jetbrains.rd.util.reactive.IScheduler import com.jetbrains.rd.util.reactive.ISource import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Deferred import kotlin.coroutines.CoroutineContext @@ -52,7 +53,14 @@ suspend fun ISource.nextNotNullValue(): T = } fun ISource.adviseSuspend(lifetime: Lifetime, scheduler: IScheduler, handler: suspend (T) -> Unit) { - adviseSuspend(lifetime, scheduler.asCoroutineDispatcher(allowInlining = true), handler) + val context = scheduler.asCoroutineDispatcher + advise(lifetime) { + val start = if (scheduler.isActive) CoroutineStart.UNDISPATCHED else CoroutineStart.DEFAULT + lifetime.launch(context, start) { + handler(it) + } + } + } fun ISource.adviseSuspend(lifetime: Lifetime, context: CoroutineContext, handler: suspend (T) -> Unit) { diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt index 56f9626f3..650fa23ee 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/MessageBroker.kt @@ -7,6 +7,7 @@ import com.jetbrains.rd.framework.impl.ProtocolContexts import com.jetbrains.rd.util.Queue import com.jetbrains.rd.util.Sync import com.jetbrains.rd.util.blockingPutUnique +import com.jetbrains.rd.util.error import com.jetbrains.rd.util.lifetime.Lifetime import com.jetbrains.rd.util.lifetime.intersect import com.jetbrains.rd.util.lifetime.isAlive @@ -87,11 +88,14 @@ class MessageBroker(queueMessages: Boolean = false) : IPrintable { return } - - AllowBindingCookie.allowBind { - val messageContext = protocol.contexts.readContext(buffer) - val helper = RdWireableDispatchHelper(entry.lifetime, id, protocol, messageContext) - entry.subscription.onWireReceived(buffer, helper) + try { + AllowBindingCookie.allowBind { + val messageContext = protocol.contexts.readContext(buffer) + val helper = RdWireableDispatchHelper(entry.lifetime, id, protocol, messageContext) + entry.subscription.onWireReceived(buffer, helper) + } + } catch (e: Throwable) { + log.error("Unexpected exception happened during processing a protocol event", e) } } diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdSignal.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdSignal.kt index 977469216..11db787c6 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdSignal.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdSignal.kt @@ -48,15 +48,15 @@ class RdSignal(val valueSerializer: ISerializer = Polymorphic()) : RdRe val proto = protocol val ctx = serializationContext - if (proto == null || ctx == null) - return + if (proto != null && ctx != null) { + val wire = proto.wire - val wire = proto.wire - - wire.send(rdid) { buffer -> - logSend.trace { "signal `$location` ($rdid):: value = ${value.printToString()}" } - valueSerializer.write(ctx, buffer, value) + wire.send(rdid) { buffer -> + logSend.trace { "signal `$location` ($rdid):: value = ${value.printToString()}" } + valueSerializer.write(ctx, buffer, value) + } } + signal.fire(value) } diff --git a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/util/IRdEndpointCoroutineUtil.kt b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/util/IRdEndpointCoroutineUtil.kt index c342d931d..b44c7606a 100644 --- a/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/util/IRdEndpointCoroutineUtil.kt +++ b/rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/util/IRdEndpointCoroutineUtil.kt @@ -5,7 +5,9 @@ import com.jetbrains.rd.framework.impl.RdEndpoint import com.jetbrains.rd.util.lifetime.Lifetime import com.jetbrains.rd.util.reactive.IScheduler import com.jetbrains.rd.util.threading.SynchronousScheduler +import com.jetbrains.rd.util.threading.coroutines.asCoroutineDispatcher import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.async fun IRdEndpoint.setSuspend( cancellationScheduler: IScheduler? = null, @@ -15,10 +17,10 @@ fun IRdEndpoint.setSuspend( // wireScheduler is not be available if RdEndpoint is not bound val coroutineDispatcher by lazy { val scheduler = handlerScheduler ?: (this as RdEndpoint).protocol?.scheduler ?: SynchronousScheduler - scheduler.asCoroutineDispatcher(allowInlining = true) + scheduler.asCoroutineDispatcher } set(cancellationScheduler, handlerScheduler) { lt, req -> - lt.startAsync(coroutineDispatcher) { handler(lt, req) }.toRdTask() + lt.coroutineScope.async(coroutineDispatcher, CoroutineStart.UNDISPATCHED) { handler(lt, req) }.toRdTask() } } \ No newline at end of file