Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ private NewTopic newRemoteLogMetadataTopic(TopicBasedRemoteLogMetadataManagerCon
topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(rlmmConfig.metadataTopicRetentionMs()));
topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
topicConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false");
topicConfigs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Short.toString(rlmmConfig.metadataTopicMinIsr()));
return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(),
rlmmConfig.metadataTopicPartitionsCount(),
rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "remote.log.metadata.topic.replication.factor";
public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.num.partitions";
public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP = "remote.log.metadata.topic.retention.ms";
public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP = "remote.log.metadata.topic.min.isr";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiafu1115 that is a good idea, but I think this new config still need a KIP. Would you mind opening a KIP for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Sure. thanks for your reminder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, maybe if there's going to be a KIP in this area, we should take the opportunity to make __remote_log_metadata into an internal topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield ok. I will consider this and try to combine two things into one KIP. thanks.

Copy link
Contributor Author

@jiafu1115 jiafu1115 Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield Hi. I checked the code about the "internal" topic and summarised the result as follows:

image It seems that __remote_log_metadata isn’t a purely internal topic, since it can be configured as a normal topic in the remote Kafka cluster — even though its name can’t be changed. And it is using the standard producer to write data instead of local append directly.
org.apache.kafka.server.log.remote.storage.RemoteLogManager#configAndWrapRlmmPlugin
private Plugin<RemoteLogMetadataManager> configAndWrapRlmmPlugin(RemoteLogMetadataManager rlmm) {
    final Map<String, Object> rlmmProps = new HashMap<>();
    endpoint.ifPresent(e -> {
        rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers", e.host() + ":" + e.port());
        rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name);
    });
    // update the remoteLogMetadataProps here to override endpoint config if any
    rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());
    // ...
}

Besides, the current producer clientId for __remote_log_metadata will conflict with the follow code will cause write's reject:

//KafkaApis.scala
val internalTopicsAllowed = request.header.clientId == "__admin_client"

So maybe we shouldn’t make it an “internal” topic in this KIP when we’re not sure it’s matched with current design, and it will also help to keep this KIP stay simple and clear without any concern. WDTY?

also cc @chia7712 @kamalcph for more thoughts.

Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiafu1115 OK, thanks for investigating the internal topic question. I agree that it's best not to make it an internal topic, because of the pluggable nature of this topic's use. Keeping your KIP simple is fine with me.

Copy link
Contributor Author

@jiafu1115 jiafu1115 Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield
My pleasure! I also learn a lot from the investigating. I’ve already opened a PR(#20822) for it, and it doesn’t need a KIP. We can continue the discussion there later. After all, the topic really feels like an internal one — making it internal could have some nice benefits.
For this PR, if you think the approach looks good, I can prepare a tiny KIP now for more discussion. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think a separate tiny KIP for the internal topic question seems like a good way to get the community to decide whether it's a good thing. Your PR for it looks about right. Of course, we'd need the KIP to pass before taking the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield ok. for this minsync change. it is time for me to create one KIP. right? thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need a KIP for min.insync change. If you want to pursue the internal topic question, you could open a second KIP. There's no real reason to tangle the two questions together. Up to you though.

public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = "remote.log.metadata.consume.wait.ms";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP = "remote.log.metadata.initialization.retry.max.timeout.ms";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP = "remote.log.metadata.initialization.retry.interval.ms";

public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS = -1L;
public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR = 2;
public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 2 * 60 * 1000L;
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS = 2 * 60 * 1000L;
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS = 100L;
Expand All @@ -63,6 +65,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
"Default: -1, that means unlimited. Users can configure this value based on their use cases. " +
"To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with " +
"tiered storage in the cluster.";
public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC = "The minimum number of replicas that must acknowledge a write to remote log metadata topic.";
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milliseconds to wait for the local consumer to " +
"receive the published event.";
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval in milliseconds for " +
Expand Down Expand Up @@ -90,6 +93,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC)
.define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS, LOW,
REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC)
.define(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, SHORT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR, atLeast(1), LOW,
REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC)
.define(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS, atLeast(0), LOW,
REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC)
.define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP, LONG,
Expand All @@ -106,6 +111,7 @@ DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS, atLeast(0), LOW,
private final long consumeWaitMs;
private final long metadataTopicRetentionMs;
private final short metadataTopicReplicationFactor;
private final short metadataTopicMinIsr;
private final long initializationRetryMaxTimeoutMs;
private final long initializationRetryIntervalMs;

Expand All @@ -126,6 +132,7 @@ public TopicBasedRemoteLogMetadataManagerConfig(Map<String, ?> props) {
if (metadataTopicRetentionMs != -1 && metadataTopicRetentionMs <= 0) {
throw new IllegalArgumentException("Invalid metadata topic retention in millis: " + metadataTopicRetentionMs);
}
metadataTopicMinIsr = (short) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP);
consumeWaitMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP);
initializationRetryIntervalMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP);
initializationRetryMaxTimeoutMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP);
Expand Down Expand Up @@ -184,6 +191,10 @@ public long initializationRetryIntervalMs() {
return initializationRetryIntervalMs;
}

public short metadataTopicMinIsr() {
return metadataTopicMinIsr;
}

public String logDir() {
return logDir;
}
Expand Down