Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ab4b130
feat: Add support for replicateSubscriptionState in Pulsar pubsub com…
pnagaraj80 Jun 20, 2025
2d75927
go fmt
pnagaraj80 Jun 20, 2025
4d367ca
Merge branch 'main' into feat/pulsar-replicate-subscription-state
pnagaraj80 Jun 21, 2025
d6fc421
Merge branch 'main' into feat/pulsar-replicate-subscription-state
cicoyle Jun 25, 2025
56442cb
Merge branch 'main' into feat/pulsar-replicate-subscription-state
yaron2 Jun 26, 2025
2bf2df6
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jun 26, 2025
0f38f90
Merge branch 'main' into feat/pulsar-replicate-subscription-state
pnagaraj80 Jun 26, 2025
3a4bccf
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jun 26, 2025
79fffe9
Merge branch 'main' into feat/pulsar-replicate-subscription-state
yaron2 Jul 15, 2025
ea172ba
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 15, 2025
91626d0
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 15, 2025
81c2a09
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 22, 2025
601e656
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 22, 2025
a53e54f
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 23, 2025
dd4e9fd
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 23, 2025
c907f13
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 24, 2025
b440147
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 25, 2025
8c357c2
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 25, 2025
b6156c1
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 30, 2025
9dcc5ac
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 31, 2025
17990ae
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 31, 2025
d57eaea
Merge branch 'main' into feat/pulsar-replicate-subscription-state
dapr-bot Jul 31, 2025
c802b8e
Merge branch 'main' into feat/pulsar-replicate-subscription-state
yaron2 Aug 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pubsub/pulsar/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type pulsarMetadata struct {
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`
SubscriptionType string `mapstructure:"subscribeType"`
SubscriptionInitialPosition string `mapstructure:"subscribeInitialPosition"`
ReplicateSubscriptionState bool `mapstructure:"replicateSubscriptionState"`
Token string `mapstructure:"token"`
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
}
Expand Down
13 changes: 12 additions & 1 deletion pubsub/pulsar/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,15 @@ metadata:
example: '"earliest"'
url:
title: "Pulsar SubscriptionInitialPosition"
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition"
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition"
- name: replicateSubscriptionState
type: bool
description: |
Enable replication of subscription state across geo-replicated Pulsar clusters.
When enabled, subscription state (such as cursor positions and acknowledgments) will be replicated to other clusters in a geo-replicated setup.
This is useful for maintaining subscription consistency during cluster failovers.
default: 'false'
example: '"true", "false"'
url:
title: "Pulsar Geo-Replication"
url: "https://pulsar.apache.org/docs/administration-geo/"
1 change: 1 addition & 0 deletions pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
MessageChannel: channel,
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
ReplicateSubscriptionState: p.metadata.ReplicateSubscriptionState,
}

// Handle KeySharedPolicy for key_shared subscription type
Expand Down
42 changes: 42 additions & 0 deletions pubsub/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,48 @@ func TestEncryptionKeys(t *testing.T) {
})
}

func TestParsePulsarMetadataReplicateSubscriptionState(t *testing.T) {
tt := []struct {
name string
replicateSubscriptionState string
expected bool
}{
{
name: "test replicateSubscriptionState true",
replicateSubscriptionState: "true",
expected: true,
},
{
name: "test replicateSubscriptionState false",
replicateSubscriptionState: "false",
expected: false,
},
{
name: "test replicateSubscriptionState empty (defaults to false)",
replicateSubscriptionState: "",
expected: false,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"host": "a",
}

if tc.replicateSubscriptionState != "" {
m.Properties["replicateSubscriptionState"] = tc.replicateSubscriptionState
}

meta, err := parsePulsarMetadata(m)

require.NoError(t, err)
assert.Equal(t, tc.expected, meta.ReplicateSubscriptionState)
})
}
}

func TestSanitiseURL(t *testing.T) {
tests := []struct {
name string
Expand Down