Skip to content

[fix][broker] Fix corrupted topic policies issues with sequential topic policy updates #24427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2611,7 +2611,7 @@ private void handleLocalPoliciesUpdates(NamespaceName namespace) {
private void handlePoliciesUpdates(NamespaceName namespace) {
pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
.thenAcceptAsync(optPolicies -> {
if (!optPolicies.isPresent()) {
if (!optPolicies.isPresent() || optPolicies.get().deleted) {
return;
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.classification.InterfaceAudience;
Expand Down Expand Up @@ -58,11 +59,21 @@ default CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName,

/**
* Update policies for a topic asynchronously.
* The policyUpdater will be called with a TopicPolicies object (either newly created or cloned from existing)
* which can be safely mutated. The service will handle writing this updated object.
*
* @param topicName topic name
* @param policies policies for the topic name
* @param topicName topic name
* @param isGlobalPolicy true if the global policy is to be updated, false for local
* @param skipUpdateWhenTopicPolicyDoesntExist when true, skips the update if the topic policy does not already
* exist. This is useful for cases when the policyUpdater is removing
* a setting in the policy.
* @param policyUpdater a function that modifies the TopicPolicies
* @return a CompletableFuture that completes when the update has been completed with read-your-writes consistency.
*/
CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies);
CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName,
boolean isGlobalPolicy,
boolean skipUpdateWhenTopicPolicyDoesntExist,
Consumer<TopicPolicies> policyUpdater);

/**
* It controls the behavior of {@link TopicPoliciesService#getTopicPoliciesAsync}.
Expand Down Expand Up @@ -117,7 +128,9 @@ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
}

@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, boolean isGlobalPolicy,
boolean skipUpdateWhenTopicPolicyDoesntExist,
Consumer<TopicPolicies> policyUpdater) {
return FutureUtil.failedFuture(new UnsupportedOperationException("Topic policies service is disabled."));
}

Expand All @@ -144,13 +157,6 @@ static String getEventKey(PulsarEvent event, boolean isGlobal) {
event.getTopicPoliciesEvent().getTopic()).toString(), isGlobal);
}

static String getEventKey(TopicName topicName, boolean isGlobal) {
return wrapEventKey(TopicName.get(topicName.getDomain().toString(),
topicName.getTenant(),
topicName.getNamespace(),
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString(), isGlobal);
}

static String wrapEventKey(String originalKey, boolean isGlobalPolicies) {
if (!isGlobalPolicies) {
return originalKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2161,16 +2161,15 @@ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
admin.topicPolicies().setMaxConsumers(systemTopic, 5);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
final var policies = TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(),
TopicName.get(systemTopic));
TopicName.get(systemTopic), false);
Assert.assertTrue(policies.isPresent());
Assert.assertEquals(policies.get().getMaxConsumerPerTopic(), 5);
});

admin.topics().delete(systemTopic, true);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(
TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(),
TopicName.get(systemTopic))
.isEmpty()));
TopicName.get(systemTopic), false).isEmpty()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3902,12 +3902,12 @@ public void testGetTopicPoliciesWhenDeleteTopicPolicy() throws Exception {
admin.topicPolicies().setMaxConsumers(persistenceTopic, 5);

Integer maxConsumerPerTopic = TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(),
TopicName.get(persistenceTopic)).orElseThrow().getMaxConsumerPerTopic();
TopicName.get(persistenceTopic), false).orElseThrow().getMaxConsumerPerTopic();

assertEquals(maxConsumerPerTopic, 5);
admin.topics().delete(persistenceTopic, true);
assertTrue(TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(),
TopicName.get(persistenceTopic)).isEmpty());
TopicName.get(persistenceTopic), false).isEmpty());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;

import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.testng.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-admin")
public class TopicPoliciesUpdateTest extends MockedPulsarServiceBaseTest {
private final boolean partitionedSystemTopic;
private final String testTenant = "my-tenant";
private final String testNamespace = "my-namespace";
private final String myNamespace = testTenant + "/" + testNamespace;

// comment out the @Factory annotation to run this test individually in IDE
@Factory
public static Object[] createTestInstances() {
return new Object[]{
new TopicPoliciesUpdateTest(false),
new TopicPoliciesUpdateTest(true) // test with partitioned system topic
};
}

public TopicPoliciesUpdateTest() {
partitionedSystemTopic = false;
}

private TopicPoliciesUpdateTest(boolean partitionedSystemTopic) {
this.partitionedSystemTopic = partitionedSystemTopic;
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
if (partitionedSystemTopic) {
// A partitioned system topic will get created when allowAutoTopicCreationType is set to PARTITIONED
conf.setDefaultNumPartitions(4);
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
}
this.conf.setDefaultNumberOfNamespaceBundles(1);
}

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant(this.testTenant, tenantInfo);
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Set.of("test"));
}

@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider
public Object[][] topicTypes() {
return new Object[][]{
{TopicType.PARTITIONED},
{TopicType.NON_PARTITIONED}
};
}

@Test(dataProvider = "topicTypes")
public void testMultipleUpdates(TopicType topicType) throws Exception {
List<String> topics = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String topic = newUniqueName("persistent://" + myNamespace + "/testtopic" + i);
if (TopicType.PARTITIONED.equals(topicType)) {
admin.topics().createNonPartitionedTopic(topic);
} else {
admin.topics().createPartitionedTopic(topic, 2);
}
topics.add(topic);
}

// test data
InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
inactiveTopicPolicies.setDeleteWhileInactive(true);
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
inactiveTopicPolicies.setMaxInactiveDurationSeconds(3600);
DispatchRate dispatchRate = DispatchRate
.builder()
.dispatchThrottlingRateInMsg(1000)
.dispatchThrottlingRateInByte(1000000)
.build();
String clusterId = "test";

// test multiple updates
for (String topic : topics) {
for (int i = 0; i < 10; i++) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
futures.add(admin.topicPolicies().setInactiveTopicPoliciesAsync(topic, inactiveTopicPolicies));
futures.add(admin.topicPolicies().setReplicatorDispatchRateAsync(topic, dispatchRate));
futures.add(admin.topics().setReplicationClustersAsync(topic, List.of(clusterId)));

// wait for all futures to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

assertEquals(admin.topicPolicies().getInactiveTopicPolicies(topic), inactiveTopicPolicies);
assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic), dispatchRate);
assertEquals(admin.topics().getReplicationClusters(topic, true), Set.of(clusterId));
}
}

// verify that there aren't any pending updates in the sequencer
SystemTopicBasedTopicPoliciesService policyService =
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
assertEquals(policyService.getTopicPolicyUpdateSequencerSize(), 0,
"There should be no pending updates after completing all updates");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
*/
package org.apache.pulsar.broker.service;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;

public class InmemoryTopicPoliciesService implements TopicPoliciesService {

private final ExecutorService executor =
Executors.newSingleThreadExecutor(new DefaultThreadFactory("InmemoryTopicPoliciesService"));
private final Map<TopicName, TopicPolicies> cache = new HashMap<>();
private final Map<TopicName, List<TopicPolicyListener>> listeners = new HashMap<>();

Expand All @@ -39,23 +44,34 @@ public synchronized CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName t
}

@Override
public synchronized CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
final var existingPolicies = cache.get(topicName);
if (existingPolicies != policies) {
cache.put(topicName, policies);
CompletableFuture.runAsync(() -> {
final TopicPolicies latestPolicies;
final List<TopicPolicyListener> listeners;
synchronized (InmemoryTopicPoliciesService.this) {
latestPolicies = cache.get(topicName);
listeners = this.listeners.getOrDefault(topicName, List.of());
}
for (var listener : listeners) {
listener.onUpdate(latestPolicies);
}
});
}
return CompletableFuture.completedFuture(null);
public synchronized CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName,
boolean isGlobalPolicy,
boolean skipUpdateWhenTopicPolicyDoesntExist,
Consumer<TopicPolicies> policyUpdater) {
return CompletableFuture.runAsync(() -> {
final var existingPolicies = cache.get(topicName);
if (existingPolicies == null && skipUpdateWhenTopicPolicyDoesntExist) {
return; // No existing policies and skip update
}
final TopicPolicies newPolicies = existingPolicies == null
? createTopicPolicy(isGlobalPolicy)
: existingPolicies.clone();
policyUpdater.accept(newPolicies);
cache.put(topicName, newPolicies);
List<TopicPolicyListener> listeners;
synchronized (this) {
listeners = this.listeners.getOrDefault(topicName, List.of());
}
for (var listener : listeners) {
listener.onUpdate(newPolicies);
}
}, executor);
}

private static TopicPolicies createTopicPolicy(boolean isGlobalPolicy) {
TopicPolicies topicPolicies = new TopicPolicies();
topicPolicies.setIsGlobal(isGlobalPolicy);
return topicPolicies;
}

@Override
Expand All @@ -78,4 +94,9 @@ public synchronized void unregisterListener(TopicName topicName, TopicPolicyList
synchronized boolean containsKey(TopicName topicName) {
return cache.containsKey(topicName);
}

@Override
public void close() throws Exception {
executor.shutdownNow();
}
}
Loading
Loading