Skip to content

Commit

Permalink
Merge pull request #1640 from mdipirro/scala-672-stream-errors
Browse files Browse the repository at this point in the history
SCALA-672 Add code for stream error handling
  • Loading branch information
lor6 authored Nov 26, 2024
2 parents a4b06f1 + 9faa82d commit 2c1b54e
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 0 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ lazy val scala_akka_3 = (project in file("scala-akka-3"))
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
"org.slf4j" % "slf4j-api" % "2.0.16",
"org.slf4j" % "slf4j-simple" % "2.0.16",
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
akkaActorTyped,
akkaStreamDep,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.baeldung.scala.akka.stream.errors

import akka.actor.ActorSystem
import akka.stream.RestartSettings
import akka.stream.scaladsl.RestartSource

import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.{Failure, Success}

object Main extends App {
implicit val system: ActorSystem = ActorSystem("baeldung")

private val backoffSettings = RestartSettings(
minBackoff = 3 seconds,
maxBackoff = 30 seconds,
randomFactor = 0.2
).withMaxRestarts(3, 5.minutes)

RestartSource
.withBackoff(backoffSettings) { () => source }
.via(parse)
.via(compare)
.runWith(sink)
.andThen {
case Failure(exception) => println(exception)
case Success((correct, total)) =>
println(s"$correct/$total correct answers")
}(system.dispatcher)
.onComplete(_ => system.terminate())(system.dispatcher)

/*source
.via(parseWithLogging)
.via(compare)
.runWith(sink)
.andThen {
case Failure(exception) => println(exception)
case Success((correct, total)) =>
println(s"$correct/$total correct answers")
}(system.dispatcher)
.onComplete(_ => system.terminate())(system.dispatcher)*/
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.baeldung.scala.akka.stream

import akka.NotUsed
import akka.stream.Attributes
import akka.stream.scaladsl.{Flow, Sink, Source}

import scala.concurrent.Future

package object errors {
val source: Source[String, NotUsed] = Source(
Seq("5,10", "15,15", "78,79", "12,12", "0,0", "456,456")
)

val parse: Flow[String, (Int, Int), NotUsed] =
Flow[String]
.map { pair =>
val parts = pair.split(",")
(parts(0).toInt, parts(1).toInt)
}

val parseWithRecover: Flow[String, Either[String, (Int, Int)], NotUsed] =
Flow[String]
.map { pair =>
val parts = pair.split(",")
Right((parts(0).toInt, parts(1).toInt))
}
.recover({ case e: ArrayIndexOutOfBoundsException =>
Left(e.getMessage)
})

val parseWithLogging: Flow[String, (Int, Int), NotUsed] =
Flow[String]
.map { pair =>
val parts = pair.split(",")
(parts(0).toInt, parts(1).toInt)
}
.log(name = "Baeldung stream")
.addAttributes(
Attributes.logLevels(
onElement = Attributes.LogLevels.Info
)
)

val compare: Flow[(Int, Int), Boolean, NotUsed] =
Flow[(Int, Int)]
.map { case (userAnswer, correctAnswer) => userAnswer == correctAnswer }

val sink: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
case ((correctCount, total), wasCorrect) =>
if (wasCorrect) (correctCount + 1, total + 1)
else (correctCount, total + 1)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.baeldung.scala.akka.stream.errors

import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, Supervision}
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import org.scalatest.concurrent.ScalaFutures.convertScalaFuture
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class ErrorRecoveryUnitTest extends AnyFlatSpec with Matchers {
implicit val system: ActorSystem = ActorSystem("baeldung")

"The \"parseWithRecover\" flow" should "parse recover from a parsing error" in {
val (pub, sub) = TestSource[String]()
.via(parseWithRecover)
.toMat(TestSink[Either[String, (Int, Int)]]())(Keep.both)
.run()

pub.sendNext("1,1")
pub.sendNext("145146")
pub.sendComplete()

sub.requestNext(Right(1, 1))
sub.requestNext(Left("Index 1 out of bounds for length 1"))
sub.expectComplete()
}

"The \"Resume\" supervision strategy" should "ignore parsing errors" in {
val runnableGraph = TestSource[String]()
.via(parseWithRecover)
.toMat(TestSink[Either[String, (Int, Int)]]())(Keep.both)

val decider: Supervision.Decider = {
case _: ArrayIndexOutOfBoundsException => Supervision.Resume
case _ => Supervision.Stop
}

val graphWithResumeSupervision =
runnableGraph.withAttributes(ActorAttributes.supervisionStrategy(decider))

val (pub, sub) = graphWithResumeSupervision.run()

pub.sendNext("1,1")
pub.sendNext("145146")
pub.sendNext("1,2")
pub.sendComplete()

sub.requestNext(Right(1, 1))
sub.requestNext(Right(1, 2))
sub.expectComplete()
}
}

0 comments on commit 2c1b54e

Please sign in to comment.