Skip to content

Add compute combinators #4190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: series/3.x
Choose a base branch
from
55 changes: 54 additions & 1 deletion core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,25 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
*/
def map[B](f: A => B): IO[B] = IO.Map(this, f, Tracing.calculateTracingEvent(f))

/**
* Like [[map]], but inserts a [[IO#cede]] before and after `f`. Use this when `f` is a
* long-running, compute-bound operation.
*
* @see
* [[IO#cede]] for more details
*/
def computeMap[B](f: A => B): IO[B] =
this.flatMap(a => IO.compute(f(a)))

/**
* Like [[computeMap]], but allows raising errors in an Either.
*
* @see
* [[computeMap]] for more details
*/
def computeMapAttempt[B](f: A => Either[Throwable, B]): IO[B] =
this.flatMap(a => IO.computeAttempt(f(a)))

/**
* Applies rate limiting to this `IO` based on provided backpressure semantics.
*
Expand Down Expand Up @@ -1286,6 +1305,27 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits with TuplePara
Delay(fn, Tracing.calculateTracingEvent(fn))
}

/**
* Suspends a long-running, compute-bound operation. Like [[delay]], but inserts a [[cede]]
* before and after computing the result.
*
* @see
* [[cede]] for more details
* @see
* [[delay]] for more details
*/
def compute[A](thunk: => A): IO[A] =
IO.cede >> IO.delay(thunk).guarantee(IO.cede)

/**
* Like [[compute]], but allows raising errors in an Either.
*
* @see
* [[compute]] for more details
*/
def computeAttempt[A](thunk: => Either[Throwable, A]): IO[A] =
IO.cede >> IO.delay(thunk).rethrow.guarantee(IO.cede)

/**
* Suspends a synchronous side effect which produces an `IO` in `IO`.
*
Expand Down Expand Up @@ -2101,7 +2141,20 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits with TuplePara
def flatMap[A, B](fa: IO[A])(f: A => IO[B]): IO[B] =
fa.flatMap(f)

override def delay[A](thunk: => A): IO[A] = IO(thunk)
override def computeMap[A, B](fa: IO[A])(f: A => B): IO[B] =
fa.computeMap(f)

override def computeMapAttempt[A, B](fa: IO[A])(f: A => Either[Throwable, B]): IO[B] =
fa.computeMapAttempt(f)

override def delay[A](thunk: => A): IO[A] =
IO.delay(thunk)

override def compute[A](thunk: => A): IO[A] =
IO.compute(thunk)

override def computeAttempt[A](thunk: => Either[Throwable, A]): IO[A] =
IO.computeAttempt(thunk)

/**
* Like [[IO.delay]] but intended for thread blocking operations. `blocking` will shift the
Expand Down
25 changes: 24 additions & 1 deletion kernel/shared/src/main/scala/cats/effect/kernel/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package cats.effect.kernel
import cats.{~>, Monoid, Semigroup}
import cats.arrow.FunctionK
import cats.data.{EitherT, Ior, IorT, Kleisli, OptionT, WriterT}
import cats.implicits._
import cats.effect.kernel.syntax.monadCancel._
import cats.syntax.all._

import scala.annotation.{nowarn, tailrec}
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -60,6 +61,7 @@ import java.util.concurrent.atomic.AtomicReference
* running {{{async(k)}}} is canceled.
*/
trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
implicit private[this] def F: MonadCancel[F, Throwable] = this

/**
* Suspends an asynchronous side effect with optional immediate result in `F`.
Expand Down Expand Up @@ -157,6 +159,27 @@ trait Async[F[_]] extends AsyncPlatform[F] with Sync[F] with Temporal[F] {
def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] =
async[A](cb => as(delay(k(cb)), None))

/**
* Suspends a long-running, compute-bound operation. Like [[delay]], but inserts a [[cede]]
* before and after computing the result.
*
* @see
* [[cede]] for more details
* @see
* [[delay]] for more details
*/
def compute[A](thunk: => A): F[A] =
cede >> delay(thunk).guarantee(cede)

/**
* Like [[compute]], but allows raising errors in an Either.
*
* @see
* [[compute]] for more details
*/
def computeAttempt[A](thunk: => Either[Throwable, A]): F[A] =
cede >> delay(thunk).rethrow.guarantee(cede)

/**
* An effect that never terminates.
*
Expand Down
19 changes: 19 additions & 0 deletions kernel/shared/src/main/scala/cats/effect/kernel/GenSpawn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,25 @@ trait GenSpawn[F[_], E] extends MonadCancel[F, E] with Unique[F] {
*/
def cede: F[Unit]

/**
* Like [[map]], but inserts a [[cede]] before and after `f`. Use this when `f` is a
* long-running, compute-bound operation.
*
* @see
* [[cede]] for more details
*/
def computeMap[A, B](fa: F[A])(f: A => B): F[B] =
cede >> fa.map(f).guarantee(cede)

/**
* Like [[computeMap]], but allows raising errors in an Either.
*
* @see
* [[computeMap]] for more details
*/
def computeMapAttempt[A, B](fa: F[A])(f: A => Either[E, B]): F[B] =
cede >> fa.flatMap(a => fromEither(f(a))).guarantee(cede)

/**
* A low-level primitive for racing the evaluation of two fibers that returns the [[Outcome]]
* of the winner and the [[Fiber]] of the loser. The winner of the race is considered to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ trait GenSpawnSyntax {

final class GenSpawnOps_[F[_], A] private[syntax] (private val wrapped: F[A]) extends AnyVal {

def race[B](another: F[B])(implicit F: GenSpawn[F, ?]) =
def race[B](another: F[B])(implicit F: GenSpawn[F, ?]): F[Either[A, B]] =
F.race(wrapped, another)

def both[B](another: F[B])(implicit F: GenSpawn[F, ?]): F[(A, B)] =
F.both(wrapped, another)

def computeMap[B](f: A => B)(implicit F: GenSpawn[F, ?]): F[B] =
F.computeMap(wrapped)(f)
}

final class GenSpawnOps[F[_], A, E] private[syntax] (private val wrapped: F[A]) extends AnyVal {
Expand All @@ -62,4 +65,7 @@ final class GenSpawnOps[F[_], A, E] private[syntax] (private val wrapped: F[A])
implicit F: GenSpawn[F, E]
): F[(Outcome[F, E, A], Outcome[F, E, B])] =
F.bothOutcome(wrapped, another)

def computeMapAttempt[B](f: A => Either[E, B])(implicit F: GenSpawn[F, E]): F[B] =
F.computeMapAttempt(wrapped)(f)
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ class SyntaxSuite {
}
}

def genSpawnSyntax[F[_], A, B, E](target: F[A], another: F[B])(implicit F: GenSpawn[F, E]) = {
def genSpawnSyntax[F[_], A, B, E](target: F[A], another: F[B], f: A => B)(
implicit F: GenSpawn[F, E]) = {
import syntax.spawn._

GenSpawn[F]: F.type
Expand Down Expand Up @@ -145,6 +146,16 @@ class SyntaxSuite {
val result = target.bothOutcome(another)
result: F[(Outcome[F, E, A], Outcome[F, E, B])]
}

{
val result = target.computeMap(f)
result: F[B]
}

{
val result = target.computeMapAttempt(f andThen Right.apply)
result: F[B]
}
}

def spawnForwarder[F[_]: Spawn] =
Expand Down
Loading