Skip to content

Commit c33a1ac

Browse files
committed
Add Lift{Kind,Value} instances for Stream
1 parent 3c06230 commit c33a1ac

File tree

3 files changed

+95
-8
lines changed

3 files changed

+95
-8
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,9 +300,11 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
300300
"org.scodec" %%% "scodec-bits" % "1.2.4",
301301
"org.typelevel" %%% "cats-core" % "2.13.0",
302302
"org.typelevel" %%% "cats-effect" % "3.7.0-RC1",
303+
"org.typelevel" %%% "cats-mtl" % "1.6.0",
303304
"org.typelevel" %%% "cats-effect-laws" % "3.7.0-RC1" % Test,
304305
"org.typelevel" %%% "cats-effect-testkit" % "3.7.0-RC1" % Test,
305306
"org.typelevel" %%% "cats-laws" % "2.13.0" % Test,
307+
"org.typelevel" %%% "cats-mtl-laws" % "1.6.0" % Test,
306308
"org.typelevel" %%% "discipline-munit" % "2.0.0" % Test,
307309
"org.typelevel" %%% "munit-cats-effect" % "2.2.0-RC1" % Test,
308310
"org.typelevel" %%% "scalacheck-effect-munit" % "2.1.0-RC1" % Test

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ import fs2.concurrent._
3939
import fs2.internal._
4040
import org.typelevel.scalaccompat.annotation._
4141
import Pull.StreamPullOps
42+
import cats.mtl.{LiftKind, LiftValue}
4243

43-
import java.util.concurrent.Flow.{Publisher, Processor, Subscriber}
44+
import java.util.concurrent.Flow.{Processor, Publisher, Subscriber}
4445

4546
/** A stream producing output of type `O` and which may evaluate `F` effects.
4647
*
@@ -5810,9 +5811,46 @@ object Stream extends StreamLowPriority {
58105811
new Defer[Stream[F, *]] {
58115812
override def defer[A](fa: => Stream[F, A]): Stream[F, A] = Stream.empty ++ fa
58125813
}
5814+
5815+
implicit def liftKindInstance[F[_]](implicit F: Applicative[F]): LiftKind[F, Stream[F, *]] =
5816+
liftKindImpl(F)
5817+
5818+
implicit def liftValueFromResourceInstance[F[_]](implicit
5819+
F: MonadCancel[F, ?]
5820+
): LiftValue[Resource[F, *], Stream[F, *]] =
5821+
liftValueFromResourceImpl(implicitly)
58135822
}
58145823

58155824
private[fs2] trait StreamLowPriority {
58165825
implicit def monadInstance[F[_]]: Monad[Stream[F, *]] =
58175826
new Stream.StreamMonad[F]
5827+
5828+
protected[this] def liftKindImpl[F[_]](F: Applicative[F]): LiftKind[F, Stream[F, *]] =
5829+
new LiftKind[F, Stream[F, *]] {
5830+
val applicativeF: Applicative[F] = F
5831+
val applicativeG: Applicative[Stream[F, *]] = monadInstance
5832+
def apply[A](fa: F[A]): Stream[F, A] = Stream.eval(fa)
5833+
def limitedMapK[A](ga: Stream[F, A])(scope: F ~> F): Stream[F, A] =
5834+
ga.translate(scope)
5835+
}
5836+
5837+
implicit def liftKindComposedInstance[F[_], G[_]](implicit
5838+
inner: LiftKind[F, G]
5839+
): LiftKind[F, Stream[G, *]] =
5840+
inner.andThen(liftKindImpl(inner.applicativeG))
5841+
5842+
protected[this] def liftValueFromResourceImpl[F[_]](
5843+
applicativeResource: Applicative[Resource[F, *]]
5844+
)(implicit F: MonadCancel[F, ?]): LiftValue[Resource[F, *], Stream[F, *]] =
5845+
new LiftValue[Resource[F, *], Stream[F, *]] {
5846+
val applicativeF: Applicative[Resource[F, *]] = applicativeResource
5847+
val applicativeG: Applicative[Stream[F, *]] = monadInstance
5848+
def apply[A](fa: Resource[F, A]): Stream[F, A] = Stream.resource(fa)
5849+
}
5850+
5851+
implicit def liftValueFromResourceComposedInstance[F[_], G[_]](implicit
5852+
inner: LiftValue[F, Resource[G, *]],
5853+
G: MonadCancel[G, ?]
5854+
): LiftValue[F, Stream[G, *]] =
5855+
inner.andThen(liftValueFromResourceImpl(inner.applicativeG))
58185856
}

core/shared/src/test/scala/fs2/StreamLawsSuite.scala

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,39 @@
2121

2222
package fs2
2323

24-
import cats.Eq
25-
import cats.effect.IO
24+
import cats.{Applicative, Eq, ~>}
25+
import cats.data.{IdT, OptionT}
26+
import cats.effect.{Concurrent, IO, Ref, Resource}
2627
import cats.effect.testkit.TestInstances
2728
import cats.laws.discipline._
2829
import cats.laws.discipline.arbitrary._
30+
import cats.mtl.LiftValue
31+
import cats.mtl.laws.discipline.{LiftKindTests, LiftValueTests}
32+
import org.scalacheck.{Arbitrary, Gen}
2933

3034
class StreamLawsSuite extends Fs2Suite with TestInstances {
3135
implicit val ticker: Ticker = Ticker()
3236

33-
implicit def eqStream[O: Eq]: Eq[Stream[IO, O]] =
34-
Eq.instance((x, y) =>
35-
Eq[IO[Vector[Either[Throwable, O]]]]
36-
.eqv(x.attempt.compile.toVector, y.attempt.compile.toVector)
37-
)
37+
implicit def eqStream[F[_], O](implicit
38+
F: Concurrent[F],
39+
eqFVecEitherThrowO: Eq[F[Vector[Either[Throwable, O]]]]
40+
): Eq[Stream[F, O]] =
41+
Eq.by((_: Stream[F, O]).attempt.compile.toVector)
42+
43+
private[this] val counter: IO[Ref[IO, Int]] = IO.ref(0)
44+
45+
implicit val arbitraryScope: Arbitrary[IO ~> IO] =
46+
Arbitrary {
47+
Gen.const {
48+
new (IO ~> IO) {
49+
def apply[A](fa: IO[A]): IO[A] =
50+
for {
51+
ref <- counter
52+
res <- ref.update(_ + 1) >> fa
53+
} yield res
54+
}
55+
}
56+
}
3857

3958
checkAll(
4059
"MonadError[Stream[F, *], Throwable]",
@@ -50,4 +69,32 @@ class StreamLawsSuite extends Fs2Suite with TestInstances {
5069
"Align[Stream[F, *]]",
5170
AlignTests[Stream[IO, *]].align[Int, Int, Int, Int]
5271
)
72+
checkAll(
73+
"LiftKind[IO, Stream[IO, *]",
74+
LiftKindTests[IO, Stream[IO, *]].liftKind[Int, Int]
75+
)
76+
checkAll(
77+
"LiftKind[IO, Stream[OptionT[IO, *], *]",
78+
LiftKindTests[IO, Stream[OptionT[IO, *], *]].liftKind[Int, Int]
79+
)
80+
checkAll(
81+
"LiftValue[Resource[IO, *], Stream[IO, *]",
82+
LiftValueTests[Resource[IO, *], Stream[IO, *]].liftValue[Int, Int]
83+
)
84+
locally {
85+
// this is a somewhat silly instance, but we need a
86+
// `LiftValue[X, Resource[IO, *]]` instance where `X` is not `IO` because
87+
// that already has a higher priority implicit instance
88+
implicit val liftIdTResource: LiftValue[IdT[IO, *], Resource[IO, *]] =
89+
new LiftValue[IdT[IO, *], Resource[IO, *]] {
90+
val applicativeF: Applicative[IdT[IO, *]] = implicitly
91+
val applicativeG: Applicative[Resource[IO, *]] = implicitly
92+
def apply[A](fa: IdT[IO, A]): Resource[IO, A] =
93+
Resource.eval(fa.value)
94+
}
95+
checkAll(
96+
"LiftValue[IdT[IO, *], Stream[IO, *]] via Resource[IO, *]",
97+
LiftValueTests[IdT[IO, *], Stream[IO, *]].liftValue[Int, Int]
98+
)
99+
}
53100
}

0 commit comments

Comments
 (0)