From 63645edc00203018d3687979fefb6c901e439063 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 29 Oct 2025 19:30:42 +0800 Subject: [PATCH 1/2] Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp --- src/Pulsar.Client/Internal/ConsumerImpl.fs | 13 ++- tests/IntegrationTests/Reader.fs | 93 +++++++++++++++++++++- 2 files changed, 100 insertions(+), 6 deletions(-) diff --git a/src/Pulsar.Client/Internal/ConsumerImpl.fs b/src/Pulsar.Client/Internal/ConsumerImpl.fs index e5e4f142..82d76622 100644 --- a/src/Pulsar.Client/Internal/ConsumerImpl.fs +++ b/src/Pulsar.Client/Internal/ConsumerImpl.fs @@ -97,6 +97,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien let mutable lastMessageIdInBroker = MessageId.Earliest let mutable lastDequeuedMessageId = MessageId.Earliest let mutable duringSeek = None + let mutable hasSoughtByTimestamp = false let initialStartMessageId = startMessageId let mutable incomingMessagesSize = 0L let deadLettersProcessor = consumerConfig.DeadLetterProcessor topicName @@ -1092,7 +1093,9 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien Log.Logger.LogInformation("{0} Seek subscription to {1}", prefix, seekData) let payload, seekMessageId = match seekData with - | SeekType.Timestamp timestamp -> Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest + | SeekType.Timestamp timestamp -> + this.HasSoughtByTimestamp <- true + Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest | SeekType.MessageId messageId -> match messageId.ChunkMessageIds with | Some chunkMessageIds when chunkMessageIds.Length >0 -> @@ -1132,9 +1135,11 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien // we haven't read yet. use startMessageId for comparison if lastDequeuedMessageId = MessageId.Earliest then + // If the last seek is called with timestamp, startMessageId cannot represent the position to start, so we + // have to get the mark-delete position from the GetLastMessageId response to compare as well. // if we are starting from latest, we should seek to the actual last message first. // allow the last one to be read when read head inclusively. - if startMessageId = MessageId.Latest then + if startMessageId = MessageId.Latest || hasSoughtByTimestamp then backgroundTask { try let! lastMessageIdResult = getLastMessageIdAsync() @@ -1431,6 +1436,10 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien with get() = Volatile.Read(&lastMessageIdInBroker) and private set value = Volatile.Write(&lastMessageIdInBroker, value) + member this.HasSoughtByTimestamp + with get() = Volatile.Read(&hasSoughtByTimestamp) + and private set value = Volatile.Write(&hasSoughtByTimestamp, value) + override this.Equals consumer = consumerId = (consumer :?> IConsumer<'T>).ConsumerId diff --git a/tests/IntegrationTests/Reader.fs b/tests/IntegrationTests/Reader.fs index 7f653336..b9d225d4 100644 --- a/tests/IntegrationTests/Reader.fs +++ b/tests/IntegrationTests/Reader.fs @@ -272,6 +272,85 @@ let tests = + let checkHasMessageAvailableAfterSeekTimestamp (initializeLastMessageIdInBroker: bool) = + task { + Log.Debug("Started HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker) + let client = getClient() + let topicName = "public/default/test-has-message-available-after-seek-timestamp-" + Guid.NewGuid().ToString("N") + + let! (producer : IProducer) = + client.NewProducer(Schema.STRING()) + .Topic(topicName) + .CreateAsync() + + let timestampBeforeSend = %(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()) + let! sentMsgId = producer.SendAsync("msg") + do! producer.DisposeAsync() + + let messageIds = [ + MessageId.Earliest + sentMsgId + MessageId.Latest + ] + + // Test 1: Seek to future timestamp + for messageId in messageIds do + let! (reader : IReader) = + client.NewReader(Schema.STRING()) + .Topic(topicName) + .ReceiverQueueSize(1) + .StartMessageId(messageId) + .CreateAsync() + + if initializeLastMessageIdInBroker then + if messageId = MessageId.Earliest then + let! hasMessage = reader.HasMessageAvailableAsync() + Expect.isTrue "should have message" hasMessage + else + let! hasMessage = reader.HasMessageAvailableAsync() + Expect.isFalse "should not have message" hasMessage + + let futureTimestamp = %(DateTimeOffset.UtcNow.AddMinutes(1.0).ToUnixTimeMilliseconds()) + + // The Seek operation does not implement a backoff mechanism. It will fail if the connection is not + // ready, so wait for a short period until the connection becomes available. + do! Async.Sleep(1000) + do! reader.SeekAsync(futureTimestamp) + // HasMessageAvailableAsync does not implement a backoff mechanism. The operation may fail dut to the + // broker cannot find the consumer during the seek operation. So wait for a short period here. + do! Async.Sleep(1000) + let! hasMessage = reader.HasMessageAvailableAsync() + Expect.isFalse "after seek to future should not have message" hasMessage + do! reader.DisposeAsync() + + // Test 2: Seek to timestamp before send + for messageId in messageIds do + let! (reader : IReader) = + client.NewReader(Schema.STRING()) + .Topic(topicName) + .ReceiverQueueSize(1) + .StartMessageId(messageId) + .CreateAsync() + + + if initializeLastMessageIdInBroker then + if messageId = MessageId.Earliest then + let! hasMessage = reader.HasMessageAvailableAsync() + Expect.isTrue "should have message" hasMessage + else + let! hasMessage = reader.HasMessageAvailableAsync() + Expect.isFalse "should not have message" hasMessage + + do! Async.Sleep(1000) + do! reader.SeekAsync(timestampBeforeSend) + do! Async.Sleep(1000) + let! hasMessage = reader.HasMessageAvailableAsync() + Expect.isTrue "after seek to before send should have message" hasMessage + do! reader.DisposeAsync() + + Log.Debug("Finished HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker) + } + testList "Reader" [ testTask "Reader non-batching configuration works fine" { @@ -314,13 +393,19 @@ let tests = do! checkReadingFromRollback true } - // uncomment when https://github.com/apache/pulsar/issues/10515 is ready - ptestTask "Check StartMessageFromFuturePoint without batching" { + testTask "Check StartMessageFromFuturePoint without batching" { do! checkReadingFromFuture false } - // uncomment when https://github.com/apache/pulsar/issues/10515 is ready - ptestTask "Check StartMessageFromFuturePoint with batching" { + testTask "Check StartMessageFromFuturePoint with batching" { do! checkReadingFromFuture true } + + testTask "HasMessageAvailable after SeekTimestamp without initializeLastMessageIdInBroker" { + do! checkHasMessageAvailableAfterSeekTimestamp false + } + + testTask "HasMessageAvailable after SeekTimestamp with initializeLastMessageIdInBroker" { + do! checkHasMessageAvailableAfterSeekTimestamp true + } ] \ No newline at end of file From d236e4dd40eb2769b9f3b55ed389654d1a9e132e Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Thu, 30 Oct 2025 22:46:04 +0800 Subject: [PATCH 2/2] Remove HasSoughtByTimestamp and improve test --- src/Pulsar.Client/Internal/ConsumerImpl.fs | 5 +---- tests/IntegrationTests/Reader.fs | 12 ++++++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/Pulsar.Client/Internal/ConsumerImpl.fs b/src/Pulsar.Client/Internal/ConsumerImpl.fs index 82d76622..b633a054 100644 --- a/src/Pulsar.Client/Internal/ConsumerImpl.fs +++ b/src/Pulsar.Client/Internal/ConsumerImpl.fs @@ -1094,7 +1094,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien let payload, seekMessageId = match seekData with | SeekType.Timestamp timestamp -> - this.HasSoughtByTimestamp <- true + hasSoughtByTimestamp <- true Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest | SeekType.MessageId messageId -> match messageId.ChunkMessageIds with @@ -1436,9 +1436,6 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien with get() = Volatile.Read(&lastMessageIdInBroker) and private set value = Volatile.Write(&lastMessageIdInBroker, value) - member this.HasSoughtByTimestamp - with get() = Volatile.Read(&hasSoughtByTimestamp) - and private set value = Volatile.Write(&hasSoughtByTimestamp, value) override this.Equals consumer = consumerId = (consumer :?> IConsumer<'T>).ConsumerId diff --git a/tests/IntegrationTests/Reader.fs b/tests/IntegrationTests/Reader.fs index b9d225d4..2133f69f 100644 --- a/tests/IntegrationTests/Reader.fs +++ b/tests/IntegrationTests/Reader.fs @@ -314,11 +314,11 @@ let tests = // The Seek operation does not implement a backoff mechanism. It will fail if the connection is not // ready, so wait for a short period until the connection becomes available. - do! Async.Sleep(1000) + do! Task.Delay(1000) do! reader.SeekAsync(futureTimestamp) - // HasMessageAvailableAsync does not implement a backoff mechanism. The operation may fail dut to the - // broker cannot find the consumer during the seek operation. So wait for a short period here. - do! Async.Sleep(1000) + // HasMessageAvailableAsync does not implement a backoff mechanism. The operation may fail due to the + // broker not finding the consumer during the seek operation. So wait for a short period here. + do! Task.Delay(1000) let! hasMessage = reader.HasMessageAvailableAsync() Expect.isFalse "after seek to future should not have message" hasMessage do! reader.DisposeAsync() @@ -341,9 +341,9 @@ let tests = let! hasMessage = reader.HasMessageAvailableAsync() Expect.isFalse "should not have message" hasMessage - do! Async.Sleep(1000) + do! Task.Delay(1000) do! reader.SeekAsync(timestampBeforeSend) - do! Async.Sleep(1000) + do! Task.Delay(1000) let! hasMessage = reader.HasMessageAvailableAsync() Expect.isTrue "after seek to before send should have message" hasMessage do! reader.DisposeAsync()