Skip to content

Commit

Permalink
Merge pull request #429 from JetBrains/Threading-usov
Browse files Browse the repository at this point in the history
Threading
  • Loading branch information
Iliya-usov authored Aug 15, 2023
2 parents 9571618 + 2c9ca7e commit 7be9f1c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.jetbrains.rd.util.reactive

import com.jetbrains.rd.util.lifetime.Lifetime
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.consumeAsFlow

class SignalFlow<T>(private val signal: ISignal<T>) : Flow<T> {

override suspend fun collect(collector: FlowCollector<T>): Nothing {
Lifetime.using { lifetime ->
val channel = Channel<T>()
signal.advise(lifetime) {
channel.trySend(it)
}

channel.consumeAsFlow().collect(collector)
}

error("Unreachable")
}
}

fun <T> ISignal<T>.asFlow() = SignalFlow(this)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.jetbrains.rd.framework.impl.InternRoot
import com.jetbrains.rd.framework.impl.ProtocolContexts
import com.jetbrains.rd.framework.impl.RdSignal
import com.jetbrains.rd.util.Sync
import com.jetbrains.rd.util.collections.SynchronizedSet
import com.jetbrains.rd.util.getLogger
import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.reactive.*
Expand Down Expand Up @@ -45,7 +46,7 @@ class Protocol internal constructor(
vararg initialContexts: RdContext<*>) : this(name, serializers, identity, scheduler, wire, lifetime, null, null, null, null, *initialContexts)

override val location: RName = RName(name)
override val outOfSyncModels: ViewableSet<RdExtBase> = ViewableSet()
override val outOfSyncModels: ViewableSet<RdExtBase> = ViewableSet(SynchronizedSet())

override val isMaster: Boolean = identity.dynamicKind == IdKind.Client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ internal abstract class ExtSchedulerBase : IScheduler {
thread
}, { thread ->
val prevThread = activeThread.getAndSet(null)
assert(prevThread == thread) { "prev thread must be $thread, but actual: $prevThread" }
// parent lifetime can be terminated from background thread
assert(prevThread != null) { "prev thread must not be null" }
})
}

Expand Down

0 comments on commit 7be9f1c

Please sign in to comment.