diff --git a/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java b/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java index 0bf3416765..931567b750 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java +++ b/core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java @@ -55,7 +55,7 @@ public class ControllerKVClient implements KVClient { private static final Logger LOGGER = LoggerFactory.getLogger(ControllerKVClient.class); - private final ControllerRequestSender requestSender; + protected final ControllerRequestSender requestSender; public ControllerKVClient(ControllerRequestSender requestSender) { this.requestSender = requestSender; diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index d5a02a7005..36eb2d29e3 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -170,7 +170,9 @@ public void start() { storageFailureHandlerChain.addHandler(new ForceCloseStorageFailureHandler(streamClient)); storageFailureHandlerChain.addHandler(new HaltStorageFailureHandler()); this.streamClient.registerStreamLifeCycleListener(localIndexCache); - this.kvClient = new ControllerKVClient(this.requestSender); + this.kvClient = config.isNamespacedKVEnabled() ? + new NamespacedControllerKVClient(this.requestSender) : + new ControllerKVClient(this.requestSender); this.failover = failover(); S3StreamThreadPoolMonitor.config(new LogContext("ThreadPoolMonitor").logger("s3.threads.logger"), TimeUnit.SECONDS.toMillis(5)); diff --git a/core/src/main/scala/kafka/log/stream/s3/NamespacedControllerKVClient.java b/core/src/main/scala/kafka/log/stream/s3/NamespacedControllerKVClient.java new file mode 100644 index 0000000000..4d35b41db0 --- /dev/null +++ b/core/src/main/scala/kafka/log/stream/s3/NamespacedControllerKVClient.java @@ -0,0 +1,322 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.stream.s3; + +import com.automq.stream.api.NamespacedKVClient; +import com.automq.stream.api.KeyValue; +import com.automq.stream.api.KeyValue.Key; +import com.automq.stream.api.KeyValue.Value; +import com.automq.stream.api.NamespacedKeyValue; +import kafka.log.stream.s3.network.ControllerRequestSender; +import kafka.log.stream.s3.network.ControllerRequestSender.RequestTask; +import kafka.log.stream.s3.network.ControllerRequestSender.ResponseHandleResult; +import kafka.log.stream.s3.network.request.BatchRequest; +import kafka.log.stream.s3.network.request.WrapRequest; +import org.apache.kafka.common.message.PutKVsRequestData.PutKVRequest; +import org.apache.kafka.common.message.PutKVsResponseData.PutKVResponse; +import org.apache.kafka.common.message.GetKVsRequestData.GetKVRequest; +import org.apache.kafka.common.message.GetKVsResponseData.GetKVResponse; +import org.apache.kafka.common.message.DeleteKVsRequestData.DeleteKVRequest; +import org.apache.kafka.common.message.DeleteKVsResponseData.DeleteKVResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.s3.DeleteKVsRequest; +import org.apache.kafka.common.requests.s3.GetKVsRequest; +import org.apache.kafka.common.requests.s3.PutKVsRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * A NamespacedControllerKVClient that extends ControllerKVClient to add namespace and epoch support. + * This implementation maintains namespace isolation by prefixing keys with the namespace + * and stores epoch information in special keys within each namespace. + */ +public class NamespacedControllerKVClient extends ControllerKVClient implements NamespacedKVClient { + private static final Logger LOGGER = LoggerFactory.getLogger(NamespacedControllerKVClient.class); + private static final String EPOCH_KEY_PREFIX = "__epoch__:"; + + public NamespacedControllerKVClient(ControllerRequestSender requestSender) { + super(requestSender); + } + + @Override + public CompletableFuture putNamespacedKVIfAbsent(NamespacedKeyValue namespacedKeyValue) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Put namespaced KV if absent: {}", namespacedKeyValue); + } + + // First check if we need to initialize or update the epoch + return getNamespaceEpoch(namespacedKeyValue.getNamespace()) + .thenCompose(currentEpoch -> { + if (currentEpoch != namespacedKeyValue.getEpoch()) { + LOGGER.warn("[NamespacedControllerKVClient]: Epoch mismatch for namespace {}: expected {}, got {}", + namespacedKeyValue.getNamespace(), namespacedKeyValue.getEpoch(), currentEpoch); + return CompletableFuture.completedFuture(Value.of((ByteBuffer) null)); + } + + String namespacedKey = namespacedKeyValue.getNamespacedKey(); + PutKVRequest request = new PutKVRequest() + .setKey(namespacedKey) + .setValue(namespacedKeyValue.getKeyValue().value().get().array()); + + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.PUT_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new PutKVsRequest.Builder( + new PutKVsRequestData() + ).addSubRequest(request); + } + }; + + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Put namespaced KV if absent success: {}", namespacedKeyValue); + } + return ResponseHandleResult.withSuccess(Value.of(response.value())); + case KEY_EXIST: + LOGGER.warn("[NamespacedControllerKVClient]: Failed to put namespaced KV if absent: {}, key already exists", namespacedKeyValue); + return ResponseHandleResult.withSuccess(Value.of(response.value())); + default: + LOGGER.error("[NamespacedControllerKVClient]: Failed to put namespaced KV if absent: {}, code: {}, retry later", namespacedKeyValue, code); + return ResponseHandleResult.withRetry(); + } + }); + getRequestSender().send(task); + return future; + }); + } + + @Override + public CompletableFuture putNamespacedKV(NamespacedKeyValue namespacedKeyValue) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Put namespaced KV: {}", namespacedKeyValue); + } + + return getNamespaceEpoch(namespacedKeyValue.getNamespace()) + .thenCompose(currentEpoch -> { + if (currentEpoch != namespacedKeyValue.getEpoch()) { + LOGGER.warn("[NamespacedControllerKVClient]: Epoch mismatch for namespace {}: expected {}, got {}", + namespacedKeyValue.getNamespace(), namespacedKeyValue.getEpoch(), currentEpoch); + return CompletableFuture.completedFuture(Value.of((ByteBuffer) null)); + } + + String namespacedKey = namespacedKeyValue.getNamespacedKey(); + PutKVRequest request = new PutKVRequest() + .setKey(namespacedKey) + .setValue(namespacedKeyValue.getKeyValue().value().get().array()) + .setOverwrite(true); + + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + PutKVsRequest.Builder realBuilder = (PutKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.PUT_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new PutKVsRequest.Builder( + new PutKVsRequestData() + ).addSubRequest(request); + } + }; + + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Put namespaced KV success: {}", namespacedKeyValue); + } + return ResponseHandleResult.withSuccess(Value.of(response.value())); + default: + LOGGER.error("[NamespacedControllerKVClient]: Failed to put namespaced KV: {}, code: {}, retry later", namespacedKeyValue, code); + return ResponseHandleResult.withRetry(); + } + }); + getRequestSender().send(task); + return future; + }); + } + + @Override + public CompletableFuture getNamespacedKV(String namespace, Key key) { + String namespacedKey = namespace + ":" + key.get(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Get namespaced KV: {}", namespacedKey); + } + + GetKVRequest request = new GetKVRequest() + .setKey(namespacedKey); + + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + GetKVsRequest.Builder realBuilder = (GetKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.GET_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new GetKVsRequest.Builder( + new GetKVsRequestData() + ).addSubRequest(request); + } + }; + + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + Value val = Value.of(response.value()); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Get namespaced KV success: {} = {}", namespacedKey, val); + } + return ResponseHandleResult.withSuccess(val); + default: + LOGGER.error("[NamespacedControllerKVClient]: Failed to get namespaced KV: {}, code: {}, retry later", namespacedKey, code); + return ResponseHandleResult.withRetry(); + } + }); + getRequestSender().send(task); + return future; + } + + @Override + public CompletableFuture delNamespacedKV(String namespace, Key key) { + String namespacedKey = namespace + ":" + key.get(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Delete namespaced KV: {}", namespacedKey); + } + + DeleteKVRequest request = new DeleteKVRequest() + .setKey(namespacedKey); + + WrapRequest req = new BatchRequest() { + @Override + public Builder addSubRequest(Builder builder) { + DeleteKVsRequest.Builder realBuilder = (DeleteKVsRequest.Builder) builder; + return realBuilder.addSubRequest(request); + } + + @Override + public ApiKeys apiKey() { + return ApiKeys.DELETE_KVS; + } + + @Override + public Builder toRequestBuilder() { + return new DeleteKVsRequest.Builder( + new DeleteKVsRequestData() + ).addSubRequest(request); + } + }; + + CompletableFuture future = new CompletableFuture<>(); + RequestTask task = new RequestTask<>(req, future, response -> { + Errors code = Errors.forCode(response.errorCode()); + switch (code) { + case NONE: + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Delete namespaced KV success: {}", namespacedKey); + } + return ResponseHandleResult.withSuccess(Value.of(response.value())); + case KEY_NOT_EXIST: + LOGGER.info("[NamespacedControllerKVClient]: Delete namespaced KV: {}, result: KEY_NOT_EXIST", namespacedKey); + return ResponseHandleResult.withSuccess(Value.of((ByteBuffer) null)); + default: + LOGGER.error("[NamespacedControllerKVClient]: Failed to delete namespaced KV: {}, code: {}, retry later", namespacedKey, code); + return ResponseHandleResult.withRetry(); + } + }); + getRequestSender().send(task); + return future; + } + + @Override + public CompletableFuture getNamespaceEpoch(String namespace) { + String epochKey = EPOCH_KEY_PREFIX + namespace; + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Get namespace epoch: {}", namespace); + } + + return getKV(Key.of(epochKey)) + .thenApply(value -> { + if (value == null || value.isNull()) { + return 0L; + } + return value.get().getLong(); + }); + } + + @Override + public CompletableFuture incrementNamespaceEpoch(String namespace) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("[NamespacedControllerKVClient]: Increment namespace epoch: {}", namespace); + } + + return getNamespaceEpoch(namespace) + .thenCompose(currentEpoch -> { + long newEpoch = currentEpoch + 1; + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(newEpoch); + buffer.flip(); + + String epochKey = EPOCH_KEY_PREFIX + namespace; + return putKV(KeyValue.of(epochKey, buffer)) + .thenApply(value -> newEpoch); + }); + } + + protected ControllerRequestSender getRequestSender() { + return super.requestSender; + } +} \ No newline at end of file diff --git a/s3stream/src/main/java/com/automq/stream/api/DefaultNamespacedKVClient.java b/s3stream/src/main/java/com/automq/stream/api/DefaultNamespacedKVClient.java new file mode 100644 index 0000000000..7fefa400a8 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/api/DefaultNamespacedKVClient.java @@ -0,0 +1,117 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.api; + +import com.automq.stream.api.KeyValue.Key; +import com.automq.stream.api.KeyValue.Value; + +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +/** + * Default implementation of NamespacedKVClient that wraps an existing KVClient. + * This implementation maintains namespace isolation by prefixing keys with the namespace + * and stores epoch information in a special key within each namespace. + */ +public class DefaultNamespacedKVClient implements NamespacedKVClient { + private static final String EPOCH_KEY_PREFIX = "__epoch__:"; + private final KVClient delegate; + + public DefaultNamespacedKVClient(KVClient delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate KVClient cannot be null"); + } + + @Override + public CompletableFuture putKVIfAbsent(KeyValue keyValue) { + return delegate.putKVIfAbsent(keyValue); + } + + @Override + public CompletableFuture putKV(KeyValue keyValue) { + return delegate.putKV(keyValue); + } + + @Override + public CompletableFuture getKV(Key key) { + return delegate.getKV(key); + } + + @Override + public CompletableFuture delKV(Key key) { + return delegate.delKV(key); + } + + @Override + public CompletableFuture putNamespacedKVIfAbsent(NamespacedKeyValue namespacedKeyValue) { + String namespacedKey = namespacedKeyValue.getNamespacedKey(); + KeyValue kv = KeyValue.of(namespacedKey, namespacedKeyValue.getKeyValue().value().get()); + return delegate.putKVIfAbsent(kv); + } + + @Override + public CompletableFuture putNamespacedKV(NamespacedKeyValue namespacedKeyValue) { + String namespacedKey = namespacedKeyValue.getNamespacedKey(); + KeyValue kv = KeyValue.of(namespacedKey, namespacedKeyValue.getKeyValue().value().get()); + return delegate.putKV(kv); + } + + @Override + public CompletableFuture getNamespacedKV(String namespace, Key key) { + String namespacedKey = namespace + ":" + key.get(); + return delegate.getKV(Key.of(namespacedKey)); + } + + @Override + public CompletableFuture delNamespacedKV(String namespace, Key key) { + String namespacedKey = namespace + ":" + key.get(); + return delegate.delKV(Key.of(namespacedKey)); + } + + @Override + public CompletableFuture getNamespaceEpoch(String namespace) { + String epochKey = EPOCH_KEY_PREFIX + namespace; + return delegate.getKV(Key.of(epochKey)) + .thenApply(value -> { + if (value == null || value.isNull()) { + return 0L; + } + ByteBuffer buffer = value.get(); + return buffer.getLong(); + }); + } + + @Override + public CompletableFuture incrementNamespaceEpoch(String namespace) { + return getNamespaceEpoch(namespace) + .thenCompose(currentEpoch -> { + long newEpoch = currentEpoch + 1; + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(newEpoch); + buffer.flip(); + + String epochKey = EPOCH_KEY_PREFIX + namespace; + KeyValue epochKV = KeyValue.of(epochKey, buffer); + + return delegate.putKV(epochKV) + .thenApply(value -> newEpoch); + }); + } +} \ No newline at end of file diff --git a/s3stream/src/main/java/com/automq/stream/api/NamespacedKVClient.java b/s3stream/src/main/java/com/automq/stream/api/NamespacedKVClient.java new file mode 100644 index 0000000000..d102fc1b3f --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/api/NamespacedKVClient.java @@ -0,0 +1,81 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.api; + +import com.automq.stream.api.KeyValue.Value; + +import java.util.concurrent.CompletableFuture; + +/** + * Enhanced KV client that supports namespace isolation and epoch-based versioning. + * This is an optional interface that applications can use when they need namespace + * and versioning support. + */ +public interface NamespacedKVClient extends KVClient { + /** + * Put namespaced key value if key not exist in the namespace. + * + * @param namespacedKeyValue The namespaced key-value pair + * @return async put result with the current value after putting + */ + CompletableFuture putNamespacedKVIfAbsent(NamespacedKeyValue namespacedKeyValue); + + /** + * Put namespaced key value, overwrite if key exists in the namespace. + * + * @param namespacedKeyValue The namespaced key-value pair + * @return async put result with the current value after putting + */ + CompletableFuture putNamespacedKV(NamespacedKeyValue namespacedKeyValue); + + /** + * Get value by namespaced key. + * + * @param namespace The namespace to look in + * @param key The key to look up + * @return async get result with the value, null if key not exist + */ + CompletableFuture getNamespacedKV(String namespace, KeyValue.Key key); + + /** + * Delete key value by namespaced key. + * + * @param namespace The namespace to delete from + * @param key The key to delete + * @return async delete result with the deleted value, null if key not exist + */ + CompletableFuture delNamespacedKV(String namespace, KeyValue.Key key); + + /** + * Get the current epoch for a namespace. + * + * @param namespace The namespace to get the epoch for + * @return async result with the current epoch + */ + CompletableFuture getNamespaceEpoch(String namespace); + + /** + * Increment the epoch for a namespace. + * + * @param namespace The namespace to increment the epoch for + * @return async result with the new epoch value + */ + CompletableFuture incrementNamespaceEpoch(String namespace); +} \ No newline at end of file diff --git a/s3stream/src/main/java/com/automq/stream/api/NamespacedKeyValue.java b/s3stream/src/main/java/com/automq/stream/api/NamespacedKeyValue.java new file mode 100644 index 0000000000..db4f55f817 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/api/NamespacedKeyValue.java @@ -0,0 +1,92 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.api; + +import java.nio.ByteBuffer; +import java.util.Objects; + +/** + * A wrapper around KeyValue that adds namespace and epoch support. + * This is an optional enhancement that can be used when namespace isolation + * and versioning is needed. + */ +public class NamespacedKeyValue { + private final KeyValue keyValue; + private final String namespace; + private final long epoch; + + private NamespacedKeyValue(KeyValue keyValue, String namespace, long epoch) { + this.keyValue = Objects.requireNonNull(keyValue, "keyValue cannot be null"); + this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null"); + this.epoch = epoch; + } + + public static NamespacedKeyValue of(String key, ByteBuffer value, String namespace, long epoch) { + return new NamespacedKeyValue(KeyValue.of(key, value), namespace, epoch); + } + + public static NamespacedKeyValue of(KeyValue keyValue, String namespace, long epoch) { + return new NamespacedKeyValue(keyValue, namespace, epoch); + } + + public KeyValue getKeyValue() { + return keyValue; + } + + public String getNamespace() { + return namespace; + } + + public long getEpoch() { + return epoch; + } + + /** + * Creates a composite key that includes the namespace. + * This allows the underlying KV store to maintain namespace isolation. + */ + public String getNamespacedKey() { + return namespace + ":" + keyValue.key().get(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof NamespacedKeyValue)) return false; + NamespacedKeyValue that = (NamespacedKeyValue) o; + return epoch == that.epoch && + Objects.equals(keyValue, that.keyValue) && + Objects.equals(namespace, that.namespace); + } + + @Override + public int hashCode() { + return Objects.hash(keyValue, namespace, epoch); + } + + @Override + public String toString() { + return "NamespacedKeyValue{" + + "keyValue=" + keyValue + + ", namespace='" + namespace + '\'' + + ", epoch=" + epoch + + '}'; + } +} \ No newline at end of file diff --git a/s3stream/src/main/java/com/automq/stream/s3/Config.java b/s3stream/src/main/java/com/automq/stream/s3/Config.java index dc43b6fdc5..5e811d18f5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/Config.java +++ b/s3stream/src/main/java/com/automq/stream/s3/Config.java @@ -64,6 +64,11 @@ public class Config { throw new UnsupportedOperationException(); }; + /** + * Whether to enable namespaced KV support + */ + private boolean namespacedKVEnabled = false; + public int nodeId() { return nodeId; } @@ -342,4 +347,13 @@ public Config version(Supplier version) { public Version version() { return version.get(); } + + public boolean isNamespacedKVEnabled() { + return namespacedKVEnabled; + } + + public Config namespacedKVEnabled(boolean enabled) { + this.namespacedKVEnabled = enabled; + return this; + } }