Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pubsub/pulsar/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type pulsarMetadata struct {
MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"`
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`
SubscriptionType string `mapstructure:"subscribeType"`
SubscriptionInitialPosition string `mapstructure:"subscribeInitialPosition"`
Token string `mapstructure:"token"`
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
}
Expand Down
11 changes: 10 additions & 1 deletion pubsub/pulsar/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,13 @@ metadata:
example: '"exclusive"'
url:
title: "Pulsar Subscription Types"
url: "https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#subscription-types"
url: "https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#subscription-types"
- name: subscribeInitialPosition
type: string
description: |
Subscription position is the initial position which the cursor is set when start consuming: "latest", "earliest".
default: '"latest"'
example: '"earliest"'
url:
title: "Pulsar SubscriptionInitialPosition"
url: "https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#SubscriptionInitialPosition"
47 changes: 41 additions & 6 deletions pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ const (

processModeAsync = "async"
processModeSync = "sync"

subscribeInitialPosition = "subscribeInitialPosition"

subscribePositionEarliest = "earliest"
subscribePositionLatest = "latest"
)

type ProcessMode string
Expand Down Expand Up @@ -144,6 +149,11 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
return nil, errors.New("invalid subscription type. Accepted values are `exclusive`, `shared`, `failover` and `key_shared`")
}

m.SubscriptionInitialPosition, err = parseSubscriptionPosition(meta.Properties[subscribeInitialPosition])
if err != nil {
return nil, errors.New("invalid subscription initial position. Accepted values are `latest` and `earliest`")
}

for k, v := range meta.Properties {
switch {
case strings.HasSuffix(k, topicJSONSchemaIdentifier):
Expand Down Expand Up @@ -421,6 +431,30 @@ func getSubscribeType(subsTypeStr string) pulsar.SubscriptionType {
return subsType
}

func parseSubscriptionPosition(in string) (string, error) {
subsPosition := strings.ToLower(in)
switch subsPosition {
case subscribePositionEarliest, subscribePositionLatest:
return subsPosition, nil
case "":
return subscribePositionLatest, nil
default:
return "", fmt.Errorf("invalid subscription initial position: %s", subsPosition)
}
}

func getSubscribePosition(subsPositionStr string) pulsar.SubscriptionInitialPosition {
var subsPosition pulsar.SubscriptionInitialPosition

switch subsPositionStr {
case subscribePositionEarliest:
subsPosition = pulsar.SubscriptionPositionEarliest
case subscribePositionLatest:
subsPosition = pulsar.SubscriptionPositionLatest
}
return subsPosition
}

func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
if p.closed.Load() {
return errors.New("component is closed")
Expand All @@ -436,12 +470,13 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
}

options := pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: p.metadata.ConsumerID,
Type: getSubscribeType(subscribeType),
MessageChannel: channel,
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
Topic: topic,
SubscriptionName: p.metadata.ConsumerID,
Type: getSubscribeType(subscribeType),
SubscriptionInitialPosition: getSubscribePosition(subscribeInitialPosition),
MessageChannel: channel,
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
}

// Handle KeySharedPolicy for key_shared subscription type
Expand Down
53 changes: 53 additions & 0 deletions pubsub/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,59 @@ func TestParsePulsarMetadataSubscriptionType(t *testing.T) {
}
}

func TestParsePulsarMetadataSubscriptionInitialPosition(t *testing.T) {
tt := []struct {
name string
subscribeInitialPosition string
expected string
err bool
}{
{
name: "test valid subscribe initial position - earliest",
subscribeInitialPosition: "earliest",
expected: "earliest",
err: false,
},
{
name: "test valid subscribe initial position - latest",
subscribeInitialPosition: "latest",
expected: "latest",
err: false,
},
{
name: "test valid subscribe initial position - empty",
subscribeInitialPosition: "",
expected: "latest",
err: false,
},
{
name: "test invalid subscribe initial position",
subscribeInitialPosition: "invalid",
err: true,
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
m := pubsub.Metadata{}

m.Properties = map[string]string{
"host": "a",
"subscribeInitialPosition": tc.subscribeInitialPosition,
}
meta, err := parsePulsarMetadata(m)

if tc.err {
require.Error(t, err)
assert.Nil(t, meta)
return
}

require.NoError(t, err)
assert.Equal(t, tc.expected, meta.SubscriptionInitialPosition)
})
}
}

func TestParsePulsarSchemaMetadata(t *testing.T) {
t.Run("test json", func(t *testing.T) {
m := pubsub.Metadata{}
Expand Down