diff --git a/src/main/scala/Demo.scala b/src/main/scala/Demo.scala deleted file mode 100644 index fe77db8..0000000 --- a/src/main/scala/Demo.scala +++ /dev/null @@ -1,81 +0,0 @@ -import java.io.File - -import akka.{Done, NotUsed} -import akka.actor.ActorSystem -import akka.stream.{ClosedShape, ThrottleMode, IOResult, ActorMaterializer} -import akka.stream.scaladsl._ -import akka.util.ByteString - -import scala.concurrent.Future -import scala.concurrent.duration._ - -object Demo extends App { - - implicit val system = ActorSystem("Demo") - implicit val materializer = ActorMaterializer() - - val source: Source[Int, NotUsed] = Source(1 to 100) - source.runForeach(i => println(i)) - - val factorials = source.scan(BigInt(1))((acc, next) => acc * next) - - val result: Future[IOResult] = factorials - .map(num => ByteString(s"$num\n")) - .runWith(FileIO.toFile(new File("factorials.txt"))) - - def lineSink(fileName: String): Sink[String, Future[IOResult]] = { - Flow[String] - .map(s => ByteString(s + "\n")) - .toMat(FileIO.toFile(new File(fileName)))(Keep.right) - } - - factorials.map(_.toString).runWith(lineSink("factorial2.txt")) - -// val done = -// factorials -// .zipWith(Source(0 to 100))((num, idx) => s"$idx! = $num") -// .throttle(1, 1.second, 1, ThrottleMode.shaping) -// .runForeach(println) - - val source2 = Source(1 to 10) - val sink = Sink.fold[Int, Int](0)(_ + _) - - sink.runWith(source2) - - import system.dispatcher - source2.runWith(Sink.head).onComplete(println(_)) - - import akka.stream.Fusing - - val flow = Flow[Int].map(_ * 2).filter(_ > 500) - val fused = Fusing.aggressive(flow) - - val fusedSource = - Source.fromIterator { () => Iterator from 0 } - .via(fused) - .take(1000) - -// fusedSource.runForeach(i => println(i)) - - - val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => - import GraphDSL.Implicits._ - - val in = Source(1 to 10) - val out: Sink[Any, Future[Done]] = Sink.foreach(println(_)) - - val bcast = builder.add(Broadcast[Int](2)) - val merge = builder.add(Merge[Int](2)) - - val f1 = Flow[Int].map(_ * 2) - val f2 = Flow[Int].map(_ + 1) - val f4 = Flow[Int].map(_ * 10) - val f3 = Flow[Int].map(i => i) - - in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out - bcast ~> f4 ~> merge - ClosedShape - }) - - g.run() -} diff --git a/src/main/scala/EchoServer.scala b/src/main/scala/EchoServer.scala deleted file mode 100644 index c8cae6a..0000000 --- a/src/main/scala/EchoServer.scala +++ /dev/null @@ -1,45 +0,0 @@ -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Tcp.{IncomingConnection, ServerBinding} -import akka.stream.scaladsl._ -import akka.util.ByteString - -import scala.concurrent.Future - -object EchoServer extends App { - -// val binding: Future[ServerBinding] = -// Tcp().bind("127.0.0.1", 8888).to(Sink.ignore).run() -// -// binding.map { b => -// b.unbind() onComplete { -// case _ => // ... -// } -// } - - implicit val system = ActorSystem("echo-server") - implicit val materializer = ActorMaterializer() - - val connections: Source[IncomingConnection, Future[ServerBinding]] = - Tcp().bind("127.0.0.1", 8888) - - connections runForeach { incomingConnection => - println(s"New connection from: ${incomingConnection.remoteAddress}") - - val echo = Flow[ByteString] - .via(Framing.delimiter( - ByteString("\n"), - maximumFrameLength = 256, - allowTruncation = true)) - .map(_.utf8String) - .map(_ + "!!!\n") - .map(ByteString(_)) - - val echo2 = Flow[ByteString] - .map(_.utf8String) - .map("Echo " + _) - .map(ByteString(_)) - - incomingConnection.handleWith(echo2) - } -} diff --git a/src/main/scala/Join.scala b/src/main/scala/Join.scala deleted file mode 100644 index 113b91a..0000000 --- a/src/main/scala/Join.scala +++ /dev/null @@ -1,27 +0,0 @@ -import java.io.File - -import akka.actor.ActorSystem -import akka.stream.scaladsl._ -import akka.stream.{ActorMaterializer, ClosedShape, IOResult} -import akka.util.ByteString -import akka.{Done, NotUsed} - -import scala.concurrent.Future - -object Join extends App { - - implicit val system = ActorSystem("Demo") - implicit val materializer = ActorMaterializer() - - val source: Source[Int, NotUsed] = Source(1 to 100) - - val f1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2) - val f2: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 5) - - val joined: RunnableGraph[NotUsed] = f1.join(f2) - -// joined. - //.runForeach(i => println(i)) - - -} diff --git a/src/main/scala/Map.scala b/src/main/scala/Map.scala deleted file mode 100644 index 4d3fc97..0000000 --- a/src/main/scala/Map.scala +++ /dev/null @@ -1,15 +0,0 @@ -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Source, Flow} -import akka.util.ByteString - -object Map extends App { - - implicit val system = ActorSystem("Demo") - implicit val materializer = ActorMaterializer() - - val f1: Flow[Int, String, _] = Flow[Int].map(_.toString) -// val f2: Flow[Int, Int, _] = f1.map(_.toInt) - - Source(1 to 100).via(f1).runForeach(x => println(x)) -} diff --git a/src/main/scala/Maybe.scala b/src/main/scala/Maybe.scala deleted file mode 100644 index 3384ca2..0000000 --- a/src/main/scala/Maybe.scala +++ /dev/null @@ -1,31 +0,0 @@ -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Keep, Sink, Flow, Source} - -import scala.concurrent.{Future, Promise} - -object Maybe extends App { - - implicit val system = ActorSystem("Demo") - implicit val materializer = ActorMaterializer() - import system.dispatcher - - val f1: Flow[Int, String, NotUsed] = - Flow[Int] -// .map { elem => println(elem); elem } - .map(_.toString) - - val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int] - val sink: Sink[String, Future[String]] = Sink.head[String] - -// val (promise, fut) = source.via(f1).toMat(sink)(Keep.both).run() - - val (promise, fut) = Source.maybe[Int].toMat(Sink.foreach(println))(Keep.both).run() - - promise.completeWith(fut.map { x => println("completed biatch"); Some(1) }) -// fut.map { x => println("Fut: " + x); x} - -// promise.success(Some(5)) - -} diff --git a/src/main/scala/Protocol.scala b/src/main/scala/Protocol.scala index 48dd768..e124d65 100644 --- a/src/main/scala/Protocol.scala +++ b/src/main/scala/Protocol.scala @@ -1,3 +1,4 @@ +import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Tcp.{IncomingConnection, ServerBinding} @@ -5,33 +6,44 @@ import akka.stream.scaladsl._ import akka.util.ByteString import com.typesafe.config.ConfigFactory -import scala.concurrent.Future +import scala.concurrent.{Promise, Future} +/** + * Basis for a unidirectional TCP protocol + */ object Protocol extends App { implicit val system = ActorSystem("unidirectional-tcp", ConfigFactory.defaultReference()) implicit val materializer = ActorMaterializer() - import system.dispatcher - val binding: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind("localhost", 8888) + val binding: Source[IncomingConnection, Future[ServerBinding]] = + Tcp().bind("localhost", 8888) binding.runForeach { (incomingConnection: IncomingConnection) => println(s"New connection from: ${incomingConnection.remoteAddress}") + // Sink can be swapped by f.i. an ActorSubscriber + val targetSink = Sink.foreach[String](e => println("Sink received: " + e)) + + val completionSource: Source[ByteString, Promise[Option[ByteString]]] = + Source.maybe[ByteString].drop(1) // Discard the element immediately, only used for closing connection + val (closePromise, doneFuture) = - Source.maybe[ByteString] + completionSource .via(incomingConnection.flow) - .via(incoming) - .toMat(Sink.foreach(println))(Keep.both).run() + .via(protocol) + .toMat(targetSink)(Keep.both) // We keep both, because we use the future to complete the promise + .run() - // How is this promise completed??? + // Completing the promise closes the connection + import system.dispatcher // Execution context for promise closePromise.completeWith(doneFuture.map { _ => - Some(ByteString.empty) + Some(ByteString.empty) // Dummy element, will be dropped anyway }) } // Simple flow mapping ByteStrings to Strings - val incoming: Flow[ByteString, String, _] = + val protocol: Flow[ByteString, String, _] = Flow[ByteString].map(_.utf8String) }