Skip to content

Commit 99f6aac

Browse files
authored
[kafka] Copy cluster option to subscription (#3919)
Summary: Make sure to always copy all kafka cluster additional options to the subscription. Mainly the group.id.
1 parent c8607bc commit 99f6aac

File tree

1 file changed

+18
-29
lines changed
  • crates/types/src/schema/metadata/updater

1 file changed

+18
-29
lines changed

crates/types/src/schema/metadata/updater/mod.rs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -934,12 +934,7 @@ impl SchemaUpdater {
934934

935935
let subscription = Configuration::pinned()
936936
.ingress
937-
.validate_subscription(Subscription::new(
938-
id,
939-
source,
940-
sink,
941-
metadata.unwrap_or_default(),
942-
))
937+
.create_kafka_subscription(id, source, sink, metadata.unwrap_or_default())
943938
.map_err(|e| SchemaError::Subscription(SubscriptionError::Validation(e.into())))?;
944939

945940
self.schema.subscriptions.insert(id, subscription);
@@ -1243,55 +1238,49 @@ pub struct ValidationError {
12431238
}
12441239

12451240
impl IngressOptions {
1246-
fn validate_subscription(
1241+
fn create_kafka_subscription(
12471242
&self,
1248-
mut subscription: Subscription,
1243+
id: SubscriptionId,
1244+
source: Source,
1245+
sink: Sink,
1246+
mut metadata: HashMap<String, String>,
12491247
) -> Result<Subscription, ValidationError> {
12501248
// Retrieve the cluster option and merge them with subscription metadata
1251-
let Source::Kafka { cluster, .. } = subscription.source();
1249+
let Source::Kafka { cluster, .. } = &source;
12521250
let cluster_options = &self.get_kafka_cluster(cluster).ok_or(ValidationError {
12531251
name: "source",
12541252
reason: "specified cluster in the source URI does not exist. Make sure it is defined in the KafkaOptions",
12551253
})?.additional_options;
12561254

12571255
if cluster_options.contains_key("enable.auto.commit")
1258-
|| subscription.metadata().contains_key("enable.auto.commit")
1256+
|| metadata.contains_key("enable.auto.commit")
12591257
{
12601258
warn!(
12611259
"The configuration option enable.auto.commit should not be set and it will be ignored."
12621260
);
12631261
}
12641262
if cluster_options.contains_key("enable.auto.offset.store")
1265-
|| subscription
1266-
.metadata()
1267-
.contains_key("enable.auto.offset.store")
1263+
|| metadata.contains_key("enable.auto.offset.store")
12681264
{
12691265
warn!(
12701266
"The configuration option enable.auto.offset.store should not be set and it will be ignored."
12711267
);
12721268
}
12731269

1274-
// Set the group.id if unset
1275-
if !(cluster_options.contains_key("group.id")
1276-
|| subscription.metadata().contains_key("group.id"))
1277-
{
1278-
let group_id = subscription.id().to_string();
1270+
let group_id = metadata
1271+
.get("group.id")
1272+
.or_else(|| cluster_options.get("group.id"))
1273+
.cloned()
1274+
.unwrap_or_else(|| id.to_string());
12791275

1280-
subscription
1281-
.metadata_mut()
1282-
.insert("group.id".to_string(), group_id);
1283-
}
1276+
metadata.insert("group.id".into(), group_id);
12841277

12851278
// Set client.id if unset
1286-
if !(cluster_options.contains_key("client.id")
1287-
|| subscription.metadata().contains_key("client.id"))
1288-
{
1289-
subscription
1290-
.metadata_mut()
1291-
.insert("client.id".to_string(), "restate".to_string());
1279+
if !(cluster_options.contains_key("client.id") || metadata.contains_key("client.id")) {
1280+
metadata.insert("client.id".to_string(), "restate".to_string());
12921281
}
12931282

1294-
Ok(subscription)
1283+
Ok(Subscription::new(id, source, sink, metadata))
12951284
}
12961285
}
12971286

0 commit comments

Comments
 (0)