Skip to content

Commit 751f5b6

Browse files
poorbarcodeKannarFr
authored andcommitted
[fix][broker]Data lost due to conflict loaded up a topic for two brokers, when enabled ServiceUnitStateMetadataStoreTableViewImpl (apache#24478)
1 parent 5fe1c31 commit 751f5b6

File tree

11 files changed

+419
-87
lines changed

11 files changed

+419
-87
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public synchronized void start() throws PulsarServerException {
315315
pulsar.getConfiguration().getDefaultNumberOfNamespaceBundles());
316316

317317
tableview = createServiceUnitStateTableView();
318-
tableview.start(pulsar, this::handleEvent, this::handleExisting);
318+
tableview.start(pulsar, this::handleEvent, this::handleExisting, this::handleInvalidate);
319319

320320
if (debug) {
321321
log.info("Successfully started the channel tableview.");
@@ -465,7 +465,12 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
465465
// we return the owner without its activeness check.
466466
// This broker tries to serve lookups on a best efforts basis when metadata store connection is unstable.
467467
if (!brokerRegistry.isRegistered()) {
468-
return CompletableFuture.completedFuture(owner);
468+
if (tableview.isMetadataStoreBased()) {
469+
return FutureUtil.failedFuture(new MetadataStoreException("broker is unavailable so far because it is"
470+
+ " in the state that tries to reconnect to the metadata store."));
471+
} else {
472+
return CompletableFuture.completedFuture(owner);
473+
}
469474
}
470475

471476
return dedupeGetOwnerRequest(serviceUnit)
@@ -736,6 +741,20 @@ private void handleExisting(String serviceUnit, ServiceUnitStateData data) {
736741
}
737742
}
738743

744+
/***
745+
* When the {@link #tableview} can not determine the ownership of the service-unit, this method will be called.
746+
* Often happens when the current broker can not connect to others.
747+
*/
748+
private void handleInvalidate(String serviceUnit, ServiceUnitStateData data) {
749+
closeServiceUnit(serviceUnit, true).whenComplete((__, ex) -> {
750+
if (ex == null) {
751+
log.info("Unloaded serviceUnit:{} because the ownership is invalidate", serviceUnit);
752+
return;
753+
}
754+
log.error("Failed to unload serviceUnit:{} after the ownership is invalidate", serviceUnit, ex);
755+
});
756+
}
757+
739758
private static boolean isTransferCommand(ServiceUnitStateData data) {
740759
if (data == null) {
741760
return false;

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@
3131
import java.util.regex.PatternSyntaxException;
3232
import lombok.NonNull;
3333
import lombok.extern.slf4j.Slf4j;
34+
import org.apache.pulsar.broker.MetadataSessionExpiredPolicy;
3435
import org.apache.pulsar.broker.PulsarService;
3536
import org.apache.pulsar.common.util.FutureUtil;
3637
import org.apache.pulsar.metadata.api.MetadataStoreException;
3738
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
39+
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
3840
import org.apache.pulsar.metadata.tableview.impl.MetadataStoreTableViewImpl;
3941

4042
@Slf4j
@@ -54,13 +56,23 @@ public class ServiceUnitStateMetadataStoreTableViewImpl extends ServiceUnitState
5456
private ServiceUnitStateDataConflictResolver conflictResolver;
5557
private volatile MetadataStoreTableView<ServiceUnitStateData> tableview;
5658

59+
@Override
5760
public void start(PulsarService pulsar,
5861
BiConsumer<String, ServiceUnitStateData> tailItemListener,
59-
BiConsumer<String, ServiceUnitStateData> existingItemListener)
62+
BiConsumer<String, ServiceUnitStateData> existingItemListener,
63+
BiConsumer<String, ServiceUnitStateData> outdatedItemListeners)
6064
throws MetadataStoreException {
6165
init(pulsar);
6266
conflictResolver = new ServiceUnitStateDataConflictResolver();
6367
conflictResolver.setStorageType(MetadataStore);
68+
if (!(pulsar.getLocalMetadataStore() instanceof AbstractMetadataStore)
69+
&& !MetadataSessionExpiredPolicy.shutdown.equals(pulsar.getConfig().getZookeeperSessionExpiredPolicy())) {
70+
String errorMsg = String.format("Your current metadata store [%s] does not support the registration of "
71+
+ "session event listeners. Please set \"zookeeperSessionExpiredPolicy\" to \"shutdown\";"
72+
+ " otherwise, you will encounter the issue that messages lost because of conflicted topic loading",
73+
pulsar.getLocalMetadataStore().getClass().getName());
74+
log.warn(errorMsg);
75+
}
6476
tableview = new MetadataStoreTableViewImpl<>(ServiceUnitStateData.class,
6577
pulsar.getBrokerId(),
6678
pulsar.getLocalMetadataStore(),
@@ -69,12 +81,21 @@ public void start(PulsarService pulsar,
6981
this::validateServiceUnitPath,
7082
List.of(this::updateOwnedServiceUnits, tailItemListener),
7183
List.of(this::updateOwnedServiceUnits, existingItemListener),
72-
TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds())
84+
List.of(this::invalidateOwnedServiceUnits, outdatedItemListeners),
85+
true,
86+
TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds()),
87+
t -> handleTableViewShutDownEvent(t)
7388
);
7489
tableview.start();
7590

7691
}
7792

93+
protected void handleTableViewShutDownEvent(Throwable throwable) {
94+
log.error("The component of load-balance, which named metadata store table view has shutdown. This Broker can"
95+
+ " not work anymore, start tp shutdow,");
96+
pulsar.shutdownNow();
97+
}
98+
7899
protected boolean resolveConflict(ServiceUnitStateData prev, ServiceUnitStateData cur) {
79100
return !conflictResolver.shouldKeepLeft(prev, cur);
80101
}
@@ -131,6 +152,11 @@ public void flush(long waitDurationInMillis) {
131152
// no-op
132153
}
133154

155+
@Override
156+
public boolean isMetadataStoreBased() {
157+
return true;
158+
}
159+
134160
@Override
135161
public CompletableFuture<Void> delete(String key) {
136162
if (!isValidState()) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,18 @@ public interface ServiceUnitStateTableView extends Closeable {
4747
* @param tailItemListener listener to listen tail(newly updated) items
4848
* @param existingItemListener listener to listen existing items
4949
* @throws IOException if it fails to init the tableview.
50+
* @param itemOutdatedListeners Let's introduce how to ensure the correct element values: 1. The table
51+
* view will try its best to ensure that the data is always the latest. When it cannot be guaranteed, see
52+
* the next Article 2. If you get an old value, you will receive an update event later, ultimately ensuring
53+
* the accuracy of the data. 3. You will receive this notification when the first two cannot be guaranteed
54+
* due to the expiration of the metadata store session of the table view。After that, you also received
55+
* notifications {@param tailItemListeners} and {@param existingItemListeners}, indicating that the table
56+
* view can once again ensure that the first two can work properly.
5057
*/
5158
void start(PulsarService pulsar,
5259
BiConsumer<String, ServiceUnitStateData> tailItemListener,
53-
BiConsumer<String, ServiceUnitStateData> existingItemListener) throws IOException;
60+
BiConsumer<String, ServiceUnitStateData> existingItemListener,
61+
BiConsumer<String, ServiceUnitStateData> itemOutdatedListeners) throws IOException;
5462

5563

5664
/**
@@ -110,4 +118,9 @@ void start(PulsarService pulsar,
110118
* @throws TimeoutException
111119
*/
112120
void flush(long waitDurationInMillis) throws ExecutionException, InterruptedException, TimeoutException;
121+
122+
/**
123+
* Whether it depends on the local metadata store.
124+
*/
125+
boolean isMetadataStoreBased();
113126
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewBase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ abstract class ServiceUnitStateTableViewBase implements ServiceUnitStateTableVie
4343
private final Map<NamespaceBundle, Boolean> ownedServiceUnitsMap = new ConcurrentHashMap<>();
4444
private final Set<NamespaceBundle> ownedServiceUnits = Collections.unmodifiableSet(ownedServiceUnitsMap.keySet());
4545
private String brokerId;
46-
private PulsarService pulsar;
46+
protected PulsarService pulsar;
4747
protected void init(PulsarService pulsar) throws MetadataStoreException {
4848
this.pulsar = pulsar;
4949
this.brokerId = pulsar.getBrokerId();
@@ -89,4 +89,9 @@ protected void updateOwnedServiceUnits(String key, ServiceUnitStateData val) {
8989
}
9090
});
9191
}
92+
93+
protected void invalidateOwnedServiceUnits(String key, ServiceUnitStateData outdatedVal) {
94+
NamespaceBundle namespaceBundle = LoadManagerShared.getNamespaceBundle(pulsar, key);
95+
ownedServiceUnitsMap.compute(namespaceBundle, (k, v) -> false);
96+
}
9297
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ public class ServiceUnitStateTableViewImpl extends ServiceUnitStateTableViewBase
5151
private volatile Producer<ServiceUnitStateData> producer;
5252
private volatile TableView<ServiceUnitStateData> tableview;
5353

54+
@Override
5455
public void start(PulsarService pulsar,
5556
BiConsumer<String, ServiceUnitStateData> tailItemListener,
56-
BiConsumer<String, ServiceUnitStateData> existingItemListener) throws IOException {
57+
BiConsumer<String, ServiceUnitStateData> existingItemListener,
58+
BiConsumer<String, ServiceUnitStateData> itemOutdatedListeners) throws IOException {
5759
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
5860

5961
init(pulsar);
@@ -175,6 +177,11 @@ public void flush(long waitDurationInMillis) throws InterruptedException, Timeou
175177
tableview.refreshAsync().get(waitTimeMs, MILLISECONDS);
176178
}
177179

180+
@Override
181+
public boolean isMetadataStoreBased() {
182+
return false;
183+
}
184+
178185
@Override
179186
public CompletableFuture<Void> delete(String key) {
180187
return put(key, null);

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ private void syncExistingItems()
100100
metadataStoreTableView.start(
101101
pulsar,
102102
this::dummy,
103+
this::dummy,
103104
this::dummy
104105
);
105106

@@ -108,6 +109,7 @@ private void syncExistingItems()
108109
systemTopicTableView.start(
109110
pulsar,
110111
this::dummy,
112+
this::dummy,
111113
this::dummy
112114
);
113115

@@ -152,6 +154,7 @@ private void syncTailItems() throws InterruptedException, IOException, TimeoutEx
152154
this.metadataStoreTableView.start(
153155
pulsar,
154156
this::syncToSystemTopic,
157+
this::dummy,
155158
this::dummy
156159
);
157160
log.info("Started MetadataStoreTableView");
@@ -160,6 +163,7 @@ private void syncTailItems() throws InterruptedException, IOException, TimeoutEx
160163
this.systemTopicTableView.start(
161164
pulsar,
162165
this::syncToMetadataStore,
166+
this::dummy,
163167
this::dummy
164168
);
165169
log.info("Started SystemTopicTableView");

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.TimeUnit;
3030
import org.apache.commons.lang3.reflect.FieldUtils;
3131
import org.apache.commons.lang3.tuple.Pair;
32+
import org.apache.pulsar.broker.MetadataSessionExpiredPolicy;
3233
import org.apache.pulsar.broker.PulsarService;
3334
import org.apache.pulsar.broker.ServiceConfiguration;
3435
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -78,6 +79,13 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ
7879

7980
protected String serviceUnitStateTableViewClassName;
8081

82+
@Override
83+
protected ServiceConfiguration getDefaultConf() {
84+
ServiceConfiguration conf = super.getDefaultConf();
85+
conf.setZookeeperSessionExpiredPolicy(MetadataSessionExpiredPolicy.shutdown);
86+
return conf;
87+
}
88+
8189
protected ArrayList<PulsarClient> clients = new ArrayList<>();
8290

8391
@DataProvider(name = "serviceUnitStateTableViewClassName")

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import org.apache.pulsar.common.naming.NamespaceBundle;
9797
import org.apache.pulsar.common.naming.TopicName;
9898
import org.apache.pulsar.common.policies.data.TopicType;
99+
import org.apache.pulsar.common.util.FutureUtil;
99100
import org.apache.pulsar.metadata.api.MetadataStoreException;
100101
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
101102
import org.apache.pulsar.metadata.api.NotificationType;
@@ -1791,15 +1792,34 @@ public void testActiveGetOwner() throws Exception {
17911792
assertTrue(ex.getCause() instanceof IllegalStateException);
17921793
assertTrue(System.currentTimeMillis() - start >= 1000);
17931794

1794-
try {
1795-
// verify getOwnerAsync returns immediately when not registered
1796-
registry.unregister();
1797-
start = System.currentTimeMillis();
1798-
assertEquals(broker, channel1.getOwnerAsync(bundle).get().get());
1799-
elapsed = System.currentTimeMillis() - start;
1800-
assertTrue(elapsed < 1000);
1801-
} finally {
1802-
registry.registerAsync().join();
1795+
if (pulsar1.getConfig().getLoadManagerServiceUnitStateTableViewClassName()
1796+
.equals(ServiceUnitStateTableViewImpl.class.getName())) {
1797+
try {
1798+
// verify getOwnerAsync returns immediately when not registered
1799+
registry.unregister();
1800+
start = System.currentTimeMillis();
1801+
assertEquals(broker, channel1.getOwnerAsync(bundle).get().get());
1802+
elapsed = System.currentTimeMillis() - start;
1803+
assertTrue(elapsed < 1000);
1804+
} finally {
1805+
registry.registerAsync().join();
1806+
}
1807+
}
1808+
1809+
if (pulsar1.getConfig().getLoadManagerServiceUnitStateTableViewClassName()
1810+
.equals(ServiceUnitStateMetadataStoreTableViewImpl.class.getName())) {
1811+
try {
1812+
// verify getOwnerAsync returns immediately when not registered
1813+
registry.unregister();
1814+
channel1.getOwnerAsync(bundle).get().get();
1815+
fail("Request should fail because it is in the state that tries to reconnect to the metadata store");
1816+
} catch (Exception e) {
1817+
Throwable actEx = FutureUtil.unwrapCompletionException(e);
1818+
assertTrue(actEx instanceof MetadataStoreException);
1819+
assertTrue(actEx.getMessage().contains("reconnect to the metadata store."));
1820+
} finally {
1821+
registry.registerAsync().join();
1822+
}
18031823
}
18041824

18051825

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,20 @@
2323
import java.net.ServerSocket;
2424
import java.net.URL;
2525
import java.util.Arrays;
26+
import java.util.Collection;
2627
import java.util.Collections;
2728
import java.util.HashSet;
2829
import java.util.Optional;
2930
import java.util.Set;
3031
import java.util.concurrent.CompletableFuture;
3132
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.stream.Collectors;
3234
import lombok.extern.slf4j.Slf4j;
3335
import org.apache.commons.lang3.StringUtils;
3436
import org.apache.pulsar.broker.PulsarService;
3537
import org.apache.pulsar.broker.ServiceConfiguration;
3638
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
39+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
3740
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
3841
import org.apache.pulsar.broker.namespace.LookupOptions;
3942
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -301,6 +304,23 @@ public static Set<String> getAvailableBrokers(PulsarService pulsar) {
301304
}
302305
}
303306

307+
public static Collection<String> getOwnedBundles(PulsarService pulsar) {
308+
Object loadManagerWrapper = pulsar.getLoadManager().get();
309+
Object loadManager = WhiteboxImpl.getInternalState(loadManagerWrapper, "loadManager");
310+
if (loadManager instanceof ModularLoadManagerImpl) {
311+
return pulsar.getNamespaceService().getOwnershipCache().getOwnedBundles()
312+
.keySet().stream().map(k -> k.getNamespaceObject().toString() + "/" + k.getBundleRange())
313+
.collect(Collectors.toList());
314+
} else if (loadManager instanceof ExtensibleLoadManagerImpl extensibleLoadManager) {
315+
ServiceUnitStateChannel serviceUnitStateChannel = extensibleLoadManager.getServiceUnitStateChannel();
316+
return serviceUnitStateChannel.getOwnedServiceUnits().stream()
317+
.map(k -> k.getNamespaceObject().toString() + "/" + k.getBundleRange())
318+
.collect(Collectors.toList());
319+
} else {
320+
throw new RuntimeException("Not support for the load manager: " + loadManager.getClass().getName());
321+
}
322+
}
323+
304324
public void clearPreferBroker() {
305325
preferBroker.set(null);
306326
}

0 commit comments

Comments
 (0)