Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/Pulsar.Client/Internal/ConsumerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
let mutable lastMessageIdInBroker = MessageId.Earliest
let mutable lastDequeuedMessageId = MessageId.Earliest
let mutable duringSeek = None
let mutable hasSoughtByTimestamp = false
let initialStartMessageId = startMessageId
let mutable incomingMessagesSize = 0L
let deadLettersProcessor = consumerConfig.DeadLetterProcessor topicName
Expand Down Expand Up @@ -1092,7 +1093,9 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
Log.Logger.LogInformation("{0} Seek subscription to {1}", prefix, seekData)
let payload, seekMessageId =
match seekData with
| SeekType.Timestamp timestamp -> Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest
| SeekType.Timestamp timestamp ->
hasSoughtByTimestamp <- true
Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest
| SeekType.MessageId messageId ->
match messageId.ChunkMessageIds with
| Some chunkMessageIds when chunkMessageIds.Length >0 ->
Expand Down Expand Up @@ -1132,9 +1135,11 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien

// we haven't read yet. use startMessageId for comparison
if lastDequeuedMessageId = MessageId.Earliest then
// If the last seek is called with timestamp, startMessageId cannot represent the position to start, so we
// have to get the mark-delete position from the GetLastMessageId response to compare as well.
// if we are starting from latest, we should seek to the actual last message first.
// allow the last one to be read when read head inclusively.
if startMessageId = MessageId.Latest then
if startMessageId = MessageId.Latest || hasSoughtByTimestamp then
backgroundTask {
try
let! lastMessageIdResult = getLastMessageIdAsync()
Expand Down Expand Up @@ -1431,6 +1436,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
with get() = Volatile.Read(&lastMessageIdInBroker)
and private set value = Volatile.Write(&lastMessageIdInBroker, value)


override this.Equals consumer =
consumerId = (consumer :?> IConsumer<'T>).ConsumerId

Expand Down
93 changes: 89 additions & 4 deletions tests/IntegrationTests/Reader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,85 @@ let tests =



let checkHasMessageAvailableAfterSeekTimestamp (initializeLastMessageIdInBroker: bool) =
task {
Log.Debug("Started HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker)
let client = getClient()
let topicName = "public/default/test-has-message-available-after-seek-timestamp-" + Guid.NewGuid().ToString("N")

let! (producer : IProducer<string>) =
client.NewProducer(Schema.STRING())
.Topic(topicName)
.CreateAsync()

let timestampBeforeSend = %(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
let! sentMsgId = producer.SendAsync("msg")
do! producer.DisposeAsync()

let messageIds = [
MessageId.Earliest
sentMsgId
MessageId.Latest
]

// Test 1: Seek to future timestamp
for messageId in messageIds do
let! (reader : IReader<string>) =
client.NewReader(Schema.STRING())
.Topic(topicName)
.ReceiverQueueSize(1)
.StartMessageId(messageId)
.CreateAsync()

if initializeLastMessageIdInBroker then
if messageId = MessageId.Earliest then
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isTrue "should have message" hasMessage
else
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isFalse "should not have message" hasMessage

let futureTimestamp = %(DateTimeOffset.UtcNow.AddMinutes(1.0).ToUnixTimeMilliseconds())

// The Seek operation does not implement a backoff mechanism. It will fail if the connection is not
// ready, so wait for a short period until the connection becomes available.
do! Task.Delay(1000)
do! reader.SeekAsync(futureTimestamp)
// HasMessageAvailableAsync does not implement a backoff mechanism. The operation may fail due to the
// broker not finding the consumer during the seek operation. So wait for a short period here.
do! Task.Delay(1000)
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isFalse "after seek to future should not have message" hasMessage
do! reader.DisposeAsync()

// Test 2: Seek to timestamp before send
for messageId in messageIds do
let! (reader : IReader<string>) =
client.NewReader(Schema.STRING())
.Topic(topicName)
.ReceiverQueueSize(1)
.StartMessageId(messageId)
.CreateAsync()


if initializeLastMessageIdInBroker then
if messageId = MessageId.Earliest then
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isTrue "should have message" hasMessage
else
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isFalse "should not have message" hasMessage

do! Task.Delay(1000)
do! reader.SeekAsync(timestampBeforeSend)
do! Task.Delay(1000)
let! hasMessage = reader.HasMessageAvailableAsync()
Expect.isTrue "after seek to before send should have message" hasMessage
do! reader.DisposeAsync()

Log.Debug("Finished HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker)
}

testList "Reader" [

testTask "Reader non-batching configuration works fine" {
Expand Down Expand Up @@ -314,13 +393,19 @@ let tests =
do! checkReadingFromRollback true
}

// uncomment when https://github.com/apache/pulsar/issues/10515 is ready
ptestTask "Check StartMessageFromFuturePoint without batching" {
testTask "Check StartMessageFromFuturePoint without batching" {
do! checkReadingFromFuture false
}

// uncomment when https://github.com/apache/pulsar/issues/10515 is ready
ptestTask "Check StartMessageFromFuturePoint with batching" {
testTask "Check StartMessageFromFuturePoint with batching" {
do! checkReadingFromFuture true
}

testTask "HasMessageAvailable after SeekTimestamp without initializeLastMessageIdInBroker" {
do! checkHasMessageAvailableAfterSeekTimestamp false
}

testTask "HasMessageAvailable after SeekTimestamp with initializeLastMessageIdInBroker" {
do! checkHasMessageAvailableAfterSeekTimestamp true
}
]
Loading