@@ -5,10 +5,14 @@ import (
55 "bytes"
66 "context"
77 "errors"
8+ "net"
89 "reflect"
910 "testing"
1011 "time"
1112
13+ "github.com/segmentio/kafka-go/protocol"
14+ "github.com/segmentio/kafka-go/protocol/consumer"
15+ "github.com/segmentio/kafka-go/protocol/joingroup"
1216 ktesting "github.com/segmentio/kafka-go/testing"
1317)
1418
@@ -124,6 +128,84 @@ func TestClientJoinGroup(t *testing.T) {
124128 }
125129}
126130
131+ type roundTripFn func (context.Context , net.Addr , Request ) (Response , error )
132+
133+ func (f roundTripFn ) RoundTrip (ctx context.Context , addr net.Addr , req Request ) (Response , error ) {
134+ return f (ctx , addr , req )
135+ }
136+
137+ // https://github.com/Shopify/sarama/blob/610514edec1825240d59b62e4d7f1aba4b1fa000/consumer_group_members.go#L43
138+ func TestClientJoinGroupSaramaCompatibility (t * testing.T ) {
139+ subscription := consumer.Subscription {
140+ Version : 1 ,
141+ Topics : []string {"topic" },
142+ }
143+
144+ // Marhsal as Verzon 0 (Without OwnedPartitions) but
145+ // with Version=1.
146+ metadata , err := protocol .Marshal (0 , subscription )
147+ if err != nil {
148+ t .Fatalf ("failed to marshal subscription %v" , err )
149+ }
150+
151+ client := & Client {
152+ Addr : TCP ("fake:9092" ),
153+ Transport : roundTripFn (func (context.Context , net.Addr , Request ) (Response , error ) {
154+ resp := joingroup.Response {
155+ ProtocolType : "consumer" ,
156+ ProtocolName : RoundRobinGroupBalancer {}.ProtocolName (),
157+ LeaderID : "member" ,
158+ MemberID : "member" ,
159+ Members : []joingroup.ResponseMember {
160+ {
161+ MemberID : "member" ,
162+ Metadata : metadata ,
163+ },
164+ },
165+ }
166+ return & resp , nil
167+ }),
168+ }
169+
170+ expResp := JoinGroupResponse {
171+ ProtocolName : RoundRobinGroupBalancer {}.ProtocolName (),
172+ ProtocolType : "consumer" ,
173+ LeaderID : "member" ,
174+ MemberID : "member" ,
175+ Members : []JoinGroupResponseMember {
176+ {
177+ ID : "member" ,
178+ Metadata : GroupProtocolSubscription {
179+ Topics : []string {"topic" },
180+ OwnedPartitions : map [string ][]int {},
181+ },
182+ },
183+ },
184+ }
185+
186+ gotResp , err := client .JoinGroup (context .Background (), & JoinGroupRequest {
187+ GroupID : "group" ,
188+ MemberID : "member" ,
189+ ProtocolType : "consumer" ,
190+ Protocols : []GroupProtocol {
191+ {
192+ Name : RoundRobinGroupBalancer {}.ProtocolName (),
193+ Metadata : GroupProtocolSubscription {
194+ Topics : []string {"topic" },
195+ UserData : metadata ,
196+ },
197+ },
198+ },
199+ })
200+ if err != nil {
201+ t .Fatalf ("error calling JoinGroup: %v" , err )
202+ }
203+
204+ if ! reflect .DeepEqual (expResp , * gotResp ) {
205+ t .Fatalf ("unexpected JoinGroup resp\n expected: %#v\n got: %#v" , expResp , * gotResp )
206+ }
207+ }
208+
127209func TestSaramaCompatibility (t * testing.T ) {
128210 var (
129211 // sample data from github.com/Shopify/sarama
0 commit comments