Skip to content

Commit affd6e5

Browse files
authored
Merge pull request #3594 from NthPortal/Stream-lifts/PR
Add `Lift{Kind,Value}` instances for `Stream`
2 parents 179dc20 + be33de8 commit affd6e5

File tree

3 files changed

+95
-8
lines changed

3 files changed

+95
-8
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,9 +389,11 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
389389
"org.scodec" %%% "scodec-bits" % "1.2.4",
390390
"org.typelevel" %%% "cats-core" % "2.13.0",
391391
"org.typelevel" %%% "cats-effect" % "3.7.0-RC1",
392+
"org.typelevel" %%% "cats-mtl" % "1.6.0",
392393
"org.typelevel" %%% "cats-effect-laws" % "3.7.0-RC1" % Test,
393394
"org.typelevel" %%% "cats-effect-testkit" % "3.7.0-RC1" % Test,
394395
"org.typelevel" %%% "cats-laws" % "2.13.0" % Test,
396+
"org.typelevel" %%% "cats-mtl-laws" % "1.6.0" % Test,
395397
"org.typelevel" %%% "discipline-munit" % "2.0.0" % Test,
396398
"org.typelevel" %%% "munit-cats-effect" % "2.2.0-RC1" % Test,
397399
"org.typelevel" %%% "scalacheck-effect-munit" % "2.1.0-RC1" % Test

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ import fs2.concurrent._
3939
import fs2.internal._
4040
import org.typelevel.scalaccompat.annotation._
4141
import Pull.StreamPullOps
42+
import cats.mtl.{LiftKind, LiftValue}
4243

43-
import java.util.concurrent.Flow.{Publisher, Processor, Subscriber}
44+
import java.util.concurrent.Flow.{Processor, Publisher, Subscriber}
4445

4546
/** A stream producing output of type `O` and which may evaluate `F` effects.
4647
*
@@ -5844,9 +5845,46 @@ object Stream extends StreamLowPriority {
58445845
new Defer[Stream[F, *]] {
58455846
override def defer[A](fa: => Stream[F, A]): Stream[F, A] = Stream.empty ++ fa
58465847
}
5848+
5849+
implicit def liftKindInstance[F[_]](implicit F: Applicative[F]): LiftKind[F, Stream[F, *]] =
5850+
liftKindImpl(F)
5851+
5852+
implicit def liftValueFromResourceInstance[F[_]](implicit
5853+
F: MonadCancel[F, ?]
5854+
): LiftValue[Resource[F, *], Stream[F, *]] =
5855+
liftValueFromResourceImpl(implicitly)
58475856
}
58485857

58495858
private[fs2] trait StreamLowPriority {
58505859
implicit def monadInstance[F[_]]: Monad[Stream[F, *]] =
58515860
new Stream.StreamMonad[F]
5861+
5862+
protected[this] def liftKindImpl[F[_]](F: Applicative[F]): LiftKind[F, Stream[F, *]] =
5863+
new LiftKind[F, Stream[F, *]] {
5864+
val applicativeF: Applicative[F] = F
5865+
val applicativeG: Applicative[Stream[F, *]] = monadInstance
5866+
def apply[A](fa: F[A]): Stream[F, A] = Stream.eval(fa)
5867+
def limitedMapK[A](ga: Stream[F, A])(scope: F ~> F): Stream[F, A] =
5868+
ga.translate(scope)
5869+
}
5870+
5871+
implicit def liftKindComposedInstance[F[_], G[_]](implicit
5872+
inner: LiftKind[F, G]
5873+
): LiftKind[F, Stream[G, *]] =
5874+
inner.andThen(liftKindImpl(inner.applicativeG))
5875+
5876+
protected[this] def liftValueFromResourceImpl[F[_]](
5877+
applicativeResource: Applicative[Resource[F, *]]
5878+
)(implicit F: MonadCancel[F, ?]): LiftValue[Resource[F, *], Stream[F, *]] =
5879+
new LiftValue[Resource[F, *], Stream[F, *]] {
5880+
val applicativeF: Applicative[Resource[F, *]] = applicativeResource
5881+
val applicativeG: Applicative[Stream[F, *]] = monadInstance
5882+
def apply[A](fa: Resource[F, A]): Stream[F, A] = Stream.resource(fa)
5883+
}
5884+
5885+
implicit def liftValueFromResourceComposedInstance[F[_], G[_]](implicit
5886+
inner: LiftValue[F, Resource[G, *]],
5887+
G: MonadCancel[G, ?]
5888+
): LiftValue[F, Stream[G, *]] =
5889+
inner.andThen(liftValueFromResourceImpl(inner.applicativeG))
58525890
}

core/shared/src/test/scala/fs2/StreamLawsSuite.scala

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,39 @@
2121

2222
package fs2
2323

24-
import cats.Eq
25-
import cats.effect.IO
24+
import cats.{Applicative, Eq, ~>}
25+
import cats.data.{IdT, OptionT}
26+
import cats.effect.{Concurrent, IO, Ref, Resource}
2627
import cats.effect.testkit.TestInstances
2728
import cats.laws.discipline._
2829
import cats.laws.discipline.arbitrary._
30+
import cats.mtl.LiftValue
31+
import cats.mtl.laws.discipline.{LiftKindTests, LiftValueTests}
32+
import org.scalacheck.{Arbitrary, Gen}
2933

3034
class StreamLawsSuite extends Fs2Suite with TestInstances {
3135
implicit val ticker: Ticker = Ticker()
3236

33-
implicit def eqStream[O: Eq]: Eq[Stream[IO, O]] =
34-
Eq.instance((x, y) =>
35-
Eq[IO[Vector[Either[Throwable, O]]]]
36-
.eqv(x.attempt.compile.toVector, y.attempt.compile.toVector)
37-
)
37+
implicit def eqStream[F[_], O](implicit
38+
F: Concurrent[F],
39+
eqFVecEitherThrowO: Eq[F[Vector[Either[Throwable, O]]]]
40+
): Eq[Stream[F, O]] =
41+
Eq.by((_: Stream[F, O]).attempt.compile.toVector)
42+
43+
private[this] val counter: IO[Ref[IO, Int]] = IO.ref(0)
44+
45+
implicit val arbitraryScope: Arbitrary[IO ~> IO] =
46+
Arbitrary {
47+
Gen.const {
48+
new (IO ~> IO) {
49+
def apply[A](fa: IO[A]): IO[A] =
50+
for {
51+
ref <- counter
52+
res <- ref.update(_ + 1) >> fa
53+
} yield res
54+
}
55+
}
56+
}
3857

3958
checkAll(
4059
"MonadError[Stream[F, *], Throwable]",
@@ -50,4 +69,32 @@ class StreamLawsSuite extends Fs2Suite with TestInstances {
5069
"Align[Stream[F, *]]",
5170
AlignTests[Stream[IO, *]].align[Int, Int, Int, Int]
5271
)
72+
checkAll(
73+
"LiftKind[IO, Stream[IO, *]",
74+
LiftKindTests[IO, Stream[IO, *]].liftKind[Int, Int]
75+
)
76+
checkAll(
77+
"LiftKind[IO, Stream[OptionT[IO, *], *]",
78+
LiftKindTests[IO, Stream[OptionT[IO, *], *]].liftKind[Int, Int]
79+
)
80+
checkAll(
81+
"LiftValue[Resource[IO, *], Stream[IO, *]",
82+
LiftValueTests[Resource[IO, *], Stream[IO, *]].liftValue[Int, Int]
83+
)
84+
locally {
85+
// this is a somewhat silly instance, but we need a
86+
// `LiftValue[X, Resource[IO, *]]` instance where `X` is not `IO` because
87+
// that already has a higher priority implicit instance
88+
implicit val liftIdTResource: LiftValue[IdT[IO, *], Resource[IO, *]] =
89+
new LiftValue[IdT[IO, *], Resource[IO, *]] {
90+
val applicativeF: Applicative[IdT[IO, *]] = implicitly
91+
val applicativeG: Applicative[Resource[IO, *]] = implicitly
92+
def apply[A](fa: IdT[IO, A]): Resource[IO, A] =
93+
Resource.eval(fa.value)
94+
}
95+
checkAll(
96+
"LiftValue[IdT[IO, *], Stream[IO, *]] via Resource[IO, *]",
97+
LiftValueTests[IdT[IO, *], Stream[IO, *]].liftValue[Int, Int]
98+
)
99+
}
53100
}

0 commit comments

Comments
 (0)