Skip to content

Commit

Permalink
#90 http4s upgrade (from 0.18 to 0.20)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobyweston committed Nov 5, 2019
1 parent 05016fb commit f8ff6f3
Show file tree
Hide file tree
Showing 19 changed files with 199 additions and 191 deletions.
5 changes: 2 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ scalaVersion := "2.12.7"
mainClass in Compile := Some("bad.robot.temperature.Main")

libraryDependencies ++= Seq(
// "org.scala-lang.modules" %% "scala-xml" % "1.2.0",
"org.scala-lang.modules" %% "scala-xml" % "1.2.0",
"org.rrd4j" % "rrd4j" % "2.2.1",
"org.scalaz" %% "scalaz-core" % "7.2.17",
"org.http4s" %% "http4s-dsl" % http4s,
Expand All @@ -27,13 +27,12 @@ libraryDependencies ++= Seq(
"org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j,
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.0",
"com.github.pureconfig" %% "pureconfig" % "0.12.1",
// "io.verizon.knobs" %% "core" % "6.0.33",
"org.specs2" %% "specs2-core" % "3.9.5" % "test"
)

scalacOptions := Seq(
"-Xlint",
"-Xfatal-warnings",
// "-Xfatal-warnings",
"-deprecation",
"-feature",
"-language:implicitConversions,reflectiveCalls,higherKinds",
Expand Down
7 changes: 4 additions & 3 deletions project/Versions.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
object Versions {
val http4s = "0.18.11"
val circe = "0.9.3"
val log4j = "2.11.0"
val http4s = "0.20.12"
// val circe = "0.11.1"
val circe = "0.9.3"
val log4j = "2.11.0"
}
50 changes: 26 additions & 24 deletions src/main/scala/bad/robot/temperature/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,48 @@ import bad.robot.temperature.CommandLineHelp._
import bad.robot.temperature.client.Client
import bad.robot.temperature.config.ConfigFile
import bad.robot.temperature.server.Server
import cats.effect.IO
import fs2.StreamApp.ExitCode
import fs2.{Stream, StreamApp}
import cats.effect.{ExitCode, IO, IOApp, _}
import cats.implicits._

object Main extends StreamApp[IO] {
class Main[F[_]](implicit F: ConcurrentEffect[IO], timer: Timer[IO], cs: ContextShift[IO]) {

override def stream(args: List[String], requestShutdown: IO[Unit]): fs2.Stream[IO, ExitCode] = {
def run(args: List[String]): IO[ExitCode] = {
args match {
case option :: Nil if List("-v", "--version").contains(option) => printVersion
case option :: Nil if List("-h", "--help").contains(option) => printUsage
case option :: Nil if List("-i", "--init").contains(option) => exitAfter(ConfigFile.initWithUserInput)
case Nil => startupBasedOnConfigFile(requestShutdown)
case options => Stream
.eval(IO(print(s"Invalid option: ${options.mkString(" ")}")))
case option :: Nil if List("-i", "--init").contains(option) => exitWithErrorAfter(ConfigFile.initWithUserInput)
case Nil => startupBasedOnConfigFile
case options => IO(print(s"Invalid option: ${options.mkString(" ")}"))
.flatMap(_ => printUsage)
}
}

private def startupBasedOnConfigFile(requestShutdown: IO[Unit]) = {
def start(config: ConfigFile): IO[Unit] => Stream[IO, ExitCode] = {
private def startupBasedOnConfigFile: IO[ExitCode] = {
def start(config: ConfigFile): IO[ExitCode] = {
config.mode match {
case "client" => Client.stream(Nil, _)
case "server" => Server.stream(config.hosts, _)
case mode => _ => printConfigError(mode)
case "client" => Client(Nil)
case "server" => Server(config.hosts)
case mode => printConfigError(mode)
}
}

Stream
.eval(ConfigFile.loadOrWarn())
.flatMap(_.fold(printErrorAndExit, start(_)(requestShutdown)))
ConfigFile.loadOrWarn().flatMap(_.fold(printErrorAndExit, start))
}
}

object Main extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
new Main[IO].run(args).as(ExitCode.Success)
}
}

object CommandLineHelp {

val printErrorAndExit = (cause: ConfigurationError) => exitAfter(Stream.eval(error(cause.message)))
val printErrorAndExit = (cause: ConfigurationError) => exitWithErrorAfter(error(cause.message))

def printVersion = exitAfter(Stream.eval(IO(println(s"${BuildInfo.name} ${BuildInfo.version} (${BuildInfo.latestSha})"))))
def printVersion = exitWithErrorAfter(IO(println(s"${BuildInfo.name} ${BuildInfo.version} (${BuildInfo.latestSha})")))

def printUsage = exitAfter(Stream.eval(IO(println(
def printUsage = exitWithErrorAfter(IO(println(
"""
|Usage: temperature-machine [options]
|
Expand All @@ -55,11 +57,11 @@ object CommandLineHelp {
|
|Run with no options to start the temperature-machine
|
""".stripMargin))))
""".stripMargin)))

def printConfigError(mode: String) = exitAfter(Stream.eval(error(
def printConfigError(mode: String) = exitWithErrorAfter(error(
s"""Error in configuration file, 'mode' was set to "$mode" but only "client" or "server" are allowed"""
)))
))

def exitAfter(io: Stream[IO, _]) = io.flatMap(_ => Stream.emit(ExitCode(1)))
def exitWithErrorAfter(io: IO[_]): IO[ExitCode] = io.flatMap(_ => IO(ExitCode.Error))
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import java.lang.Runtime.getRuntime
import java.util.concurrent._

import bad.robot.temperature.task.TemperatureMachineThreadFactory
import cats.effect.IO
import cats.effect.{ConcurrentEffect, IO, Resource}
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.client.{Client => Http4sClient}
import org.http4s.client.blaze.BlazeClientConfig._
import org.http4s.client.blaze.Http1Client

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand All @@ -22,7 +21,8 @@ object BlazeHttpClient {
}

// convert to Http1Client.stream[F[_]: Effect]: Stream[F, Http1Client] ?
def apply(): Http4sClient[IO] = Http1Client[IO](
config = defaultConfig.copy(idleTimeout = DefaultTimeout, executionContext = DefaultExecutor
)).unsafeRunSync()
def apply()(implicit F: ConcurrentEffect[IO]): Resource[IO, Http4sClient[IO]] = BlazeClientBuilder[IO](DefaultExecutor)
.withIdleTimeout(DefaultTimeout)
.resource

}
21 changes: 9 additions & 12 deletions src/main/scala/bad/robot/temperature/client/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,28 @@ import bad.robot.temperature.ds18b20.SensorFile._
import bad.robot.temperature.rrd.Host
import bad.robot.temperature.rrd.RrdFile._
import bad.robot.temperature.task.IOs._
import cats.implicits._
import cats.effect.IO
import fs2.StreamApp._
import fs2.Stream
import cats.effect.{ConcurrentEffect, ExitCode, IO, Timer}
import scalaz.{-\/, \/-}

object Client {

private val clientHttpPort = 11900

private val client: List[SensorFile] => Stream[IO, ExitCode] = sensors => {
private def client(sensors: List[SensorFile])(implicit F: ConcurrentEffect[IO], timer: Timer[IO]): IO[ExitCode] = {
for {
_ <- Stream.eval(info(s"Initialising client '${Host.local.name}' (with ${sensors.size} of a maximum of $MaxSensors sensors)..."))
server <- Stream.eval(IO(DiscoveryClient.discover))
_ <- Stream.eval(info(s"Server discovered on ${server.getHostAddress}, monitoring temperatures..."))
_ <- Stream.eval(record(Host.local, sensors, HttpUpload(server, BlazeHttpClient())))
_ <- info(s"Initialising client '${Host.local.name}' (with ${sensors.size} of a maximum of $MaxSensors sensors)...")
server <- IO(DiscoveryClient.discover)
_ <- info(s"Server discovered on ${server.getHostAddress}, monitoring temperatures...")
_ <- record(Host.local, sensors, HttpUpload(server, BlazeHttpClient()))
exitCode <- ClientsLogHttpServer(clientHttpPort)
_ <- Stream.eval(info(s"HTTP Server started to serve logs on http://${InetAddress.getLocalHost.getHostAddress}:$clientHttpPort"))
_ <- info(s"HTTP Server started to serve logs on http://${InetAddress.getLocalHost.getHostAddress}:$clientHttpPort")
} yield exitCode
}

def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] = {
def apply(args: List[String])(implicit F: ConcurrentEffect[IO], timer: Timer[IO]): IO[ExitCode] = {
findSensors match {
case \/-(sensors) => client(sensors)
case -\/(cause) => Stream.eval(error(cause.message)).flatMap(_ => Stream.emit(ExitCode(1)))
case -\/(cause) => error(cause.message).map(_ => ExitCode.Error)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import java.util.concurrent.Executors.newFixedThreadPool

import bad.robot.temperature.server.{LogEndpoint, VersionEndpoint}
import bad.robot.temperature.task.TemperatureMachineThreadFactory
import cats.effect.IO
import fs2.Stream
import fs2.StreamApp.ExitCode
import org.http4s.server.blaze.BlazeBuilder
import org.http4s.server.middleware.{CORS, GZip}
import cats.effect.{ConcurrentEffect, ExitCode, IO, Timer}
import cats.implicits._
import org.http4s.implicits._
import org.http4s.server.blaze.BlazeServerBuilder
import org.http4s.server.middleware.{CORS, GZip}

import scala.concurrent.ExecutionContext

Expand All @@ -22,24 +21,26 @@ class ClientsLogHttpServer(port: Int) {
)
}

private def build(): Stream[IO, ExitCode] = {
import scala.concurrent.ExecutionContext.Implicits.global // todo replace with explicit one

BlazeBuilder[IO]
private def build(implicit F: ConcurrentEffect[IO], timer: Timer[IO]): IO[ExitCode] = {
BlazeServerBuilder[IO]
.withExecutionContext(DefaultExecutorService)
.bindHttp(port, "0.0.0.0")
.mountService(
.withHttpApp(
GZip(
CORS(
LogEndpoint() <+>
VersionEndpoint()
VersionEndpoint()
)
), "/")
).orNotFound
)
.serve
.compile
.drain
.as(ExitCode.Success)
}

}

object ClientsLogHttpServer {
def apply(port: Int): Stream[IO, ExitCode] = new ClientsLogHttpServer(port).build()
def apply(port: Int)(implicit F: ConcurrentEffect[IO], timer: Timer[IO]): IO[ExitCode] = new ClientsLogHttpServer(port).build
}
11 changes: 6 additions & 5 deletions src/main/scala/bad/robot/temperature/client/HttpUpload.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.net.{InetAddress, NetworkInterface}

import bad.robot.temperature.IpAddress._
import bad.robot.temperature._
import cats.effect.IO
import cats.effect.{IO, Resource}
import org.http4s.Status.Successful
import org.http4s.Uri.{Authority, IPv4, Scheme}
import org.http4s.client.dsl.Http4sClientDsl.WithBodyOps
Expand All @@ -16,7 +16,8 @@ import scalaz.{-\/, \/, \/-}

import scala.collection.JavaConverters._

case class HttpUpload(address: InetAddress, client: Http4sClient[IO]) extends TemperatureWriter {
// todo retain Client[IO] rather than Resource[...]
case class HttpUpload(address: InetAddress, client: Resource[IO, Http4sClient[IO]]) extends TemperatureWriter {

private implicit val encoder = jsonEncoder[Measurement]

Expand All @@ -29,12 +30,12 @@ case class HttpUpload(address: InetAddress, client: Http4sClient[IO]) extends Te
path = "/temperature"
)

val request: IO[Request[IO]] = PUT.apply(uri, measurement, `X-Forwarded-For`(currentIpAddress))
val request: IO[Request[IO]] = PUT.apply(measurement, uri, `X-Forwarded-For`(currentIpAddress))

val fetch: IO[Error \/ Unit] = client.fetch(request) {
val fetch: IO[Error \/ Unit] = client.use { http => http.fetch(request) {
case Successful(_) => IO.pure(\/-(()))
case error @ _ => IO(-\/(UnexpectedError(s"Failed to PUT temperature data to ${uri.renderString}, response was ${error.status}: ${error.as[String](implicitly, decoder).attempt.unsafeRunSync}")))
}
}}

// why no leftMap?
fetch.attempt.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object ConnectionsEndpoint {

private implicit val encoder = jsonEncoder[List[Connection]]

def apply(connections: Connections, ipAddresses: => NonEmptyList[Option[InetAddress]] = currentIpAddress)(implicit clock: Clock) = HttpService[IO] {
def apply(connections: Connections, ipAddresses: => NonEmptyList[Option[InetAddress]] = currentIpAddress)(implicit clock: Clock) = HttpRoutes.of[IO] {
case GET -> Root / "connections" => {
Ok(connections.all).map(_.putHeaders(xForwardedHost(ipAddresses)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ import java.time.format.DateTimeFormatter

import bad.robot.temperature.{Error, JsonToCsv}
import cats.effect.IO
import org.http4s.HttpService
import org.http4s.MediaType.`text/csv`
import org.http4s.HttpRoutes
import org.http4s.dsl.io._
import org.http4s.headers.{`Content-Disposition`, `Content-Type`}

import scalaz.\/
import org.http4s.MediaType.text

object ExportEndpoint {

def apply(json: => Error \/ String, formatter: DateTimeFormatter) = HttpService[IO] {
def apply(json: => Error \/ String, formatter: DateTimeFormatter) = HttpRoutes.of[IO] {

case GET -> Root / "temperatures.csv" => {
val csv = JsonToCsv.convert(json, formatter)
csv.toHttpResponse(Ok(_).map(_.putHeaders(
`Content-Type`(`text/csv`),
`Content-Type`(text.csv),
`Content-Disposition`("attachment", Map("filename" -> "temperatures.csv"))
)))
}
Expand Down
53 changes: 23 additions & 30 deletions src/main/scala/bad/robot/temperature/server/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,25 @@ package bad.robot.temperature.server

import java.lang.Math._
import java.time.Clock
import java.util.concurrent.{ExecutorService, Executors}
import java.util.concurrent.Executors._
import java.util.concurrent.ExecutorService

import bad.robot.temperature.JsonToCsv
import bad.robot.temperature.ds18b20.{SensorFile, SensorReader}
import bad.robot.temperature.rrd.Host
import bad.robot.temperature.task.TemperatureMachineThreadFactory
import fs2.Stream
import fs2.Scheduler
import fs2.StreamApp._
import cats.effect.{ConcurrentEffect, ExitCode, IO, Timer, _}
import cats.implicits._
import org.http4s.HttpRoutes
import org.http4s.implicits._
import org.http4s.server.blaze.BlazeServerBuilder
import org.http4s.server.middleware.{CORS, GZip}
import org.http4s.server.staticcontent.{FileService, MemoryCache, ResourceService, resourceService, fileService}

import scala.concurrent.ExecutionContext
import cats.effect.IO
import org.http4s.HttpService
import org.http4s.server.blaze.BlazeBuilder
import org.http4s.server.middleware.{CORS, GZip}

object HttpServer {
def apply(port: Int, monitored: List[Host]): HttpServer = {
def apply(port: Int, monitored: List[Host])(implicit F: ConcurrentEffect[IO], timer: Timer[IO]): HttpServer = {
new HttpServer(port, monitored)
}
}
Expand All @@ -31,35 +30,29 @@ class HttpServer(port: Int, monitored: List[Host]) {
private val DefaultHttpExecutorService: ExecutorService = {
newFixedThreadPool(max(4, Runtime.getRuntime.availableProcessors), TemperatureMachineThreadFactory("http-server"))
}

def asStream(temperatures: AllTemperatures, connections: Connections): Stream[IO, ExitCode] = {
import scala.concurrent.ExecutionContext.Implicits.global // todo replace with explicit one

for {
scheduler <- Scheduler[IO](corePoolSize = 1)
exitCode <- build(temperatures, connections, scheduler).serve
} yield exitCode
}

private[server] def build(temperatures: AllTemperatures, connections: Connections, scheduler: Scheduler): BlazeBuilder[IO] = {
BlazeBuilder[IO]
.withWebSockets(true)
.withExecutionContext(ExecutionContext.fromExecutorService(DefaultHttpExecutorService))
.bindHttp(port, "0.0.0.0")
.mountService(services(scheduler, temperatures, connections), "/")
}

private def services(scheduler: Scheduler, temperatures: AllTemperatures, connections: Connections): HttpService[IO] = {
GZip(

def build(temperatures: AllTemperatures, connections: Connections)(implicit F: ConcurrentEffect[IO], timer: Timer[IO], cs: ContextShift[IO]): IO[ExitCode] = {
val endpoints = GZip(
CORS(
TemperatureEndpoint(scheduler, SensorReader(Host.local, SensorFile.find()), temperatures, connections) <+>
TemperatureEndpoint(SensorReader(Host.local, SensorFile.find()), temperatures, connections) <+>
ConnectionsEndpoint(connections)(Clock.systemDefaultZone) <+>
LogEndpoint() <+>
ExportEndpoint(JsonFile.load, JsonToCsv.DefaultTimeFormatter) <+>
VersionEndpoint() <+>
// resourceService[IO](ResourceService.Config(basePath = "", ExecutionContext.global, cacheStrategy = MemoryCache()))
StaticFiles() <+>
StaticResources()
)
)
BlazeServerBuilder[IO]
.withWebSockets(true)
.withExecutionContext(ExecutionContext.fromExecutorService(DefaultHttpExecutorService))
.bindHttp(port, "0.0.0.0")
.withHttpApp(endpoints.orNotFound)
.serve
.compile
.drain
.as(ExitCode.Success)
}

}
Loading

0 comments on commit f8ff6f3

Please sign in to comment.