From 1704469b715a985dbacbd17f8947bf5720ccc460 Mon Sep 17 00:00:00 2001 From: Ruslan Shevchenko Date: Mon, 6 May 2024 16:06:26 +0300 Subject: [PATCH] adopted to scala-native 0.5 --- build.sbt | 3 +- .../BaseUnfoldCpsAsyncEmitAbsorber.scala | 108 ++++++++++++------ .../test/scala/cps/util/FutureCompleter.scala | 4 +- .../src/test/scala/cps/util/FutureSleep.scala | 3 +- .../src/test/scala/cps/util/TestTimer.scala | 59 +++++++++- project/plugins.sbt | 2 +- .../cps/stream/TestAsyncListIterator.scala | 6 +- .../scala/cps/stream/TestFtBasicUsage.scala | 11 +- 8 files changed, 140 insertions(+), 56 deletions(-) diff --git a/build.sbt b/build.sbt index 2bfd5f625..416c87d01 100644 --- a/build.sbt +++ b/build.sbt @@ -61,9 +61,8 @@ lazy val cps = crossProject(JSPlatform, JVMPlatform, NativePlatform) libraryDependencies += ("org.scala-js" %% "scalajs-junit-test-runtime" % "1.8.0" % Test).cross(CrossVersion.for3Use2_13), mimaFailOnNoPrevious := false ).nativeSettings( - //scalaVersion := "3.1.2", libraryDependencies += "org.scala-native" %%% "junit-runtime" % nativeVersion % Test, - libraryDependencies += "com.github.lolgab" %%% "native-loop-core" % "0.2.1" % Test, + //libraryDependencies += "com.github.lolgab" %%% "native-loop-core" % "0.2.1" % Test, addCompilerPlugin("org.scala-native" % "junit-plugin" % nativeVersion cross CrossVersion.full) ) diff --git a/native/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala b/native/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala index b8125e36e..6115c58b1 100644 --- a/native/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala +++ b/native/src/main/scala/cps/stream/BaseUnfoldCpsAsyncEmitAbsorber.scala @@ -1,11 +1,12 @@ package cps.stream -import cps.{*,given} +import cps.{*, given} import scala.concurrent.* import scala.util.* import scala.collection.mutable.Queue import java.util.concurrent.atomic.* +import scala.collection.mutable @@ -29,27 +30,82 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec type ConsumerCallback = Try[SupplyEventRecord]=>Unit type OneThreadTaskCallback = Unit => Unit + class State: val finishRef = new AtomicReference[Try[Unit]|Null]() val emitStart = new AtomicBoolean() - val supplyEvents = Queue[SupplyEventRecord]() - val consumerEvents = Queue[ConsumerCallback]() + private val supplyEvents = mutable.Queue[SupplyEventRecord]() + private val consumerEvents = mutable.Queue[ConsumerCallback]() - def queueEmit(v:T): F[Unit] = + private def queueEmit(v:T): F[Unit] = val p = Promise[Unit]() val emitted = Emitted(v, x => p.tryComplete(x) ) supplyEvents.enqueue(emitted) - asyncMonad.adoptCallbackStyle{ emitCallback => + asyncMonad.adoptCallbackStyle{ emitCallback => p.future.onComplete(emitCallback) } - def queueConsumer(): F[SupplyEventRecord] = + private def queueConsumer(): F[SupplyEventRecord] = val p = Promise[SupplyEventRecord]() consumerEvents.enqueue( x => p.complete(x)) asyncMonad.adoptCallbackStyle[SupplyEventRecord]{ evalCallback => p.future.onComplete(evalCallback) } + private def tryDequeConsumer(): Option[ConsumerCallback] = + if (consumerEvents.isEmpty) None + else Some(consumerEvents.dequeue()) + + private def tryDequeSupply(): Option[SupplyEventRecord] = + if (supplyEvents.isEmpty) None + else Some(supplyEvents.dequeue()) + + def emit(v:T): F[Unit] = { + this.synchronized{ + tryDequeConsumer() match + case Some(consumer) => + asyncMonad.adoptCallbackStyle{ emitCallback => + val emitted = Emitted(v, emitCallback) + consumer(Success(emitted)) + } + case None => + queueEmit(v) + } + } + + def consume(): F[SupplyEventRecord] = + this.synchronized{ + if (emitStart.compareAndSet(false, true)) then + asyncMonad.pure(SpawnEmitter) + else + tryDequeSupply() match + case Some(r) => asyncMonad.pure(r) + case None => + queueConsumer() + } + + def finish(r:Try[Unit]): Unit = { + this.synchronized{ + finishRef.set(r) + while { + tryDequeConsumer() match + case Some(consumer) => + consumer(Success(Finished(r))) + true + case None => + false + } do () + while { + tryDequeSupply() match + case Some(Emitted(_,cb)) => + cb(Failure(new CancellationException("Stream is closed"))) + true + case _ => + false + } do () + } + } + val unitSuccess = Success(()) @@ -58,30 +114,13 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec ) extends CpsAsyncEmitter[F,T]: - def emitAsync(v:T): F[Unit] = - if (!state.consumerEvents.isEmpty) then - val consumer = state.consumerEvents.dequeue() - asyncMonad.adoptCallbackStyle{ emitCallback => - val emitted = Emitted(v, emitCallback) - consumer(Success(emitted)) - } - else - state.queueEmit(v) + def emitAsync(v:T): F[Unit] = + state.emit(v) + def finish(r: Try[Unit]): Unit = - state.finishRef.set(r) - while(! state.consumerEvents.isEmpty ) { - val consumer = state.consumerEvents.dequeue() - consumer(Success(Finished(r))) - } - while(! state.supplyEvents.isEmpty) { - val ev = state.supplyEvents.dequeue() - ev match - case Emitted(v,cb) => - cb(Failure(new CancellationException("Stream is closed"))) - case _ => - } - + state.finish(r) + end StepsObserver def evalAsync(f: C => CpsAsyncEmitter[F,T] => F[Unit]):F[R] = @@ -115,16 +154,9 @@ trait BaseUnfoldCpsAsyncEmitAbsorber[R,F[_],C <: CpsMonadContext[F], T](using ec case Success(_) => summon[CpsAsyncMonad[F]].pure(None) case Failure(e) => summon[CpsAsyncMonad[F]].error(e) - def nextEvent(): F[SupplyEventRecord] = - if (state.emitStart.compareAndSet(false,true)) then - asyncMonad.pure(SpawnEmitter) - else - if !state.supplyEvents.isEmpty then - val e = state.supplyEvents.dequeue() - asyncMonad.pure(e) - else - state.queueConsumer() - + def nextEvent(): F[SupplyEventRecord] = + state.consume() + val r = state.finishRef.get() if r eq null then asyncMonad.flatMap(nextEvent())(e => handleEvent(e)) diff --git a/native/src/test/scala/cps/util/FutureCompleter.scala b/native/src/test/scala/cps/util/FutureCompleter.scala index 72ffe7591..5336826cb 100644 --- a/native/src/test/scala/cps/util/FutureCompleter.scala +++ b/native/src/test/scala/cps/util/FutureCompleter.scala @@ -4,14 +4,12 @@ import scala.concurrent._ import scala.concurrent.duration._ import scala.util._ -import scala.scalanative.loop._ // for use futures in jvm and native test. object FutureCompleter { - def apply[T](f: Future[T])(using ec:ExecutionContext): Unit = - EventLoop.run() + def apply[T](f: Future[T])(using ec:ExecutionContext): Unit = Await.result(f, 30.seconds) } diff --git a/native/src/test/scala/cps/util/FutureSleep.scala b/native/src/test/scala/cps/util/FutureSleep.scala index 953244c7a..da7f0bab2 100644 --- a/native/src/test/scala/cps/util/FutureSleep.scala +++ b/native/src/test/scala/cps/util/FutureSleep.scala @@ -4,12 +4,11 @@ import scala.concurrent.* import scala.concurrent.duration.* import scala.concurrent.ExecutionContext.Implicits.global -import scala.scalanative.loop.* object FutureSleep: def apply(duration: FiniteDuration):Future[Duration] = { - Timer.delay(duration).map(_ => duration) + TestTimer.delay(duration) } diff --git a/native/src/test/scala/cps/util/TestTimer.scala b/native/src/test/scala/cps/util/TestTimer.scala index f1c3b73a2..5fa24d22d 100644 --- a/native/src/test/scala/cps/util/TestTimer.scala +++ b/native/src/test/scala/cps/util/TestTimer.scala @@ -1,25 +1,74 @@ package cps.util +import scala.collection.mutable import scala.concurrent.* import scala.concurrent.duration.* import scala.concurrent.ExecutionContext.Implicits.global -import scala.scalanative.loop.* object TestTimer: - type CancelToken = Timer + + class Sleeper(val wakeTime: Long, val runnable: Runnable, @volatile var cancelled:Boolean) + + type CancelToken = Sleeper + + private val queue = mutable.PriorityQueue[Sleeper]()(using Ordering.by[Sleeper,Long](_.wakeTime)) + @volatile var shutdownFlag = false + + + val sleeperThread = new Thread { + override def run(): Unit = { + val executionContext = summon[ExecutionContext] + while(!shutdownFlag) { + val now = System.currentTimeMillis() + val next = TestTimer.synchronized(queue.headOption) + next match + case Some(sleeper) => + if (sleeper.wakeTime <= now) then + TestTimer.synchronized(queue.dequeue()) + if (!sleeper.cancelled) then + executionContext.execute(sleeper.runnable) + else + val sleepTime = sleeper.wakeTime - now + TestTimer.synchronized { + TestTimer.wait(sleepTime) + } + case _ => + val sleepTime = 1000L + TestTimer.synchronized { + TestTimer.wait(sleepTime) + } + } + } + } + sleeperThread.setDaemon(true) + sleeperThread.start() def delay(duration: FiniteDuration):Future[Duration] = { - Timer.delay(duration).map(_ => duration) + val p = Promise[Duration]() + schedule(duration)(()=>p.success(duration)) + p.future } def schedule(duration: FiniteDuration)(f: =>Unit): CancelToken = { - Timer.timeout(duration)(()=>f) + val wakeTime = System.currentTimeMillis() + duration.toMillis + val sleeper = Sleeper(wakeTime, () => f, false) + TestTimer.synchronized { + queue.enqueue(sleeper) + TestTimer.notify() + } + sleeper } def cancel(ct: CancelToken): Unit = { - ct.clear() + ct.cancelled = true } + def shutdown(): Unit = { + shutdownFlag = true + TestTimer.synchronized { + TestTimer.notify() + } + } diff --git a/project/plugins.sbt b/project/plugins.sbt index e4f3b171a..bf3c37089 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,5 +4,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.6.3") addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.3.1") addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.1") addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.16.0") -addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.17") +addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.1") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.0.1") diff --git a/shared/src/test/scala/cps/stream/TestAsyncListIterator.scala b/shared/src/test/scala/cps/stream/TestAsyncListIterator.scala index e94d75b12..ea6b1b505 100644 --- a/shared/src/test/scala/cps/stream/TestAsyncListIterator.scala +++ b/shared/src/test/scala/cps/stream/TestAsyncListIterator.scala @@ -23,7 +23,9 @@ class TestAsyncListIterator { val stream: AsyncList[Future, Int] = asyncStream[AsyncList[Future, Int]] { out => out.emit(0) - for i <- 1 to N do out.emit(i) + for i <- 1 to N do { + out.emit(i) + } } val ite = stream.iterator @@ -40,7 +42,7 @@ class TestAsyncListIterator { assert(count == N+1) res } - + FutureCompleter(compute) } diff --git a/shared/src/test/scala/cps/stream/TestFtBasicUsage.scala b/shared/src/test/scala/cps/stream/TestFtBasicUsage.scala index 51c9e78ea..62c3876cf 100644 --- a/shared/src/test/scala/cps/stream/TestFtBasicUsage.scala +++ b/shared/src/test/scala/cps/stream/TestFtBasicUsage.scala @@ -1,10 +1,11 @@ -package cps +package cps.stream import org.junit.{Test,Ignore} import org.junit.Assert._ import scala.concurrent.* import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.* import cps.* @@ -14,10 +15,11 @@ import cps.monads.{*, given} import cps.testconfig.given import cps.util.FutureCompleter -class TestFbBasicUsage: +class TestFtBasicUsage: val N = 10000 + @Test def simpleLoopAsyncListFt() = val stream = asyncStream[AsyncList[Future, Int]]{ out => @@ -52,9 +54,12 @@ class TestFbBasicUsage: } } val listSum = stream.fold(0)(_ + _) - val res = listSum.failed.map(ex => assert(ex.getMessage()=="bye")) + val res = listSum.failed.map(ex => + assert(ex.getMessage()=="bye") + ) FutureCompleter(res) + @Test def fewSmallLoopsInAsyncList() = val M = 1000