Skip to content

Commit 0008780

Browse files
authored
[2.x] Support getting producerName and topic name from the received message (#332)
* Fix single topic consumer returning message ID with empty topic name (#330) (cherry picked from commit fc04d7b) * Add producer name attribute to the Message (#331) * Add producer name attribute to the Message * Fix compilation issue (cherry picked from commit 8738e79) * Remove unnesessary changes
1 parent 6e4a543 commit 0008780

File tree

5 files changed

+27
-16
lines changed

5 files changed

+27
-16
lines changed

src/Pulsar.Client/Common/DTO.fs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ type internal Metadata =
213213
EncryptionAlgo: string
214214
OrderingKey: byte[]
215215
ReplicatedFrom: string
216+
ProducerName: string
216217
}
217218

218219
type MessageKey =
@@ -264,7 +265,7 @@ type Message<'T> internal (messageId: MessageId, data: byte[], key: PartitionKey
264265
properties: IReadOnlyDictionary<string, string>, encryptionCtx: EncryptionContext option,
265266
schemaVersion: byte[], sequenceId: SequenceId, orderingKey: byte[], publishTime: TimeStamp,
266267
eventTime: Nullable<TimeStamp>,
267-
redeliveryCount: int32, replicatedFrom: string,
268+
redeliveryCount: int32, replicatedFrom: string, producerName: string,
268269
getValue: unit -> 'T) =
269270
/// Get the unique message ID associated with this message.
270271
member this.MessageId = messageId
@@ -293,26 +294,27 @@ type Message<'T> internal (messageId: MessageId, data: byte[], key: PartitionKey
293294
member this.RedeliveryCount = redeliveryCount
294295
/// Get name of cluster, from which the message is replicated.
295296
member this.ReplicatedFrom = replicatedFrom
296-
297+
/// Get name of producer of the message
298+
member this.ProducerName = producerName
297299
/// Get the de-serialized value of the message, according the configured Schema.
298300
member this.GetValue() =
299301
getValue()
300302

301303
member internal this.WithMessageId messageId =
302304
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
303-
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
305+
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
304306
/// Get a new instance of the message with updated data
305307
member this.WithData data =
306308
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
307-
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
309+
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
308310
/// Get a new instance of the message with updated key
309311
member this.WithKey (key, hasBase64EncodedKey) =
310312
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
311-
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
313+
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
312314
/// Get a new instance of the message with updated properties
313315
member this.WithProperties properties =
314316
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
315-
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
317+
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
316318

317319
type Messages<'T> internal(maxNumberOfMessages: int, maxSizeOfMessages: int64) =
318320

src/Pulsar.Client/Internal/ClientCnx.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ and internal ClientCnx (config: PulsarClientConfiguration,
526526
EncryptionAlgo = messageMetadata.EncryptionAlgo
527527
OrderingKey = messageMetadata.OrderingKey
528528
ReplicatedFrom = messageMetadata.ReplicatedFrom
529+
ProducerName = messageMetadata.ProducerName
529530
}
530531

531532
{

src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
452452
ackRequests.Clear()
453453

454454
let getNewIndividualMsgIdWithPartition messageId =
455-
{ messageId with Type = MessageIdType.Single; Partition = partitionIndex; TopicName = %"" }
455+
{ messageId with Type = MessageIdType.Single; Partition = partitionIndex; TopicName = topicName.CompleteTopicName }
456456

457457
let processPossibleToDLQ (messageId : MessageId) =
458458
let acknowledge = trySendAcknowledge Individual EmptyProperties None
@@ -685,6 +685,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
685685
rawMessage.Metadata.EventTime,
686686
rawMessage.RedeliveryCount,
687687
rawMessage.Metadata.ReplicatedFrom,
688+
rawMessage.Metadata.ProducerName,
688689
getValue
689690
)
690691
if (rawMessage.RedeliveryCount >= deadLettersProcessor.MaxRedeliveryCount) then
@@ -1336,13 +1337,12 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
13361337
elif rawMessage.AckSet.Count > 0 && not rawMessage.AckSet[i] then
13371338
skippedMessages <- skippedMessages + 1
13381339
else
1339-
let messageId =
1340-
{
1341-
rawMessage.MessageId with
1342-
Partition = partitionIndex
1343-
Type = Batch(%i, acker)
1344-
TopicName = %""
1345-
}
1340+
let messageId = {
1341+
rawMessage.MessageId with
1342+
Partition = partitionIndex
1343+
Type = Batch(%i, acker)
1344+
TopicName = topicName.CompleteTopicName
1345+
}
13461346
let msgKey = singleMessageMetadata.PartitionKey
13471347
let getValue () =
13481348
keyValueProcessor
@@ -1374,6 +1374,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
13741374
eventTime,
13751375
rawMessage.RedeliveryCount,
13761376
rawMessage.Metadata.ReplicatedFrom,
1377+
rawMessage.Metadata.ProducerName,
13771378
getValue
13781379
)
13791380
if (rawMessage.RedeliveryCount >= deadLettersProcessor.MaxRedeliveryCount) then

tests/IntegrationTests/Basic.fs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,22 @@ let tests =
2222

2323
testList "Basic" [
2424

25-
testTask "Sent messageId should be equal to received messageId" {
25+
testTask "Sent message/messageId should be equal to received message/messageId" {
2626

2727
Log.Debug("Started Sent messageId should be equal to received messageId")
2828
let client = getClient()
29-
let topicName = "public/default/topic-" + Guid.NewGuid().ToString("N")
29+
let topicName = "persistent://public/default/topic-" + Guid.NewGuid().ToString("N")
3030

3131
let! (producer1 : IProducer<byte[]>) =
3232
client.NewProducer()
3333
.Topic(topicName)
34+
.ProducerName("producer1")
3435
.CreateAsync()
3536

3637
let! (producer2 : IProducer<byte[]>) =
3738
client.NewProducer()
3839
.Topic(topicName)
40+
.ProducerName("producer2")
3941
.EnableBatching(false)
4042
.CreateAsync()
4143

@@ -52,9 +54,13 @@ let tests =
5254

5355
Expect.isTrue "" (msg1Id = msg1.MessageId)
5456
Expect.equal "" [| 0uy |] <| msg1.GetValue()
57+
Expect.equal "Message ID topic name should match" topicName (string msg1.MessageId.TopicName)
58+
Expect.equal "Message producer name should match" "producer1" (string msg1.ProducerName)
5559

5660
Expect.isTrue "" (msg2Id = msg2.MessageId)
5761
Expect.equal "" [| 1uy |] <| msg2.GetValue()
62+
Expect.equal "Message ID topic name should match" topicName (string msg2.MessageId.TopicName)
63+
Expect.equal "Message producer name should match" "producer2" (string msg2.ProducerName)
5864

5965
Log.Debug("Finished Sent messageId should be equal to received messageId")
6066
}

tests/UnitTests/Internal/ChunkedMessageTrackerTests.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ let tests =
3030
EventTime = Nullable()
3131
OrderingKey = [||]
3232
ReplicatedFrom = ""
33+
ProducerName = ""
3334
}
3435

3536
let testRawMessage =

0 commit comments

Comments
 (0)