Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion waku/v2/protocol/relay/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package relay
import (
"context"

"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/slices"

"github.com/waku-org/go-waku/waku/v2/protocol"
)

// Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic.
Expand Down Expand Up @@ -55,3 +56,7 @@ func NewSubscription(contentFilter protocol.ContentFilter) *Subscription {
subType: subType,
}
}

func (s *Subscription) ContentFilter() protocol.ContentFilter {
return s.contentFilter
}
14 changes: 6 additions & 8 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
proto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/proto"

pubsub "github.com/libp2p/go-libp2p-pubsub"

"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
Expand Down Expand Up @@ -388,7 +389,7 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
}

// subscribe returns list of Subscription to receive messages based on content filter
func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
func (w *WakuRelay) subscribe(contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {

var subscriptions []*Subscription
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
Expand Down Expand Up @@ -438,23 +439,20 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
w.topicsMutex.Unlock()

subscriptions = append(subscriptions, subscription)
go func() {
defer utils.LogOnPanic()
<-ctx.Done()
subscription.Unsubscribe()
}()
}

return subscriptions, nil
}

// Subscribe returns a Subscription to receive messages as per contentFilter
// contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding)
// ctx argument is ignored and left for compatibility.
func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
return w.subscribe(ctx, contentFilter, opts...)
return w.subscribe(contentFilter, opts...)
}

// Unsubscribe closes a subscription to a pubsub topic
// ctx argument is ignored and left for compatibility.
func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error {

pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
Expand Down
15 changes: 8 additions & 7 deletions waku/v2/protocol/relay/waku_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
Expand Down Expand Up @@ -41,7 +42,7 @@ func TestWakuRelay(t *testing.T) {
require.NoError(t, err)
defer relay.Stop()

subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))

require.NoError(t, err)

Expand Down Expand Up @@ -92,7 +93,7 @@ func TestWakuRelayUnsubscribedTopic(t *testing.T) {
require.NoError(t, err)
defer relay.Stop()

subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))

require.NoError(t, err)

Expand Down Expand Up @@ -278,7 +279,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
defer bcaster.Stop()

//Create a contentTopic level subscription
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic))
subs, err := relay.subscribe(protocol.NewContentFilter("", testcTopic))
require.NoError(t, err)
require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true)

Expand All @@ -299,7 +300,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
defer cancel()

//Create a pubSub level subscription
subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
subs1, err := relay.subscribe(protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)

msg := &pb.WakuMessage{
Expand Down Expand Up @@ -382,7 +383,7 @@ func TestInvalidMessagePublish(t *testing.T) {

ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second)

subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
require.NoError(t, err)

// Test empty contentTopic
Expand Down Expand Up @@ -459,10 +460,10 @@ func TestWakuRelayStaticSharding(t *testing.T) {
time.Sleep(2 * time.Second)

// Subscribe to valid static shard topic on both hosts
subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
subs1, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
require.NoError(t, err)

subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
subs2, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
require.NoError(t, err)
require.True(t, relay2.IsSubscribed(testTopic))
require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0])
Expand Down
Loading