From 03dd2d357278121d5e4a6b759952852f55064c24 Mon Sep 17 00:00:00 2001 From: ethqunzhong Date: Mon, 13 Oct 2025 11:50:05 +0800 Subject: [PATCH] use dedicated bookkeeper-ml-offload-scheduler to avoid any potential blocking of core services --- conf/broker.conf | 3 +++ .../mledger/ManagedLedgerFactoryConfig.java | 5 +++++ .../impl/ManagedLedgerFactoryImpl.java | 22 ++++++++++++++++++- .../mledger/impl/ManagedLedgerImpl.java | 19 +++++++++------- .../pulsar/broker/ServiceConfiguration.java | 2 ++ .../broker/ManagedLedgerClientFactory.java | 2 ++ 6 files changed, 44 insertions(+), 9 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 8fd0e18af3696..f7dfd853d3e86 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1249,6 +1249,9 @@ managedLedgerDigestType=CRC32C # Number of threads to be used for managed ledger scheduled tasks managedLedgerNumSchedulerThreads= +# Number of threads to be use for managed ledger scheduled offload operations. +managedLedgerNumOffloadSchedulerThreads= + # Amount of memory to use for caching data payload in managed ledger. This memory # is allocated from JVM direct memory and it's shared across all the topics # running in the same broker. By default, uses 1/5th of available direct memory diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index 592fd2d385e5c..b96fe99d05ce2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -147,6 +147,11 @@ public class ManagedLedgerFactoryConfig { */ private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name(); + /** + * Number of threads to use for ML offload operations. + */ + private int numManagedLedgerOffloadSchedulerThreads = Runtime.getRuntime().availableProcessors(); + /** * ManagedCursorInfo compression threshold. If the origin metadata size below configuration. * compression will not apply. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index a452c6682a53b..8c9512c48fc26 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -122,6 +122,10 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { @Getter private final ScheduledExecutorService cacheEvictionExecutor; + // Dedicated thread pool for offload operations to isolate from core services + @Getter + private final OrderedScheduler offloadScheduler; + @Getter protected final ManagedLedgerFactoryMBeanImpl mbean; @@ -234,6 +238,17 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, .build(); cacheEvictionExecutor = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction")); + + // Create dedicated scheduler for offload operations to prevent blocking core services + // Use a conservative thread count to minimize resource overhead while ensuring adequate capacity + int offloadThreads = config.getNumManagedLedgerOffloadSchedulerThreads(); + offloadScheduler = OrderedScheduler.newSchedulerBuilder() + .numThreads(offloadThreads) + .statsLogger(statsLogger) + .traceTaskExecution(config.isTraceTaskExecution()) + .name("bookkeeper-ml-offload-scheduler") + .build(); + log.info("Created offload scheduler with {} threads for ML operations isolation", offloadThreads); this.metadataServiceAvailable = true; this.bookkeeperFactory = bookKeeperGroupFactory; this.isBookkeeperManaged = isBookkeeperManaged; @@ -647,6 +662,11 @@ public CompletableFuture shutdownAsync() throws ManagedLedgerException { flushCursorsTask.cancel(true); cacheEvictionExecutor.shutdownNow(); + // Shutdown offload scheduler + if (offloadScheduler != null) { + offloadScheduler.shutdownNow(); + } + List ledgerNames = new ArrayList<>(this.ledgers.keySet()); List> futures = new ArrayList<>(ledgerNames.size()); int numLedgers = ledgerNames.size(); @@ -1037,7 +1057,7 @@ public void operationFailed(MetaStoreException e) { return OffloadUtils.cleanupOffloaded(li.ledgerId, uuid, mlConfig, OffloadUtils.getOffloadDriverMetadata(ls, mlConfig.getLedgerOffloader().getOffloadDriverMetadata()), - "Deletion", managedLedgerName, scheduledExecutor); + "Deletion", managedLedgerName, offloadScheduler); } return CompletableFuture.completedFuture(null); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 346ddb2f6c904..70a3e5cd78489 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2771,7 +2771,9 @@ public void maybeOffloadInBackground(CompletableFuture promise) { final long offloadThresholdInSeconds = Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L); if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) { - executor.execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)); + // Use dedicated offload scheduler to avoid any potential blocking of core services + factory.getOffloadScheduler() + .execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)); } } @@ -2791,7 +2793,8 @@ private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInS } if (!offloadMutex.tryLock()) { - scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise), + // Use dedicated offload scheduler to avoid blocking core services + factory.getOffloadScheduler().schedule(() -> maybeOffloadInBackground(finalPromise), 100, TimeUnit.MILLISECONDS); return; } @@ -3384,7 +3387,7 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); OffloadUtils.cleanupOffloaded(ledgerId, uuid, config, OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), - "Trimming", name, scheduledExecutor); + "Trimming", name, factory.getOffloadScheduler()); } } @@ -3657,7 +3660,7 @@ void offloadLoop(CompletableFuture promise, Queue ledgersT TimeUnit.HOURS.toMillis(1)).limit(10), FAIL_ON_CONFLICT, () -> completeLedgerInfoForOffloaded(ledgerId, uuid), - scheduledExecutor, name) + factory.getOffloadScheduler(), name) .whenComplete((ignore2, exception) -> { if (exception != null) { Throwable e = FutureUtil.unwrapCompletionException(exception); @@ -3678,7 +3681,7 @@ void offloadLoop(CompletableFuture promise, Queue ledgersT OffloadUtils.cleanupOffloaded( ledgerId, uuid, config, driverMetadata, - "Metastore failure", name, scheduledExecutor); + "Metastore failure", name, factory.getOffloadScheduler()); } }); }) @@ -3740,8 +3743,8 @@ private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation tran CompletableFuture finalPromise) { synchronized (this) { if (!metadataMutex.tryLock()) { - // retry in 100 milliseconds - scheduledExecutor.schedule( + // retry in 100 milliseconds using offload scheduler to avoid blocking core services + factory.getOffloadScheduler().schedule( () -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise), 100, TimeUnit.MILLISECONDS); } else { // lock acquired @@ -3805,7 +3808,7 @@ private CompletableFuture prepareLedgerInfoForOffloaded(long ledgerId, UUI config.getLedgerOffloader().getOffloadDriverMetadata()), "Previous failed offload", name, - scheduledExecutor); + factory.getOffloadScheduler()); } LedgerInfo.Builder builder = oldInfo.toBuilder(); builder.getOffloadContextBuilder() diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 30fef55ece3bd..295217439e4ec 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2313,6 +2313,8 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors(); + private int managedLedgerNumOffloadSchedulerThreads = Runtime.getRuntime().availableProcessors(); + @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of entries to append to a ledger before triggering a rollover.\n\n" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index b199db883044a..a538e3cc1e2c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -113,6 +113,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata conf.getManagedLedgerInfoCompressionThresholdInBytes()); managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds()); managedLedgerFactoryConfig.setManagedCursorInfoCompressionType(conf.getManagedCursorInfoCompressionType()); + managedLedgerFactoryConfig.setNumManagedLedgerOffloadSchedulerThreads( + conf.getManagedLedgerNumOffloadSchedulerThreads()); managedLedgerFactoryConfig.setManagedCursorInfoCompressionThresholdInBytes( conf.getManagedCursorInfoCompressionThresholdInBytes());