-
Notifications
You must be signed in to change notification settings - Fork 559
Add AtomicMap
#4424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/3.x
Are you sure you want to change the base?
Add AtomicMap
#4424
Changes from all commits
97df1ef
d4b2727
d075501
d33cc79
b879a72
edeea76
177a69a
bf3a0ef
0b80fca
32f2f01
eb53283
b37a131
bc3e094
2208b23
d81546f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
--- | ||
id: atomic-map | ||
title: Atomic Map | ||
--- | ||
|
||
A total map from `K` to `AtomicCell[F, V]`. | ||
|
||
```scala mdoc:silent | ||
import cats.effect.std.AtomicCell | ||
|
||
trait AtomicMap[F[_], K, V] { | ||
/** | ||
* Access the AtomicCell for the given key. | ||
*/ | ||
def apply(key: K): AtomicCell[F, V] | ||
} | ||
``` | ||
|
||
It is conceptually similar to a `AtomicCell[F, Map[K, V]]`, but with better ergonomics when | ||
working on a per key basis. Note, however, that it does not support atomic updates to | ||
multiple keys. | ||
|
||
Additionally, it also provide less contention: since all operations are performed on | ||
individual key-value pairs, the pairs can be sharded by key. Thus, multiple concurrent | ||
updates may be executed independently to each other, as long as their keys belong to | ||
different shards. | ||
|
||
## Using `AtomicMap` | ||
|
||
You can think of a `AtomicMap` like a `MapRef` that supports effectual updates by locking the underlying `Ref`. | ||
|
||
```scala mdoc:reset:silent | ||
import cats.effect.IO | ||
import cats.effect.std.AtomicMap | ||
|
||
trait State | ||
trait Key | ||
|
||
class Service(am: AtomicMap[IO, Key, State]) { | ||
def modify(key: Key)(f: State => IO[State]): IO[Unit] = | ||
am(key).evalUpdate(f) | ||
} | ||
``` | ||
|
||
### Example | ||
|
||
Imagine a parking tower, | ||
where users have access to specific floors, | ||
and getting a parking space involves an effectual operation _(e.g. a database call)_. | ||
In that case, it may be better to block than repeat the operation, | ||
but without blocking operations on different floors. | ||
|
||
```scala mdoc:reset:silent | ||
import cats.effect.IO | ||
import cats.effect.std.AtomicMap | ||
|
||
trait Car | ||
trait Floor | ||
trait ParkingSpace | ||
|
||
class ParkingTowerService(state: AtomicMap[IO, Floor, List[ParkingSpace]]) { | ||
// Tries to park the given Car in the solicited Floor. | ||
// Returns either the assigned ParkingSpace, or None if this Floor is full. | ||
def parkCarInFloor(floor: Floor, car: Car): IO[Option[ParkingSpace]] = | ||
state(key = floor).evalModify { | ||
case firstFreeParkingSpace :: remainingParkingSpaces => | ||
markParkingSpaceAsUsed(parkingSpace = firstFreeParkingSpace, car).as( | ||
remainingParkingSpaces -> Some(firstFreeParkingSpace) | ||
) | ||
|
||
case Nil => | ||
IO.pure(List.empty -> None) | ||
} | ||
|
||
private def markParkingSpaceAsUsed(parkingSpace: ParkingSpace, car: Car): IO[Unit] = | ||
??? | ||
} | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -153,35 +153,29 @@ object AtomicCell { | |
of(M.empty)(F) | ||
} | ||
|
||
private[effect] def async[F[_], A](init: A)(implicit F: Async[F]): F[AtomicCell[F, A]] = | ||
Mutex.apply[F].map(mutex => new AsyncImpl(init, mutex)) | ||
|
||
private[effect] def concurrent[F[_], A](init: A)( | ||
implicit F: Concurrent[F]): F[AtomicCell[F, A]] = | ||
(Ref.of[F, A](init), Mutex.apply[F]).mapN { (ref, m) => new ConcurrentImpl(ref, m) } | ||
private[effect] def async[F[_], A]( | ||
init: A | ||
)( | ||
implicit F: Async[F] | ||
): F[AtomicCell[F, A]] = | ||
Mutex.apply[F].map(mutex => new AsyncImpl(init, lock = mutex.lock)) | ||
|
||
private final class ConcurrentImpl[F[_], A]( | ||
ref: Ref[F, A], | ||
mutex: Mutex[F] | ||
private[effect] def concurrent[F[_], A]( | ||
init: A | ||
)( | ||
implicit F: Concurrent[F] | ||
) extends AtomicCell[F, A] { | ||
override def get: F[A] = ref.get | ||
|
||
override def set(a: A): F[Unit] = | ||
mutex.lock.surround(ref.set(a)) | ||
): F[AtomicCell[F, A]] = | ||
(Ref.of[F, A](init), Mutex.apply[F]).mapN { (ref, mutex) => | ||
new ConcurrentImpl(ref, lock = mutex.lock) | ||
} | ||
|
||
// Provides common implementations for derived methods that depend on F being an applicative. | ||
private[effect] sealed abstract class CommonImpl[F[_], A]( | ||
implicit F: Applicative[F] | ||
) extends AtomicCell[F, A] { | ||
Comment on lines
+172
to
+175
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally, the implementations here would be part of the Name suggestions are welcome :) |
||
override def modify[B](f: A => (A, B)): F[B] = | ||
evalModify(a => F.pure(f(a))) | ||
|
||
override def evalModify[B](f: A => F[(A, B)]): F[B] = | ||
mutex.lock.surround { | ||
ref.get.flatMap(f).flatMap { | ||
case (a, b) => | ||
ref.set(a).as(b) | ||
} | ||
} | ||
|
||
override def evalUpdate(f: A => F[A]): F[Unit] = | ||
evalModify(a => f(a).map(aa => (aa, ()))) | ||
|
||
|
@@ -192,12 +186,33 @@ object AtomicCell { | |
evalModify(a => f(a).map(aa => (aa, aa))) | ||
} | ||
|
||
private final class AsyncImpl[F[_], A]( | ||
private[effect] final class ConcurrentImpl[F[_], A]( | ||
ref: Ref[F, A], | ||
lock: Resource[F, Unit] | ||
)( | ||
implicit F: Concurrent[F] | ||
) extends CommonImpl[F, A] { | ||
override def get: F[A] = | ||
ref.get | ||
|
||
override def set(a: A): F[Unit] = | ||
lock.surround(ref.set(a)) | ||
|
||
override def evalModify[B](f: A => F[(A, B)]): F[B] = | ||
lock.surround { | ||
ref.get.flatMap(f).flatMap { | ||
case (a, b) => | ||
ref.set(a).as(b) | ||
} | ||
} | ||
} | ||
|
||
private[effect] final class AsyncImpl[F[_], A]( | ||
init: A, | ||
mutex: Mutex[F] | ||
lock: Resource[F, Unit] | ||
)( | ||
implicit F: Async[F] | ||
) extends AtomicCell[F, A] { | ||
) extends CommonImpl[F, A] { | ||
@volatile private var cell: A = init | ||
|
||
override def get: F[A] = | ||
|
@@ -206,17 +221,14 @@ object AtomicCell { | |
} | ||
|
||
override def set(a: A): F[Unit] = | ||
mutex.lock.surround { | ||
lock.surround { | ||
F.delay { | ||
cell = a | ||
} | ||
} | ||
|
||
override def modify[B](f: A => (A, B)): F[B] = | ||
evalModify(a => F.pure(f(a))) | ||
|
||
override def evalModify[B](f: A => F[(A, B)]): F[B] = | ||
mutex.lock.surround { | ||
lock.surround { | ||
F.delay(cell).flatMap(f).flatMap { | ||
case (a, b) => | ||
F.delay { | ||
|
@@ -225,14 +237,93 @@ object AtomicCell { | |
} | ||
} | ||
} | ||
} | ||
|
||
override def evalUpdate(f: A => F[A]): F[Unit] = | ||
evalModify(a => f(a).map(aa => (aa, ()))) | ||
/** | ||
* Allows seeing a `AtomicCell[F, Option[A]]` as a `AtomicCell[F, A]`. This is useful not only | ||
* for ergonomic reasons, but because some implementations may save space. | ||
* | ||
* Setting the `default` value is the same as storing a `None` in the underlying `AtomicCell`. | ||
*/ | ||
def defaultedAtomicCell[F[_], A]( | ||
Comment on lines
+242
to
+248
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inspired by |
||
atomicCell: AtomicCell[F, Option[A]], | ||
default: A | ||
)( | ||
implicit F: Applicative[F] | ||
): AtomicCell[F, A] = | ||
new DefaultedAtomicCell[F, A](atomicCell, default) | ||
|
||
override def evalGetAndUpdate(f: A => F[A]): F[A] = | ||
evalModify(a => f(a).map(aa => (aa, a))) | ||
private[effect] final class DefaultedAtomicCell[F[_], A]( | ||
atomicCell: AtomicCell[F, Option[A]], | ||
default: A | ||
)( | ||
implicit F: Applicative[F] | ||
) extends CommonImpl[F, A] { | ||
override def get: F[A] = | ||
atomicCell.get.map(_.getOrElse(default)) | ||
|
||
override def evalUpdateAndGet(f: A => F[A]): F[A] = | ||
evalModify(a => f(a).map(aa => (aa, aa))) | ||
override def set(a: A): F[Unit] = | ||
if (a == default) atomicCell.set(None) else atomicCell.set(Some(a)) | ||
|
||
override def evalModify[B](f: A => F[(A, B)]): F[B] = | ||
atomicCell.evalModify { opt => | ||
val a = opt.getOrElse(default) | ||
f(a).map { | ||
case (result, b) => | ||
if (result == default) (None, b) else (Some(result), b) | ||
} | ||
} | ||
} | ||
|
||
implicit def atomicCellOptionSyntax[F[_], A]( | ||
atomicCell: AtomicCell[F, Option[A]] | ||
)( | ||
implicit F: Applicative[F] | ||
): AtomicCellOptionOps[F, A] = | ||
new AtomicCellOptionOps(atomicCell) | ||
|
||
final class AtomicCellOptionOps[F[_], A] private[effect] ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure how useful these extensions are, but it felt good to have them for completeness of the API. |
||
atomicCell: AtomicCell[F, Option[A]] | ||
)( | ||
implicit F: Applicative[F] | ||
) { | ||
def getOrElse(default: A): F[A] = | ||
atomicCell.get.map(_.getOrElse(default)) | ||
|
||
def unset: F[Unit] = | ||
atomicCell.set(None) | ||
|
||
def setValue(a: A): F[Unit] = | ||
atomicCell.set(Some(a)) | ||
|
||
def modifyValueIfSet[B](f: A => (A, B)): F[Option[B]] = | ||
evalModifyValueIfSet(a => F.pure(f(a))) | ||
|
||
def evalModifyValueIfSet[B](f: A => F[(A, B)]): F[Option[B]] = | ||
atomicCell.evalModify { | ||
case None => | ||
F.pure((None, None)) | ||
|
||
case Some(a) => | ||
f(a).map { | ||
case (result, b) => | ||
(Some(result), Some(b)) | ||
} | ||
} | ||
|
||
def updateValueIfSet(f: A => A): F[Unit] = | ||
evalUpdateValueIfSet(a => F.pure(f(a))) | ||
|
||
def evalUpdateValueIfSet(f: A => F[A]): F[Unit] = | ||
atomicCell.evalUpdate { | ||
case None => F.pure(None) | ||
case Some(a) => f(a).map(Some.apply) | ||
} | ||
|
||
def getAndSetValue(a: A): F[Option[A]] = | ||
atomicCell.getAndSet(Some(a)) | ||
|
||
def withDefaultValue(default: A): AtomicCell[F, A] = | ||
defaultedAtomicCell(atomicCell, default) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If lightbend-labs/mima#211 is ever fixed we should revert these.