diff --git a/akka-http-backend/src/test/scala/sttp/client4/akkahttp/AkkaHttpClientHttpTest.scala b/akka-http-backend/src/test/scala/sttp/client4/akkahttp/AkkaHttpClientHttpTest.scala index 3aa84b20f5..0c45beb6a2 100644 --- a/akka-http-backend/src/test/scala/sttp/client4/akkahttp/AkkaHttpClientHttpTest.scala +++ b/akka-http-backend/src/test/scala/sttp/client4/akkahttp/AkkaHttpClientHttpTest.scala @@ -10,5 +10,7 @@ class AkkaHttpClientHttpTest extends HttpTest[Future] { override implicit val convertToFuture: ConvertToFuture[Future] = ConvertToFuture.future override def supportsCancellation: Boolean = false + override def supportsResponseAsInputStream = false + override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_)) } diff --git a/armeria-backend/cats-ce2/src/test/scala/sttp/client4/armeria/cats/ArmeriaCatsHttpTest.scala b/armeria-backend/cats-ce2/src/test/scala/sttp/client4/armeria/cats/ArmeriaCatsHttpTest.scala index 069fc87878..28ec4d7630 100644 --- a/armeria-backend/cats-ce2/src/test/scala/sttp/client4/armeria/cats/ArmeriaCatsHttpTest.scala +++ b/armeria-backend/cats-ce2/src/test/scala/sttp/client4/armeria/cats/ArmeriaCatsHttpTest.scala @@ -21,4 +21,5 @@ class ArmeriaCatsHttpTest extends HttpTest[IO] with CatsTestBase { override def supportsAutoDecompressionDisabling = false override def supportsDeflateWrapperChecking = false // armeria hangs override def supportsEmptyContentEncoding = false + override def supportsResponseAsInputStream = false } diff --git a/armeria-backend/cats/src/test/scala/sttp/client4/armeria/cats/ArmeriaCatsHttpTest.scala b/armeria-backend/cats/src/test/scala/sttp/client4/armeria/cats/ArmeriaCatsHttpTest.scala index 9bbe25a3e9..59a484496d 100644 --- a/armeria-backend/cats/src/test/scala/sttp/client4/armeria/cats/ArmeriaCatsHttpTest.scala +++ b/armeria-backend/cats/src/test/scala/sttp/client4/armeria/cats/ArmeriaCatsHttpTest.scala @@ -21,4 +21,5 @@ class ArmeriaCatsHttpTest extends HttpTest[IO] with CatsRetryTest with CatsTestB override def supportsAutoDecompressionDisabling = false override def supportsDeflateWrapperChecking = false // armeria hangs override def supportsEmptyContentEncoding = false + override def supportsResponseAsInputStream = false } diff --git a/armeria-backend/fs2-ce2/src/test/scala/sttp/client4/armeria/fs2/ArmeriaFs2HttpTest.scala b/armeria-backend/fs2-ce2/src/test/scala/sttp/client4/armeria/fs2/ArmeriaFs2HttpTest.scala index 10b195ffca..287d050c42 100644 --- a/armeria-backend/fs2-ce2/src/test/scala/sttp/client4/armeria/fs2/ArmeriaFs2HttpTest.scala +++ b/armeria-backend/fs2-ce2/src/test/scala/sttp/client4/armeria/fs2/ArmeriaFs2HttpTest.scala @@ -12,6 +12,6 @@ class ArmeriaFs2HttpTest extends HttpTest[IO] with CatsTestBase { override def supportsCancellation = false override def supportsAutoDecompressionDisabling = false override def supportsDeflateWrapperChecking = false // armeria hangs - override def supportsEmptyContentEncoding = false + override def supportsResponseAsInputStream = false } diff --git a/armeria-backend/fs2/src/test/scala/sttp/client4/armeria/fs2/ArmeriaFs2HttpTest.scala b/armeria-backend/fs2/src/test/scala/sttp/client4/armeria/fs2/ArmeriaFs2HttpTest.scala index 6b2456410b..6c191e9b0a 100644 --- a/armeria-backend/fs2/src/test/scala/sttp/client4/armeria/fs2/ArmeriaFs2HttpTest.scala +++ b/armeria-backend/fs2/src/test/scala/sttp/client4/armeria/fs2/ArmeriaFs2HttpTest.scala @@ -13,4 +13,5 @@ class ArmeriaFs2HttpTest extends HttpTest[IO] with CatsTestBase with TestIODispa override def supportsAutoDecompressionDisabling = false override def supportsDeflateWrapperChecking = false // armeria hangs override def supportsEmptyContentEncoding = false + override def supportsResponseAsInputStream = false } diff --git a/armeria-backend/monix/src/test/scala/sttp/client4/armeria/monix/ArmeriaMonixHttpTest.scala b/armeria-backend/monix/src/test/scala/sttp/client4/armeria/monix/ArmeriaMonixHttpTest.scala index 8f2e0862cb..5a813eec44 100644 --- a/armeria-backend/monix/src/test/scala/sttp/client4/armeria/monix/ArmeriaMonixHttpTest.scala +++ b/armeria-backend/monix/src/test/scala/sttp/client4/armeria/monix/ArmeriaMonixHttpTest.scala @@ -22,7 +22,6 @@ class ArmeriaMonixHttpTest extends HttpTest[Task] { override def supportsHostHeaderOverride = false override def supportsAutoDecompressionDisabling = false override def supportsDeflateWrapperChecking = false // armeria hangs - override def supportsEmptyContentEncoding = false - + override def supportsResponseAsInputStream = false } diff --git a/armeria-backend/scalaz/src/test/scala/sttp/client4/armeria/scalaz/ArmeriaScalazHttpTest.scala b/armeria-backend/scalaz/src/test/scala/sttp/client4/armeria/scalaz/ArmeriaScalazHttpTest.scala index ed1ef7ae82..47e99871c1 100644 --- a/armeria-backend/scalaz/src/test/scala/sttp/client4/armeria/scalaz/ArmeriaScalazHttpTest.scala +++ b/armeria-backend/scalaz/src/test/scala/sttp/client4/armeria/scalaz/ArmeriaScalazHttpTest.scala @@ -15,6 +15,7 @@ class ArmeriaScalazHttpTest extends HttpTest[Task] { override def supportsAutoDecompressionDisabling = false override def supportsDeflateWrapperChecking = false // armeria hangs override def supportsEmptyContentEncoding = false + override def supportsResponseAsInputStream = false override def timeoutToNone[T](t: Task[T], timeoutMillis: Int): Task[Option[T]] = t.map(Some(_)) } diff --git a/armeria-backend/src/main/scala/sttp/client4/armeria/AbstractArmeriaBackend.scala b/armeria-backend/src/main/scala/sttp/client4/armeria/AbstractArmeriaBackend.scala index bd44a2b350..5e500fdf17 100644 --- a/armeria-backend/src/main/scala/sttp/client4/armeria/AbstractArmeriaBackend.scala +++ b/armeria-backend/src/main/scala/sttp/client4/armeria/AbstractArmeriaBackend.scala @@ -38,7 +38,7 @@ import sttp.client4.armeria.AbstractArmeriaBackend.{noopCanceler, RightUnit} import sttp.client4.internal.toByteArray import sttp.model._ import sttp.monad.syntax._ -import sttp.monad.{Canceler, MonadAsyncError, MonadError} +import sttp.monad.{Canceler, MonadAsyncError} abstract class AbstractArmeriaBackend[F[_], S <: Streams[S]]( client: WebClient = WebClient.of(), diff --git a/armeria-backend/src/main/scala/sttp/client4/armeria/BodyFromStreamMessage.scala b/armeria-backend/src/main/scala/sttp/client4/armeria/BodyFromStreamMessage.scala index 03209d012b..52f5a3e782 100644 --- a/armeria-backend/src/main/scala/sttp/client4/armeria/BodyFromStreamMessage.scala +++ b/armeria-backend/src/main/scala/sttp/client4/armeria/BodyFromStreamMessage.scala @@ -4,7 +4,7 @@ import com.linecorp.armeria.common.{CommonPools, HttpData} import com.linecorp.armeria.common.stream.{StreamMessage, StreamMessages} import io.netty.util.concurrent.EventExecutor -import java.io.File +import java.io.{File, InputStream} import java.nio.file.Path import java.util.concurrent.atomic.AtomicReference import sttp.capabilities.Streams diff --git a/armeria-backend/src/test/scala/sttp/client4/armeria/future/ArmeriaFutureHttpTest.scala b/armeria-backend/src/test/scala/sttp/client4/armeria/future/ArmeriaFutureHttpTest.scala index 0b7771e9f2..0e3750faee 100644 --- a/armeria-backend/src/test/scala/sttp/client4/armeria/future/ArmeriaFutureHttpTest.scala +++ b/armeria-backend/src/test/scala/sttp/client4/armeria/future/ArmeriaFutureHttpTest.scala @@ -15,6 +15,7 @@ class ArmeriaFutureHttpTest extends HttpTest[Future] { override def supportsAutoDecompressionDisabling = false override def supportsDeflateWrapperChecking = false // armeria hangs override def supportsEmptyContentEncoding = false + override def supportsResponseAsInputStream = false override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_)) } diff --git a/armeria-backend/zio/src/test/scala/sttp/client4/armeria/zio/ArmeriaZioHttpTest.scala b/armeria-backend/zio/src/test/scala/sttp/client4/armeria/zio/ArmeriaZioHttpTest.scala index 01eb45bee6..1b9c351dc0 100644 --- a/armeria-backend/zio/src/test/scala/sttp/client4/armeria/zio/ArmeriaZioHttpTest.scala +++ b/armeria-backend/zio/src/test/scala/sttp/client4/armeria/zio/ArmeriaZioHttpTest.scala @@ -15,6 +15,7 @@ class ArmeriaZioHttpTest extends HttpTest[Task] with ZioTestBase { override def supportsAutoDecompressionDisabling = false override def supportsDeflateWrapperChecking = false // armeria hangs override def supportsEmptyContentEncoding = false + override def supportsResponseAsInputStream = false "throw an exception instead of ZIO defect if the header value is invalid" in { diff --git a/armeria-backend/zio1/src/test/scala/sttp/client4/armeria/zio/ArmeriaZioHttpTest.scala b/armeria-backend/zio1/src/test/scala/sttp/client4/armeria/zio/ArmeriaZioHttpTest.scala index 66a0f9fef8..9287740fe3 100644 --- a/armeria-backend/zio1/src/test/scala/sttp/client4/armeria/zio/ArmeriaZioHttpTest.scala +++ b/armeria-backend/zio1/src/test/scala/sttp/client4/armeria/zio/ArmeriaZioHttpTest.scala @@ -15,6 +15,7 @@ class ArmeriaZioHttpTest extends HttpTest[Task] with ZioTestBase { override def supportsAutoDecompressionDisabling = false override def supportsDeflateWrapperChecking = false // armeria hangs override def supportsEmptyContentEncoding = false + override def supportsResponseAsInputStream = false "throw an exception instead of ZIO defect if the header value is invalid" in { diff --git a/async-http-client-backend/cats-ce2/src/test/scala/sttp/client4/asynchttpclient/cats/AsyncHttpClientCatsHttpTest.scala b/async-http-client-backend/cats-ce2/src/test/scala/sttp/client4/asynchttpclient/cats/AsyncHttpClientCatsHttpTest.scala index d27206c4e9..58f853b446 100644 --- a/async-http-client-backend/cats-ce2/src/test/scala/sttp/client4/asynchttpclient/cats/AsyncHttpClientCatsHttpTest.scala +++ b/async-http-client-backend/cats-ce2/src/test/scala/sttp/client4/asynchttpclient/cats/AsyncHttpClientCatsHttpTest.scala @@ -18,4 +18,5 @@ class AsyncHttpClientCatsHttpTest extends HttpTest[IO] with CatsTestBase { override def throwsExceptionOnUnsupportedEncoding = false override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false } diff --git a/async-http-client-backend/cats/src/test/scala/sttp/client4/asynchttpclient/cats/AsyncHttpClientCatsHttpTest.scala b/async-http-client-backend/cats/src/test/scala/sttp/client4/asynchttpclient/cats/AsyncHttpClientCatsHttpTest.scala index 7a684486db..9059006dd0 100644 --- a/async-http-client-backend/cats/src/test/scala/sttp/client4/asynchttpclient/cats/AsyncHttpClientCatsHttpTest.scala +++ b/async-http-client-backend/cats/src/test/scala/sttp/client4/asynchttpclient/cats/AsyncHttpClientCatsHttpTest.scala @@ -18,4 +18,5 @@ class AsyncHttpClientCatsHttpTest extends HttpTest[IO] with CatsRetryTest with C override def throwsExceptionOnUnsupportedEncoding = false override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false } diff --git a/async-http-client-backend/fs2-ce2/src/test/scala/sttp/client4/asynchttpclient/fs2/AsyncHttpClientFs2HttpTest.scala b/async-http-client-backend/fs2-ce2/src/test/scala/sttp/client4/asynchttpclient/fs2/AsyncHttpClientFs2HttpTest.scala index e3a468db8d..6b33766c72 100644 --- a/async-http-client-backend/fs2-ce2/src/test/scala/sttp/client4/asynchttpclient/fs2/AsyncHttpClientFs2HttpTest.scala +++ b/async-http-client-backend/fs2-ce2/src/test/scala/sttp/client4/asynchttpclient/fs2/AsyncHttpClientFs2HttpTest.scala @@ -13,4 +13,5 @@ class AsyncHttpClientFs2HttpTest extends HttpTest[IO] with CatsTestBase { override def throwsExceptionOnUnsupportedEncoding = false override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false } diff --git a/async-http-client-backend/fs2/src/test/scala/sttp/client4/asynchttpclient/fs2/AsyncHttpClientFs2HttpTest.scala b/async-http-client-backend/fs2/src/test/scala/sttp/client4/asynchttpclient/fs2/AsyncHttpClientFs2HttpTest.scala index 457d0664aa..b340714d17 100644 --- a/async-http-client-backend/fs2/src/test/scala/sttp/client4/asynchttpclient/fs2/AsyncHttpClientFs2HttpTest.scala +++ b/async-http-client-backend/fs2/src/test/scala/sttp/client4/asynchttpclient/fs2/AsyncHttpClientFs2HttpTest.scala @@ -13,4 +13,5 @@ class AsyncHttpClientFs2HttpTest extends HttpTest[IO] with TestIODispatcher with // for some unknown reason this single test fails using the fs2 implementation override def supportsConnectionRefusedTest = false override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false } diff --git a/async-http-client-backend/future/src/test/scala/sttp/client4/asynchttpclient/future/AsyncHttpClientFutureHttpTest.scala b/async-http-client-backend/future/src/test/scala/sttp/client4/asynchttpclient/future/AsyncHttpClientFutureHttpTest.scala index 0d73f6cb38..66a233dc89 100644 --- a/async-http-client-backend/future/src/test/scala/sttp/client4/asynchttpclient/future/AsyncHttpClientFutureHttpTest.scala +++ b/async-http-client-backend/future/src/test/scala/sttp/client4/asynchttpclient/future/AsyncHttpClientFutureHttpTest.scala @@ -15,4 +15,5 @@ class AsyncHttpClientFutureHttpTest extends HttpTest[Future] { override def supportsCancellation: Boolean = false override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_)) override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false } diff --git a/async-http-client-backend/monix/src/test/scala/sttp/client4/asynchttpclient/monix/AsyncHttpClientMonixHttpTest.scala b/async-http-client-backend/monix/src/test/scala/sttp/client4/asynchttpclient/monix/AsyncHttpClientMonixHttpTest.scala index 5fe77cc5e5..3b5bc826db 100644 --- a/async-http-client-backend/monix/src/test/scala/sttp/client4/asynchttpclient/monix/AsyncHttpClientMonixHttpTest.scala +++ b/async-http-client-backend/monix/src/test/scala/sttp/client4/asynchttpclient/monix/AsyncHttpClientMonixHttpTest.scala @@ -23,4 +23,5 @@ class AsyncHttpClientMonixHttpTest extends HttpTest[Task] { override def throwsExceptionOnUnsupportedEncoding = false override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false } diff --git a/async-http-client-backend/scalaz/src/test/scala/sttp/client4/asynchttpclient/scalaz/AsyncHttpClientScalazHttpTest.scala b/async-http-client-backend/scalaz/src/test/scala/sttp/client4/asynchttpclient/scalaz/AsyncHttpClientScalazHttpTest.scala index 593ed9dd7f..b85a22c713 100644 --- a/async-http-client-backend/scalaz/src/test/scala/sttp/client4/asynchttpclient/scalaz/AsyncHttpClientScalazHttpTest.scala +++ b/async-http-client-backend/scalaz/src/test/scala/sttp/client4/asynchttpclient/scalaz/AsyncHttpClientScalazHttpTest.scala @@ -15,4 +15,5 @@ class AsyncHttpClientScalazHttpTest extends HttpTest[Task] { override def supportsCancellation: Boolean = false override def timeoutToNone[T](t: Task[T], timeoutMillis: Int): Task[Option[T]] = t.map(Some(_)) override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false } diff --git a/async-http-client-backend/zio/src/test/scala/sttp/client4/asynchttpclient/zio/AsyncHttpClientZioHttpTest.scala b/async-http-client-backend/zio/src/test/scala/sttp/client4/asynchttpclient/zio/AsyncHttpClientZioHttpTest.scala index bb6abb9ec6..f91aefdbe1 100644 --- a/async-http-client-backend/zio/src/test/scala/sttp/client4/asynchttpclient/zio/AsyncHttpClientZioHttpTest.scala +++ b/async-http-client-backend/zio/src/test/scala/sttp/client4/asynchttpclient/zio/AsyncHttpClientZioHttpTest.scala @@ -13,6 +13,7 @@ class AsyncHttpClientZioHttpTest extends HttpTest[Task] with ZioTestBase { override def throwsExceptionOnUnsupportedEncoding = false override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false "throw an exception instead of ZIO defect if the header value is invalid" in { diff --git a/async-http-client-backend/zio1/src/test/scala/sttp/client4/asynchttpclient/zio/AsyncHttpClientZioHttpTest.scala b/async-http-client-backend/zio1/src/test/scala/sttp/client4/asynchttpclient/zio/AsyncHttpClientZioHttpTest.scala index 9cc03dc193..afc863f4fa 100644 --- a/async-http-client-backend/zio1/src/test/scala/sttp/client4/asynchttpclient/zio/AsyncHttpClientZioHttpTest.scala +++ b/async-http-client-backend/zio1/src/test/scala/sttp/client4/asynchttpclient/zio/AsyncHttpClientZioHttpTest.scala @@ -13,6 +13,7 @@ class AsyncHttpClientZioHttpTest extends HttpTest[Task] with ZioTestBase { override def throwsExceptionOnUnsupportedEncoding = false override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false "throw an exception instead of ZIO defect if the header value is invalid" in { diff --git a/core/src/main/scala/sttp/client4/ResponseAs.scala b/core/src/main/scala/sttp/client4/ResponseAs.scala index 484a25855b..fa3a6bbfbc 100644 --- a/core/src/main/scala/sttp/client4/ResponseAs.scala +++ b/core/src/main/scala/sttp/client4/ResponseAs.scala @@ -6,6 +6,7 @@ import sttp.model.ResponseMetadata import sttp.model.internal.Rfc3986 import sttp.ws.{WebSocket, WebSocketFrame} +import java.io.InputStream import scala.collection.immutable.Seq import scala.util.{Failure, Success, Try} @@ -299,6 +300,13 @@ object ResponseAsStreamUnsafe { def apply[S](s: Streams[S]): GenericResponseAs[s.BinaryStream, S] = new ResponseAsStreamUnsafe(s) } +case class ResponseAsInputStream[T](f: InputStream => T) extends GenericResponseAs[T, Any] { + override def show: String = s"as input stream" +} +case object ResponseAsInputStreamUnsafe extends GenericResponseAs[InputStream, Any] { + override def show: String = s"as input stream unsafe" +} + case class ResponseAsFile(output: SttpFile) extends GenericResponseAs[SttpFile, Any] { override def show: String = s"as file: ${output.name}" } diff --git a/core/src/main/scala/sttp/client4/internal/BodyFromResponseAs.scala b/core/src/main/scala/sttp/client4/internal/BodyFromResponseAs.scala index 17f3af1d06..8b570a8cc6 100644 --- a/core/src/main/scala/sttp/client4/internal/BodyFromResponseAs.scala +++ b/core/src/main/scala/sttp/client4/internal/BodyFromResponseAs.scala @@ -6,6 +6,8 @@ import sttp.model.ResponseMetadata import sttp.monad.MonadError import sttp.monad.syntax._ +import java.io.InputStream + abstract class BodyFromResponseAs[F[_], RegularResponse, WSResponse, Stream](implicit m: MonadError[F]) { def apply[T]( responseAs: ResponseAsDelegate[T, _], @@ -58,6 +60,13 @@ abstract class BodyFromResponseAs[F[_], RegularResponse, WSResponse, Stream](imp (stream.asInstanceOf[T], nonReplayableBody) } + case (ResponseAsInputStream(f), Left(regular)) => + regularAsInputStream(regular) + .flatMap(w => m.eval(f(w)).ensure(m.eval(w.close()))) + .map(t => (t, nonReplayableBody)) + case (ResponseAsInputStreamUnsafe, Left(regular)) => + regularAsInputStream(regular).map(w => (w, nonReplayableBody)) + case (ResponseAsFile(file), Left(regular)) => regularAsFile(regular, file).map(f => (f, replayableBody(f))) @@ -82,6 +91,8 @@ abstract class BodyFromResponseAs[F[_], RegularResponse, WSResponse, Stream](imp protected def regularAsByteArray(response: RegularResponse): F[Array[Byte]] protected def regularAsFile(response: RegularResponse, file: SttpFile): F[SttpFile] protected def regularAsStream(response: RegularResponse): F[(Stream, () => F[Unit])] + protected def regularAsInputStream(response: RegularResponse): F[InputStream] = + throw new UnsupportedOperationException("Responses as a java.io.InputStream are not supported") protected def handleWS[T](responseAs: GenericWebSocketResponseAs[T, _], meta: ResponseMetadata, ws: WSResponse): F[T] protected def cleanupWhenNotAWebSocket(response: RegularResponse, e: NotAWebSocketException): F[Unit] protected def cleanupWhenGotWebSocket(response: WSResponse, e: GotAWebSocketException): F[Unit] diff --git a/core/src/main/scala/sttp/client4/monad/MapEffect.scala b/core/src/main/scala/sttp/client4/monad/MapEffect.scala index 579fd0bd36..d2d73179da 100644 --- a/core/src/main/scala/sttp/client4/monad/MapEffect.scala +++ b/core/src/main/scala/sttp/client4/monad/MapEffect.scala @@ -59,6 +59,8 @@ object MapEffect { ResponseAsStream(s)((s, m) => fk(f.asInstanceOf[(Any, ResponseMetadata) => F[Any]](s, m))) case rasu: ResponseAsStreamUnsafe[_, _] => rasu case ResponseAsFile(output) => ResponseAsFile(output) + case ResponseAsInputStream(f) => ResponseAsInputStream(f) + case ResponseAsInputStreamUnsafe => ResponseAsInputStreamUnsafe case ResponseAsWebSocket(f) => ResponseAsWebSocket((wg: WebSocket[G], m: ResponseMetadata) => fk(f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[Any]](apply[G, F](wg, gk, fm), m)) diff --git a/core/src/main/scala/sttp/client4/testing/AbstractBackendStub.scala b/core/src/main/scala/sttp/client4/testing/AbstractBackendStub.scala index 2ddbedcd17..60402ebc6e 100644 --- a/core/src/main/scala/sttp/client4/testing/AbstractBackendStub.scala +++ b/core/src/main/scala/sttp/client4/testing/AbstractBackendStub.scala @@ -1,6 +1,6 @@ package sttp.client4.testing -import java.io.InputStream +import java.io.{ByteArrayInputStream, InputStream} import sttp.capabilities.Effect import sttp.client4.internal._ import sttp.client4.testing.AbstractBackendStub._ @@ -121,7 +121,15 @@ object AbstractBackendStub { ra: GenericResponseAs[T, _], b: U, meta: ResponseMetadata - )(implicit monad: MonadError[F]): Option[F[T]] = + )(implicit monad: MonadError[F]): Option[F[T]] = { + def bAsInputStream = b match { + case s: String => Some(new ByteArrayInputStream(s.getBytes(Utf8))) + case a: Array[Byte] => Some(new ByteArrayInputStream(a)) + case is: InputStream => Some(is) + case () => Some(new ByteArrayInputStream(new Array[Byte](0))) + case _ => None + } + ra match { case IgnoreResponse => Some(().unit.asInstanceOf[F[T]]) case ResponseAsByteArray => @@ -142,6 +150,8 @@ object AbstractBackendStub { case RawStream(s) => Some(s.unit.asInstanceOf[F[T]]) case _ => None } + case ResponseAsInputStream(f) => bAsInputStream.map(f).map(_.unit.asInstanceOf[F[T]]) + case ResponseAsInputStreamUnsafe => bAsInputStream.map(_.unit.asInstanceOf[F[T]]) case ResponseAsFile(_) => b match { case f: SttpFile => Some(f.unit.asInstanceOf[F[T]]) @@ -172,4 +182,5 @@ object AbstractBackendStub { } } } + } } diff --git a/core/src/main/scalajvm/sttp/client4/SttpExtensions.scala b/core/src/main/scalajvm/sttp/client4/SttpExtensions.scala index 09062e4bb9..cc54286080 100644 --- a/core/src/main/scalajvm/sttp/client4/SttpExtensions.scala +++ b/core/src/main/scalajvm/sttp/client4/SttpExtensions.scala @@ -1,12 +1,45 @@ package sttp.client4 -import java.io.File +import java.io.{File, InputStream} import java.nio.file.Path - import sttp.client4.internal.SttpFile import sttp.model.{Part, StatusCode} trait SttpExtensions { + + /** Specify that the body should be passed as an input stream to the given function `f`. After the function completes, + * the input stream is always closed. + * + * If the response code is not successful, the body is returned as a `String`. + * + * '''Warning:''' this type of responses is supported only by some backends on the JVM. + */ + def asInputStream[T](f: InputStream => T): ResponseAs[Either[String, T]] = + asEither(asStringAlways, asInputStreamAlways(f)) + + /** Specify that the body should be passed as an input stream to the given function `f`. After the function completes, + * the input stream is always closed. + * + * '''Warning:''' this type of responses is supported only by some backends on the JVM. + */ + def asInputStreamAlways[T](f: InputStream => T): ResponseAs[T] = new ResponseAs(ResponseAsInputStream(f)) + + /** Specify that the body should be returned as an input stream. It is the responsibility of the user to properly + * close the stream. + * + * If the response code is not successful, the body is returned as a `String`. + * + * '''Warning:''' this type of responses is supported only by some backends on the JVM. + */ + def asInputStreamUnsafe: ResponseAs[Either[String, InputStream]] = asEither(asStringAlways, asInputStreamAlwaysUnsafe) + + /** Specify that the body should be returned as an input stream. It is the responsibility of the user to properly + * close the stream. + * + * '''Warning:''' this type of responses is supported only by some backends on the JVM. + */ + def asInputStreamAlwaysUnsafe: ResponseAs[InputStream] = new ResponseAs(ResponseAsInputStreamUnsafe) + def asFile(file: File): ResponseAs[Either[String, File]] = asEither(asStringAlways, asFileAlways(file)) def asFileAlways(file: File): ResponseAs[File] = diff --git a/core/src/main/scalajvm/sttp/client4/httpurlconnection/HttpURLConnectionBackend.scala b/core/src/main/scalajvm/sttp/client4/httpurlconnection/HttpURLConnectionBackend.scala index a634f983bb..2b19179eb5 100644 --- a/core/src/main/scalajvm/sttp/client4/httpurlconnection/HttpURLConnectionBackend.scala +++ b/core/src/main/scalajvm/sttp/client4/httpurlconnection/HttpURLConnectionBackend.scala @@ -276,6 +276,7 @@ class HttpURLConnectionBackend private ( } override protected def regularAsStream(response: InputStream): (Nothing, () => Identity[Unit]) = throw new IllegalStateException() + override protected def regularAsInputStream(response: InputStream): Identity[InputStream] = response override protected def handleWS[T]( responseAs: GenericWebSocketResponseAs[T, _], meta: ResponseMetadata, diff --git a/core/src/main/scalajvm/sttp/client4/internal/httpclient/InputStreamBodyFromHttpClient.scala b/core/src/main/scalajvm/sttp/client4/internal/httpclient/InputStreamBodyFromHttpClient.scala index 711ec3f52b..2fa5e9634b 100644 --- a/core/src/main/scalajvm/sttp/client4/internal/httpclient/InputStreamBodyFromHttpClient.scala +++ b/core/src/main/scalajvm/sttp/client4/internal/httpclient/InputStreamBodyFromHttpClient.scala @@ -41,6 +41,8 @@ private[client4] trait InputStreamBodyFromHttpClient[F[_], S] extends BodyFromHt override protected def regularAsStream(response: InputStream): F[(streams.BinaryStream, () => F[Unit])] = inputStreamToStream(response) + override protected def regularAsInputStream(response: InputStream): F[InputStream] = monad.unit(response) + override protected def handleWS[T]( responseAs: GenericWebSocketResponseAs[T, _], meta: ResponseMetadata, diff --git a/core/src/test/scalajvm/sttp/client4/testing/HttpTestExtensions.scala b/core/src/test/scalajvm/sttp/client4/testing/HttpTestExtensions.scala index efee989884..09a0f223a9 100644 --- a/core/src/test/scalajvm/sttp/client4/testing/HttpTestExtensions.scala +++ b/core/src/test/scalajvm/sttp/client4/testing/HttpTestExtensions.scala @@ -14,6 +14,33 @@ import sttp.client4.wrappers.{DigestAuthenticationBackend, FollowRedirectsBacken import sttp.model.headers.CookieWithMeta trait HttpTestExtensions[F[_]] extends AsyncFreeSpecLike { self: HttpTest[F] => + protected def supportsResponseAsInputStream = true + + "parse response" - { + if (supportsResponseAsInputStream) { + "as input stream" in { + postEcho.body(testBody).response(asInputStreamAlways(_.readAllBytes())).send(backend).toFuture().map { + response => + val allBytes = response.body + val fc = new String(allBytes, "UTF-8") + fc should be(expectedPostEchoResponse) + } + } + + "as input stream unsafe" in { + postEcho.body(testBody).response(asInputStreamAlwaysUnsafe).send(backend).toFuture().map { response => + try { + val allBytes = response.body.readAllBytes() + val fc = new String(allBytes, "UTF-8") + fc should be(expectedPostEchoResponse) + } finally { + response.body.close() + } + } + } + } + } + "cookies" - { "read response cookies" in { basicRequest diff --git a/docs/backends/summary.md b/docs/backends/summary.md index 66faa657d6..19f6a90c6f 100644 --- a/docs/backends/summary.md +++ b/docs/backends/summary.md @@ -29,15 +29,15 @@ Below is a summary of all the JVM backends; see the sections on individual backe ==================================== ================================ ================================================= ========================== =================== Class Effect type Supported stream type Supports websockets Fully non-blocking ==================================== ================================ ================================================= ========================== =================== -``DefaultSyncBackend`` None (``Identity``) n/a yes (regular) no -``HttpClientSyncBackend`` None (``Identity``) n/a yes (regular) no -``DefaultFutureBackend`` ``scala.concurrent.Future`` n/a yes (regular) no -``HttpClientFutureBackend`` ``scala.concurrent.Future`` n/a yes (regular) no +``DefaultSyncBackend`` None (``Identity``) ``java.io.InputStream`` (blocking) yes (regular) no +``HttpClientSyncBackend`` None (``Identity``) ``java.io.InputStream`` (blocking) yes (regular) no +``DefaultFutureBackend`` ``scala.concurrent.Future`` ``java.io.InputStream`` (blocking) yes (regular) no +``HttpClientFutureBackend`` ``scala.concurrent.Future`` ``java.io.InputStream`` (blocking) yes (regular) no ``HttpClientMonixBackend`` ``monix.eval.Task`` ``monix.reactive.Observable[ByteBuffer]`` yes (regular & streaming) yes ``HttpClientFs2Backend`` ``F[_]: cats.effect.Concurrent`` ``fs2.Stream[F, Byte]`` yes (regular & streaming) yes ``HttpClientZioBackend`` ``zio.Task`` ``zio.stream.Stream[Throwable, Byte]`` yes (regular & streaming) yes -``HttpURLConnectionBackend`` None (``Identity``) n/a no no -``TryHttpURLConnectionBackend`` ``scala.util.Try`` n/a no no +``HttpURLConnectionBackend`` None (``Identity``) ``java.io.InputStream`` (blocking) no no +``TryHttpURLConnectionBackend`` ``scala.util.Try`` ``java.io.InputStream`` (blocking) no no ``AkkaHttpBackend`` ``scala.concurrent.Future`` ``akka.stream.scaladsl.Source[ByteString, Any]`` yes (regular & streaming) yes ``ArmeriaFutureBackend`` ``scala.concurrent.Future`` n/a no yes ``ArmeriaScalazBackend`` ``scalaz.concurrent.Task`` n/a no yes @@ -45,8 +45,8 @@ Class Effect type Supported ``ArmeriaMonixBackend`` ``monix.eval.Task`` ``monix.reactive.Observable[HttpData]`` no yes ``ArmeriaCatsBackend`` ``F[_]: cats.effect.Concurrent`` n/a no yes ``ArmeriaFs2Backend`` ``F[_]: cats.effect.Concurrent`` ``fs2.Stream[F, Byte]`` no yes -``OkHttpSyncBackend`` None (``Identity``) n/a yes (regular) no -``OkHttpFutureBackend`` ``scala.concurrent.Future`` n/a yes (regular) no +``OkHttpSyncBackend`` None (``Identity``) ``java.io.InputStream`` (blocking) yes (regular) no +``OkHttpFutureBackend`` ``scala.concurrent.Future`` ``java.io.InputStream`` (blocking) yes (regular) no ``OkHttpMonixBackend`` ``monix.eval.Task`` ``monix.reactive.Observable[ByteBuffer]`` yes (regular & streaming) no ``Http4sBackend`` ``F[_]: cats.effect.Effect`` ``fs2.Stream[F, Byte]`` no no ``FinagleBackend`` ``com.twitter.util.Future`` n/a no no @@ -121,4 +121,4 @@ Depending on the capabilities that a backend supports, the exact backend type di Each backend type extends `GenericBackend` has two type parameters: * `F[_]`, the type constructor used to represent side effects. That is, when you invoke `send(backend)` on a request description, do you get a `Response[_]` directly, or is it wrapped in a `Future` or a `Task`? -* `P`, the capabilities supported by the backend, in addition to `Effect[F]`. If `Any`, no additional capabilities are provided. Might include `Streams` (the ability to send and receive streaming bodies) and `WebSockets` (the ability to handle websocket requests). \ No newline at end of file +* `P`, the capabilities supported by the backend, in addition to `Effect[F]`. If `Any`, no additional capabilities are provided. Might include `Streams` (the ability to send and receive streaming bodies) and `WebSockets` (the ability to handle websocket requests). diff --git a/docs/responses/body.md b/docs/responses/body.md index 704e89d105..9a9c35192a 100644 --- a/docs/responses/body.md +++ b/docs/responses/body.md @@ -182,7 +182,15 @@ basicRequest ## Streaming -If the backend used supports streaming (see [backends summary](../backends/summary.md)), it's possible to receive responses as a stream. This can be described using the following methods: +### Blocking streaming (InputStream) + +Some backends on the JVM support receiving the response body as a `java.io.InputStream`. This is possible either using the safe `asInputStream(f)` specification, where the entire stream has to be consumed by the provided `f` function, and is then closed by sttp client. Alternatively, there's `asInputStreamUnsafe`, which returns the stream directly to the user, who is then responsible for closing it. + +`InputStream`s have two major limitations. First, they operate on the relatively low `byte`-level. The consumer is responsible for any decoding, chunking etc. Moreover, all `InputStream` operations are blocking, hence using them in a non-virtual-threads environment may severely limit performance. If you're using a functional effect system, see below on how to use non-blocking streams instead. + +### Non-blocking streaming + +If the backend used supports non-blocking streaming (see "Supported stream type" in the [backends summary](../backends/summary.md)), it's possible to receive responses as a stream. This can be described using the following methods: ```scala mdoc:compile-only import sttp.capabilities.{Effect, Streams} diff --git a/effects/fs2-ce2/src/test/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2HttpTest.scala b/effects/fs2-ce2/src/test/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2HttpTest.scala index 7db8b5b82f..a438f1611d 100644 --- a/effects/fs2-ce2/src/test/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2HttpTest.scala +++ b/effects/fs2-ce2/src/test/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2HttpTest.scala @@ -5,4 +5,5 @@ import sttp.client4.testing.HttpTest class HttpClientFs2HttpTest extends HttpTest[IO] with HttpClientFs2TestBase { override def supportsHostHeaderOverride = false + override def supportsResponseAsInputStream = false } diff --git a/effects/fs2/src/test/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2HttpTest.scala b/effects/fs2/src/test/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2HttpTest.scala index 7db8b5b82f..a438f1611d 100644 --- a/effects/fs2/src/test/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2HttpTest.scala +++ b/effects/fs2/src/test/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2HttpTest.scala @@ -5,4 +5,5 @@ import sttp.client4.testing.HttpTest class HttpClientFs2HttpTest extends HttpTest[IO] with HttpClientFs2TestBase { override def supportsHostHeaderOverride = false + override def supportsResponseAsInputStream = false } diff --git a/effects/monix/src/test/scalajvm/sttp/client4/impl/monix/HttpClientMonixHttpTest.scala b/effects/monix/src/test/scalajvm/sttp/client4/impl/monix/HttpClientMonixHttpTest.scala index f75b7da76d..6ede9df44d 100644 --- a/effects/monix/src/test/scalajvm/sttp/client4/impl/monix/HttpClientMonixHttpTest.scala +++ b/effects/monix/src/test/scalajvm/sttp/client4/impl/monix/HttpClientMonixHttpTest.scala @@ -15,6 +15,7 @@ class HttpClientMonixHttpTest extends HttpTest[Task] { override def supportsHostHeaderOverride = false override def supportsDeflateWrapperChecking = false + override def supportsResponseAsInputStream = false override def timeoutToNone[T](t: Task[T], timeoutMillis: Int): Task[Option[T]] = t.map(Some(_)) diff --git a/effects/zio/src/test/scalajvm/sttp/client4/httpclient/zio/HttpClientZioHttpTest.scala b/effects/zio/src/test/scalajvm/sttp/client4/httpclient/zio/HttpClientZioHttpTest.scala index 589bc74e1e..386cf92b1b 100644 --- a/effects/zio/src/test/scalajvm/sttp/client4/httpclient/zio/HttpClientZioHttpTest.scala +++ b/effects/zio/src/test/scalajvm/sttp/client4/httpclient/zio/HttpClientZioHttpTest.scala @@ -11,6 +11,7 @@ class HttpClientZioHttpTest extends HttpTest[Task] with ZioTestBase { override implicit val convertToFuture: ConvertToFuture[Task] = convertZioTaskToFuture override def supportsHostHeaderOverride = false + override def supportsResponseAsInputStream = false "throw an exception instead of ZIO defect if the header value is invalid" in { diff --git a/effects/zio1/src/test/scalajvm/sttp/client4/httpclient/zio/HttpClientZioHttpTest.scala b/effects/zio1/src/test/scalajvm/sttp/client4/httpclient/zio/HttpClientZioHttpTest.scala index 977f6b87da..4b93afccc7 100644 --- a/effects/zio1/src/test/scalajvm/sttp/client4/httpclient/zio/HttpClientZioHttpTest.scala +++ b/effects/zio1/src/test/scalajvm/sttp/client4/httpclient/zio/HttpClientZioHttpTest.scala @@ -11,6 +11,7 @@ class HttpClientZioHttpTest extends HttpTest[Task] with ZioTestBase { override implicit val convertToFuture: ConvertToFuture[Task] = convertZioTaskToFuture override def supportsHostHeaderOverride = false + override def supportsResponseAsInputStream = false "compile" - { "SttpClient usage" in { diff --git a/finagle-backend/src/test/scala/sttp/client4/finagle/FinagleBackendTest.scala b/finagle-backend/src/test/scala/sttp/client4/finagle/FinagleBackendTest.scala index dde7f9895b..bf8b530b63 100644 --- a/finagle-backend/src/test/scala/sttp/client4/finagle/FinagleBackendTest.scala +++ b/finagle-backend/src/test/scala/sttp/client4/finagle/FinagleBackendTest.scala @@ -22,6 +22,7 @@ class FinagleBackendTest extends HttpTest[TFuture] { override def throwsExceptionOnUnsupportedEncoding = false override def supportsCustomMultipartContentType = false override def supportsAutoDecompressionDisabling = false + override def supportsResponseAsInputStream = false override def supportsCancellation: Boolean = false override def timeoutToNone[T](t: TFuture[T], timeoutMillis: Int): TFuture[Option[T]] = t.map(Some(_)) diff --git a/http4s-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala b/http4s-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala index 4bd63a4d93..f055af3c12 100644 --- a/http4s-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala +++ b/http4s-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala @@ -16,4 +16,5 @@ class Http4sHttpTest extends HttpTest[IO] with CatsRetryTest with CatsTestBase { override protected def supportsRequestTimeout = false override protected def supportsCustomMultipartContentType = false + override protected def supportsResponseAsInputStream = false } diff --git a/http4s-ce2-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala b/http4s-ce2-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala index 0c4acd037f..92c06ffffc 100644 --- a/http4s-ce2-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala +++ b/http4s-ce2-backend/src/test/scala/sttp/client4/http4s/Http4sHttpTest.scala @@ -16,4 +16,5 @@ class Http4sHttpTest extends HttpTest[IO] with CatsTestBase { override protected def supportsRequestTimeout = false override protected def supportsCustomMultipartContentType = false + override protected def supportsResponseAsInputStream = false } diff --git a/okhttp-backend/src/main/scala/sttp/client4/okhttp/BodyFromOkHttp.scala b/okhttp-backend/src/main/scala/sttp/client4/okhttp/BodyFromOkHttp.scala index 6ce96ca08b..7003add819 100644 --- a/okhttp-backend/src/main/scala/sttp/client4/okhttp/BodyFromOkHttp.scala +++ b/okhttp-backend/src/main/scala/sttp/client4/okhttp/BodyFromOkHttp.scala @@ -67,6 +67,8 @@ private[okhttp] trait BodyFromOkHttp[F[_], S] { override protected def regularAsStream(response: InputStream): F[(streams.BinaryStream, () => F[Unit])] = monad.eval((responseBodyToStream(response), () => monad.eval(response.close()))) + override protected def regularAsInputStream(response: InputStream): F[InputStream] = monad.unit(response) + override protected def handleWS[T]( responseAs: GenericWebSocketResponseAs[T, _], meta: ResponseMetadata, diff --git a/pekko-http-backend/src/test/scala/sttp/client4/pekkohttp/PekkoHttpClientHttpTest.scala b/pekko-http-backend/src/test/scala/sttp/client4/pekkohttp/PekkoHttpClientHttpTest.scala index f27d62c2d7..54ec9a75e0 100644 --- a/pekko-http-backend/src/test/scala/sttp/client4/pekkohttp/PekkoHttpClientHttpTest.scala +++ b/pekko-http-backend/src/test/scala/sttp/client4/pekkohttp/PekkoHttpClientHttpTest.scala @@ -10,5 +10,6 @@ class PekkoHttpClientHttpTest extends HttpTest[Future] { override implicit val convertToFuture: ConvertToFuture[Future] = ConvertToFuture.future override def supportsCancellation: Boolean = false + override def supportsResponseAsInputStream: Boolean = false override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_)) }