Skip to content

Commit

Permalink
adopted to scala-native 0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
rssh committed May 6, 2024
1 parent 21eba36 commit 1704469
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 56 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ lazy val cps = crossProject(JSPlatform, JVMPlatform, NativePlatform)
libraryDependencies += ("org.scala-js" %% "scalajs-junit-test-runtime" % "1.8.0" % Test).cross(CrossVersion.for3Use2_13),
mimaFailOnNoPrevious := false
).nativeSettings(
//scalaVersion := "3.1.2",
libraryDependencies += "org.scala-native" %%% "junit-runtime" % nativeVersion % Test,
libraryDependencies += "com.github.lolgab" %%% "native-loop-core" % "0.2.1" % Test,
//libraryDependencies += "com.github.lolgab" %%% "native-loop-core" % "0.2.1" % Test,
addCompilerPlugin("org.scala-native" % "junit-plugin" % nativeVersion cross CrossVersion.full)
)

Expand Down
108 changes: 70 additions & 38 deletions native/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package cps.stream

import cps.{*,given}
import cps.{*, given}

import scala.concurrent.*
import scala.util.*
import scala.collection.mutable.Queue
import java.util.concurrent.atomic.*
import scala.collection.mutable



Expand All @@ -29,27 +30,82 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec
type ConsumerCallback = Try[SupplyEventRecord]=>Unit
type OneThreadTaskCallback = Unit => Unit


class State:
val finishRef = new AtomicReference[Try[Unit]|Null]()
val emitStart = new AtomicBoolean()
val supplyEvents = Queue[SupplyEventRecord]()
val consumerEvents = Queue[ConsumerCallback]()
private val supplyEvents = mutable.Queue[SupplyEventRecord]()
private val consumerEvents = mutable.Queue[ConsumerCallback]()

def queueEmit(v:T): F[Unit] =
private def queueEmit(v:T): F[Unit] =
val p = Promise[Unit]()
val emitted = Emitted(v, x => p.tryComplete(x) )
supplyEvents.enqueue(emitted)
asyncMonad.adoptCallbackStyle{ emitCallback =>
asyncMonad.adoptCallbackStyle{ emitCallback =>
p.future.onComplete(emitCallback)
}

def queueConsumer(): F[SupplyEventRecord] =
private def queueConsumer(): F[SupplyEventRecord] =
val p = Promise[SupplyEventRecord]()
consumerEvents.enqueue( x => p.complete(x))
asyncMonad.adoptCallbackStyle[SupplyEventRecord]{ evalCallback =>
p.future.onComplete(evalCallback)
}

private def tryDequeConsumer(): Option[ConsumerCallback] =
if (consumerEvents.isEmpty) None
else Some(consumerEvents.dequeue())

private def tryDequeSupply(): Option[SupplyEventRecord] =
if (supplyEvents.isEmpty) None
else Some(supplyEvents.dequeue())

def emit(v:T): F[Unit] = {
this.synchronized{
tryDequeConsumer() match
case Some(consumer) =>
asyncMonad.adoptCallbackStyle{ emitCallback =>
val emitted = Emitted(v, emitCallback)
consumer(Success(emitted))
}
case None =>
queueEmit(v)
}
}

def consume(): F[SupplyEventRecord] =
this.synchronized{
if (emitStart.compareAndSet(false, true)) then
asyncMonad.pure(SpawnEmitter)
else
tryDequeSupply() match
case Some(r) => asyncMonad.pure(r)
case None =>
queueConsumer()
}

def finish(r:Try[Unit]): Unit = {
this.synchronized{
finishRef.set(r)
while {
tryDequeConsumer() match
case Some(consumer) =>
consumer(Success(Finished(r)))
true
case None =>
false
} do ()
while {
tryDequeSupply() match
case Some(Emitted(_,cb)) =>
cb(Failure(new CancellationException("Stream is closed")))
true
case _ =>
false
} do ()
}
}


val unitSuccess = Success(())

Expand All @@ -58,30 +114,13 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec
) extends CpsAsyncEmitter[F,T]:


def emitAsync(v:T): F[Unit] =
if (!state.consumerEvents.isEmpty) then
val consumer = state.consumerEvents.dequeue()
asyncMonad.adoptCallbackStyle{ emitCallback =>
val emitted = Emitted(v, emitCallback)
consumer(Success(emitted))
}
else
state.queueEmit(v)
def emitAsync(v:T): F[Unit] =
state.emit(v)


def finish(r: Try[Unit]): Unit =
state.finishRef.set(r)
while(! state.consumerEvents.isEmpty ) {
val consumer = state.consumerEvents.dequeue()
consumer(Success(Finished(r)))
}
while(! state.supplyEvents.isEmpty) {
val ev = state.supplyEvents.dequeue()
ev match
case Emitted(v,cb) =>
cb(Failure(new CancellationException("Stream is closed")))
case _ =>
}

state.finish(r)

end StepsObserver

def evalAsync(f: C => CpsAsyncEmitter[F,T] => F[Unit]):F[R] =
Expand Down Expand Up @@ -115,16 +154,9 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec
case Success(_) => summon[CpsAsyncMonad[F]].pure(None)
case Failure(e) => summon[CpsAsyncMonad[F]].error(e)

def nextEvent(): F[SupplyEventRecord] =
if (state.emitStart.compareAndSet(false,true)) then
asyncMonad.pure(SpawnEmitter)
else
if !state.supplyEvents.isEmpty then
val e = state.supplyEvents.dequeue()
asyncMonad.pure(e)
else
state.queueConsumer()

def nextEvent(): F[SupplyEventRecord] =
state.consume()

val r = state.finishRef.get()
if r eq null then
asyncMonad.flatMap(nextEvent())(e => handleEvent(e))
Expand Down
4 changes: 1 addition & 3 deletions native/src/test/scala/cps/util/FutureCompleter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import scala.concurrent._
import scala.concurrent.duration._
import scala.util._

import scala.scalanative.loop._

// for use futures in jvm and native test.
object FutureCompleter
{

def apply[T](f: Future[T])(using ec:ExecutionContext): Unit =
EventLoop.run()
def apply[T](f: Future[T])(using ec:ExecutionContext): Unit =
Await.result(f, 30.seconds)

}
3 changes: 1 addition & 2 deletions native/src/test/scala/cps/util/FutureSleep.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import scala.concurrent.*
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext.Implicits.global

import scala.scalanative.loop.*


object FutureSleep:

def apply(duration: FiniteDuration):Future[Duration] = {
Timer.delay(duration).map(_ => duration)
TestTimer.delay(duration)
}

59 changes: 54 additions & 5 deletions native/src/test/scala/cps/util/TestTimer.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,74 @@
package cps.util

import scala.collection.mutable
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext.Implicits.global

import scala.scalanative.loop.*


object TestTimer:

type CancelToken = Timer

class Sleeper(val wakeTime: Long, val runnable: Runnable, @volatile var cancelled:Boolean)

type CancelToken = Sleeper

private val queue = mutable.PriorityQueue[Sleeper]()(using Ordering.by[Sleeper,Long](_.wakeTime))
@volatile var shutdownFlag = false


val sleeperThread = new Thread {
override def run(): Unit = {
val executionContext = summon[ExecutionContext]
while(!shutdownFlag) {
val now = System.currentTimeMillis()
val next = TestTimer.synchronized(queue.headOption)
next match
case Some(sleeper) =>
if (sleeper.wakeTime <= now) then
TestTimer.synchronized(queue.dequeue())
if (!sleeper.cancelled) then
executionContext.execute(sleeper.runnable)
else
val sleepTime = sleeper.wakeTime - now
TestTimer.synchronized {
TestTimer.wait(sleepTime)
}
case _ =>
val sleepTime = 1000L
TestTimer.synchronized {
TestTimer.wait(sleepTime)
}
}
}
}
sleeperThread.setDaemon(true)
sleeperThread.start()

def delay(duration: FiniteDuration):Future[Duration] = {
Timer.delay(duration).map(_ => duration)
val p = Promise[Duration]()
schedule(duration)(()=>p.success(duration))
p.future
}

def schedule(duration: FiniteDuration)(f: =>Unit): CancelToken = {
Timer.timeout(duration)(()=>f)
val wakeTime = System.currentTimeMillis() + duration.toMillis
val sleeper = Sleeper(wakeTime, () => f, false)
TestTimer.synchronized {
queue.enqueue(sleeper)
TestTimer.notify()
}
sleeper
}

def cancel(ct: CancelToken): Unit = {
ct.clear()
ct.cancelled = true
}

def shutdown(): Unit = {
shutdownFlag = true
TestTimer.synchronized {
TestTimer.notify()
}
}
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.6.3")
addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.3.1")
addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.1")
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.16.0")
addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.17")
addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.1")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.0.1")
6 changes: 4 additions & 2 deletions shared/src/test/scala/cps/stream/TestAsyncListIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class TestAsyncListIterator {

val stream: AsyncList[Future, Int] = asyncStream[AsyncList[Future, Int]] { out =>
out.emit(0)
for i <- 1 to N do out.emit(i)
for i <- 1 to N do {
out.emit(i)
}
}

val ite = stream.iterator
Expand All @@ -40,7 +42,7 @@ class TestAsyncListIterator {
assert(count == N+1)
res
}

FutureCompleter(compute)
}

Expand Down
11 changes: 8 additions & 3 deletions shared/src/test/scala/cps/stream/TestFtBasicUsage.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package cps
package cps.stream

import org.junit.{Test,Ignore}
import org.junit.Assert._

import scala.concurrent.*
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.*


import cps.*
Expand All @@ -14,10 +15,11 @@ import cps.monads.{*, given}
import cps.testconfig.given
import cps.util.FutureCompleter

class TestFbBasicUsage:
class TestFtBasicUsage:

val N = 10000


@Test def simpleLoopAsyncListFt() =

val stream = asyncStream[AsyncList[Future, Int]]{ out =>
Expand Down Expand Up @@ -52,9 +54,12 @@ class TestFbBasicUsage:
}
}
val listSum = stream.fold(0)(_ + _)
val res = listSum.failed.map(ex => assert(ex.getMessage()=="bye"))
val res = listSum.failed.map(ex =>
assert(ex.getMessage()=="bye")
)
FutureCompleter(res)


@Test def fewSmallLoopsInAsyncList() =

val M = 1000
Expand Down

0 comments on commit 1704469

Please sign in to comment.