Skip to content

Commit

Permalink
Further work on using RunnableStep (#90)
Browse files Browse the repository at this point in the history
* Incorporate instrumentation in the RunnableStep

* Working on the execution model

* Fix build issue

* Working on step tracing
  • Loading branch information
DamianReeves authored Feb 10, 2021
1 parent bbe052f commit d89088a
Show file tree
Hide file tree
Showing 22 changed files with 361 additions and 157 deletions.
3 changes: 3 additions & 0 deletions morphir/flowz/src/morphir/flowz/Flow.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package morphir.flowz

sealed abstract class Flow[-SIn, +SOut, -P, -R, +E, +A] {}
3 changes: 3 additions & 0 deletions morphir/flowz/src/morphir/flowz/FlowArgs.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package morphir.flowz

object FlowArgs {}
19 changes: 19 additions & 0 deletions morphir/flowz/src/morphir/flowz/FlowExecutor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package morphir.flowz

import zio._

abstract class FlowExecutor[+InitialState, +Params, +R <: Has[_]] {

/**
* The environment used for executing a flow.
*/
def environment: Layer[Nothing, R]

/**
* Initialize the execution of a flow given the initial environment.
* Part of initializing a flow involves constructing the initial state and parameters for that flow.
*/
def initialize: RIO[FlowInitEnv, (InitialState, Params)]
}

object FlowExecutor {}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package morphir.flowz.experimental
package morphir.flowz

import morphir.flowz.StepSuccess

final case class FlowSuccess[+S, +A](state: S, result: A)
final case class FlowSuccess[+S, +A](state: S, result: A) {}
object FlowSuccess {
def fromResult[S, A](result: StepSuccess[S, A]): FlowSuccess[S, A] =
FlowSuccess(state = result.state, result = result.result)
Expand Down
58 changes: 58 additions & 0 deletions morphir/flowz/src/morphir/flowz/RunnableStep.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package morphir.flowz
import morphir.flowz.instrumentation.InstrumentationEvent
import zio.ZIO

final case class RunnableStep[-SIn, +SOut, -Msg, -R, +E, +A](
override val label: Option[String],
underlyingStep: Step[SIn, SOut, Msg, R, E, A]
) extends Step[SIn, SOut, Msg, R with StepRuntimeEnv, E, A] {

/**
* Defines the underlying behavior of this `Step`.
*/
protected[flowz] def behavior(state: SIn, message: Msg): ZIO[R with StepRuntimeEnv, E, StepSuccess[SOut, A]] =
for {
uid <- StepExecutionId.nextExecutionId
labelResolved <- ZIO.succeed(label getOrElse "N/A")
_ <- iLog.trace(InstrumentationEvent.stepExecutionStarted(uid, labelResolved))
result <-
underlyingStep
.behavior(state, message)
.tapCause(cause => iLog.error(InstrumentationEvent.stepExecutionFailed(uid, labelResolved, cause), cause))
.tap(_ => iLog.trace(InstrumentationEvent.stepExecutionSucceeded(uid, labelResolved)))
} yield result

/**
* Runs the step.
*/
final def run(implicit ev1: Any <:< SIn, ev2: Any <:< Msg): ZIO[R with StepRuntimeEnv, E, StepSuccess[SOut, A]] =
run((), ())

/**
* Runs the step.
*/
final def run(state: SIn, message: Msg): ZIO[R with StepRuntimeEnv, E, StepSuccess[SOut, A]] =
behavior(state, message)

/**
* Runs the step.
*/
final def run(message: Msg)(implicit ev: Any <:< SIn): ZIO[R with StepRuntimeEnv, E, StepSuccess[SOut, A]] =
run((), message)

/**
* Runs the step.
*/
final def runResult(implicit ev1: Any <:< SIn, ev2: Any <:< Msg): ZIO[R with StepRuntimeEnv, E, A] =
run((), ()).map(_.result)
}

object RunnableStep {
def step[SIn, SOut, Msg, R, Err, A](
label: String
)(theStep: Step[SIn, SOut, Msg, R, Err, A]): RunnableStep[SIn, SOut, Msg, R, Err, A] =
theStep match {
case runnable @ RunnableStep(_, _) => runnable.asInstanceOf[RunnableStep[SIn, SOut, Msg, R, Err, A]]
case _ => RunnableStep(Option(label), theStep)
}
}
84 changes: 19 additions & 65 deletions morphir/flowz/src/morphir/flowz/Step.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package morphir.flowz

import morphir.flowz.instrumentation.InstrumentationEvent
import zio._

/**
Expand Down Expand Up @@ -95,12 +94,12 @@ abstract class Step[-SIn, +SOut, -Msg, -R, +E, +A] { self =>
/**
* Get this Step as an effect.
*/
protected final lazy val asEffect: ZBehavior[SIn, SOut, Msg, R, E, A] = toEffect
protected[flowz] final lazy val asEffect: ZBehavior[SIn, SOut, Msg, R, E, A] = toEffect

/**
* Defines the underlying behavior of this `Step`.
*/
protected def behavior(state: SIn, message: Msg): ZIO[R, E, StepSuccess[SOut, A]]
protected[flowz] def behavior(state: SIn, message: Msg): ZIO[R, E, StepSuccess[SOut, A]]

/**
* Returns a step whose failure and success channels have been mapped by the specified pair of
Expand Down Expand Up @@ -271,37 +270,6 @@ abstract class Step[-SIn, +SOut, -Msg, -R, +E, +A] { self =>
final def provideState(initialState: SIn): Step[Any, SOut, Msg, R, E, A] =
Step(asEffect.provideState(initialState))

/**
* Runs the step.
*/
final def run(implicit ev1: Any <:< SIn, ev2: Any <:< Msg): ZIO[R with StepRuntimeEnv, E, StepSuccess[SOut, A]] =
run((), ())

/**
* Runs the step.
*/
final def run(state: SIn, message: Msg): ZIO[R with StepRuntimeEnv, E, StepSuccess[SOut, A]] =
for {
uid <- StepUid.nextUid
labelResolved <- ZIO.succeed(label getOrElse "N/A")
_ <- iLog.trace(
InstrumentationEvent.runningStep(s"Running Step[Label=$labelResolved; Uid=$uid;]", uid, labelResolved)
)
result <- behavior(state, message)
} yield result

/**
* Runs the step.
*/
final def run(message: Msg)(implicit ev: Any <:< SIn): ZIO[R with StepRuntimeEnv, E, StepSuccess[SOut, A]] =
run((), message)

/**
* Runs the step.
*/
final def runResult(implicit ev1: Any <:< SIn, ev2: Any <:< Msg): ZIO[R with StepRuntimeEnv, E, A] =
run((), ()).map(_.result)

/**
* Returns a step that effectfully "peeks" at the success of this behavior.
*
Expand All @@ -318,7 +286,7 @@ abstract class Step[-SIn, +SOut, -Msg, -R, +E, +A] { self =>
}
)

def toEffect: ZIO[(SIn, Msg, R), E, StepSuccess[SOut, A]] = ZIO.accessM[(SIn, Msg, R)] { case (stateIn, msg, r) =>
def toEffect: ZBehavior[SIn, SOut, Msg, R, E, A] = ZIO.accessM[(SIn, Msg, R)] { case (stateIn, msg, r) =>
behavior(stateIn, msg).provide(r)
}

Expand Down Expand Up @@ -385,18 +353,8 @@ object Step extends StepArities with ZBehaviorSyntax {
*/
def fail[E](error: E): IndieStep[Nothing, E, Nothing] = Fail(error)

// def fromEffect[SIn, SOut, In, R, E, A](
// effect: ZIO[(SIn, In, R), E, StepSuccess[SOut, A]]
// )(evState: NeedsInputState[SIn], evMsg: NeedsMsg[In]): Step[SIn, SOut, In, R, E, A] = {
// val _ = (evState, evMsg) //NOTE: Suppresses the warning about these not being used
// new Step[SIn, SOut, In, R, E, A] {
// protected def behavior(state: SIn, message: In): ZIO[R, E, StepSuccess[SOut, A]] =
// effect.provideSome[R](r => (state, message, r))
// }
// }

def fromEffect[SIn, SOut, Msg, R, E, A](
effect: ZIO[(SIn, Msg, R), E, StepSuccess[SOut, A]]
effect: ZBehavior[SIn, SOut, Msg, R, E, A]
): Step[SIn, SOut, Msg, R, E, A] =
FromEffect(effect)

Expand Down Expand Up @@ -464,7 +422,8 @@ object Step extends StepArities with ZBehaviorSyntax {
*/
def stateless[In, R, E, A](f: In => ZIO[R, E, A]): StatelessStep[In, R, E, A] =
new AbstractStatelessStep[In, R, E, A] {
protected def behavior(state: Any, message: In): ZIO[R, E, StepSuccess[Any, A]] = f(message).map(state -> _)
protected[flowz] def behavior(state: Any, message: In): ZIO[R, E, StepSuccess[Any, A]] =
f(message).map(state -> _)
}

/**
Expand All @@ -486,54 +445,55 @@ object Step extends StepArities with ZBehaviorSyntax {
})

final case class Fail[E](error: E) extends IndieStep[Nothing, E, Nothing] {
protected def behavior(state: Any, message: Any): ZIO[Any, E, StepSuccess[Nothing, Nothing]] = ZIO.fail(error)
protected[flowz] def behavior(state: Any, message: Any): ZIO[Any, E, StepSuccess[Nothing, Nothing]] =
ZIO.fail(error)
}

final case class FromEffect[SIn, SOut, In, Env, E, A](
zio: ZIO[(SIn, In, Env), E, StepSuccess[SOut, A]]
) extends Step[SIn, SOut, In, Env, E, A] {
override lazy val toEffect: ZIO[(SIn, In, Env), E, StepSuccess[SOut, A]] = zio
protected def behavior(state: SIn, message: In): ZIO[Env, E, StepSuccess[SOut, A]] =
protected[flowz] def behavior(state: SIn, message: In): ZIO[Env, E, StepSuccess[SOut, A]] =
zio.provideSome[Env](env => (state, message, env))
}

final case class FromZIO[-R, +E, +A](zio: ZIO[R, E, A]) extends Step[Any, Any, Any, R, E, A] {
protected def behavior(state: Any, message: Any): ZIO[R, E, StepSuccess[Any, A]] =
protected[flowz] def behavior(state: Any, message: Any): ZIO[R, E, StepSuccess[Any, A]] =
zio.map(a => StepSuccess(state = state, result = a))
}

final case class MessageHandler[-In, +Err, +A](private val zio: ZIO[In, Err, A])
extends AbstractStatelessStep[In, Any, Err, A] {

protected def behavior(state: Any, message: In): ZIO[Any, Err, StepSuccess[Any, A]] =
protected[flowz] def behavior(state: Any, message: In): ZIO[Any, Err, StepSuccess[Any, A]] =
zio.map(result => StepSuccess(state = state, result = result)).provide(message)

}

final case class Modify[-S1, +S2, +A](func: S1 => (S2, A)) extends Step[S1, S2, Any, Any, Nothing, A] {

protected def behavior(state: S1, message: Any): ZIO[Any, Nothing, StepSuccess[S2, A]] =
protected[flowz] def behavior(state: S1, message: Any): ZIO[Any, Nothing, StepSuccess[S2, A]] =
ZIO.effectTotal(func(state))

}

final case class SetOutputs[S, A](newState: S, value: A) extends IndieStep[S, Nothing, A] {
protected def behavior(state: Any, message: Any): ZIO[Any, Nothing, StepSuccess[S, A]] =
protected[flowz] def behavior(state: Any, message: Any): ZIO[Any, Nothing, StepSuccess[S, A]] =
ZIO.succeed((newState, value))
}

final case class Succeed[+A](value: A) extends Step[Any, Any, Any, Any, Nothing, A] {
protected def behavior(state: Any, message: Any): ZIO[Any, Nothing, StepSuccess[Any, A]] =
protected[flowz] def behavior(state: Any, message: Any): ZIO[Any, Nothing, StepSuccess[Any, A]] =
ZIO.succeed(StepSuccess(state, value))
}

/**
* Represents a stateful behavior that is constructed from an effect.
* Represents a stateful step that is constructed from an effect.
*/
final case class Stateful[S, StateOut, In, R, E, A](
private val effect: ZBehavior[S, StateOut, In, R, E, A]
) extends Step[S, StateOut, In, R, E, A] {
protected def behavior(state: S, message: In): ZIO[R, E, StepSuccess[StateOut, A]] =
protected[flowz] def behavior(state: S, message: In): ZIO[R, E, StepSuccess[StateOut, A]] =
effect.provideSome[R](r => (state, message, r))
}

Expand All @@ -542,7 +502,7 @@ object Step extends StepArities with ZBehaviorSyntax {
*/
final case class Stateless[In, R, E, A](private val effect: ZIO[(In, R), E, A])
extends AbstractStatelessStep[In, R, E, A] {
protected def behavior(state: Any, message: In): ZIO[R, E, StepSuccess[Any, A]] =
protected[flowz] def behavior(state: Any, message: In): ZIO[R, E, StepSuccess[Any, A]] =
effect.map(value => StepSuccess(state, value)).provideSome[R](r => (message, r))
}

Expand Down Expand Up @@ -575,7 +535,7 @@ object Step extends StepArities with ZBehaviorSyntax {
/**
* Defines the underlying behavior of this `Step`.
*/
protected def behavior(state: SIn, message: Msg): ZIO[R, E, StepSuccess[SOut, A]] =
protected[flowz] def behavior(state: SIn, message: Msg): ZIO[R, E, StepSuccess[SOut, A]] =
underlyingStep.behavior(state, message)
}
}
Expand All @@ -585,14 +545,8 @@ object stepExample extends App {
import zio.logging.LogLevel
import morphir.flowz.instrumentation.InstrumentationLogging

def defineStep[SIn, SOut, Msg, R, E, A](label: String)(
theStep: Step[SIn, SOut, Msg, R, E, A]
): Step[SIn, SOut, Msg, R with StepRuntimeEnv, E, A] =
theStep.withLabel(label)
//TODO: This is where you could do something like add an Aspect to the step

override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
val step1 = defineStep("Say Hi")(Step.accessServiceM[Console.Service](_.putStrLn("Hi")))
val step1 = step("Say Hi")(Step.accessServiceM[Console.Service](_.putStrLn("Hi")))

step1.run.exitCode.provideCustomLayer(
StepUidGenerator.live ++ InstrumentationLogging.console(logLevel = LogLevel.Trace)
Expand Down
18 changes: 9 additions & 9 deletions morphir/flowz/src/morphir/flowz/StepArities.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import zio.logging.LogLevel
trait StepArities {

final def mapN[SIn, SA, SB, Msg, R, E, A, B, SOut, Result, MA, MB](
behaviorA: Step[SIn, SA, Msg, R, E, A],
behaviorB: Step[SIn, SB, Msg, R, E, B]
stepA: Step[SIn, SA, Msg, R, E, A],
stepB: Step[SIn, SB, Msg, R, E, B]
)(
f: (StepSuccess[SA, A], StepSuccess[SB, B]) => StepSuccess[SOut, Result]
): Step[SIn, SOut, Msg, R, E, Result] =
behaviorA.zipWith(behaviorB)(f)
stepA.zipWith(stepB)(f)

final def mapParN[SIn, SA, SB, Msg, R, E, A, B, SOut, Result](
behaviorA: Step[SIn, SA, Msg, R, E, A],
Expand Down Expand Up @@ -425,15 +425,15 @@ object mapNExamples extends zio.App {
import morphir.flowz.instrumentation.InstrumentationLogging

def run(args: List[String]): URIO[ZEnv, ExitCode] = {
val stepA = Step.set("SA").as('A')
val stepB = Step.set(List("SA")).as("B")
val finalStep = Step.mapN(stepA, stepB) { case (a, b) =>
val stepA = step("step A")(Step.set("SA").as('A'))
val stepB = step("step B")(Step.set(List("SA")).as("B"))
val finalStep = step("final-1")(Step.mapN(stepA, stepB) { case (a, b) =>
StepSuccess(state = (a.state, b.state), result = (a.result, b.result))
}
})

val finalStepAlt = Step.mapN(stepA, stepB) { case (a, b) =>
val finalStepAlt = step("final-2")(Step.mapN(stepA, stepB) { case (a, b) =>
((a.state, b.state), (a.result, b.result))
}
})

(
(finalStep.run.tap(res => console.putStrLn(s"Result Orig: $res")) *>
Expand Down
28 changes: 28 additions & 0 deletions morphir/flowz/src/morphir/flowz/StepAspect.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package morphir.flowz

trait StepAspect[-SIn, +SOut, -Msg, -R, +E, +A] {
def apply[SIn1 <: SIn, SOut1 >: SOut, Msg1 <: Msg, R1 <: R, E1 >: E, A1 >: A](
step: Step[SIn1, SOut1, Msg1, R1, E1, A1]
): Step[SIn1, SOut1, Msg1, R1, E1, A1]
}

object StepAspect {
// val traced: StepAspect[Any, Nothing, Any, InstrumentationLogging with StepUidGenerator, Nothing, Nothing] =
// new StepAspect[Any, Nothing, Any, InstrumentationLogging, Nothing, Nothing] {
// def apply[
// SIn1 <: Any,
// SOut1 >: Nothing,
// Msg1 <: Any,
// R1 <: InstrumentationLogging with StepUidGenerator,
// E1 >: Nothing,
// A1 >: Nothing
// ](
// step: RunnableStep[SIn1, SOut1, Msg1, R1, E1, A1]
// ): RunnableStep[SIn1, SOut1, Msg1, R1, E1, A1] =
// Step(for {
//
// _ <- iLog.trace(InstrumentationEvent.stepExecutionStarted())
// res <- step.asEffect
// } yield res)
// }
}
8 changes: 8 additions & 0 deletions morphir/flowz/src/morphir/flowz/StepExecutionId.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package morphir.flowz

import zio.URIO

object StepExecutionId {
def nextExecutionId: URIO[StepUidGenerator, StepExecutionId] =
uidGenerator.nextUid[Step.type]
}
8 changes: 0 additions & 8 deletions morphir/flowz/src/morphir/flowz/StepUid.scala

This file was deleted.

Loading

0 comments on commit d89088a

Please sign in to comment.