Skip to content

Commit

Permalink
New flowz model that incorporates Behaviors, Steps, and Flowz (#87)
Browse files Browse the repository at this point in the history
* Step changes

* Try and get Steps to respect the passed in state

* Rename Step to Stage to start new effort

* More renaming

* Shuffling things so we can work on new design of Step and Stage

* Towards recursion schemes

* Embracing the use of Behaviors

* Trying to expand on Behavior type

* Behavior related updates

* Continue building out Behavior

* Continued build out of the Behavior type

* Refining behavior and flow usage

* Refining the definition of behavior

* Refining the definition of behavior

* Clear out quite a bit of the prior art

* Add some docs and a comment about the changing API

* working on flow execution:

* Setup the structure for flow execution

* Flow execution and Instrumentation

* More work on instrumentation and properties

* Prepping to change the definition of a Behavior

* expanding behavior

* Add more operators and constructors to Behavior

* Support up to an arity of 8 for mapN and mapParN

* Change Behavior to be Step...

* Change Behavior to be Step...

* Additional renames

* Add the ability to instrument tests

* Move experimental and not fully baked items into the experimental package
  • Loading branch information
DamianReeves authored Feb 9, 2021
1 parent df6fc33 commit bbe052f
Show file tree
Hide file tree
Showing 78 changed files with 2,673 additions and 1,772 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
- name: Checks
run: |
git config --global user.name "CI"
# ./mill all __.checkFormat __.docJar __.test
./mill all __.checkFormat __.docJar __.test
# ./mill all __.checkFormat "__.fix --check" __.docJar __.test

- name: Status Check
Expand Down
2 changes: 1 addition & 1 deletion .mill-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.3
0.9.5
126 changes: 66 additions & 60 deletions build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,29 @@ object Deps {

val silencer = "1.7.1"
val scalaCollectionsCompat = "2.3.1"
val zio = "1.0.4"
val zio = "1.0.4-2"
val zioConfig = "1.0.0-RC32"
val zioLogging = "0.5.5"
val zioLogging = "0.5.6"
val zioMagic = "0.1.8"
val zioNio = "1.0.0-RC10"
val zioPrelude = "1.0.0-RC1"
val zioProcess = "0.2.0"
val newtype = "0.4.4"
def decline(scalaVersion:String) = scalaVersion match {
def decline(scalaVersion: String) = scalaVersion match {
case version if version.startsWith("2.11") => "1.2.0"
case _ => "1.3.0"
case _ => "1.3.0"
}
val pprint = "0.5.9"
val scalameta = "4.3.18"
val directories = "11"
val enumeratum = "1.6.1"
val macroParadise = "2.1.1"
val upickle = "1.1.0"
val slf4zio = "1.0.0"
val scalactic = "3.1.2"
val scalaUri = "2.2.2"
val oslib = "0.6.2"
val quill = "3.6.0-RC3"
val pprint = "0.5.9"
val scalameta = "4.3.18"
val directories = "11"
val enumeratum = "1.6.1"
val macroParadise = "2.1.1"
val upickle = "1.1.0"
val slf4zio = "1.0.0"
val scalactic = "3.1.2"
val scalaUri = "2.2.2"
val oslib = "0.6.2"
val quill = "3.6.0-RC3"
}
}

Expand All @@ -66,13 +67,13 @@ trait MorphirScalaModule extends ScalaModule with TpolecatModule { self =>
trait MorphirScalafixModule extends ScalafixModule

trait MorphirPublishModule extends GitVersionedPublishModule {
def packageDescription = T(artifactName())
def pomSettings = PomSettings(
def packageDescription = T(artifactName())
def pomSettings = PomSettings(
description = packageDescription(),
organization = "org.morphir",
url = "https://github.com/MorganStanley/morphir-jvm",
url = "https://github.com/finos/morphir-jvm",
licenses = Seq(License.`Apache-2.0`),
versionControl = VersionControl.github("MorganStanley", "morphir-jvm"),
versionControl = VersionControl.github("finos", "morphir-jvm"),
developers = Seq(
Developer(
"DamianReeves",
Expand Down Expand Up @@ -136,8 +137,8 @@ trait CommonJvmModule extends MorphirCommonModule {
trait CommonJsModule extends MorphirCommonModule with ScalaJSModule {
def platformSegment = "js"
def crossScalaJSVersion: String
def scalaJSVersion = crossScalaJSVersion
def millSourcePath = super.millSourcePath / os.up / os.up
def scalaJSVersion = crossScalaJSVersion
def millSourcePath = super.millSourcePath / os.up / os.up
trait Tests extends super.Tests with MorphirTestModule {
def platformSegment = "js"
def scalaJSVersion = crossScalaJSVersion
Expand All @@ -160,7 +161,7 @@ trait MorphirTestModule extends MorphirScalaModule with TestModule {
Seq("zio.test.sbt.ZTestFramework")

def offset: os.RelPath = os.rel
def sources = T.sources(
def sources = T.sources(
super
.sources()
.++(
Expand Down Expand Up @@ -202,7 +203,7 @@ object morphir extends Module {
/*with MorphirScalafixModule*/ { self =>

def artifactName = "morphir-ir"
def ivyDeps = Agg(
def ivyDeps = Agg(
ivy"dev.zio::zio:${Versions.zio}",
ivy"dev.zio::zio-streams:${Versions.zio}",
ivy"com.lihaoyi::upickle:${Versions.upickle}",
Expand Down Expand Up @@ -276,10 +277,10 @@ object morphir extends Module {
with CommonJvmModule
with MorphirPublishModule { self =>

def artifactName = "morphir-flowz"
def artifactName = "morphir-flowz"
def scalacPluginIvyDeps = Agg(ivy"com.github.ghik:::silencer-plugin:${Versions.silencer}")
def compileIvyDeps = Agg(ivy"com.github.ghik:::silencer-lib:${Versions.silencer}")
def ivyDeps = Agg(
def ivyDeps = Agg(
ivy"org.scala-lang.modules::scala-collection-compat:${Versions.scalaCollectionsCompat}",
ivy"com.github.mlangc:slf4zio_2.11:${Versions.slf4zio}",
ivy"dev.zio::zio:${Versions.zio}",
Expand All @@ -295,43 +296,48 @@ object morphir extends Module {
object test extends Tests {
def platformSegment: String = self.platformSegment
def crossScalaVersion = JvmMorphirFlowz.this.crossScalaVersion
override def ivyDeps = super.ivyDeps() ++ Agg(ivy"com.monovore::decline:${Versions.decline(crossScalaVersion)}")
}
}

object spark extends Module {
object jvm
extends Cross[JvmMorphirFlowzSpark](
Versions.scala212,
Versions.scala211
)
class JvmMorphirFlowzSpark(val crossScalaVersion: String)
extends CrossScalaModule
with CommonJvmModule
with MorphirPublishModule { self =>

def artifactName = "morphir-flowz-spark"
def moduleDeps = Seq(morphir.flowz.jvm(crossScalaVersion))

def scalacPluginIvyDeps = Agg(ivy"com.github.ghik:::silencer-plugin:${Versions.silencer}")
def compileIvyDeps = Agg(ivy"com.github.ghik:::silencer-lib:${Versions.silencer}")
def ivyDeps = Agg(
ivy"org.scala-lang.modules::scala-collection-compat:${Versions.scalaCollectionsCompat}",
ivy"com.github.mlangc:slf4zio_2.11:${Versions.slf4zio}",
ivy"io.getquill::quill-spark:${Versions.quill}",
ivy"dev.zio::zio:${Versions.zio}",
ivy"dev.zio::zio-streams:${Versions.zio}",
ivy"dev.zio::zio-prelude:${Versions.zioPrelude}"
override def ivyDeps = super.ivyDeps() ++ Agg(
ivy"com.monovore::decline:${Versions.decline(crossScalaVersion)}",
ivy"io.github.kitlangton::zio-magic:${Versions.zioMagic}"
)

object test extends Tests {
def platformSegment: String = self.platformSegment
def crossScalaVersion = JvmMorphirFlowzSpark.this.crossScalaVersion
override def ivyDeps = super.ivyDeps() ++
Agg(ivy"dev.zio::zio-logging:${Versions.zioLogging}",
ivy"dev.zio::zio-logging-slf4j:${Versions.zioLogging}")
}
}
}

// object spark extends Module {
// object jvm
// extends Cross[JvmMorphirFlowzSpark](
// Versions.scala212,
// Versions.scala211
// )
// class JvmMorphirFlowzSpark(val crossScalaVersion: String)
// extends CrossScalaModule
// with CommonJvmModule
// with MorphirPublishModule { self =>
//
// def artifactName = "morphir-flowz-spark"
// def moduleDeps = Seq(morphir.flowz.jvm(crossScalaVersion))
//
// def scalacPluginIvyDeps = Agg(ivy"com.github.ghik:::silencer-plugin:${Versions.silencer}")
// def compileIvyDeps = Agg(ivy"com.github.ghik:::silencer-lib:${Versions.silencer}")
// def ivyDeps = Agg(
// ivy"org.scala-lang.modules::scala-collection-compat:${Versions.scalaCollectionsCompat}",
// ivy"com.github.mlangc:slf4zio_2.11:${Versions.slf4zio}",
// ivy"io.getquill::quill-spark:${Versions.quill}",
// ivy"dev.zio::zio:${Versions.zio}",
// ivy"dev.zio::zio-streams:${Versions.zio}",
// ivy"dev.zio::zio-prelude:${Versions.zioPrelude}"
// )
//
// object test extends Tests {
// def platformSegment: String = self.platformSegment
// def crossScalaVersion = JvmMorphirFlowzSpark.this.crossScalaVersion
// override def ivyDeps = super.ivyDeps() ++
// Agg(
// ivy"dev.zio::zio-logging:${Versions.zioLogging}",
// ivy"dev.zio::zio-logging-slf4j:${Versions.zioLogging}"
// )
// }
// }
// }
}
}
2 changes: 2 additions & 0 deletions morphir/flowz/ReadMe.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Flowz

## Under Construction - API is changing

The Flowz module in morphir, provides a library for functionally composing small units of executable content.

At its core flowz is about composing individual `Step`s into a workflow. A `Step` allows you to model a potentially stateful operation in a pure functional way.
Expand Down
34 changes: 19 additions & 15 deletions morphir/flowz/spark/src/morphir/flowz/spark/DatasetModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,26 @@ trait DatasetModule { self =>

def filterDataset[State, DataRow, Exclude: Encoder: TypeTag, Include: Encoder: TypeTag](
func: SparkSession => (State, DataRow) => (State, FilterResult[Exclude, Include])
): Step[State, State, SparkModule, Dataset[DataRow], Throwable, Dataset[FilterResult[Exclude, Include]]] =
Step[State, State, SparkModule, Dataset[DataRow], Throwable, Dataset[FilterResult[Exclude, Include]]](
ZIO.environment[StepContext[SparkModule, State, Dataset[DataRow]]].mapEffect { ctx =>
val spark = ctx.environment.get.sparkSession
var outputState = ctx.inputs.state
val inputData = ctx.inputs.params

import spark.implicits._
val dataset = inputData.map { row =>
val (nextState, filterRow) = func(spark)(outputState, row)
outputState = nextState
filterRow
): Behavior[State, State, Dataset[DataRow], SparkModule, Throwable, Dataset[FilterResult[Exclude, Include]]] =
new Behavior[State, State, Dataset[DataRow], SparkModule, Throwable, Dataset[FilterResult[Exclude, Include]]] {
protected def behavior(
state: State,
message: Dataset[DataRow]
): ZIO[SparkModule, Throwable, BehaviorResult[State, Dataset[FilterResult[Exclude, Include]]]] =
ZIO.environment[SparkModule].mapEffect { sparkModule =>
val spark = sparkModule.get.sparkSession
var outputState = state
val inputData = message
import spark.implicits._
val dataset = inputData.map { row =>
val (nextState, filterRow) = func(spark)(outputState, row)
outputState = nextState
filterRow
}
BehaviorResult(state = outputState, value = dataset)
}
StepOutputs(state = outputState, value = dataset)
}
)
}

}

trait DatasetExports { exports => }
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait SparkModuleExports {
val SparkModule: morphir.flowz.spark.sparkModule.SparkModule.type = morphir.flowz.spark.sparkModule.SparkModule

final type SparkStep[-StateIn, +StateOut, -Env, -Params, +Err, +Value] =
morphir.flowz.spark.SparkStep[StateIn, StateOut, Env, Params, Err, Value]
morphir.flowz.spark.SparkBehavior[StateIn, StateOut, Env, Params, Err, Value]
val SparkStep: morphir.flowz.spark.SparkStep.type = morphir.flowz.spark.SparkStep

def sparkStep[Params, A](func: SparkSession => Params => A): SparkStep[Any, A, Any, Params, Throwable, A] =
Expand Down
Loading

0 comments on commit bbe052f

Please sign in to comment.