This is a follow-up task to #122.
Currently errors from consumer related methods are thrown in a poll thread,
and there's no way to provide recovering code inside [[RebalanceCallback]].
For example:
def onPartitionsAssigned(partitions: Nes[TopicPartition]) = {
for {
state <- lift(restoreStateFor(partitions))
_ <- state.offsets.traverse_(o => seek(o.partition, o.offset))
} yield ()
}
if seek operation fails, exception is thrown in a poll thread, and user cannot provide recovering code while building a RebalanceCallback.
To fix it we need to implement MonadThrowable instance for RebalanceCallback
This is a follow-up task to #122.
Currently errors from consumer related methods are thrown in a
pollthread,and there's no way to provide recovering code inside [[RebalanceCallback]].
For example:
if
seekoperation fails, exception is thrown in apollthread, and user cannot provide recovering code while building aRebalanceCallback.To fix it we need to implement MonadThrowable instance for
RebalanceCallback