diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3b74e19d2884e..c2fde04b24052 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -128,7 +128,6 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; -import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -552,17 +551,14 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { protected CompletableFuture internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies, boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setIsGlobal(isGlobal); - topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive()); - topicPolicies.setDelayedDeliveryTickTimeMillis( - deliveryPolicies == null ? null : deliveryPolicies.getTickTime()); - topicPolicies.setDelayedDeliveryMaxDelayInMillis( - deliveryPolicies == null ? null : deliveryPolicies.getMaxDeliveryDelayInMillis()); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, deliveryPolicies == null, policies -> { + policies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive()); + policies.setDelayedDeliveryTickTimeMillis( + deliveryPolicies == null ? null : deliveryPolicies.getTickTime()); + policies.setDelayedDeliveryMaxDelayInMillis( + deliveryPolicies == null ? null : deliveryPolicies.getMaxDeliveryDelayInMillis()); + }); } protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative, @@ -1014,15 +1010,12 @@ protected CompletableFuture internalGetOffloadPolicies(bool }); } - protected CompletableFuture internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies, + protected CompletableFuture internalSetOffloadPolicies(OffloadPoliciesImpl offloadPoliciesToSet, boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setOffloadPolicies(offloadPolicies); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, offloadPoliciesToSet == null, policies -> { + policies.setOffloadPolicies(offloadPoliciesToSet); + }); } protected CompletableFuture internalGetInactiveTopicPolicies @@ -1042,14 +1035,11 @@ protected CompletableFuture internalSetOffloadPolicies(OffloadPoliciesImpl } protected CompletableFuture internalSetInactiveTopicPolicies - (InactiveTopicPolicies inactiveTopicPolicies, boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setIsGlobal(isGlobal); - topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + (InactiveTopicPolicies inactiveTopicPoliciesToSet, boolean isGlobal) { + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, inactiveTopicPoliciesToSet == null, policies -> { + policies.setInactiveTopicPolicies(inactiveTopicPoliciesToSet); + }); } protected CompletableFuture internalGetMaxUnackedMessagesOnSubscription(boolean applied, @@ -1066,20 +1056,17 @@ protected CompletableFuture internalGetMaxUnackedMessagesOnSubscription })); } - protected CompletableFuture internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum, + protected CompletableFuture internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNumToSet, boolean isGlobal) { - if (maxUnackedNum != null && maxUnackedNum < 0) { + if (maxUnackedNumToSet != null && maxUnackedNumToSet < 0) { throw new RestException(Status.PRECONDITION_FAILED, "maxUnackedNum must be 0 or more"); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, maxUnackedNumToSet == null, policies -> { + policies.setMaxUnackedMessagesOnSubscription(maxUnackedNumToSet); + }); } protected CompletableFuture internalGetMaxUnackedMessagesOnConsumer(boolean applied, boolean isGlobal) { @@ -1094,33 +1081,28 @@ protected CompletableFuture internalGetMaxUnackedMessagesOnConsumer(boo })); } - protected CompletableFuture internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum, + protected CompletableFuture internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNumToSet, boolean isGlobal) { - if (maxUnackedNum != null && maxUnackedNum < 0) { + if (maxUnackedNumToSet != null && maxUnackedNumToSet < 0) { throw new RestException(Status.PRECONDITION_FAILED, "maxUnackedNum must be 0 or more"); } - return getTopicPoliciesAsyncWithRetry(topicName) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, maxUnackedNumToSet == null, policies -> { + policies.setMaxUnackedMessagesOnConsumer(maxUnackedNumToSet); + }); } - protected CompletableFuture internalSetDeduplicationSnapshotInterval(Integer interval, boolean isGlobal) { - if (interval != null && interval < 0) { + protected CompletableFuture internalSetDeduplicationSnapshotInterval(Integer intervalToSet, + boolean isGlobal) { + if (intervalToSet != null && intervalToSet < 0) { throw new RestException(Status.PRECONDITION_FAILED, "interval must be 0 or more"); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies policies = op.orElseGet(TopicPolicies::new); - policies.setDeduplicationSnapshotIntervalSeconds(interval); - policies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, intervalToSet == null, policies -> { + policies.setDeduplicationSnapshotIntervalSeconds(intervalToSet); + }); } private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, boolean authoritative) { @@ -3320,16 +3302,17 @@ protected CompletableFuture internalSetBacklogQuota(BacklogQuota.BacklogQu "Backlog Quota exceeds configured retention quota for topic. " + "Please increase retention quota and retry")); } - if (backlogQuota != null) { - topicPolicies.getBackLogQuotaMap().put(finalBacklogQuotaType.name(), backlogQuota); - } else { - topicPolicies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name()); - } - Map backLogQuotaMap = topicPolicies.getBackLogQuotaMap(); - topicPolicies.setIsGlobal(isGlobal); return pulsar().getTopicPoliciesService() - .updateTopicPoliciesAsync(topicName, topicPolicies) - .thenRun(() -> { + .updateTopicPoliciesAsync(topicName, isGlobal, backlogQuota == null, + policies -> { + if (backlogQuota != null) { + policies.getBackLogQuotaMap() + .put(finalBacklogQuotaType.name(), backlogQuota); + } else { + policies.getBackLogQuotaMap().remove(finalBacklogQuotaType.name()); + } + Map backLogQuotaMap = + policies.getBackLogQuotaMap(); try { log.info( "[{}] Successfully updated backlog quota map: namespace={}, " @@ -3350,10 +3333,10 @@ protected CompletableFuture internalSetReplicationClusters(List cl return CompletableFuture.failedFuture(new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty")); } - Set replicationClusters = Sets.newHashSet(clusterIds); + Set replicationClustersSet = Sets.newHashSet(clusterIds); return validatePoliciesReadOnlyAccessAsync() .thenAccept(__ -> { - if (replicationClusters.contains("global")) { + if (replicationClustersSet.contains("global")) { throw new RestException(Status.PRECONDITION_FAILED, "Cannot specify global in the list of replication clusters"); } @@ -3401,19 +3384,19 @@ protected CompletableFuture internalSetReplicationClusters(List cl }) .thenCompose(__ -> clustersAsync()) .thenCompose(clusters -> { - List> futures = new ArrayList<>(replicationClusters.size()); - for (String clusterId : replicationClusters) { + List> futures = new ArrayList<>(replicationClustersSet.size()); + for (String clusterId : replicationClustersSet) { if (!clusters.contains(clusterId)) { throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId); } - futures.add(validatePeerClusterConflictAsync(clusterId, replicationClusters)); + futures.add(validatePeerClusterConflictAsync(clusterId, replicationClustersSet)); futures.add(validateClusterForTenantAsync(namespaceName.getTenant(), clusterId)); } return FutureUtil.waitForAll(futures); }).thenCompose(__ -> { if (!pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) { log.info("[{}] Skip creating partitioned for topic {} for the remote clusters {}", - clientAppId(), topicName, replicationClusters.stream().filter(v -> + clientAppId(), topicName, replicationClustersSet.stream().filter(v -> !pulsar().getConfig().getClusterName().equals(v)).collect(Collectors.toList())); return CompletableFuture.completedFuture(null); } @@ -3426,48 +3409,34 @@ protected CompletableFuture internalSetReplicationClusters(List cl if (topicMetaOp.isEmpty()) { return CompletableFuture.completedFuture(null); } - return FutureUtil.waitForAll( - internalCreatePartitionedTopicToReplicatedClustersInBackground(replicationClusters, - topicMetaOp.get().partitions).values()); + return FutureUtil.waitForAll( + internalCreatePartitionedTopicToReplicatedClustersInBackground( + replicationClustersSet, topicMetaOp.get().partitions).values()); }); - }).thenCompose(__ -> - getTopicPoliciesAsyncWithRetry(topicName, isGlobal).thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setReplicationClusters(clusterIds); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) - .thenRun(() -> { - log.info("[{}] Successfully set replication clusters for namespace={}, " - + "topic={}, clusters={}", - clientAppId(), - namespaceName, - topicName.getLocalName(), - topicPolicies.getReplicationClusters()); - }); - } - )); + }) + .thenCompose(__ -> pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, + false, policies -> { + policies.setReplicationClusters(clusterIds); + log.info("[{}] Successfully set replication clusters for namespace={}, " + + "topic={}, clusters={}", + clientAppId(), + namespaceName, + topicName.getLocalName(), + policies.getReplicationClusters()); + })); // Replication clusters are typically local policy } protected CompletableFuture internalRemoveReplicationClusters(boolean isGlobal) { return validatePoliciesReadOnlyAccessAsync() - .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)) - .thenCompose(op -> { - if (op.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setReplicationClusters(null); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) - .thenRun(() -> { - log.info("[{}] Successfully set replication clusters for namespace={}, " - + "topic={}, clusters={}", - clientAppId(), - namespaceName, - topicName.getLocalName(), - topicPolicies.getReplicationClusters()); - }); - }); + .thenCompose(__ -> pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, + true, policies -> { + policies.setReplicationClusters(null); + log.info("[{}] Successfully removed replication clusters for namespace={}, " + + "topic={}, clusters set to null", + clientAppId(), + namespaceName, + topicName.getLocalName()); + })); } protected CompletableFuture internalGetDeduplication(boolean applied, boolean isGlobal) { @@ -3482,33 +3451,26 @@ protected CompletableFuture internalGetDeduplication(boolean applied, b })); } - protected CompletableFuture internalSetDeduplication(Boolean enabled, boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setDeduplicationEnabled(enabled); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + protected CompletableFuture internalSetDeduplication(Boolean enabledToSet, boolean isGlobal) { + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, enabledToSet == null, policies -> { + policies.setDeduplicationEnabled(enabledToSet); + }); } - protected CompletableFuture internalSetMessageTTL(Integer ttlInSecond, boolean isGlobal) { + protected CompletableFuture internalSetMessageTTL(Integer ttlInSecondToSet, boolean isGlobal) { //Validate message ttl value. - if (ttlInSecond != null && ttlInSecond < 0) { + if (ttlInSecondToSet != null && ttlInSecondToSet < 0) { return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL")); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setMessageTTLInSeconds(ttlInSecond); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) - .thenRun(() -> - log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}", - clientAppId(), namespaceName, topicName.getLocalName(), ttlInSecond)); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, ttlInSecondToSet == null, policies -> { + policies.setMessageTTLInSeconds(ttlInSecondToSet); + log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}", + clientAppId(), namespaceName, topicName.getLocalName(), ttlInSecondToSet); + }); } private CompletableFuture getRetentionPoliciesAsync(TopicName topicName, @@ -3534,66 +3496,53 @@ protected CompletableFuture internalGetRetention(boolean appl })); } - protected CompletableFuture internalSetRetention(RetentionPolicies retention, boolean isGlobal) { - validateRetentionPolicies(retention); - if (retention == null) { + protected CompletableFuture internalSetRetention(RetentionPolicies retentionToSet, boolean isGlobal) { + validateRetentionPolicies(retentionToSet); + if (retentionToSet == null) { return CompletableFuture.completedFuture(null); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) { - BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name()); - if (backlogQuota == null) { - Policies policies = getNamespacePolicies(topicName.getNamespaceObject()); - backlogQuota = policies.backlog_quota_map.get(backlogQuotaType); - } - if (!checkBacklogQuota(backlogQuota, retention)) { - log.warn( - "[{}] Failed to update retention quota configuration for topic {}: " - + "conflicts with retention quota", - clientAppId(), topicName); - return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, - "Retention Quota must exceed configured backlog quota for topic. " - + "Please increase retention quota and retry")); - } - } - topicPolicies.setRetentionPolicies(retention); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return getNamespacePoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(currentNamespacePolicies -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenCompose(op -> { + TopicPolicies currentTopicPolicies = op.orElseGet(TopicPolicies::new); + for (BacklogQuota.BacklogQuotaType backlogQuotaType : + BacklogQuota.BacklogQuotaType.values()) { + BacklogQuota backlogQuota = + currentTopicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name()); + if (backlogQuota == null) { + backlogQuota = currentNamespacePolicies.backlog_quota_map.get(backlogQuotaType); + } + if (!checkBacklogQuota(backlogQuota, retentionToSet)) { + log.warn("[{}] Failed to update retention quota configuration for topic {}: " + + "conflicts with retention quota", clientAppId(), topicName); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Retention Quota must exceed configured backlog quota for topic. " + + "Please increase retention quota and retry")); + } + } + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, retentionToSet == null, policies -> { + policies.setRetentionPolicies(retentionToSet); + }); + })); } protected CompletableFuture internalRemoveRetention(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setRetentionPolicies(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setRetentionPolicies(null); + }); } protected CompletableFuture internalSetDispatcherPauseOnAckStatePersistent(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setDispatcherPauseOnAckStatePersistentEnabled(true); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setDispatcherPauseOnAckStatePersistentEnabled(true); + }); } protected CompletableFuture internalRemoveDispatcherPauseOnAckStatePersistent(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setDispatcherPauseOnAckStatePersistentEnabled(false); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setDispatcherPauseOnAckStatePersistentEnabled(false); + }); } protected CompletableFuture internalGetDispatcherPauseOnAckStatePersistent(boolean applied, @@ -3622,44 +3571,33 @@ protected CompletableFuture internalGetPersistence(boolean })); } - protected CompletableFuture internalSetPersistence(PersistencePolicies persistencePolicies, + protected CompletableFuture internalSetPersistence(PersistencePolicies persistencePoliciesToSet, boolean isGlobal) { - validatePersistencePolicies(persistencePolicies); - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setPersistence(persistencePolicies); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + validatePersistencePolicies(persistencePoliciesToSet); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, persistencePoliciesToSet == null, policies -> { + policies.setPersistence(persistencePoliciesToSet); + }); } protected CompletableFuture internalRemovePersistence(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setPersistence(null); - op.get().setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setPersistence(null); + }); } - protected CompletableFuture internalSetMaxMessageSize(Integer maxMessageSize, boolean isGlobal) { - if (maxMessageSize != null && (maxMessageSize < 0 || maxMessageSize > config().getMaxMessageSize())) { + protected CompletableFuture internalSetMaxMessageSize(Integer maxMessageSizeToSet, boolean isGlobal) { + if (maxMessageSizeToSet != null && (maxMessageSizeToSet < 0 + || maxMessageSizeToSet > config().getMaxMessageSize())) { throw new RestException(Status.PRECONDITION_FAILED , "topic-level maxMessageSize must be greater than or equal to 0 " + "and must be smaller than that in the broker-level"); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setMaxMessageSize(maxMessageSize); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, maxMessageSizeToSet == null, policies -> { + policies.setMaxMessageSize(maxMessageSizeToSet); + }); } protected CompletableFuture> internalGetMaxMessageSize(boolean isGlobal) { @@ -3679,19 +3617,15 @@ protected CompletableFuture internalGetMaxProducers(boolean applied, bo })); } - protected CompletableFuture internalSetMaxProducers(Integer maxProducers, boolean isGlobal) { - if (maxProducers != null && maxProducers < 0) { + protected CompletableFuture internalSetMaxProducers(Integer maxProducersToSet, boolean isGlobal) { + if (maxProducersToSet != null && maxProducersToSet < 0) { throw new RestException(Status.PRECONDITION_FAILED, "maxProducers must be 0 or more"); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setMaxProducerPerTopic(maxProducers); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); - + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, maxProducersToSet == null, policies -> { + policies.setMaxProducerPerTopic(maxProducersToSet); + }); } protected CompletableFuture> internalGetMaxSubscriptionsPerTopic(boolean isGlobal) { @@ -3699,20 +3633,17 @@ protected CompletableFuture> internalGetMaxSubscriptionsPerTop .thenApply(op -> op.map(TopicPolicies::getMaxSubscriptionsPerTopic)); } - protected CompletableFuture internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic, + protected CompletableFuture internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsToSet, boolean isGlobal) { - if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) { + if (maxSubscriptionsToSet != null && maxSubscriptionsToSet < 0) { throw new RestException(Status.PRECONDITION_FAILED, "maxSubscriptionsPerTopic must be 0 or more"); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, maxSubscriptionsToSet == null, policies -> { + policies.setMaxSubscriptionsPerTopic(maxSubscriptionsToSet); + }); } protected CompletableFuture internalGetReplicatorDispatchRate(boolean applied, boolean isGlobal) { @@ -3728,15 +3659,12 @@ protected CompletableFuture internalGetReplicatorDispatchRate( })); } - protected CompletableFuture internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate, + protected CompletableFuture internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRateToSet, boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setReplicatorDispatchRate(dispatchRate); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, dispatchRateToSet == null, policies -> { + policies.setReplicatorDispatchRate(dispatchRateToSet); + }); } protected CompletableFuture preValidation(boolean authoritative) { @@ -3775,15 +3703,9 @@ protected CompletableFuture preValidation(boolean authoritative) { } protected CompletableFuture internalRemoveMaxProducers(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setMaxProducerPerTopic(null); - op.get().setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setMaxProducerPerTopic(null); + }); } protected CompletableFuture internalGetMaxConsumers(boolean applied, boolean isGlobal) { @@ -3798,31 +3720,21 @@ protected CompletableFuture internalGetMaxConsumers(boolean applied, bo })); } - protected CompletableFuture internalSetMaxConsumers(Integer maxConsumers, boolean isGlobal) { - if (maxConsumers != null && maxConsumers < 0) { + protected CompletableFuture internalSetMaxConsumers(Integer maxConsumersToSet, boolean isGlobal) { + if (maxConsumersToSet != null && maxConsumersToSet < 0) { throw new RestException(Status.PRECONDITION_FAILED, "maxConsumers must be 0 or more"); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setMaxConsumerPerTopic(maxConsumers); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, maxConsumersToSet == null, policies -> { + policies.setMaxConsumerPerTopic(maxConsumersToSet); + }); } protected CompletableFuture internalRemoveMaxConsumers(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setMaxConsumerPerTopic(null); - op.get().setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); - + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setMaxConsumerPerTopic(null); + }); } protected CompletableFuture internalTerminateAsync(boolean authoritative) { @@ -4731,30 +4643,19 @@ protected CompletableFuture internalGetDispatchRate(boolean ap })); } - protected CompletableFuture internalSetDispatchRate(DispatchRateImpl dispatchRate, boolean isGlobal) { - if (dispatchRate == null) { + protected CompletableFuture internalSetDispatchRate(DispatchRateImpl dispatchRateToSet, boolean isGlobal) { + if (dispatchRateToSet == null) { return CompletableFuture.completedFuture(null); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setDispatchRate(dispatchRate); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setDispatchRate(dispatchRateToSet); + }); } protected CompletableFuture internalRemoveDispatchRate(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - TopicPolicies topicPolicies = op.get(); - topicPolicies.setDispatchRate(null); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setDispatchRate(null); + }); } protected CompletableFuture internalGetSubscriptionDispatchRate(boolean applied, boolean isGlobal) { @@ -4771,30 +4672,19 @@ protected CompletableFuture internalGetSubscriptionDispatchRate(bo } protected CompletableFuture internalSetSubscriptionDispatchRate - (DispatchRateImpl dispatchRate, boolean isGlobal) { - if (dispatchRate == null) { + (DispatchRateImpl dispatchRateToSet, boolean isGlobal) { + if (dispatchRateToSet == null) { return CompletableFuture.completedFuture(null); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setSubscriptionDispatchRate(dispatchRate); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setSubscriptionDispatchRate(dispatchRateToSet); + }); } protected CompletableFuture internalRemoveSubscriptionDispatchRate(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - TopicPolicies topicPolicies = op.get(); - topicPolicies.setSubscriptionDispatchRate(null); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setSubscriptionDispatchRate(null); + }); } protected CompletableFuture internalGetSubscriptionLevelDispatchRate(String subName, boolean applied, @@ -4815,40 +4705,28 @@ protected CompletableFuture internalGetSubscriptionLevelDispatchRa protected CompletableFuture internalSetSubscriptionLevelDispatchRate(String subName, DispatchRateImpl dispatchRate, boolean isGlobal) { - final DispatchRateImpl newDispatchRate = DispatchRateImpl.normalize(dispatchRate); - if (newDispatchRate == null) { + final DispatchRateImpl newDispatchRateToSet = DispatchRateImpl.normalize(dispatchRate); + if (newDispatchRateToSet == null) { return CompletableFuture.completedFuture(null); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setIsGlobal(isGlobal); - topicPolicies.getSubscriptionPolicies() - .computeIfAbsent(subName, k -> new SubscriptionPolicies()) - .setDispatchRate(newDispatchRate); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.getSubscriptionPolicies() + .computeIfAbsent(subName, k -> new SubscriptionPolicies()) + .setDispatchRate(newDispatchRateToSet); + }); } protected CompletableFuture internalRemoveSubscriptionLevelDispatchRate(String subName, boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - TopicPolicies topicPolicies = op.get(); - SubscriptionPolicies sp = topicPolicies.getSubscriptionPolicies().get(subName); - if (sp == null) { - return CompletableFuture.completedFuture(null); - } + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + SubscriptionPolicies sp = policies.getSubscriptionPolicies().get(subName); + if (sp != null) { sp.setDispatchRate(null); if (sp.checkEmpty()) { // cleanup empty SubscriptionPolicies - topicPolicies.getSubscriptionPolicies().remove(subName, sp); + policies.getSubscriptionPolicies().remove(subName, sp); } - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + } + }); } @@ -4858,29 +4736,19 @@ protected CompletableFuture> internalGetMaxConsumersPerSubscri } protected CompletableFuture internalSetMaxConsumersPerSubscription( - Integer maxConsumersPerSubscription, boolean isGlobal) { - if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) { + Integer maxConsumersPerSubscriptionToSet, boolean isGlobal) { + if (maxConsumersPerSubscriptionToSet != null && maxConsumersPerSubscriptionToSet < 0) { throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription"); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setMaxConsumersPerSubscription(maxConsumersPerSubscriptionToSet); + }); } protected CompletableFuture internalRemoveMaxConsumersPerSubscription(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setMaxConsumersPerSubscription(null); - op.get().setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setMaxConsumersPerSubscription(null); + }); } protected CompletableFuture internalGetCompactionThreshold(boolean applied, boolean isGlobal) { @@ -4897,32 +4765,20 @@ protected CompletableFuture internalGetCompactionThreshold(boolean applied })); } - protected CompletableFuture internalSetCompactionThreshold(Long compactionThreshold, boolean isGlobal) { - if (compactionThreshold != null && compactionThreshold < 0) { + protected CompletableFuture internalSetCompactionThreshold(Long compactionThresholdToSet, boolean isGlobal) { + if (compactionThresholdToSet != null && compactionThresholdToSet < 0) { throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold"); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setCompactionThreshold(compactionThreshold); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); - + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setCompactionThreshold(compactionThresholdToSet); + }); } protected CompletableFuture internalRemoveCompactionThreshold(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - TopicPolicies topicPolicies = op.get(); - topicPolicies.setCompactionThreshold(null); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setCompactionThreshold(null); + }); } protected CompletableFuture> internalGetPublishRate(boolean isGlobal) { @@ -4930,17 +4786,13 @@ protected CompletableFuture> internalGetPublishRate(boolea .thenApply(op -> op.map(TopicPolicies::getPublishRate)); } - protected CompletableFuture internalSetPublishRate(PublishRate publishRate, boolean isGlobal) { - if (publishRate == null) { + protected CompletableFuture internalSetPublishRate(PublishRate publishRateToSet, boolean isGlobal) { + if (publishRateToSet == null) { return CompletableFuture.completedFuture(null); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setPublishRate(publishRate); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setPublishRate(publishRateToSet); + }); } protected CompletableFuture>> internalGetSubscriptionTypesEnabled(boolean isGlobal) { @@ -4949,40 +4801,25 @@ protected CompletableFuture>> internalGetSubscriptionType } protected CompletableFuture internalSetSubscriptionTypesEnabled( - Set subscriptionTypesEnabled, boolean isGlobal) { - List subTypes = new ArrayList<>(); - subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(SubType.valueOf(subscriptionType.name()))); - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setSubscriptionTypesEnabled(subTypes); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + Set subscriptionTypesEnabledSet, boolean isGlobal) { + List subTypesToSet = new ArrayList<>(); + subscriptionTypesEnabledSet.forEach( + subscriptionType -> subTypesToSet.add(SubType.valueOf(subscriptionType.name()))); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setSubscriptionTypesEnabled(subTypesToSet); + }); } protected CompletableFuture internalRemoveSubscriptionTypesEnabled(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setSubscriptionTypesEnabled(new ArrayList<>()); - op.get().setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setSubscriptionTypesEnabled(new ArrayList<>()); + }); } protected CompletableFuture internalRemovePublishRate(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setPublishRate(null); - op.get().setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setPublishRate(null); + }); } protected CompletableFuture internalGetSubscribeRate(boolean applied, boolean isGlobal) { @@ -4998,29 +4835,19 @@ protected CompletableFuture internalGetSubscribeRate(boolean appl })); } - protected CompletableFuture internalSetSubscribeRate(SubscribeRate subscribeRate, boolean isGlobal) { - if (subscribeRate == null) { + protected CompletableFuture internalSetSubscribeRate(SubscribeRate subscribeRateToSet, boolean isGlobal) { + if (subscribeRateToSet == null) { return CompletableFuture.completedFuture(null); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setSubscribeRate(subscribeRate); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setSubscribeRate(subscribeRateToSet); + }); } protected CompletableFuture internalRemoveSubscribeRate(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setSubscribeRate(null); - op.get().setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { + policies.setSubscribeRate(null); + }); } protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) { @@ -5349,53 +5176,51 @@ private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic( protected CompletableFuture internalGetSchemaCompatibilityStrategy(boolean applied) { if (applied) { return getSchemaCompatibilityStrategyAsync(); - } + } + // For non-applied, get specific (global or local depends on default which is local) return getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> { - if (!op.isPresent()) { - return null; - } - SchemaCompatibilityStrategy strategy = op.get().getSchemaCompatibilityStrategy(); - return SchemaCompatibilityStrategy.isUndefined(strategy) ? null : strategy; - }); + if (!op.isPresent()) { + return null; + } + SchemaCompatibilityStrategy strategy = op.get().getSchemaCompatibilityStrategy(); + return SchemaCompatibilityStrategy.isUndefined(strategy) ? null : strategy; + }); } - protected CompletableFuture internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) { - return getTopicPoliciesAsyncWithRetry(topicName) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setSchemaCompatibilityStrategy( - strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy); - return pulsar().getTopicPoliciesService() - .updateTopicPoliciesAsync(topicName, topicPolicies); + protected CompletableFuture internalSetSchemaCompatibilityStrategy( + SchemaCompatibilityStrategy strategyToSet) { + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, false, strategyToSet == SchemaCompatibilityStrategy.UNDEFINED, + policies -> { + policies.setSchemaCompatibilityStrategy( + strategyToSet == SchemaCompatibilityStrategy.UNDEFINED ? null : strategyToSet); }); } protected CompletableFuture internalGetSchemaValidationEnforced(boolean applied) { + // Schema validation enforced is typically a local policy return getTopicPoliciesAsyncWithRetry(topicName) .thenApply(op -> op.map(TopicPolicies::getSchemaValidationEnforced).orElseGet(() -> { if (applied) { boolean namespacePolicy = getNamespacePolicies(namespaceName).schema_validation_enforced; return namespacePolicy || pulsar().getConfiguration().isSchemaValidationEnforced(); } - return false; + return false; // Default if not set and not applied })); } - protected CompletableFuture internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) { - return getTopicPoliciesAsyncWithRetry(topicName) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setSchemaValidationEnforced(schemaValidationEnforced); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + protected CompletableFuture internalSetSchemaValidationEnforced(boolean schemaValidationEnforcedToSet) { + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, false, false, policies -> { + policies.setSchemaValidationEnforced(schemaValidationEnforcedToSet); + }); // Schema validation is typically a local policy } protected CompletableFuture internalGetEntryFilters(boolean applied, boolean isGlobal) { if (!applied) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenApply(op -> op.map(TopicPolicies::getEntryFilters).orElse(null)); - } - if (!pulsar().getConfiguration().isAllowOverrideEntryFilters()) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenApply(op -> op.map(TopicPolicies::getEntryFilters).orElse(null)); + } + if (!pulsar().getConfiguration().isAllowOverrideEntryFilters()) { return CompletableFuture.completedFuture(new EntryFilters(String.join(",", pulsar().getConfiguration().getEntryFilterNames()))); } @@ -5417,29 +5242,18 @@ protected CompletableFuture internalGetEntryFilters(boolean applie }); } - protected CompletableFuture internalSetEntryFilters(EntryFilters entryFilters, + protected CompletableFuture internalSetEntryFilters(EntryFilters entryFiltersToSet, boolean isGlobal) { - validateEntryFilters(entryFilters); - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setEntryFilters(entryFilters); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService() - .updateTopicPoliciesAsync(topicName, topicPolicies); - }); + validateEntryFilters(entryFiltersToSet); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setEntryFilters(entryFiltersToSet); + }); } protected CompletableFuture internalRemoveEntryFilters(boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - if (!op.isPresent()) { - return CompletableFuture.completedFuture(null); - } - op.get().setEntryFilters(null); - op.get().setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { + policies.setEntryFilters(null); + }); } protected CompletableFuture validateShadowTopics(List shadowTopics) { @@ -5479,39 +5293,27 @@ protected CompletableFuture internalSetShadowTopic(List shadowTopi } return validatePoliciesReadOnlyAccessAsync() .thenCompose(__ -> validateShadowTopics(shadowTopics)) - .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setShadowTopics(shadowTopics); - return pulsar().getTopicPoliciesService(). - updateTopicPoliciesAsync(topicName, topicPolicies); - }); + .thenCompose(__ -> pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, false, + false, policies -> { + policies.setShadowTopics(shadowTopics); + })); // Shadow topics are local policy } protected CompletableFuture internalDeleteShadowTopics() { return validatePoliciesReadOnlyAccessAsync() - .thenCompose(shadowTopicName -> getTopicPoliciesAsyncWithRetry(topicName)) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - List shadowTopics = topicPolicies.getShadowTopics(); - if (CollectionUtils.isEmpty(shadowTopics)) { - return CompletableFuture.completedFuture(null); - } - topicPolicies.setShadowTopics(null); - return pulsar().getTopicPoliciesService(). - updateTopicPoliciesAsync(topicName, topicPolicies); - }); + .thenCompose(__ -> pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, false, + true, policies -> { + policies.setShadowTopics(null); + })); // Shadow topics are local policy } protected CompletableFuture internalSetAutoSubscriptionCreation( - AutoSubscriptionCreationOverrideImpl autoSubscriptionCreationOverride, boolean isGlobal) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setAutoSubscriptionCreationOverride(autoSubscriptionCreationOverride); - topicPolicies.setIsGlobal(isGlobal); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + AutoSubscriptionCreationOverrideImpl autoSubscriptionCreationOverrideToSet, boolean isGlobal) { + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, isGlobal, autoSubscriptionCreationOverrideToSet == null, + policies -> { + policies.setAutoSubscriptionCreationOverride(autoSubscriptionCreationOverrideToSet); + }); } protected CompletableFuture internalGetAutoSubscriptionCreation(boolean applied, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f67a16ed8da7e..a149abea50f21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2611,7 +2611,7 @@ private void handleLocalPoliciesUpdates(NamespaceName namespace) { private void handlePoliciesUpdates(NamespaceName namespace) { pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) .thenAcceptAsync(optPolicies -> { - if (!optPolicies.isPresent()) { + if (!optPolicies.isPresent() || optPolicies.get().deleted) { return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 4e858fc91a100..ac6c11451b20c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -24,17 +24,20 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.PriorityQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.apache.commons.lang3.concurrent.ConcurrentInitializer; import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.commons.lang3.mutable.Mutable; @@ -48,6 +51,7 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; @@ -75,7 +79,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService { private final PulsarService pulsarService; - private final HashSet localCluster; + private final HashSet localCluster; private final String clusterName; private final AtomicBoolean closed = new AtomicBoolean(false); @@ -107,8 +111,16 @@ protected NamespaceEventsSystemTopicFactory initialize() { @VisibleForTesting final Map> listeners = new ConcurrentHashMap<>(); + private final Map topicPolicyMessageHandlerTrackers = + new ConcurrentHashMap<>(); + private final AsyncLoadingCache> writerCaches; + // Sequencer for policy updates per topic and per policy type (global/local) + // Key: Pair, Value: CompletableFuture representing the last update in sequence + private final ConcurrentHashMap, CompletableFuture> topicPolicyUpdateSequencer = + new ConcurrentHashMap<>(); + public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { this.pulsarService = pulsarService; this.clusterName = pulsarService.getConfiguration().getClusterName(); @@ -178,92 +190,248 @@ public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName, log.info("Skip delete topic-level policies because {} has been removed before", changeEvents); return CompletableFuture.completedFuture(null); } - return sendTopicPolicyEvent(topicName, ActionType.DELETE, null, - keepGlobalPolicies); + // delete local policy + return updateTopicPoliciesAsync(topicName, null, false, ActionType.DELETE, true) + .thenCompose(__ -> + // delete global policy + updateTopicPoliciesAsync(topicName, null, true, ActionType.DELETE, true)); }); } @Override - public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) { + public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, + boolean isGlobalPolicy, + boolean skipUpdateWhenTopicPolicyDoesntExist, + Consumer policyUpdater) { if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { return CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException( "Not allowed to update topic policy for the heartbeat topic")); } - return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies, false); + return updateTopicPoliciesAsync(topicName, policyUpdater, isGlobalPolicy, ActionType.UPDATE, + skipUpdateWhenTopicPolicyDoesntExist); } - private CompletableFuture sendTopicPolicyEvent(TopicName topicName, ActionType actionType, - @Nullable TopicPolicies policies, boolean keepGlobalPoliciesAfterDeleting) { - return pulsarService.getPulsarResources().getNamespaceResources() - .getPoliciesAsync(topicName.getNamespaceObject()) - .thenCompose(namespacePolicies -> { - if (namespacePolicies.isPresent() && namespacePolicies.get().deleted) { - log.debug("[{}] skip sending topic policy event since the namespace is deleted", topicName); - return CompletableFuture.completedFuture(null); - } - - try { - createSystemTopicFactoryIfNeeded(); - } catch (PulsarServerException e) { - return CompletableFuture.failedFuture(e); - } - CompletableFuture result = new CompletableFuture<>(); - writerCaches.get(topicName.getNamespaceObject()) - .whenComplete((writer, cause) -> { - if (cause != null) { - writerCaches.synchronous().invalidate(topicName.getNamespaceObject()); - result.completeExceptionally(cause); - } else { - CompletableFuture writeFuture = - sendTopicPolicyEventInternal(topicName, actionType, writer, policies, - keepGlobalPoliciesAfterDeleting); - writeFuture.whenComplete((messageId, e) -> { - if (e != null) { - result.completeExceptionally(e); - } else { - if (messageId != null) { - result.complete(null); + private CompletableFuture updateTopicPoliciesAsync(TopicName topicName, + Consumer policyUpdater, + boolean isGlobalPolicy, + ActionType actionType, + boolean skipUpdateWhenTopicPolicyDoesntExist) { + if (closed.get()) { + return CompletableFuture.failedFuture(new BrokerServiceException(getClass().getName() + " is closed.")); + } + TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + Pair sequencerKey = Pair.of(partitionedTopicName, isGlobalPolicy); + + CompletableFuture operationFuture = new CompletableFuture<>(); + + // Chain the operation on the sequencer for the specific topic and policy type + topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture) -> { + CompletableFuture chain = (existingFuture == null || existingFuture.isDone()) + ? CompletableFuture.completedFuture(null) + : existingFuture; + + return chain.thenCompose(v -> + pulsarService.getPulsarResources().getNamespaceResources() + .getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(namespacePolicies -> { + if (namespacePolicies.isPresent() && namespacePolicies.get().deleted) { + log.debug("[{}] skip sending topic policy event since the namespace is deleted", + topicName); + return CompletableFuture.completedFuture(null); + } + return getTopicPoliciesAsync(partitionedTopicName, + isGlobalPolicy ? GetType.GLOBAL_ONLY : GetType.LOCAL_ONLY) + .thenCompose(currentPolicies -> { + if (currentPolicies.isEmpty() && skipUpdateWhenTopicPolicyDoesntExist) { + log.debug("[{}] No existing policies, skipping sending event as " + + "requested", topicName); + return CompletableFuture.completedFuture(null); + } + TopicPolicies policiesToUpdate; + if (actionType == ActionType.DELETE) { + policiesToUpdate = null; // For delete, policies object is null } else { - result.completeExceptionally( - new RuntimeException("Got message id is null.")); + policiesToUpdate = currentPolicies.isEmpty() + ? createTopicPolicies(isGlobalPolicy) + : currentPolicies.get().clone(); + policyUpdater.accept(policiesToUpdate); } - } - }); - } - }); - return result; - }); + return sendTopicPolicyEventInternal(topicName, actionType, policiesToUpdate, + isGlobalPolicy); + }) + .thenCompose(messageId -> { + if (messageId == null) { + return CompletableFuture.completedFuture(null); + } else { + // asynchronously wait until the message ID is read by the reader + return untilMessageIdHasBeenRead(topicName.getNamespaceObject(), + messageId); + } + }); + })); + }).whenComplete((res, ex) -> { + // remove the current future from the sequencer map, if it is done + // this would remove the future from the sequencer map when the last operation completes in the chained + // future + topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture) -> { + if (existingFuture != null && existingFuture.isDone()) { + // Remove the completed future from the sequencer map + return null; + } + return existingFuture; + }); + if (ex != null) { + writerCaches.synchronous().invalidate(topicName.getNamespaceObject()); + operationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); + } else { + operationFuture.complete(res); + } + }); + return operationFuture; + } + + /** + * Asynchronously waits until the message ID has been read by the reader. + * This ensures that the write operation has been fully processed and the changes are effective. + * @param namespaceObject the namespace object for which the message ID is being tracked + * @param messageId the message ID to wait for being handled + * @return a CompletableFuture that completes when the message ID has been read by the reader + */ + private CompletableFuture untilMessageIdHasBeenRead(NamespaceName namespaceObject, MessageId messageId) { + CompletableFuture future = new CompletableFuture<>(); + getMessageHandlerTracker(namespaceObject).addPendingFuture((MessageIdAdv) messageId, future); + return future; + } + + private TopicPolicyMessageHandlerTracker getMessageHandlerTracker(NamespaceName namespaceObject) { + return topicPolicyMessageHandlerTrackers.computeIfAbsent(namespaceObject, + ns -> new TopicPolicyMessageHandlerTracker()); + } + + private record PendingMessageFuture(MessageId messageId, CompletableFuture future) + implements Comparable { + @Override + public int compareTo(PendingMessageFuture o) { + return messageId.compareTo(o.messageId); + } + } + + /** + * This tracks the last handled message IDs for each partition of the topic policies topic and + * pending futures for topic policy messages. Each namespace has its own tracker instance since + * this is tracking the per-namespace __change_events topic. + * The purpose for this tracker is to ensure that write operations on topic policies don't complete before the topic + * policies message has been read by the reader and effective. + */ + private static class TopicPolicyMessageHandlerTracker implements AutoCloseable { + private List lastHandledMessageIds = new ArrayList<>(); + private List> pendingFutures = new ArrayList<>(); + private boolean closed = false; + + /** + * Called after a message ID has been handled by the reader. + * This will update the last handled message ID for the partition and complete any pending futures that are + * registered to the handled message ID or before it. + * @param messageId the message ID that has been handled + */ + public synchronized void handleMessageId(MessageIdAdv messageId) { + if (closed) { + return; + } + int partitionIndex = messageId.getPartitionIndex(); + if (partitionIndex < 0) { + partitionIndex = 0; + } + while (lastHandledMessageIds.size() <= partitionIndex) { + lastHandledMessageIds.add(null); + } + lastHandledMessageIds.set(partitionIndex, messageId); + if (pendingFutures.size() > partitionIndex) { + PriorityQueue pq = pendingFutures.get(partitionIndex); + while (!pq.isEmpty() && pq.peek().messageId.compareTo(messageId) <= 0) { + PendingMessageFuture pendingFuture = pq.poll(); + pendingFuture.future.complete(null); + } + } + } + + /** + * Adds a pending future for a message ID. If the message ID is already handled, the future will be completed + * immediately. + * @param messageId the message ID to add the future for + * @param future the future to complete when the message ID is handled + */ + public synchronized void addPendingFuture(MessageIdAdv messageId, CompletableFuture future) { + if (closed) { + future.complete(null); + return; + } + int partitionIndex = messageId.getPartitionIndex(); + if (partitionIndex < 0) { + partitionIndex = 0; + } + while (pendingFutures.size() <= partitionIndex) { + pendingFutures.add(new PriorityQueue<>()); + } + MessageIdAdv lastHandledMessageId = + lastHandledMessageIds.size() > partitionIndex ? lastHandledMessageIds.get(partitionIndex) : null; + if (lastHandledMessageId != null && lastHandledMessageId.compareTo(messageId) >= 0) { + // If the messageId is already handled, complete the future immediately + future.complete(null); + return; + } + pendingFutures.get(partitionIndex).add(new PendingMessageFuture(messageId, future)); + } + + @Override + public synchronized void close() { + if (!closed) { + closed = true; + for (PriorityQueue pq : pendingFutures) { + while (!pq.isEmpty()) { + PendingMessageFuture pendingFuture = pq.poll(); + pendingFuture.future.complete(null); + } + } + pendingFutures.clear(); + lastHandledMessageIds.clear(); + } + } + } + + + private static TopicPolicies createTopicPolicies(boolean isGlobalPolicy) { + TopicPolicies topicPolicies = new TopicPolicies(); + topicPolicies.setIsGlobal(isGlobalPolicy); + return topicPolicies; } private CompletableFuture sendTopicPolicyEventInternal(TopicName topicName, ActionType actionType, - SystemTopicClient.Writer writer, @Nullable TopicPolicies policies, - boolean keepGlobalPoliciesAfterDeleting) { - PulsarEvent event = getPulsarEvent(topicName, actionType, policies); - if (!ActionType.DELETE.equals(actionType)) { - return writer.writeAsync(getEventKey(event, policies != null && policies.isGlobalPolicies()), event); - } - // When a topic is deleting, delete both non-global and global topic-level policies. - CompletableFuture dealWithGlobalPolicy; - if (keepGlobalPoliciesAfterDeleting) { - dealWithGlobalPolicy = CompletableFuture.completedFuture(null); - } else { - dealWithGlobalPolicy = writer.deleteAsync(getEventKey(event, true), event); - } - CompletableFuture deletePolicies = dealWithGlobalPolicy - .thenCompose(__ -> { - return writer.deleteAsync(getEventKey(event, false), event); - }); - deletePolicies.exceptionally(ex -> { - log.error("Failed to delete topic policy [{}] error.", topicName, ex); - return null; - }); - return deletePolicies; + @Nullable TopicPolicies policies, + boolean isGlobalPolicy) { + return writerCaches.get(topicName.getNamespaceObject()) + .thenCompose(writer -> { + PulsarEvent event = getPulsarEvent(topicName, actionType, policies, isGlobalPolicy); + String eventKey = getEventKey(event, isGlobalPolicy); + + if (actionType == ActionType.DELETE) { + return writer.deleteAsync(eventKey, event) + .exceptionally(ex -> { + log.error("Failed to delete {} topic policy [{}] error.", + isGlobalPolicy ? "global" : "local", topicName, ex); + return null; + }); + } else { + return writer.writeAsync(eventKey, event); + } + }); } - private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) { + private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, @Nullable TopicPolicies policies, + boolean isGlobalPolicy) { PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder(); - if (policies == null || !policies.isGlobalPolicies()) { - // we don't need to replicate local policies to remote cluster, so set `replicateTo` to empty. + if (!isGlobalPolicy) { + // we don't need to replicate local policies to remote cluster, so set `replicateTo` to localCluster. builder.replicateTo(localCluster); } return builder @@ -362,12 +530,16 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { if (NamespaceService.isHeartbeatNamespace(namespace)) { return; } - synchronized (this) { - if (readerCaches.get(namespace) != null) { - ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); - } else { - prepareInitPoliciesCacheAsync(namespace); - } + AtomicInteger bundlesCount = + ownedBundlesCountPerNamespace.computeIfAbsent(namespace, k -> new AtomicInteger(0)); + int previousCount = bundlesCount.getAndIncrement(); + if (previousCount == 0) { + // initialize policies cache asynchronously on the first bundle load + prepareInitPoliciesCacheAsync(namespace).exceptionally(t -> { + log.warn("Failed to prepare policies cache for namespace {} due to previously logged error ({}).", + namespace, t.getMessage()); + return null; + }); } } @@ -389,7 +561,6 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { final CompletableFuture> readerCompletableFuture = createSystemTopicClient(namespace); readerCaches.put(namespace, readerCompletableFuture); - ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); final CompletableFuture initFuture = readerCompletableFuture .thenCompose(reader -> { final CompletableFuture stageFuture = new CompletableFuture<>(); @@ -437,8 +608,7 @@ protected CompletableFuture> createSystemT private void removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { NamespaceName namespace = namespaceBundle.getNamespaceObject(); - if (NamespaceService.checkHeartbeatNamespace(namespace) != null - || NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) { + if (NamespaceService.isHeartbeatNamespace(namespace)) { return; } AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace); @@ -455,12 +625,14 @@ public void start(PulsarService pulsarService) { @Override public void onLoad(NamespaceBundle bundle) { - addOwnedNamespaceBundleAsync(bundle); + pulsarService.getOrderedExecutor().executeOrdered(bundle.getNamespaceObject(), + () -> addOwnedNamespaceBundleAsync(bundle)); } @Override public void unLoad(NamespaceBundle bundle) { - removeOwnedNamespaceBundleAsync(bundle); + pulsarService.getOrderedExecutor().executeOrdered(bundle.getNamespaceObject(), + () -> removeOwnedNamespaceBundleAsync(bundle)); } @Override @@ -537,6 +709,12 @@ private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace, boolean } CompletableFuture> readerFuture = readerCaches.remove(namespace); + TopicPolicyMessageHandlerTracker topicPolicyMessageHandlerTracker = + topicPolicyMessageHandlerTrackers.remove(namespace); + if (topicPolicyMessageHandlerTracker != null) { + topicPolicyMessageHandlerTracker.close(); + } + if (cleanOwnedBundlesCount) { ownedBundlesCountPerNamespace.remove(namespace); } @@ -550,6 +728,8 @@ private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace, boolean policyCacheInitMap.compute(namespace, (k, v) -> { policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace)); + globalPoliciesCache.entrySet() + .removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace)); return null; }); } @@ -561,14 +741,16 @@ private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace, boolean * #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync} method to block loading topic. */ private void readMorePoliciesAsync(SystemTopicClient.Reader reader) { + NamespaceName namespaceObject = reader.getSystemTopic().getTopicName().getNamespaceObject(); if (closed.get()) { - cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); + cleanCacheAndCloseReader(namespaceObject, false); return; } reader.readNextAsync() .thenAccept(msg -> { try { refreshTopicPoliciesCache(msg); + getMessageHandlerTracker(namespaceObject).handleMessageId((MessageIdAdv) msg.getMessageId()); notifyListener(msg); } finally { msg.release(); @@ -582,8 +764,7 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader reader) if (cause instanceof PulsarClientException.AlreadyClosedException) { log.info("Closing the topic policies reader for {}", reader.getSystemTopic().getTopicName()); - cleanCacheAndCloseReader( - reader.getSystemTopic().getTopicName().getNamespaceObject(), false); + cleanCacheAndCloseReader(namespaceObject, false); } else { log.warn("Read more topic polices exception, read again.", ex); readMorePoliciesAsync(reader); @@ -630,23 +811,13 @@ private void refreshTopicPoliciesCache(Message msg) { // However, due to compatibility, it is temporarily retained here // and can be deleted in the future. policiesCache.remove(topicName); - try { - createSystemTopicFactoryIfNeeded(); - } catch (PulsarServerException e) { - log.error("Failed to create system topic factory"); - break; - } - SystemTopicClient systemTopicClient = getNamespaceEventsSystemTopicFactory() - .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()); - systemTopicClient.newWriterAsync().thenAccept(writer -> { - sendTopicPolicyEventInternal(topicName, ActionType.DELETE, writer, event.getPolicies(), false) - .whenComplete((result, e) -> writer.closeAsync() - .whenComplete((res, ex) -> { + sendTopicPolicyEventInternal(topicName, ActionType.DELETE, null, + event.getPolicies().isGlobalPolicies()) + .whenComplete((__, ex) -> { if (ex != null) { - log.error("close writer failed ", ex); + log.error("Failed to send delete topic policy event for {}", topicName, ex); } - })); - }); + }); break; case NONE: break; @@ -689,7 +860,6 @@ NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() { } - @VisibleForTesting long getPoliciesCacheSize() { return policiesCache.size(); @@ -782,4 +952,9 @@ private static boolean isSelf(TopicName topicName) { final var index = localName.lastIndexOf(TopicName.PARTITIONED_TOPIC_SUFFIX); return localName.substring(0, index).equals(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); } + + @VisibleForTesting + public int getTopicPolicyUpdateSequencerSize() { + return topicPolicyUpdateSequencer.size(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index 02303fd9c4af7..239c1d3d9bad4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -20,6 +20,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.classification.InterfaceAudience; @@ -58,11 +59,21 @@ default CompletableFuture deleteTopicPoliciesAsync(TopicName topicName, /** * Update policies for a topic asynchronously. + * The policyUpdater will be called with a TopicPolicies object (either newly created or cloned from existing) + * which can be safely mutated. The service will handle writing this updated object. * - * @param topicName topic name - * @param policies policies for the topic name + * @param topicName topic name + * @param isGlobalPolicy true if the global policy is to be updated, false for local + * @param skipUpdateWhenTopicPolicyDoesntExist when true, skips the update if the topic policy does not already + * exist. This is useful for cases when the policyUpdater is removing + * a setting in the policy. + * @param policyUpdater a function that modifies the TopicPolicies + * @return a CompletableFuture that completes when the update has been completed with read-your-writes consistency. */ - CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies); + CompletableFuture updateTopicPoliciesAsync(TopicName topicName, + boolean isGlobalPolicy, + boolean skipUpdateWhenTopicPolicyDoesntExist, + Consumer policyUpdater); /** * It controls the behavior of {@link TopicPoliciesService#getTopicPoliciesAsync}. @@ -117,7 +128,9 @@ public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) { } @Override - public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) { + public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, boolean isGlobalPolicy, + boolean skipUpdateWhenTopicPolicyDoesntExist, + Consumer policyUpdater) { return FutureUtil.failedFuture(new UnsupportedOperationException("Topic policies service is disabled.")); } @@ -144,13 +157,6 @@ static String getEventKey(PulsarEvent event, boolean isGlobal) { event.getTopicPoliciesEvent().getTopic()).toString(), isGlobal); } - static String getEventKey(TopicName topicName, boolean isGlobal) { - return wrapEventKey(TopicName.get(topicName.getDomain().toString(), - topicName.getTenant(), - topicName.getNamespace(), - TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString(), isGlobal); - } - static String wrapEventKey(String originalKey, boolean isGlobalPolicies) { if (!isGlobalPolicies) { return originalKey; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index b46c1f92faa07..75f0854970e6c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -2161,7 +2161,7 @@ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception { admin.topicPolicies().setMaxConsumers(systemTopic, 5); Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { final var policies = TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(), - TopicName.get(systemTopic)); + TopicName.get(systemTopic), false); Assert.assertTrue(policies.isPresent()); Assert.assertEquals(policies.get().getMaxConsumerPerTopic(), 5); }); @@ -2169,8 +2169,7 @@ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception { admin.topics().delete(systemTopic, true); Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> assertTrue( TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(), - TopicName.get(systemTopic)) - .isEmpty())); + TopicName.get(systemTopic), false).isEmpty())); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 1d911c52c838c..a41ee20137b25 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -3902,12 +3902,12 @@ public void testGetTopicPoliciesWhenDeleteTopicPolicy() throws Exception { admin.topicPolicies().setMaxConsumers(persistenceTopic, 5); Integer maxConsumerPerTopic = TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(), - TopicName.get(persistenceTopic)).orElseThrow().getMaxConsumerPerTopic(); + TopicName.get(persistenceTopic), false).orElseThrow().getMaxConsumerPerTopic(); assertEquals(maxConsumerPerTopic, 5); admin.topics().delete(persistenceTopic, true); assertTrue(TopicPolicyTestUtils.getTopicPoliciesBypassCache(pulsar.getTopicPoliciesService(), - TopicName.get(persistenceTopic)).isEmpty()); + TopicName.get(persistenceTopic), false).isEmpty()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesUpdateTest.java new file mode 100644 index 0000000000000..71934bda9f55a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesUpdateTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; +import static org.testng.Assert.assertEquals; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicType; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-admin") +public class TopicPoliciesUpdateTest extends MockedPulsarServiceBaseTest { + private final boolean partitionedSystemTopic; + private final String testTenant = "my-tenant"; + private final String testNamespace = "my-namespace"; + private final String myNamespace = testTenant + "/" + testNamespace; + + // comment out the @Factory annotation to run this test individually in IDE + @Factory + public static Object[] createTestInstances() { + return new Object[]{ + new TopicPoliciesUpdateTest(false), + new TopicPoliciesUpdateTest(true) // test with partitioned system topic + }; + } + + public TopicPoliciesUpdateTest() { + partitionedSystemTopic = false; + } + + private TopicPoliciesUpdateTest(boolean partitionedSystemTopic) { + this.partitionedSystemTopic = partitionedSystemTopic; + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + if (partitionedSystemTopic) { + // A partitioned system topic will get created when allowAutoTopicCreationType is set to PARTITIONED + conf.setDefaultNumPartitions(4); + conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); + } + this.conf.setDefaultNumberOfNamespaceBundles(1); + } + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + admin.tenants().createTenant(this.testTenant, tenantInfo); + admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Set.of("test")); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider + public Object[][] topicTypes() { + return new Object[][]{ + {TopicType.PARTITIONED}, + {TopicType.NON_PARTITIONED} + }; + } + + @Test(dataProvider = "topicTypes") + public void testMultipleUpdates(TopicType topicType) throws Exception { + List topics = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String topic = newUniqueName("persistent://" + myNamespace + "/testtopic" + i); + if (TopicType.PARTITIONED.equals(topicType)) { + admin.topics().createNonPartitionedTopic(topic); + } else { + admin.topics().createPartitionedTopic(topic, 2); + } + topics.add(topic); + } + + // test data + InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(); + inactiveTopicPolicies.setDeleteWhileInactive(true); + inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + inactiveTopicPolicies.setMaxInactiveDurationSeconds(3600); + DispatchRate dispatchRate = DispatchRate + .builder() + .dispatchThrottlingRateInMsg(1000) + .dispatchThrottlingRateInByte(1000000) + .build(); + String clusterId = "test"; + + // test multiple updates + for (String topic : topics) { + for (int i = 0; i < 10; i++) { + List> futures = new ArrayList<>(); + futures.add(admin.topicPolicies().setInactiveTopicPoliciesAsync(topic, inactiveTopicPolicies)); + futures.add(admin.topicPolicies().setReplicatorDispatchRateAsync(topic, dispatchRate)); + futures.add(admin.topics().setReplicationClustersAsync(topic, List.of(clusterId))); + + // wait for all futures to complete + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + assertEquals(admin.topicPolicies().getInactiveTopicPolicies(topic), inactiveTopicPolicies); + assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic), dispatchRate); + assertEquals(admin.topics().getReplicationClusters(topic, true), Set.of(clusterId)); + } + } + + // verify that there aren't any pending updates in the sequencer + SystemTopicBasedTopicPoliciesService policyService = + (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + assertEquals(policyService.getTopicPolicyUpdateSequencerSize(), 0, + "There should be no pending updates after completing all updates"); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesService.java index 88a75fe8f0387..5d9ce48dcfdc4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InmemoryTopicPoliciesService.java @@ -18,17 +18,22 @@ */ package org.apache.pulsar.broker.service; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; public class InmemoryTopicPoliciesService implements TopicPoliciesService { - + private final ExecutorService executor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("InmemoryTopicPoliciesService")); private final Map cache = new HashMap<>(); private final Map> listeners = new HashMap<>(); @@ -39,23 +44,34 @@ public synchronized CompletableFuture deleteTopicPoliciesAsync(TopicName t } @Override - public synchronized CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) { - final var existingPolicies = cache.get(topicName); - if (existingPolicies != policies) { - cache.put(topicName, policies); - CompletableFuture.runAsync(() -> { - final TopicPolicies latestPolicies; - final List listeners; - synchronized (InmemoryTopicPoliciesService.this) { - latestPolicies = cache.get(topicName); - listeners = this.listeners.getOrDefault(topicName, List.of()); - } - for (var listener : listeners) { - listener.onUpdate(latestPolicies); - } - }); - } - return CompletableFuture.completedFuture(null); + public synchronized CompletableFuture updateTopicPoliciesAsync(TopicName topicName, + boolean isGlobalPolicy, + boolean skipUpdateWhenTopicPolicyDoesntExist, + Consumer policyUpdater) { + return CompletableFuture.runAsync(() -> { + final var existingPolicies = cache.get(topicName); + if (existingPolicies == null && skipUpdateWhenTopicPolicyDoesntExist) { + return; // No existing policies and skip update + } + final TopicPolicies newPolicies = existingPolicies == null + ? createTopicPolicy(isGlobalPolicy) + : existingPolicies.clone(); + policyUpdater.accept(newPolicies); + cache.put(topicName, newPolicies); + List listeners; + synchronized (this) { + listeners = this.listeners.getOrDefault(topicName, List.of()); + } + for (var listener : listeners) { + listener.onUpdate(newPolicies); + } + }, executor); + } + + private static TopicPolicies createTopicPolicy(boolean isGlobalPolicy) { + TopicPolicies topicPolicies = new TopicPolicies(); + topicPolicies.setIsGlobal(isGlobalPolicy); + return topicPolicies; } @Override @@ -78,4 +94,9 @@ public synchronized void unregisterListener(TopicName topicName, TopicPolicyList synchronized boolean containsKey(TopicName topicName) { return cache.containsKey(topicName); } + + @Override + public void close() throws Exception { + executor.shutdownNow(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 554e7895975d6..8149b7a943535 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -124,11 +124,8 @@ public void onUpdate(TopicPolicies data) { @Test public void testGetPolicy() throws Exception { - // Init topic policies - TopicPolicies initPolicy = TopicPolicies.builder() - .maxConsumerPerTopic(10) - .build(); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(10)).get(); // Wait for all topic policies updated. Awaitility.await().untilAsserted(() -> @@ -141,40 +138,28 @@ public void testGetPolicy() throws Exception { .getMaxConsumerPerTopic().intValue(), 10)); // Update policy for TOPIC1 - TopicPolicies policies1 = TopicPolicies.builder() - .maxProducerPerTopic(1) - .build(); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1).get(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(1)).get(); // Update policy for TOPIC2 - TopicPolicies policies2 = TopicPolicies.builder() - .maxProducerPerTopic(2) - .build(); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2).get(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(2)).get(); // Update policy for TOPIC3 - TopicPolicies policies3 = TopicPolicies.builder() - .maxProducerPerTopic(3) - .build(); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC3, policies3).get(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC3, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(3)).get(); // Update policy for TOPIC4 - TopicPolicies policies4 = TopicPolicies.builder() - .maxProducerPerTopic(4) - .build(); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC4, policies4).get(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC4, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(4)).get(); // Update policy for TOPIC5 - TopicPolicies policies5 = TopicPolicies.builder() - .maxProducerPerTopic(5) - .build(); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC5, policies5).get(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC5, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(5)).get(); // Update policy for TOPIC6 - TopicPolicies policies6 = TopicPolicies.builder() - .maxProducerPerTopic(6) - .build(); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, policies6).get(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(6)).get(); Awaitility.await().untilAsserted(() -> { TopicPolicies policiesGet1 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, @@ -190,12 +175,12 @@ public void testGetPolicy() throws Exception { TopicPolicies policiesGet6 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC6); - Assert.assertEquals(policiesGet1, policies1); - Assert.assertEquals(policiesGet2, policies2); - Assert.assertEquals(policiesGet3, policies3); - Assert.assertEquals(policiesGet4, policies4); - Assert.assertEquals(policiesGet5, policies5); - Assert.assertEquals(policiesGet6, policies6); + Assert.assertEquals(policiesGet1.getMaxConsumerPerTopic(), 1); + Assert.assertEquals(policiesGet2.getMaxConsumerPerTopic(), 2); + Assert.assertEquals(policiesGet3.getMaxConsumerPerTopic(), 3); + Assert.assertEquals(policiesGet4.getMaxConsumerPerTopic(), 4); + Assert.assertEquals(policiesGet5.getMaxConsumerPerTopic(), 5); + Assert.assertEquals(policiesGet6.getMaxConsumerPerTopic(), 6); }); // Remove reader cache will remove policies cache @@ -209,27 +194,27 @@ public void testGetPolicy() throws Exception { Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached( NamespaceName.get(NAMESPACE3))); - policies1.setMaxProducerPerTopic(101); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); - policies2.setMaxProducerPerTopic(102); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); - policies2.setMaxProducerPerTopic(103); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); - policies1.setMaxProducerPerTopic(104); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); - policies2.setMaxProducerPerTopic(105); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); - policies1.setMaxProducerPerTopic(106); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(101)); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(102)); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(103)); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(104)); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(105)); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, false, false, topicPolicies -> + topicPolicies.setMaxConsumerPerTopic(106)); // reader for NAMESPACE1 will back fill the reader cache Awaitility.await().untilAsserted(() -> { - TopicPolicies policiesGet1 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, - TOPIC1); - TopicPolicies policiesGet2 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, - TOPIC2); - Assert.assertEquals(policies1, policiesGet1); - Assert.assertEquals(policies2, policiesGet2); + TopicPolicies policiesGet1 = + TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC1); + TopicPolicies policiesGet2 = + TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC2); + Assert.assertEquals(policiesGet1.getMaxConsumerPerTopic(), 106); + Assert.assertEquals(policiesGet2.getMaxConsumerPerTopic(), 105); }); // Check reader cache is correct. @@ -240,9 +225,10 @@ public void testGetPolicy() throws Exception { Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached( NamespaceName.get(NAMESPACE3))); + TopicPolicies policies1 = TopicPolicyTestUtils.getTopicPolicies(systemTopicBasedTopicPoliciesService, TOPIC1); // Check get without cache TopicPolicies policiesGet1 = TopicPolicyTestUtils.getTopicPoliciesBypassCache( - systemTopicBasedTopicPoliciesService, TOPIC1).orElseThrow(); + systemTopicBasedTopicPoliciesService, TOPIC1, false).orElseThrow(); Assert.assertEquals(policies1, policiesGet1); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPoliciesServiceDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPoliciesServiceDisableTest.java index e00883f7e3e82..b41b9986039b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPoliciesServiceDisableTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPoliciesServiceDisableTest.java @@ -20,7 +20,6 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.TopicPolicies; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -48,8 +47,9 @@ protected void cleanup() throws Exception { @Test public void testTopicLevelPoliciesDisabled() { try { - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TopicName.get("test"), - new TopicPolicies()).get(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TopicName.get("test"), false, false, + topicPolicies -> { + }).get(); } catch (Exception e) { Assert.assertTrue(e.getCause() instanceof UnsupportedOperationException); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java index b5adaeccad76f..6b9735d59b21a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java @@ -23,6 +23,7 @@ import lombok.Cleanup; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.events.PulsarEvent; +import org.apache.pulsar.common.events.TopicPoliciesEvent; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; @@ -69,17 +70,36 @@ public static TopicPolicies getGlobalTopicPolicies(TopicPoliciesService topicPol } public static Optional getTopicPoliciesBypassCache(TopicPoliciesService topicPoliciesService, - TopicName topicName) throws Exception { + TopicName topicName, boolean isGlobal) + throws Exception { @Cleanup final var reader = ((SystemTopicBasedTopicPoliciesService) topicPoliciesService) .getNamespaceEventsSystemTopicFactory() .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()) .newReader(); - PulsarEvent event = null; + TopicPoliciesEvent lastTopicPoliciesEvent = null; while (reader.hasMoreEvents()) { @Cleanup("release") Message message = reader.readNext(); - event = message.getValue(); + if (message.getValue() == null) { + boolean isGlobalPolicy = TopicPoliciesService.isGlobalPolicy(message); + TopicName eventTopicName = TopicName.get(TopicPoliciesService.unwrapEventKey(message.getKey()) + .getPartitionedTopicName()); + if (eventTopicName.equals(topicName) && isGlobalPolicy == isGlobal) { + lastTopicPoliciesEvent = null; + } + } else { + TopicPoliciesEvent topicPoliciesEvent = message.getValue().getTopicPoliciesEvent(); + if (topicPoliciesEvent != null) { + TopicName eventTopicName = + TopicName.get(topicPoliciesEvent.getDomain(), topicPoliciesEvent.getTenant(), + topicPoliciesEvent.getNamespace(), topicPoliciesEvent.getTopic()); + if (eventTopicName.equals(topicName) + && topicPoliciesEvent.getPolicies().isGlobalPolicies() == isGlobal) { + lastTopicPoliciesEvent = topicPoliciesEvent; + } + } + } } - return Optional.ofNullable(event).map(e -> e.getTopicPoliciesEvent().getPolicies()); + return Optional.ofNullable(lastTopicPoliciesEvent).map(TopicPoliciesEvent::getPolicies); } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java index 80e91fdf320e0..419634c0b1507 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java @@ -21,6 +21,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.SneakyThrows; /** * Definition of the inactive topic policy. @@ -28,8 +29,14 @@ @Data @AllArgsConstructor @NoArgsConstructor -public class InactiveTopicPolicies { +public class InactiveTopicPolicies implements Cloneable { private InactiveTopicDeleteMode inactiveTopicDeleteMode; private int maxInactiveDurationSeconds; private boolean deleteWhileInactive; + + @SneakyThrows + @Override + protected InactiveTopicPolicies clone() { + return InactiveTopicPolicies.class.cast(super.clone()); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java index 70ff2ab8a4e9a..684c202f15852 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PersistencePolicies.java @@ -19,13 +19,14 @@ package org.apache.pulsar.common.policies.data; import java.util.Objects; +import lombok.SneakyThrows; import lombok.ToString; /** * Configuration of bookkeeper persistence policies. */ @ToString -public class PersistencePolicies { +public class PersistencePolicies implements Cloneable { private int bookkeeperEnsemble; private int bookkeeperWriteQuorum; private int bookkeeperAckQuorum; @@ -70,6 +71,12 @@ public String getManagedLedgerStorageClassName() { return managedLedgerStorageClassName; } + @SneakyThrows + @Override + protected PersistencePolicies clone() { + return PersistencePolicies.class.cast(super.clone()); + } + @Override public int hashCode() { return Objects.hash(bookkeeperEnsemble, bookkeeperWriteQuorum, diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PublishRate.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PublishRate.java index e9c19e4903e8e..3fbd3533c44a8 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PublishRate.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PublishRate.java @@ -19,13 +19,14 @@ package org.apache.pulsar.common.policies.data; import java.util.Objects; +import lombok.SneakyThrows; import lombok.ToString; /** * Publish-rate to manage publish throttling. */ @ToString -public class PublishRate { +public class PublishRate implements Cloneable{ public int publishThrottlingRateInMsg = -1; public long publishThrottlingRateInByte = -1; @@ -52,6 +53,12 @@ public static PublishRate normalize(PublishRate publishRate) { } } + @SneakyThrows + @Override + protected PublishRate clone() { + return PublishRate.class.cast(super.clone()); + } + @Override public int hashCode() { return Objects.hash(publishThrottlingRateInMsg, publishThrottlingRateInByte); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/RetentionPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/RetentionPolicies.java index 8d5b25da43153..8d69b0931173d 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/RetentionPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/RetentionPolicies.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.common.policies.data; +import lombok.SneakyThrows; + /** * Definition of the retention policy. * @@ -27,7 +29,7 @@ * messages when either size or time limit is set to `0`. * Infinite retention can be achieved by setting both time and size limits to `-1`. */ -public class RetentionPolicies { +public class RetentionPolicies implements Cloneable { private int retentionTimeInMinutes; private long retentionSizeInMB; @@ -48,6 +50,12 @@ public long getRetentionSizeInMB() { return retentionSizeInMB; } + @SneakyThrows + @Override + protected RetentionPolicies clone() { + return RetentionPolicies.class.cast(super.clone()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java index 7f196beb6d904..6a32e8c92a62e 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java @@ -19,13 +19,14 @@ package org.apache.pulsar.common.policies.data; import java.util.Objects; +import lombok.SneakyThrows; import lombok.ToString; /** * Information about subscription rate. */ @ToString -public class SubscribeRate { +public class SubscribeRate implements Cloneable { public int subscribeThrottlingRatePerConsumer = -1; public int ratePeriodInSecond = 30; @@ -48,6 +49,12 @@ public static SubscribeRate normalize(SubscribeRate subscribeRate) { } } + @SneakyThrows + @Override + protected SubscribeRate clone() { + return SubscribeRate.class.cast(super.clone()); + } + @Override public int hashCode() { return Objects.hash(subscribeThrottlingRatePerConsumer, ratePeriodInSecond); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java index 05da03f1090b6..e7a5ea502abfe 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java @@ -20,13 +20,14 @@ import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.SneakyThrows; import lombok.ToString; import org.apache.pulsar.common.policies.data.BacklogQuota; @ToString @EqualsAndHashCode @NoArgsConstructor -public class BacklogQuotaImpl implements BacklogQuota { +public class BacklogQuotaImpl implements BacklogQuota, Cloneable { public static final long BYTES_IN_GIGABYTE = 1024 * 1024 * 1024; /** @@ -54,6 +55,12 @@ public BacklogQuotaImpl(long limitSize, int limitTime, RetentionPolicy policy) { this.policy = policy; } + @SneakyThrows + @Override + public BacklogQuotaImpl clone() { + return BacklogQuotaImpl.class.cast(super.clone()); + } + @Deprecated public long getLimit() { if (limitSize == null) { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DispatchRateImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DispatchRateImpl.java index b10f2eefc8bf3..916cd9d8f4842 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DispatchRateImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/DispatchRateImpl.java @@ -21,6 +21,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.SneakyThrows; import org.apache.pulsar.common.policies.data.DispatchRate; /** @@ -29,13 +30,19 @@ @Data @AllArgsConstructor @NoArgsConstructor -public final class DispatchRateImpl implements DispatchRate { +public final class DispatchRateImpl implements DispatchRate, Cloneable { private int dispatchThrottlingRateInMsg; private long dispatchThrottlingRateInByte; private boolean relativeToPublishRate; private int ratePeriodInSecond; + @SneakyThrows + @Override + public DispatchRateImpl clone() { + return DispatchRateImpl.class.cast(super.clone()); + } + public static DispatchRateImplBuilder builder() { return new DispatchRateImplBuilder(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java index 8149eb2485fad..b41f3ace9cc8e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionPolicies.java @@ -22,6 +22,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.SneakyThrows; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; /** @@ -31,7 +32,7 @@ @Builder @NoArgsConstructor @AllArgsConstructor -public class SubscriptionPolicies { +public class SubscriptionPolicies implements Cloneable { private DispatchRateImpl dispatchRate; /** @@ -41,4 +42,12 @@ public class SubscriptionPolicies { public boolean checkEmpty() { return dispatchRate == null; } + + @SneakyThrows + @Override + protected SubscriptionPolicies clone() { + SubscriptionPolicies cloned = SubscriptionPolicies.class.cast(super.clone()); + cloned.dispatchRate = dispatchRate != null ? dispatchRate.clone() : null; + return cloned; + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 6305ee7d9bee4..842c67714d5c7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -29,6 +29,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.SneakyThrows; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl; import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; @@ -42,7 +43,7 @@ @Builder @NoArgsConstructor @AllArgsConstructor -public class TopicPolicies { +public class TopicPolicies implements Cloneable { @Builder.Default private Map backLogQuotaMap = new HashMap<>(); @@ -88,6 +89,71 @@ public class TopicPolicies { private Boolean schemaValidationEnforced; + @SneakyThrows + @Override + public TopicPolicies clone() { + TopicPolicies cloned = TopicPolicies.class.cast(super.clone()); + + if (this.backLogQuotaMap != null) { + cloned.backLogQuotaMap = new HashMap<>(); + for (Map.Entry entry : this.backLogQuotaMap.entrySet()) { + cloned.backLogQuotaMap.put(entry.getKey(), + entry.getValue() != null ? entry.getValue().clone() : null); + } + } else { + cloned.backLogQuotaMap = new HashMap<>(); + } + + cloned.subscriptionTypesEnabled = this.subscriptionTypesEnabled != null + ? new ArrayList<>(this.subscriptionTypesEnabled) : new ArrayList<>(); + cloned.replicationClusters = this.replicationClusters != null + ? new ArrayList<>(this.replicationClusters) : null; + cloned.shadowTopics = this.shadowTopics != null ? new ArrayList<>(this.shadowTopics) : null; + + cloned.persistence = this.persistence != null ? this.persistence.clone() : null; + cloned.retentionPolicies = this.retentionPolicies != null ? this.retentionPolicies.clone() : null; + + cloned.offloadPolicies = + this.offloadPolicies != null ? OffloadPoliciesImpl.create(this.offloadPolicies.toProperties()) : null; + + cloned.inactiveTopicPolicies = + this.inactiveTopicPolicies != null ? this.inactiveTopicPolicies.clone() : null; + + cloned.dispatchRate = this.dispatchRate != null ? this.dispatchRate.clone() : null; + cloned.subscriptionDispatchRate = + this.subscriptionDispatchRate != null ? this.subscriptionDispatchRate.clone() : null; + + cloned.publishRate = this.publishRate != null ? this.publishRate.clone() : null; + cloned.subscribeRate = this.subscribeRate != null ? this.subscribeRate.clone() : null; + + cloned.entryFilters = + this.entryFilters != null ? new EntryFilters(this.entryFilters.getEntryFilterNames()) : null; + cloned.autoSubscriptionCreationOverride = this.autoSubscriptionCreationOverride != null + ? AutoSubscriptionCreationOverrideImpl.builder() + .allowAutoSubscriptionCreation( + this.autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation()) + .build() : null; + cloned.replicatorDispatchRate = + this.replicatorDispatchRate != null ? this.replicatorDispatchRate.clone() : null; + + + if (this.subscriptionPolicies != null) { + cloned.subscriptionPolicies = new HashMap<>(); + for (Map.Entry entry : this.subscriptionPolicies.entrySet()) { + cloned.subscriptionPolicies.put(entry.getKey(), + entry.getValue() != null ? entry.getValue().clone() : null); + } + } else { + cloned.subscriptionPolicies = new HashMap<>(); + } + + // Primitive types (Boolean, Integer, Long, String) and enums (SchemaCompatibilityStrategy) + // are fine with the shallow copy from super.clone(). + // isGlobal, deduplicationEnabled, messageTTLInSeconds, maxProducerPerTopic, etc. + + return cloned; + } + public boolean isGlobalPolicies() { return isGlobal != null && isGlobal; }