Skip to content

Commit 6bd5b67

Browse files
author
LiuHongliang
committed
Add support for multiple topic in avro format
1 parent 63eb64e commit 6bd5b67

File tree

4 files changed

+29
-9
lines changed

4 files changed

+29
-9
lines changed

docs/kafka.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ These Kafka-specific properties, if used, may be specified either at the global
3333
|`topicPattern`|A regular expression used to match Kafka topics to dataSource configurations. See "Matching Topics to Data Sources" for details.|{match nothing}, must be provided|
3434
|`topicPattern.priority`|If multiple topicPatterns match the same topic name, the highest priority dataSource configuration will be used. A higher number indicates a higher priority. See "Matching Topics to Data Sources" for details.|1|
3535
|`useTopicAsDataSource`|Use the Kafka topic as the dataSource name instead of the one provided in the configuration file. Useful when combined with a topicPattern that matches more than one Kafka topic. See "Matching Topics to Data Sources" for details.|false|
36+
|`useInputTopicAsDecodeTopic`|Use the topic which used by kafka consumer as the avro stream decoder's topic. Useful when consume multiply topic with different avro schema.|true|
3637
|`reportDropsAsExceptions`|Whether or not dropped messages will cause an exception and terminate the application.|false|
3738

3839
### Running

kafka/src/main/java/com/metamx/tranquility/kafka/model/PropertiesBasedKafkaConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ public PropertiesBasedKafkaConfig()
6060
@Default("false")
6161
public abstract Boolean useTopicAsDataSource();
6262

63+
@Config("useInputTopicAsDecodeTopic")
64+
@Default("false")
65+
public abstract Boolean useInputTopicAsDecodeTopic();
66+
6367
@Config("topicPattern.priority")
6468
@Default("1")
6569
public abstract Integer getTopicPatternPriority();

kafka/src/main/java/com/metamx/tranquility/kafka/writer/WriterController.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.metamx.tranquility.config.DataSourceConfig;
2525
import com.metamx.tranquility.finagle.FinagleRegistry;
2626
import com.metamx.tranquility.finagle.FinagleRegistryConfig;
27+
import com.metamx.tranquility.kafka.KafkaBeamUtils;
2728
import com.metamx.tranquility.kafka.model.MessageCounters;
2829
import com.metamx.tranquility.kafka.model.PropertiesBasedKafkaConfig;
2930
import org.apache.curator.RetryPolicy;
@@ -163,7 +164,9 @@ finagleKey, new FinagleRegistry(
163164
)
164165
);
165166
}
166-
167+
if (dataSourceConfig.propertiesBasedConfig().useInputTopicAsDecodeTopic()) {
168+
dataSourceConfig = KafkaBeamUtils.useInputTopicAsDecodeTopic(topic, dataSourceConfig);
169+
}
167170
return new TranquilityEventWriter(
168171
topic,
169172
dataSourceConfig,

kafka/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.metamx.tranquility.kafka
2121

22+
import com.metamx.common.scala.untyped.Dict
2223
import com.metamx.tranquility.config.DataSourceConfig
2324
import com.metamx.tranquility.druid.DruidBeams
2425
import com.metamx.tranquility.druid.DruidLocation
@@ -28,15 +29,13 @@ import com.metamx.tranquility.tranquilizer.Tranquilizer
2829
import org.apache.curator.framework.CuratorFramework
2930
import scala.reflect.runtime.universe.typeTag
3031

31-
object KafkaBeamUtils
32-
{
32+
object KafkaBeamUtils {
3333
def createTranquilizer(
34-
topic: String,
35-
config: DataSourceConfig[PropertiesBasedKafkaConfig],
36-
curator: CuratorFramework,
37-
finagleRegistry: FinagleRegistry
38-
): Tranquilizer[Array[Byte]] =
39-
{
34+
topic: String,
35+
config: DataSourceConfig[PropertiesBasedKafkaConfig],
36+
curator: CuratorFramework,
37+
finagleRegistry: FinagleRegistry
38+
): Tranquilizer[Array[Byte]] = {
4039
DruidBeams.fromConfig(config, typeTag[Array[Byte]])
4140
.location(
4241
DruidLocation.create(
@@ -48,4 +47,17 @@ object KafkaBeamUtils
4847
.finagleRegistry(finagleRegistry)
4948
.buildTranquilizer(config.tranquilizerBuilder())
5049
}
50+
51+
def useInputTopicAsDecodeTopic(topic: String, config: DataSourceConfig[PropertiesBasedKafkaConfig]): DataSourceConfig[PropertiesBasedKafkaConfig] = {
52+
val dataSchema = config.specMap.get("dataSchema").get.asInstanceOf[Dict]
53+
val parser = dataSchema.get("parser").get.asInstanceOf[Dict]
54+
if ("avro_stream".equals(parser.get("type").toString)) {
55+
val avroBytesDecoder = parser.get("avroBytesDecoder").get.asInstanceOf[Dict]
56+
val subjectAndIdConverter = avroBytesDecoder.get("subjectAndIdConverter").get.asInstanceOf[Dict]
57+
val map = config.specMap.updated("dataSchema", dataSchema.updated("parser", parser.updated("avroBytesDecoder", avroBytesDecoder.updated("subjectAndIdConverter", subjectAndIdConverter.updated("topic", topic)))))
58+
config.copy(config.dataSource, config.propertiesBasedConfig, map)
59+
} else {
60+
config
61+
}
62+
}
5163
}

0 commit comments

Comments
 (0)