-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Removed experiments and tidied up example
- Loading branch information
1 parent
c24d8d2
commit b5a12ba
Showing
6 changed files
with
21 additions
and
208 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,49 @@ | ||
import akka.NotUsed | ||
import akka.actor.ActorSystem | ||
import akka.stream.ActorMaterializer | ||
import akka.stream.scaladsl.Tcp.{IncomingConnection, ServerBinding} | ||
import akka.stream.scaladsl._ | ||
import akka.util.ByteString | ||
import com.typesafe.config.ConfigFactory | ||
|
||
import scala.concurrent.Future | ||
import scala.concurrent.{Promise, Future} | ||
|
||
/** | ||
* Basis for a unidirectional TCP protocol | ||
*/ | ||
object Protocol extends App { | ||
|
||
implicit val system = ActorSystem("unidirectional-tcp", ConfigFactory.defaultReference()) | ||
implicit val materializer = ActorMaterializer() | ||
import system.dispatcher | ||
|
||
val binding: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind("localhost", 8888) | ||
val binding: Source[IncomingConnection, Future[ServerBinding]] = | ||
Tcp().bind("localhost", 8888) | ||
|
||
binding.runForeach { (incomingConnection: IncomingConnection) => | ||
println(s"New connection from: ${incomingConnection.remoteAddress}") | ||
|
||
// Sink can be swapped by f.i. an ActorSubscriber | ||
val targetSink = Sink.foreach[String](e => println("Sink received: " + e)) | ||
|
||
val completionSource: Source[ByteString, Promise[Option[ByteString]]] = | ||
Source.maybe[ByteString].drop(1) // Discard the element immediately, only used for closing connection | ||
|
||
val (closePromise, doneFuture) = | ||
Source.maybe[ByteString] | ||
completionSource | ||
.via(incomingConnection.flow) | ||
.via(incoming) | ||
.toMat(Sink.foreach(println))(Keep.both).run() | ||
.via(protocol) | ||
.toMat(targetSink)(Keep.both) // We keep both, because we use the future to complete the promise | ||
.run() | ||
|
||
// How is this promise completed??? | ||
// Completing the promise closes the connection | ||
import system.dispatcher // Execution context for promise | ||
closePromise.completeWith(doneFuture.map { _ => | ||
Some(ByteString.empty) | ||
Some(ByteString.empty) // Dummy element, will be dropped anyway | ||
}) | ||
} | ||
|
||
// Simple flow mapping ByteStrings to Strings | ||
val incoming: Flow[ByteString, String, _] = | ||
val protocol: Flow[ByteString, String, _] = | ||
Flow[ByteString].map(_.utf8String) | ||
|
||
} |