Skip to content

[Bug] Topic cannot be accessed for "Failed to load topic within timeout" unless unloading the namespace #24497

Open
@BewareMyPower

Description

@BewareMyPower

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

StreamNative Pulsar 4.0.5.2, which is based on 6b5fdbf

Issue Description

This issue happens when we restarted brokers for an upgrade. There were two partitions (10 and 23 of a partitioned topic) owned by a broker stuck at initializing the PersistentTopic. Here are the info from the heap dump:

topic:
  - name: "<topic>-partition-23"
    messageDeduplication:
      snapshotCounter: 21
      managedCursor:
        entriesReadCount: 42
        lastMarkDeleteEntry:
          newPosition: "860608:298"
        pendingReadOps: 0
        readPosition: "861217:3"
      managedLedger:
        config:
          readEntryTimeoutSeconds: 0
        executor:
          runner: "BookKeeperClientWorker-OrderedExecutor-0-0"
          tasksFailed: 1
        lastConfirmedEntry: "861217:2"
        ledgers:
          - key: 860608
            value: {"entries": 338}
          - key: 861217
            value: {"entries": 3}
          - key: 861315
            value: {"entries": 0}
  - name: "<topic>-partition-10"
    messageDeduplication:
      snapshotCounter: 0
      managedCursor:
        entriesReadCount: 84
        lastMarkDeleteEntry:
          newPosition: "860650:231"
        pendingReadOps: 1
        readPosition: "861214:0"
      managedLedger:
        currentLedger:
          ledgerId: 861295
        lastConfirmedEntry: "861214:1"
        ledgers:
          - key: 860650
            value: {"entries": 314}
          - key: 861214
            value: {"entries": 2}
          - key: 861295
            value: {"entries": 0}
        state: LedgerOpened

The key is the MessageDeduplication field:

  • status is Initialized
  • managedCursor is not null
  • snapshotCounter is non-zero

From

managedCursor = cursor;
recoverSequenceIdsMap().thenRun(() -> {
status = Status.Enabled;
future.complete(null);
log.info("[{}] Enabled deduplication", topic.getName());
}).exceptionally(ex -> {
status = Status.Failed;

we can get a conclusion that the future of recoverSequenceIdsMap never completed.

The difference between partition 10 and 23 is:

  • Partition 10's cursor read position didn't reach the last confirmed entry, and there was a pending read operation. It seems that a read operation was somehow stuck forever.
  • Partition 23's cursor read position exceeds the last confirmed entry, the future should have been completed. There is another suspicious point that snapshotCounter is 21 while entriesReadCount is 42.

The most possible reason is that exceptions were thrown in a callback like

public void readEntriesComplete(List<Entry> entries, Object ctx) {

There was something wrong with our log collection, the key logs of SingleThreadExecutor#run have been lost. But we can check the taskFailed field is 1 from the heap dump.

Error messages


Reproducing the issue

Currently I cannot reproduce it.

Additional information

Currently Pulsar's topic caching mechanism is bad. There is a timeout configured by topicLoadTimeoutSeconds (default: 60). 1 minute is usually long enough in most cases. However, when the timeout happens, the topic cache won't be removed from BrokerService#topics until the pending PersistentTopic initialization completes. See

persistentTopic
.initialize()
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ -> persistentTopic.checkReplication())
.thenCompose(v -> {
// Also check dedup status
return persistentTopic.checkDeduplicationStatus();

initialize():

  1. Create a compaction service and set topicCompactionService with it.
  2. Create subscriptions for all cursors in the managed ledger.
  3. Get namespace policies for this topic from the metadata store.
  4. Initialize some fields according to the namespace policies.
  5. Register itself to the topic policies service.
  6. Get topic policies and update some fields.
  7. Remove orphan replication cursors.

preCreateSubscriptionForCompactionIfNeeded(): Create __compaction subscription, which might open the durable cursor.

checkReplication():

  1. Get allowed clusters from the metadata store.
  2. Start replicators if necessary.

checkDeduplicationStatus(): Perform the topic replay.

If there is a bug that makes PersistentTopic stuck at initialization, e.g. checkDeduplicationStatus(), the topic will be unavailable until manual intervention.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

type/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions