Skip to content

Commit 415c6fa

Browse files
[feat][fn] Fallback to using STATE_STORAGE_SERVICE_URL in PulsarMetadataStateStoreProviderImpl.init (#24721)
1 parent 7547fab commit 415c6fa

File tree

5 files changed

+225
-89
lines changed

5 files changed

+225
-89
lines changed

pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.Map;
2222
import lombok.SneakyThrows;
23+
import org.apache.commons.lang3.StringUtils;
2324
import org.apache.pulsar.metadata.api.MetadataStore;
2425
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
2526
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
@@ -46,6 +47,9 @@ public void init(Map<String, Object> config) throws Exception {
4647
shouldCloseStore = false;
4748
} else {
4849
String metadataUrl = (String) config.get(METADATA_URL);
50+
if (StringUtils.isEmpty(metadataUrl)) {
51+
metadataUrl = (String) config.get(StateStoreProvider.STATE_STORAGE_SERVICE_URL);
52+
}
4953
store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder()
5054
.metadataStoreName(MetadataStoreConfig.STATE_METADATA_STORE).build());
5155
shouldCloseStore = true;
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.tests.integration.functions;
20+
21+
import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
22+
23+
public class PulsarBKStateStoreTest extends PulsarStateTest {
24+
protected PulsarBKStateStoreTest() {
25+
super(BKStateStoreProviderImpl.class.getName());
26+
}
27+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.tests.integration.functions;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import lombok.extern.slf4j.Slf4j;
23+
import org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl;
24+
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
25+
import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
26+
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
27+
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
28+
import org.testcontainers.containers.Network;
29+
30+
@Slf4j
31+
public class PulsarMetadataStateStoreTest extends PulsarStateTest {
32+
protected PulsarMetadataStateStoreTest() {
33+
super(PulsarMetadataStateStoreProviderImpl.class.getName());
34+
}
35+
36+
public void setUpCluster() throws Exception {
37+
incrementSetupNumber();
38+
network = Network.newNetwork();
39+
String clusterName = PulsarClusterTestBase.randomName(8);
40+
container = new StandaloneContainer(clusterName, PulsarContainer.DEFAULT_IMAGE_NAME)
41+
.withNetwork(network)
42+
.withNetworkAliases(StandaloneContainer.NAME + "-" + clusterName)
43+
.withEnv("PULSAR_STANDALONE_USE_ZOOKEEPER", "true")
44+
.withEnv("PF_stateStorageProviderImplementation", PulsarMetadataStateStoreProviderImpl.class.getName())
45+
.withEnv("PF_stateStorageServiceUrl", "zk:localhost:2181");
46+
container.start();
47+
log.info("Pulsar cluster {} is up running:", clusterName);
48+
log.info("\tBinary Service Url : {}", container.getPlainTextServiceUrl());
49+
log.info("\tHttp Service Url : {}", container.getHttpServiceUrl());
50+
51+
// add cluster to public tenant
52+
ContainerExecResult result = container.execCmd(
53+
"/pulsar/bin/pulsar-admin", "namespaces", "policies", "public/default");
54+
assertEquals(0, result.getExitCode());
55+
log.info("public/default namespace policies are {}", result.getStdout());
56+
}
57+
58+
59+
}
60+

0 commit comments

Comments
 (0)