Skip to content

Commit 8738e79

Browse files
authored
Add producer name attribute to the Message (#331)
* Add producer name attribute to the Message * Fix compilation issue
1 parent 79efa0f commit 8738e79

File tree

5 files changed

+17
-6
lines changed

5 files changed

+17
-6
lines changed

src/Pulsar.Client/Common/DTO.fs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ type internal Metadata =
213213
EncryptionAlgo: string
214214
OrderingKey: byte[]
215215
ReplicatedFrom: string
216+
ProducerName: string
216217
NullValue: bool
217218
}
218219

@@ -265,7 +266,7 @@ type Message<'T> internal (messageId: MessageId, data: byte[], key: PartitionKey
265266
properties: IReadOnlyDictionary<string, string>, encryptionCtx: EncryptionContext option,
266267
schemaVersion: byte[], sequenceId: SequenceId, orderingKey: byte[], publishTime: TimeStamp,
267268
eventTime: Nullable<TimeStamp>,
268-
redeliveryCount: int32, replicatedFrom: string,
269+
redeliveryCount: int32, replicatedFrom: string, producerName: string,
269270
getValue: unit -> 'T) =
270271
/// Get the unique message ID associated with this message.
271272
member this.MessageId = messageId
@@ -294,26 +295,28 @@ type Message<'T> internal (messageId: MessageId, data: byte[], key: PartitionKey
294295
member this.RedeliveryCount = redeliveryCount
295296
/// Get name of cluster, from which the message is replicated.
296297
member this.ReplicatedFrom = replicatedFrom
298+
/// Get name of producer of the message
299+
member this.ProducerName = producerName
297300

298301
/// Get the de-serialized value of the message, according the configured Schema.
299302
member this.GetValue() =
300303
getValue()
301304

302305
member internal this.WithMessageId messageId =
303306
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
304-
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
307+
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
305308
/// Get a new instance of the message with updated data
306309
member this.WithData data =
307310
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
308-
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
311+
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
309312
/// Get a new instance of the message with updated key
310313
member this.WithKey (key, hasBase64EncodedKey) =
311314
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
312-
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
315+
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
313316
/// Get a new instance of the message with updated properties
314317
member this.WithProperties properties =
315318
Message(messageId, data, key, hasBase64EncodedKey, properties, encryptionCtx, schemaVersion, sequenceId,
316-
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, getValue)
319+
orderingKey, publishTime, eventTime, redeliveryCount, replicatedFrom, producerName, getValue)
317320

318321
type Messages<'T> internal(maxNumberOfMessages: int, maxSizeOfMessages: int64) =
319322

src/Pulsar.Client/Internal/ClientCnx.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ and internal ClientCnx (config: PulsarClientConfiguration,
530530
EncryptionAlgo = messageMetadata.EncryptionAlgo
531531
OrderingKey = messageMetadata.OrderingKey
532532
ReplicatedFrom = messageMetadata.ReplicatedFrom
533+
ProducerName = messageMetadata.ProducerName
533534
NullValue = messageMetadata.NullValue
534535
}
535536

src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
691691
rawMessage.Metadata.EventTime,
692692
rawMessage.RedeliveryCount,
693693
rawMessage.Metadata.ReplicatedFrom,
694+
rawMessage.Metadata.ProducerName,
694695
getValue
695696
)
696697
if (rawMessage.RedeliveryCount >= deadLettersProcessor.MaxRedeliveryCount) then
@@ -1387,6 +1388,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
13871388
eventTime,
13881389
rawMessage.RedeliveryCount,
13891390
rawMessage.Metadata.ReplicatedFrom,
1391+
rawMessage.Metadata.ProducerName,
13901392
getValue
13911393
)
13921394
if (rawMessage.RedeliveryCount >= deadLettersProcessor.MaxRedeliveryCount) then

tests/IntegrationTests/Basic.fs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ let tests =
2424

2525
testList "Basic" [
2626

27-
testTask "Sent messageId should be equal to received messageId" {
27+
testTask "Sent message/messageId should be equal to received message/messageId" {
2828

2929
Log.Debug("Started Sent messageId should be equal to received messageId")
3030
let client = getClient()
@@ -33,11 +33,13 @@ let tests =
3333
let! (producer1 : IProducer<byte[]>) =
3434
client.NewProducer()
3535
.Topic(topicName)
36+
.ProducerName("producer1")
3637
.CreateAsync()
3738

3839
let! (producer2 : IProducer<byte[]>) =
3940
client.NewProducer()
4041
.Topic(topicName)
42+
.ProducerName("producer2")
4143
.EnableBatching(false)
4244
.CreateAsync()
4345

@@ -55,10 +57,12 @@ let tests =
5557
Expect.isTrue "" (msg1Id = msg1.MessageId)
5658
Expect.equal "" [| 0uy |] <| msg1.GetValue()
5759
Expect.equal "Message ID topic name should match" topicName (string msg1.MessageId.TopicName)
60+
Expect.equal "Message producer name should match" "producer1" (string msg1.ProducerName)
5861

5962
Expect.isTrue "" (msg2Id = msg2.MessageId)
6063
Expect.equal "" [| 1uy |] <| msg2.GetValue()
6164
Expect.equal "Message ID topic name should match" topicName (string msg2.MessageId.TopicName)
65+
Expect.equal "Message producer name should match" "producer2" (string msg2.ProducerName)
6266

6367
Log.Debug("Finished Sent messageId should be equal to received messageId")
6468
}

tests/UnitTests/Internal/ChunkedMessageTrackerTests.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ let tests =
3131
EventTime = Nullable()
3232
OrderingKey = [||]
3333
ReplicatedFrom = ""
34+
ProducerName = ""
3435
NullValue = false
3536
}
3637

0 commit comments

Comments
 (0)