-
Notifications
You must be signed in to change notification settings - Fork 115
KafkaConsumerActor Configuration
To create a KafkaConsumerActor, the dependencies in a KafkaConsumerActor.props() function need to be satisfied. This can be done by providing the KafkaConsumer.Conf and KafkaConsumerActor.Conf configuration case classes which can be created like this:
import scala.concurrent.duration._
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.akka.KafkaConsumerActor
// Configuration for the KafkaConsumer
val consumerConf = KafkaConsumer.Conf(
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
bootstrapServers = "localhost:9092",
groupId = "group",
enableAutoCommit = false
)
// Configuration specific to the Async Consumer Actor
val actorConf = KafkaConsumerActor.Conf(
scheduleInterval = 1.seconds, // scheduling interval for Kafka polling when consumer is inactive
unconfirmedTimeout = 3.seconds // duration for how long to wait for a confirmation before redelivery
)
// Create the Actor
val consumer = system.actorOf(
KafkaConsumerActor.props(consumerConf, actorConf, receiverActor)
)
The schedule interval is the delay before Akka will dispatch a new poll() request when its not busy. Providing a margin of schedule interval
delay significantly reduces demand on the thread dispatcher but introduces a slight latency.
schedule.interval = 1 second
The consumer actor dispatches messages asynchronously and expects a confirmation. If the confirmation is not received within the Unconfirmed Timeout,
it assumes the message was lost and redelivers it. This is an at-least-once acknowledgement pattern. If message processing might take this long,
it must be increased.
unconfirmed.timeout = 3 seconds
An alternative approach is to provide a Key and Value deserializer with all of the other consumer properties supplied in a Typesafe configuration:
//application.conf
{
// Standard KafkaConsumer properties:
bootstrap.servers = "localhost:9092",
group.id = "group"
enable.auto.commit = false
auto.offset.reset = "earliest"
// KafkaConsumerActor config
schedule.interval = 1 second
unconfirmed.timeout = 3 seconds
}
import cakesolutions.kafka.akka.KafkaConsumerActor
import com.typesafe.config.ConfigFactory
val conf = ConfigFactory.load()
val consumer = system.actorOf(
//Construct the KafkaConsumerActor with Typesafe config
KafkaConsumerActor.props(conf, new StringDeserializer(), new StringDeserializer(), receiverActor)
)In each of the above configuration examples it is assumed the Consumer Actor is created from the context of a parent actor,
which passes to the consumer a receiverActor ActorRef. This is an ActorRef to which consumed messages will be delivered. The target
actor should expect to receive cakesolutions.kafka.akka.ConsumerRecords[K, V] containing a batch of Java client's
ConsumerRecords consumed from Kafka.
import cakesolutions.kafka.akka.KafkaConsumerActor.Confirm
import cakesolutions.kafka.akka.ConsumerRecords
class ReceiverActor extends Actor {
// Extractor for ensuring type safe cast of records
val recordsExt = ConsumerRecords.extractor[Int, String]
// Akka will dispatch messages here sequentially for processing. The next batch is prepared in parallel and can be dispatched immediately
// after the Confirm. Performance is only limited by processing time and network bandwidth.
override def receive: Receive = {
// Type safe cast of records to correct serialisation type
case recordsExt(records) =>
// Provide the records for processing as a sequence of tuples
processRecords(records.pairs)
// Or provide them using the raw Java type of ConsumerRecords[Key,Value]
processRecords(records.records)
// Confirm and commit back to Kafka
sender() ! Confirm(records.offsets)
}
// Process the whole batch of received records.
// The first value in the tuple is the optional key of a record.
// The second value in the tuple is the actual value from a record.
def processRecords(records: Seq[(Option[Int], String)]) = { ... }
// Or process the batch of records via the raw kafka client records model
def processRecords(records: org.apache.kafka.clients.consumer.ConsumerRecords[String, String]) = { ... }
}Produced by Cake Solutions
Team Blog | Twitter @cakesolutions | Careers