Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
yanns committed Mar 9, 2023
1 parent 427f640 commit df78602
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 77 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ lazy val core = project
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"sangria.execution.Resolver.resolveSimpleListValue"),
ProblemFilters.exclude[DirectMissingMethodProblem]("sangria.schema.Field.subs"),
ProblemFilters.exclude[DirectMissingMethodProblem]("sangria.schema.Field.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("sangria.schema.Field.apply"),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"sangria.execution.ExecutionScheme.effectScheme"),
ProblemFilters.exclude[DirectMissingMethodProblem]("sangria.execution.Resolver.handleScheme"),
ProblemFilters.exclude[DirectMissingMethodProblem]("sangria.execution.Resolver.this"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"sangria.execution.Resolver.processFinalResolve"),
ProblemFilters.exclude[DirectMissingMethodProblem]("sangria.execution.Resolver.resolveSeq")
),
Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oF"),
libraryDependencies ++= Seq(
Expand Down
22 changes: 22 additions & 0 deletions modules/core/src/main/scala/sangria/execution/EffectScheme.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sangria.execution

import scala.concurrent.{ExecutionContext, Future}

trait EffectScheme {
type F[_]
def map[A, B](eff: F[A])(f: A => B): F[B]
def flatMap[A, B](eff: F[A])(f: A => F[B]): F[B]
def success[A](a: A): F[A]
def fromFuture[A](f: Future[A]): F[A]
def toFuture[A](f: F[A]): Future[A]
}

class FutureEffectScheme(implicit ec: ExecutionContext) extends EffectScheme {
override type F[A] = Future[A]

override def map[A, B](eff: Future[A])(f: A => B): Future[B] = eff.map(f)
override def flatMap[A, B](eff: Future[A])(f: A => Future[B]): Future[B] = eff.flatMap(f)
override def success[A](a: A): Future[A] = Future.successful(a)
override def fromFuture[A](f: Future[A]): Future[A] = f
override def toFuture[A](f: Future[A]): Future[A] = f
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ sealed trait ExecutionScheme {
def flatMapFuture[Ctx, Res, T](future: Future[T])(resultFn: T => Result[Ctx, Res])(implicit
ec: ExecutionContext): Result[Ctx, Res]
def extended: Boolean
def effectScheme(ec: ExecutionContext): EffectScheme
}

object ExecutionScheme extends AlternativeExecutionScheme {
implicit object Default extends ExecutionScheme {
type Result[Ctx, Res] = Future[Res]

override def effectScheme(ec: ExecutionContext): EffectScheme = new FutureEffectScheme()(ec)

def failed[Ctx, Res](error: Throwable): Result[Ctx, Res] =
Future.failed(error)

Expand All @@ -46,19 +49,18 @@ trait EffectOps[F[_]] {

@ApiMayChange
class EffectBasedExecutionScheme[F[_]](
ops: EffectOps[F]
ops: EffectOps[F],
effect: EffectScheme
) extends ExecutionScheme {
override type Result[Ctx, Res] = F[Res]
override def effectScheme(ec: ExecutionContext): EffectScheme = effect
override def failed[Ctx, Res](error: Throwable): Result[Ctx, Res] =
ops.failed(error)
override def onComplete[Ctx, Res](result: Result[Ctx, Res])(op: => Unit)(implicit
ec: ExecutionContext): Result[Ctx, Res] = ???
override def flatMapFuture[Ctx, Res, T](future: Future[T])(resultFn: T => Result[Ctx, Res])(
implicit ec: ExecutionContext): Result[Ctx, Res] =
ops.flatMapFuture(future)(resultFn)
def mapEffect[Ctx, Res, T](future: Future[(Ctx, T)])(f: (Ctx, T) => Res)(implicit
ec: ExecutionContext): F[Res] =
ops.map(future) { case (ctx, in) => f(ctx, in) }

override def extended: Boolean = false
}
Expand All @@ -70,6 +72,7 @@ trait AlternativeExecutionScheme {

implicit object Extended extends ExecutionScheme {
type Result[Ctx, T] = Future[ExecutionResult[Ctx, T]]
override def effectScheme(ec: ExecutionContext): EffectScheme = new FutureEffectScheme()(ec)

def failed[Ctx, Res](error: Throwable): Result[Ctx, Res] =
Future.failed(error)
Expand All @@ -93,6 +96,7 @@ trait AlternativeExecutionScheme {
} =
new ExecutionScheme with StreamBasedExecutionScheme[S] {
type Result[Ctx, T] = S[T]
override def effectScheme(ec: ExecutionContext): EffectScheme = new FutureEffectScheme()(ec)

def subscriptionStream = stream
def extended = false
Expand All @@ -115,6 +119,7 @@ trait AlternativeExecutionScheme {
} =
new ExecutionScheme with StreamBasedExecutionScheme[S] {
type Result[Ctx, T] = S[ExecutionResult[Ctx, T]]
override def effectScheme(ec: ExecutionContext): EffectScheme = new FutureEffectScheme()(ec)

def subscriptionStream = stream
def extended = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ case class Executor[Ctx, Root](
validationTiming,
queryReducerTiming,
queryAst
)
)(executionContext, scheme.effectScheme(executionContext))

val result =
operation.operationType match {
Expand Down
151 changes: 82 additions & 69 deletions modules/core/src/main/scala/sangria/execution/Resolver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Resolver[Ctx](
validationTiming: TimeMeasurement,
queryReducerTiming: TimeMeasurement,
queryAst: ast.Document
)(implicit executionContext: ExecutionContext) {
)(implicit executionContext: ExecutionContext, effectScheme: EffectScheme) {
val resultResolver = new ResultResolver(marshaller, exceptionHandler, preserveOriginalErrors)
val toScalarMiddleware = Middleware.composeToScalarMiddleware(middleware.map(_._2), userContext)

Expand All @@ -45,17 +45,22 @@ class Resolver[Ctx](
collectActionsPar(ExecutionPath.empty, tpe, value, fields, ErrorRegistry.empty, userContext)

handleScheme(
processFinalResolve(
resolveActionsPar(ExecutionPath.empty, tpe, actions, userContext, fields.namesOrdered))
.map(_ -> userContext),
scheme)
effectScheme.map(
processFinalResolve(
resolveActionsPar(ExecutionPath.empty, tpe, actions, userContext, fields.namesOrdered)))(
_ -> userContext),
scheme
)
}

def resolveFieldsSeq(tpe: ObjectType[Ctx, _], value: Any, fields: CollectedFields)(
scheme: ExecutionScheme): scheme.Result[Ctx, marshaller.Node] = {
val result = resolveSeq(ExecutionPath.empty, tpe, value, fields, ErrorRegistry.empty)
val result = resolveSeq(ExecutionPath.empty, tpe, value, fields)

handleScheme(result.flatMap(res => processFinalResolve(res._1).map(_ -> res._2)), scheme)
handleScheme(
effectScheme.flatMap(result)(res =>
effectScheme.map(processFinalResolve(res._1))(_ -> res._2)),
scheme)
}

def resolveFieldsSubs(tpe: ObjectType[Ctx, _], value: Any, fields: CollectedFields)(
Expand Down Expand Up @@ -120,62 +125,68 @@ class Resolver[Ctx](
throw new IllegalStateException(s"Unsupported execution scheme: $s")
}

def handleScheme(
result: Future[((Vector[RegisteredError], marshaller.Node), Ctx)],
private def handleScheme(
result: effectScheme.F[((Vector[RegisteredError], marshaller.Node), Ctx)],
scheme: ExecutionScheme): scheme.Result[Ctx, marshaller.Node] = scheme match {
case ExecutionScheme.Default =>
result.map { case ((_, res), _) => res }.asInstanceOf[scheme.Result[Ctx, marshaller.Node]]
effectScheme
.map(result) { case ((_, res), _) => res }
.asInstanceOf[scheme.Result[Ctx, marshaller.Node]]

case ExecutionScheme.Extended =>
result
.map { case ((errors, res), uc) =>
effectScheme
.map(result) { case ((errors, res), uc) =>
ExecutionResult(uc, res, errors, middleware, validationTiming, queryReducerTiming)
}
.asInstanceOf[scheme.Result[Ctx, marshaller.Node]]

case s: ExecutionScheme.StreamBasedExecutionScheme[_] =>
s.subscriptionStream
.singleFuture(result.map {
.singleFuture(effectScheme.toFuture(effectScheme.map(result) {
case ((errors, res), uc) if s.extended =>
ExecutionResult(uc, res, errors, middleware, validationTiming, queryReducerTiming)
case ((_, res), _) => res
})
}))
.asInstanceOf[scheme.Result[Ctx, marshaller.Node]]

case s: EffectBasedExecutionScheme[_] =>
s.mapEffect(result.map(_.swap)) { case (_, in) => in._2 }
effectScheme
.map(result) { case ((_, res), _) => res }
.asInstanceOf[scheme.Result[Ctx, marshaller.Node]]

case s =>
throw new IllegalStateException(s"Unsupported execution scheme: $s")
}

def processFinalResolve(resolve: Resolve) = resolve match {
case Result(errors, data, _) =>
Future.successful(
errors.originalErrors ->
marshalResult(
data.asInstanceOf[Option[resultResolver.marshaller.Node]],
marshalErrors(errors),
marshallExtensions.asInstanceOf[Option[resultResolver.marshaller.Node]],
beforeExecution = false
).asInstanceOf[marshaller.Node])

case dr: DeferredResult =>
immediatelyResolveDeferred(
userContext,
dr,
_.map { case (Result(errors, data, _)) =>
private def processFinalResolve(
resolve: Resolve): effectScheme.F[(Vector[RegisteredError], marshaller.Node)] =
resolve match {
case Result(errors, data, _) =>
effectScheme.success(
errors.originalErrors ->
marshalResult(
data.asInstanceOf[Option[resultResolver.marshaller.Node]],
marshalErrors(errors),
marshallExtensions.asInstanceOf[Option[resultResolver.marshaller.Node]],
beforeExecution = false
).asInstanceOf[marshaller.Node]
}
)
}
).asInstanceOf[marshaller.Node])

case dr: DeferredResult =>
effectScheme.fromFuture(
immediatelyResolveDeferred(
userContext,
dr,
_.map { case (Result(errors, data, _)) =>
errors.originalErrors ->
marshalResult(
data.asInstanceOf[Option[resultResolver.marshaller.Node]],
marshalErrors(errors),
marshallExtensions.asInstanceOf[Option[resultResolver.marshaller.Node]],
beforeExecution = false
).asInstanceOf[marshaller.Node]
}
))
}

private def marshallExtensions: Option[marshaller.Node] = {
val extensions =
Expand Down Expand Up @@ -311,61 +322,63 @@ class Resolver[Ctx](
}

stream -> stream.mapFuture(stream.merge(fieldStreams.asInstanceOf[Vector[S[Result]]]))(r =>
processFinalResolve(r.buildValue))
effectScheme.toFuture(processFinalResolve(r.buildValue)))
}

def resolveSeq(
private def resolveSeq(
path: ExecutionPath,
tpe: ObjectType[Ctx, _],
value: Any,
fields: CollectedFields,
errorReg: ErrorRegistry): Future[(Result, Ctx)] =
fields.fields
fields: CollectedFields): effectScheme.F[(Result, Ctx)] = {
val result = fields.fields
.foldLeft(
Future.successful(
effectScheme.success(
(
Result(ErrorRegistry.empty, Some(marshaller.emptyMapNode(fields.namesOrdered))),
userContext))) { case (future, elem) =>
future.flatMap { resAndCtx =>
effectScheme.flatMap(future) { resAndCtx =>
(resAndCtx, elem) match {
case (acc @ (Result(_, None, _), _), _) => Future.successful(acc)
case (acc @ (Result(_, None, _), _), _) => effectScheme.success(acc)
case (acc, CollectedField(name, origField, _))
if tpe.getField(schema, origField.name).isEmpty =>
Future.successful(acc)
effectScheme.success(acc)
case (
(Result(errors, s @ Some(acc), _), uc),
CollectedField(name, origField, Failure(error))) =>
Future.successful(Result(
errors.add(path.add(origField, tpe), error),
if (isOptional(tpe, origField.name))
Some(
marshaller.addMapNodeElem(
acc.asInstanceOf[marshaller.MapBuilder],
origField.outputName,
marshaller.nullNode,
optional = true))
else None
) -> uc)
effectScheme.success(
Result(
errors.add(path.add(origField, tpe), error),
if (isOptional(tpe, origField.name))
Some(
marshaller.addMapNodeElem(
acc.asInstanceOf[marshaller.MapBuilder],
origField.outputName,
marshaller.nullNode,
optional = true))
else None
) -> uc)
case (
(accRes @ Result(errors, s @ Some(acc), _), uc),
CollectedField(name, origField, Success(fields))) =>
resolveSingleFieldSeq(
path,
uc,
tpe,
value,
errors,
name,
origField,
fields,
accRes,
acc)
effectScheme.fromFuture(
resolveSingleFieldSeq(
path,
uc,
tpe,
value,
errors,
name,
origField,
fields,
accRes,
acc))
}
}
}
.map { case (res, ctx) =>
res.buildValue -> ctx
}
effectScheme.map(result) { case (res, ctx) =>
res.buildValue -> ctx
}
}

private def resolveSingleFieldSeq(
path: ExecutionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,23 @@ import scala.concurrent.{ExecutionContext, Future}
/** The integration with [[cats.effect.IO]] is far from being complete for now.
*/
class IOExecutionScheme extends AnyWordSpec with Matchers {
private implicit val ec: ExecutionContext = global.compute
private implicit val ec: ExecutionContext = null
private val ioEffectOps = new EffectOps[IO] {
override def failed[Ctx, Res](error: Throwable): IO[Res] = IO.raiseError(error)
override def flatMapFuture[Res, T](future: Future[T])(resultFn: T => IO[Res]): IO[Res] =
IO.fromFuture(IO(future)).flatMap(resultFn)
override def map[T, Out](in: Future[T])(f: T => Out): IO[Out] = IO.fromFuture(IO(in)).map(f)
}
private val effectSchema = new EffectScheme {
override type F[A] = IO[A]
override def map[A, B](eff: IO[A])(f: A => B): IO[B] = eff.map(f)
override def flatMap[A, B](eff: IO[A])(f: A => IO[B]): IO[B] = eff.flatMap(f)
override def success[A](a: A): IO[A] = IO.pure(a)
override def fromFuture[A](f: Future[A]): IO[A] = IO.fromFuture(IO(f))
override def toFuture[A](f: IO[A]): Future[A] = f.unsafeToFuture()(global)
}
private implicit val ioExecutionScheme: EffectBasedExecutionScheme[IO] =
new EffectBasedExecutionScheme[IO](ioEffectOps)
new EffectBasedExecutionScheme[IO](ioEffectOps, effectSchema)

import IOExecutionScheme._
"IOExecutionScheme" must {
Expand Down

0 comments on commit df78602

Please sign in to comment.