Skip to content

Commit ebcf4fe

Browse files
committed
fix: handle nullable metadata in OffsetFetchResponse
The Kafka protocol defines OffsetFetchResponse.Partitions.Metadata as nullable (versions 0-7), but sarama decoded it with getString() which rejects a -1 length. Use getNullableString()/putNullableString() instead, mapping nil to empty string to preserve backwards compatibility. Fixes #3444 Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
1 parent 9519c80 commit ebcf4fe

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

offset_fetch_response.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err
2424
b.LeaderEpoch = -1
2525
}
2626

27-
b.Metadata, err = pd.getString()
27+
metadata, err := pd.getNullableString()
2828
if err != nil {
2929
return err
3030
}
31+
if metadata != nil {
32+
b.Metadata = *metadata
33+
}
3134

3235
b.Err, err = pd.getKError()
3336
if err != nil {
@@ -44,7 +47,7 @@ func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err
4447
if version >= 5 {
4548
pe.putInt32(b.LeaderEpoch)
4649
}
47-
err = pe.putString(b.Metadata)
50+
err = pe.putNullableString(&b.Metadata)
4851
if err != nil {
4952
return err
5053
}

offset_fetch_response_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,24 @@ package sarama
55
import (
66
"fmt"
77
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
811
)
912

1013
var (
14+
// version 0 response with one topic "t", one partition 0, offset 0,
15+
// null metadata (length -1), and error code 7 (ErrRequestTimedOut)
16+
offsetFetchResponseV0NullMetadata = []byte{
17+
0x00, 0x00, 0x00, 0x01, // num topics = 1
18+
0x00, 0x01, 't', // topic name = "t"
19+
0x00, 0x00, 0x00, 0x01, // num partitions = 1
20+
0x00, 0x00, 0x00, 0x00, // partition = 0
21+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset = 0
22+
0xFF, 0xFF, // metadata = null (length -1)
23+
0x00, 0x07, // error code = 7
24+
}
25+
1126
emptyOffsetFetchResponse = []byte{
1227
0x00, 0x00, 0x00, 0x00,
1328
}
@@ -39,6 +54,17 @@ func TestEmptyOffsetFetchResponse(t *testing.T) {
3954
}
4055
}
4156

57+
func TestOffsetFetchResponseNullMetadata(t *testing.T) {
58+
response := &OffsetFetchResponse{}
59+
err := versionedDecode(offsetFetchResponseV0NullMetadata, response, 0, nil)
60+
require.NoError(t, err)
61+
62+
block := response.GetBlock("t", 0)
63+
require.NotNil(t, block)
64+
assert.Empty(t, block.Metadata)
65+
assert.Equal(t, ErrRequestTimedOut, block.Err)
66+
}
67+
4268
func TestNormalOffsetFetchResponse(t *testing.T) {
4369
// The response encoded form cannot be checked for it varies due to
4470
// unpredictable map traversal order.

0 commit comments

Comments
 (0)