Skip to content

Commit 4c927b5

Browse files
authored
Add FetchFs2Backend (#2726)
Supports websocket and sse streaming. Heavily based of the FetchZioBackend. Before submitting pull request: - [X] Check if the project compiles by running `sbt compile` - [X] Verify docs compilation by running `sbt compileDocs` - [X] Check if tests pass by running `sbt test` - [X] Format code by running `sbt scalafmt`
1 parent 558f655 commit 4c927b5

File tree

7 files changed

+223
-2
lines changed

7 files changed

+223
-2
lines changed

build.sbt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,10 @@ lazy val fs2 = (projectMatrix in file("effects/fs2"))
459459
)
460460
)
461461
)
462-
.jsPlatform(scalaVersions = scala2And3, settings = commonJsSettings)
462+
.jsPlatform(
463+
scalaVersions = scala2And3,
464+
settings = commonJsSettings ++ commonJsBackendSettings ++ browserChromeTestSettings ++ testServerSettings
465+
)
463466

464467
lazy val monix = (projectMatrix in file("effects/monix"))
465468
.settings(

docs/backends/fs2.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ This backend is built on top of [Armeria](https://armeria.dev/docs/client-http).
126126
Armeria's [ClientFactory](https://armeria.dev/docs/client-factory) manages connections and protocol-specific properties.
127127
Please visit [the official documentation](https://armeria.dev/docs/client-factory) to learn how to configure it.
128128

129+
## Using JavaScript
130+
131+
The Fs2 backend is also available for the JS platform, see [the `FetchBackend` documentation](javascript/fetch.md).
132+
The `FetchFs2Backend` companion object contains methods to create the backend directly.
133+
129134
## Streaming
130135

131136
The fs2 backends support streaming for any instance of the `cats.effect.Effect` typeclass, such as `cats.effect.IO`. If `IO` is used then the type of supported streams is `fs2.Stream[IO, Byte]`. The streams capability is represented as `sttp.client4.fs2.Fs2Streams`.

docs/backends/javascript/fetch.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,21 @@ And create the backend instance:
7070
val backend = FetchCatsBackend[IO]()
7171
```
7272

73+
## fs2-based
74+
75+
Any effect implementing the cats-effect `Async` typeclass can be used. To use, add the following dependency to
76+
your project:
77+
78+
```
79+
"com.softwaremill.sttp.client4" %%% "fs2" % "@VERSION@"
80+
```
81+
82+
And create the backend instance:
83+
84+
```scala
85+
val backend = FetchFs2Backend[IO]()
86+
```
87+
7388
## Node.js
7489

7590
### CommonJS module
@@ -161,4 +176,4 @@ The backend supports both regular and streaming [websockets](../../other/websock
161176

162177
## Server-sent events
163178

164-
Received data streams can be parsed to a stream of server-sent events (SSE), when using the [Monix](../monix.md) and [ZIO](../zio.md) variants - the respective documentation pages contain appropriate SSE examples.
179+
Received data streams can be parsed to a stream of server-sent events (SSE), when using the [Monix](../monix.md), [ZIO](../zio.md), or [Fs2](../fs2.md) variants - the respective documentation pages contain appropriate SSE examples.
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package sttp.client4.impl.fs2
2+
3+
import scala.scalajs.js.{Function1, Thenable, |}
4+
import sttp.client4.testing.WebSocketStreamBackendStub
5+
import fs2.Stream
6+
import cats.syntax.all._
7+
import cats.effect.syntax.all._
8+
import sttp.client4.fetch.FetchOptions
9+
import sttp.client4.fetch.AbstractFetchBackend
10+
import sttp.capabilities.fs2.Fs2Streams
11+
import cats.effect.kernel.Async
12+
import sttp.client4.impl.cats.CatsMonadAsyncError
13+
import sttp.ws.WebSocketFrame.Data
14+
import sttp.ws.WebSocket
15+
import sttp.ws.WebSocketFrame
16+
import scala.scalajs.js
17+
import sttp.client4.internal.ConvertFromFuture
18+
import scala.concurrent.Future
19+
import sttp.client4.WebSocketStreamBackend
20+
import scala.scalajs.js.typedarray._
21+
import fs2.Chunk
22+
import org.scalajs.dom.Request
23+
import org.scalajs.dom.Response
24+
import org.scalajs.dom.BodyInit
25+
26+
class FetchFs2Backend[F[_]: Async] private (fetchOptions: FetchOptions, customizeRequest: Request => Request)
27+
extends AbstractFetchBackend[F, Fs2Streams[F]](fetchOptions, customizeRequest, new CatsMonadAsyncError)
28+
with WebSocketStreamBackend[F, Fs2Streams[F]] {
29+
30+
override val streams: Fs2Streams[F] = Fs2Streams[F]
31+
32+
override protected def addCancelTimeoutHook[T](result: F[T], cancel: () => Unit, cleanup: () => Unit): F[T] = {
33+
result
34+
.onCancel(Async[F].delay(cancel()).voidError)
35+
.guarantee(
36+
Async[F].delay(cleanup()).voidError
37+
)
38+
}
39+
40+
override protected def compileWebSocketPipe(
41+
ws: WebSocket[F],
42+
pipe: streams.Pipe[Data[_], WebSocketFrame]
43+
): F[Unit] = {
44+
Fs2WebSockets.handleThroughPipe[F](ws)(pipe)
45+
}
46+
47+
override protected def handleResponseAsStream(response: Response): F[(streams.BinaryStream, () => F[Unit])] = {
48+
Async[F].delay {
49+
lazy val reader = response.body.getReader()
50+
def read() = Async[F].fromFuture(Async[F].delay(reader.read().toFuture))
51+
52+
val cancel = Async[F].async[Unit] { callback =>
53+
Async[F]
54+
.delay {
55+
// copied from https://github.com/zio/zio/blob/fca6870f42b1d2b06ebf0e6c13b975bccea72f13/core/js/src/main/scala/zio/ZIOPlatformSpecific.scala#L58
56+
val onFulfilled: Function1[Unit, Unit | Thenable[Unit]] = new scala.Function1[Unit, Unit | Thenable[Unit]] {
57+
def apply(a: Unit): Unit | Thenable[Unit] = callback(Right(a))
58+
}
59+
val onRejected: Function1[Any, Unit | Thenable[Unit]] = new scala.Function1[Any, Unit | Thenable[Unit]] {
60+
def apply(e: Any): Unit | Thenable[Unit] =
61+
callback(Left(e match {
62+
case t: Throwable => t
63+
case _ => js.JavaScriptException(e)
64+
}))
65+
}
66+
reader.cancel("Response body reader cancelled").`then`[Unit](onFulfilled, js.defined(onRejected))
67+
}
68+
.as(None)
69+
}
70+
71+
val stream = Stream
72+
.unfoldChunkEval(()) { case () =>
73+
read()
74+
.map { chunk =>
75+
if (chunk.done) {
76+
Option.empty
77+
} else {
78+
val bytes = new Int8Array(chunk.value.buffer).toArray
79+
Option((Chunk.array(bytes), ()))
80+
}
81+
}
82+
}
83+
(stream.onComplete(Stream.exec(cancel.voidError)), () => cancel.voidError)
84+
85+
}
86+
}
87+
88+
override protected def handleStreamBody(s: streams.BinaryStream): F[js.UndefOr[BodyInit]] = {
89+
s.chunks
90+
.fold(Chunk.empty[Byte])(_ ++ _)
91+
.compile
92+
.last
93+
.map {
94+
case None => js.undefined
95+
case Some(res) => res.toArray.toTypedArray
96+
}
97+
}
98+
99+
override def convertFromFuture: ConvertFromFuture[F] = {
100+
new ConvertFromFuture[F] {
101+
override def apply[T](f: Future[T]): F[T] = Async[F].fromFuture(monad.unit(f))
102+
}
103+
}
104+
}
105+
106+
object FetchFs2Backend {
107+
108+
def apply[F[_]: Async](
109+
fetchOptions: FetchOptions = FetchOptions.Default,
110+
customizeRequest: Request => Request = identity
111+
): WebSocketStreamBackend[F, Fs2Streams[F]] = {
112+
new FetchFs2Backend[F](fetchOptions, customizeRequest)
113+
}
114+
115+
/** Create a stub backend for testing, which uses the given [[F]] response wrapper
116+
*
117+
* See [[WebSocketStreamBackendStub]] for details on how to configure stub responses.
118+
*/
119+
def stub[F[_]: Async]: WebSocketStreamBackendStub[F, Fs2Streams[F]] = WebSocketStreamBackendStub(
120+
new CatsMonadAsyncError[F]
121+
)
122+
123+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package sttp.client4.impl.fs2
2+
3+
import cats.effect.IO
4+
import sttp.capabilities.fs2.Fs2Streams
5+
import sttp.client4.StreamBackend
6+
import sttp.client4.testing.{AbstractFetchHttpTest, ConvertToFuture}
7+
import cats.effect.unsafe.implicits.global
8+
9+
class FetchFs2HttpTest extends AbstractFetchHttpTest[IO, Fs2Streams[IO]] {
10+
override implicit val convertToFuture: ConvertToFuture[IO] = sttp.client4.impl.cats.convertCatsIOToFuture()
11+
12+
override val backend: StreamBackend[IO, Fs2Streams[IO]] = FetchFs2Backend()
13+
14+
override protected def supportsCustomMultipartContentType = false
15+
16+
override protected def supportsCustomMultipartEncoding = false
17+
18+
override def timeoutToNone[T](t: IO[T], timeoutMillis: Int): IO[Option[T]] = super.timeoutToNone(t, timeoutMillis)
19+
20+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package sttp.client4.impl.fs2
2+
3+
import cats.effect.IO
4+
import sttp.capabilities.fs2.Fs2Streams
5+
import sttp.client4.StreamBackend
6+
import sttp.client4.testing.streaming.StreamingTest
7+
8+
class FetchFs2StreamingTest extends StreamingTest[IO, Fs2Streams[IO]] with Fs2StreamingTest {
9+
override val streams: Fs2Streams[IO] = Fs2Streams[IO]
10+
11+
override val backend: StreamBackend[IO, Fs2Streams[IO]] = FetchFs2Backend()
12+
13+
override protected def supportsStreamingMultipartParts: Boolean = false
14+
15+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package sttp.client4.impl.fs2
2+
3+
import sttp.client4.impl.cats.CatsTestBase
4+
import cats.effect.IO
5+
import fs2.Stream
6+
import sttp.capabilities.fs2.Fs2Streams
7+
import sttp.client4.WebSocketStreamBackend
8+
import sttp.client4.testing.websocket.{WebSocketStreamingTest, WebSocketTest}
9+
import sttp.ws.WebSocketFrame
10+
11+
import scala.concurrent.ExecutionContext
12+
import scala.scalajs.concurrent.JSExecutionContext.Implicits.queue
13+
14+
class FetchFs2WebSocketTest
15+
extends WebSocketTest[IO]
16+
with WebSocketStreamingTest[IO, Fs2Streams[IO]]
17+
with CatsTestBase {
18+
implicit override def executionContext: ExecutionContext = queue
19+
override def throwsWhenNotAWebSocket: Boolean = true
20+
override def supportsReadingWebSocketResponseHeaders: Boolean = false
21+
override def supportsReadingSubprotocolWebSocketResponseHeader: Boolean = false
22+
23+
override val backend: WebSocketStreamBackend[IO, Fs2Streams[IO]] = FetchFs2Backend()
24+
25+
override val streams: Fs2Streams[IO] = Fs2Streams[IO]
26+
27+
override def prepend(item: WebSocketFrame.Text)(
28+
to: streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame]
29+
): streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame] =
30+
to.andThen(rest => Stream(item) ++ rest)
31+
32+
override def fromTextPipe(function: String => WebSocketFrame): streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame] =
33+
Fs2WebSockets.fromTextPipe(function)
34+
35+
override def functionToPipe(
36+
f: WebSocketFrame.Data[_] => Option[WebSocketFrame]
37+
): streams.Pipe[WebSocketFrame.Data[_], WebSocketFrame] = _.map(f).collect { case Some(v) =>
38+
v
39+
}
40+
}

0 commit comments

Comments
 (0)