Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish with selector #189

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
27 changes: 27 additions & 0 deletions api/FlowExt.api
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,18 @@ public final class com/hoc081098/flowext/PairwiseKt {
public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}

public abstract interface annotation class com/hoc081098/flowext/PublishSelectorDsl : java/lang/annotation/Annotation {
}

public abstract interface annotation class com/hoc081098/flowext/PublishSelectorSharedFlowDsl : java/lang/annotation/Annotation {
}

public final class com/hoc081098/flowext/PublishWithSelectorKt {
public static final fun main (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun main ([Ljava/lang/String;)V
public static final fun publish (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
}

public final class com/hoc081098/flowext/RaceKt {
public static final fun amb (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun amb (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -243,6 +255,21 @@ public final class com/hoc081098/flowext/ScanWithKt {
public static final fun scanWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}

public abstract interface class com/hoc081098/flowext/SelectorScope {
public abstract fun select (Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}

public abstract interface class com/hoc081098/flowext/SelectorSharedFlowScope {
public abstract fun shareIn (Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/SharedFlow;
public abstract fun shareIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lkotlinx/coroutines/flow/SharedFlow;
}

public final class com/hoc081098/flowext/SelectorSharedFlowScope$DefaultImpls {
public static fun shareIn (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;I)Lkotlinx/coroutines/flow/SharedFlow;
public static synthetic fun shareIn$default (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;ILkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow;
public static synthetic fun shareIn$default (Lcom/hoc081098/flowext/SelectorSharedFlowScope;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;IILjava/lang/Object;)Lkotlinx/coroutines/flow/SharedFlow;
}

public final class com/hoc081098/flowext/SelectorsKt {
public static final fun select (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun select (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function6;)Lkotlinx/coroutines/flow/Flow;
Expand Down
47 changes: 47 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/internal/AtomicRef.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,50 @@ internal expect class AtomicRef<T>(initialValue: T) {

fun compareAndSet(expect: T, update: T): Boolean
}

// Copy from: https://github.com/arrow-kt/arrow/blob/6fb6a75b131f5bbb272611bf277e263ff791cb67/arrow-libs/core/arrow-atomic/src/commonMain/kotlin/arrow/atomic/Atomic.kt#L44

/**
* Infinite loop that reads this atomic variable and performs the specified [action] on its value.
*/
internal inline fun <T> AtomicRef<T>.loop(action: (T) -> Unit): Nothing {
while (true) {
action(value)
}
}

internal fun <T> AtomicRef<T>.tryUpdate(function: (T) -> T): Boolean {
val cur = value
val upd = function(cur)
return compareAndSet(cur, upd)
}

internal inline fun <T> AtomicRef<T>.update(function: (T) -> T) {
while (true) {
val cur = value
val upd = function(cur)
if (compareAndSet(cur, upd)) return
}
}

/**
* Updates variable atomically using the specified [function] of its value and returns its old value.
*/
internal inline fun <T> AtomicRef<T>.getAndUpdate(function: (T) -> T): T {
while (true) {
val cur = value
val upd = function(cur)
if (compareAndSet(cur, upd)) return cur
}
}

/**
* Updates variable atomically using the specified [function] of its value and returns its new value.
*/
internal inline fun <T> AtomicRef<T>.updateAndGet(function: (T) -> T): T {
while (true) {
val cur = value
val upd = function(cur)
if (compareAndSet(cur, upd)) return upd
}
}
61 changes: 61 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/internal/SimpleLazy.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* MIT License
*
* Copyright (c) 2021-2023 Petrus Nguyễn Thái Học
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

@file:Suppress("ktlint:standard:property-naming")

package com.hoc081098.flowext.internal

import kotlin.concurrent.Volatile
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.internal.SynchronizedObject
import kotlinx.coroutines.internal.synchronized

// TODO: Remove SynchronizedObject
@OptIn(InternalCoroutinesApi::class)
internal class SimpleLazy<T : Any>(
initializer: () -> T,
) : SynchronizedObject() {
private var _initializer: (() -> T)? = initializer

@Volatile
private var value: T? = null

fun getValue(): T =
value ?: synchronized(this) {
value ?: _initializer!!().also {
_initializer = null
value = it
}
}

fun getOrNull(): T? = value

fun clear() {
_initializer = null
value = null
}
}

internal fun <T : Any> simpleLazyOf(initializer: () -> T): SimpleLazy<T> =
SimpleLazy(initializer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* MIT License
*
* Copyright (c) 2021-2023 Petrus Nguyễn Thái Học
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

@file:Suppress("ktlint:standard:property-naming")

package com.hoc081098.flowext.internal

import kotlin.concurrent.Volatile
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal class SimpleSuspendLazy<T : Any>(
initializer: suspend () -> T,
) {
private val mutex = Mutex()

@Volatile
private var _initializer: (suspend () -> T)? = initializer

@Volatile
private var value: T? = null

suspend fun getValue(): T =
value ?: mutex.withLock {
value ?: _initializer!!().also {
_initializer = null
value = it
}
}

fun clear() {
_initializer = null
value = null
}
}

internal fun <T : Any> simpleSuspendLazy(initializer: suspend () -> T): SimpleSuspendLazy<T> =
SimpleSuspendLazy(initializer)
Loading
Loading