Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ trait KeyContext[F[_]] {
def hold(offset: Offset): F[Unit]
def remove: F[Unit]
def log: Log[F]
def key: String
}
object KeyContext {

Expand All @@ -27,29 +28,33 @@ object KeyContext {
def holding = none[Offset].pure[F]
def hold(offset: Offset) = ().pure[F]
def remove = ().pure[F]
val key = ""
}

def of[F[_]: Ref.Make: Monad: Log](removeFromCache: F[Unit]): F[KeyContext[F]] =
def of[F[_]: Ref.Make: Monad: Log](removeFromCache: F[Unit], key: String): F[KeyContext[F]] =
Ref.of[F, Option[Offset]](None) map { storage =>
KeyContext(storage.stateInstance, removeFromCache)
KeyContext(storage.stateInstance, removeFromCache, key)
}

def apply[F[_]: Monad: Log](
storage: Stateful[F, Option[Offset]],
removeFromCache: F[Unit]
removeFromCache: F[Unit],
_key: String
): KeyContext[F] = new KeyContext[F] {
def holding = storage.get
def hold(offset: Offset) = storage.set(Some(offset))
def remove = storage.set(None) *> removeFromCache
def log = Log[F]
val key = _key
}

def resource[F[_]: Ref.Make: Monad](
removeFromCache: F[Unit],
log: Log[F]
log: Log[F],
key: String
): Resource[F, KeyContext[F]] = {
implicit val _log = log
Resource.eval(of(removeFromCache))
Resource.eval(of(removeFromCache, key))
}

}
21 changes: 15 additions & 6 deletions core/src/main/scala/com/evolutiongaming/kafka/flow/KeyFlowOf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.evolutiongaming.kafka.flow

import cats.Monad
import cats.effect.{Ref, Resource}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.persistence.Persistence
import com.evolutiongaming.kafka.flow.registry.EntityRegistry
import com.evolutiongaming.kafka.flow.timer.{TimerContext, TimerFlowOf}
Expand All @@ -15,7 +16,7 @@ trait KeyFlowOf[F[_], S, A] {
timers: TimerContext[F],
additionalPersist: AdditionalStatePersist[F, S, A],
registry: EntityRegistry[F, KafkaKey, S],
): Resource[F, KeyFlow[F, A]]
)(implicit logOf: LogOf[F]): Resource[F, KeyFlow[F, A]]

}
object KeyFlowOf {
Expand Down Expand Up @@ -50,11 +51,19 @@ object KeyFlowOf {
timerFlowOf: TimerFlowOf[F],
fold: EnhancedFold[F, S, A],
tick: TickOption[F, S],
): KeyFlowOf[F, S, A] = { (key, context, persistence, timers, additionalPersist, registry) =>
implicit val _context = context
timerFlowOf(context, persistence, timers) flatMap { timerFlow =>
KeyFlow.of(key, fold, tick, persistence, additionalPersist, timerFlow, registry)
): KeyFlowOf[F, S, A] = new KeyFlowOf[F, S, A] {
override def apply(
key: KafkaKey,
context: KeyContext[F],
persistence: Persistence[F, S, A],
timers: TimerContext[F],
additionalPersist: AdditionalStatePersist[F, S, A],
registry: EntityRegistry[F, KafkaKey, S]
)(implicit logOf: LogOf[F]): Resource[F, KeyFlow[F, A]] = {
implicit val _context = context
timerFlowOf(context, persistence, timers) flatMap { timerFlow =>
KeyFlow.of(key, fold, tick, persistence, additionalPersist, timerFlow, registry)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.evolutiongaming.kafka.flow
import cats.Applicative
import cats.effect.{Resource, Sync}
import cats.syntax.all.*
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.key.KeysOf
import com.evolutiongaming.kafka.flow.persistence.{PersistenceOf, SnapshotPersistenceOf}
import com.evolutiongaming.kafka.flow.registry.EntityRegistry
Expand All @@ -20,7 +21,7 @@ trait KeyStateOf[F[_]] { self =>
key: String,
createdAt: Timestamp,
context: KeyContext[F]
): Resource[F, KeyState[F, ConsumerRecord[String, ByteVector]]]
)(implicit logOf: LogOf[F]): Resource[F, KeyState[F, ConsumerRecord[String, ByteVector]]]

/** Restores a state for all keys present in persistence.
*
Expand Down Expand Up @@ -71,7 +72,9 @@ object KeyStateOf {
registry: EntityRegistry[F, KafkaKey, S],
): KeyStateOf[F] = new KeyStateOf[F] {

def apply(topicPartition: TopicPartition, key: String, createdAt: Timestamp, context: KeyContext[F]) = {
def apply(topicPartition: TopicPartition, key: String, createdAt: Timestamp, context: KeyContext[F])(
implicit logOf: LogOf[F]
) = {
implicit val _context = context
val kafkaKey = KafkaKey(
applicationId = applicationId,
Expand Down Expand Up @@ -217,7 +220,9 @@ object KeyStateOf {
registry: EntityRegistry[F, KafkaKey, S],
): KeyStateOf[F] = new KeyStateOf[F] {

def apply(topicPartition: TopicPartition, key: String, createdAt: Timestamp, context: KeyContext[F]) = {
def apply(topicPartition: TopicPartition, key: String, createdAt: Timestamp, context: KeyContext[F])(
implicit logOf: LogOf[F]
) = {
val kafkaKey = KafkaKey(
applicationId = applicationId,
groupId = groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object PartitionFlow {
}
}

def of[F[_]: Async](
def of[F[_]: Async: LogOf](
topicPartition: TopicPartition,
assignedAt: Offset,
keyStateOf: KeyStateOf[F],
Expand Down Expand Up @@ -97,7 +97,7 @@ object PartitionFlow {
} yield flow

// TODO: put most `Ref` variables into one state class?
def of[F[_]: Async](
def of[F[_]: Async: LogOf](
topicPartition: TopicPartition,
keyStateOf: KeyStateOf[F],
committedOffset: Ref[F, Offset],
Expand All @@ -116,7 +116,8 @@ object PartitionFlow {
for {
context <- KeyContext.resource[F](
removeFromCache = cache.remove(key).flatten.void,
log = log.prefixed(key)
log = log.prefixed(key),
key = key
)
keyState <- keyStateOf(topicPartition, key, createdAt, context)
} yield PartitionKey(keyState, context)
Expand Down
Loading
Loading