Skip to content
11 changes: 10 additions & 1 deletion core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,16 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits with TuplePara
def async_[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = {
val body = new Cont[IO, A, A] {
def apply[G[_]](implicit G: MonadCancel[G, Throwable]) = { (resume, get, lift) =>
G.uncancelable(_ => lift(IO.delay(k(resume))).flatMap(_ => get))
G.uncancelable(_ =>
try {
k(resume)
get
} catch {
case t if UnsafeNonFatal(t) =>
lift(IO.raiseError(t))
case t: Throwable =>
throw t
})
}
}

Expand Down
31 changes: 24 additions & 7 deletions ioapp-tests/src/test/scala/IOAppSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,37 @@ class IOAppSpec extends Specification {
h.stdout() must not(contain("sadness"))
}

"exit on raising a fatal error inside a map" in {
val h = platform("RaiseFatalErrorMap", List.empty)
h.awaitStatus() mustEqual 1
h.stderr() must contain("Boom!")
h.stdout() must not(contain("sadness"))
}

"exit on raising a fatal error inside a flatMap" in {
val h = platform("RaiseFatalErrorFlatMap", List.empty)
h.awaitStatus() mustEqual 1
h.stderr() must contain("Boom!")
h.stdout() must not(contain("sadness"))
}

"exit on fatal error from CompletableFuture" in {
if (platform == JVM) {
val h = platform("FatalErrorFromCompletableFuture", List.empty)
h.awaitStatus() mustEqual 1
h.stderr() must contain("Boom from CompletableFuture!")
h.stdout() must not(contain("sadness"))
} else {
// CompletableFuture is JVM-only
ok
}
}

"exit on fatal error from async_" in {
if (platform == JVM) {
val h = platform("FatalErrorFromAsync", List.empty)
h.awaitStatus() mustEqual 1
h.stderr() must contain("Boom from async!")
h.stdout() must not(contain("sadness"))
} else {
// Fatal error testing is JVM-only
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, are you sure? I am pretty sure we support fatal errors on all platforms.

ok
}
}

"warn on global runtime collision" in {
val h = platform("GlobalRacingInit", List.empty)
h.awaitStatus() mustEqual 0
Expand Down
17 changes: 15 additions & 2 deletions kernel/jvm/src/main/scala/cats/effect/kernel/AsyncPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
package cats
package effect.kernel

import scala.util.control.ControlThrowable

import java.util.concurrent.{CompletableFuture, CompletionException, CompletionStage}

private[kernel] trait AsyncPlatform[F[_]] extends Serializable { this: Async[F] =>

private def isNonFatal(t: Throwable): Boolean = t match {
case _: VirtualMachineError | _: ThreadDeath | _: LinkageError | _: ControlThrowable =>
false
case _ => true
}

def fromCompletionStage[A](completionStage: F[CompletionStage[A]]): F[A] =
fromCompletableFuture(flatMap(completionStage) { cs => delay(cs.toCompletableFuture()) })

Expand Down Expand Up @@ -50,10 +58,15 @@ private[kernel] trait AsyncPlatform[F[_]] extends Serializable { this: Async[F]
cf.handle[Unit] {
case (a, null) => resume(Right(a))
case (_, t) =>
resume(Left(t match {
val actualThrowable = t match {
case e: CompletionException if e.getCause ne null => e.getCause
case _ => t
}))
}
if (isNonFatal(actualThrowable)) {
resume(Left(actualThrowable))
} else {
throw actualThrowable
}
}
}

Expand Down
20 changes: 20 additions & 0 deletions tests/jvm/src/main/scala/catseffect/examplesplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,24 @@ package examples {
val run =
IO.cede.foreverM.start >> IO(Thread.sleep(2.seconds.toMillis))
}

object FatalErrorFromCompletableFuture extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
import java.util.concurrent.CompletableFuture

IO.fromCompletableFuture(IO(CompletableFuture.runAsync(() => {
throw new OutOfMemoryError("Boom from CompletableFuture!")
})))
.flatMap(_ => IO.println("sadness"))
.as(ExitCode.Success)
}
}

object FatalErrorFromAsync extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
IO.async_[Unit] { cb => cb(Left(new OutOfMemoryError("Boom from async!"))) }
.flatMap(_ => IO.println("sadness"))
.as(ExitCode.Success)
}
}
}
Loading