@@ -62,6 +62,11 @@ func TestReader(t *testing.T) {
6262 scenario : "reading from an out-of-range offset waits until the context is cancelled" ,
6363 function : testReaderOutOfRangeGetsCanceled ,
6464 },
65+
66+ {
67+ scenario : "topic being recreated will return an error" ,
68+ function : testReaderTopicRecreated ,
69+ },
6570 }
6671
6772 for _ , test := range tests {
@@ -78,6 +83,7 @@ func TestReader(t *testing.T) {
7883 MinBytes : 1 ,
7984 MaxBytes : 10e6 ,
8085 MaxWait : 100 * time .Millisecond ,
86+ Logger : newTestKafkaLogger (t , "" ),
8187 })
8288 defer r .Close ()
8389 testFunc (t , ctx , r )
@@ -1950,3 +1956,29 @@ func createTopicWithCompaction(t *testing.T, topic string, partitions int) {
19501956 defer cancel ()
19511957 waitForTopic (ctx , t , topic )
19521958}
1959+
1960+ // The current behavior of the Reader is to retry OffsetOutOfRange errors
1961+ // indefinitely, which results in programs hanging in the event of a topic being
1962+ // re-created while a consumer is running. To retain backwards-compatibility,
1963+ // ReaderConfig.OffsetOutOfRangeError is being used to instruct the Reader to
1964+ // return an error in this case instead, allowing callers to react.
1965+ func testReaderTopicRecreated (t * testing.T , ctx context.Context , r * Reader ) {
1966+ r .config .OffsetOutOfRangeError = true
1967+
1968+ topic := r .config .Topic
1969+
1970+ // add 1 message to the topic
1971+ prepareReader (t , ctx , r , makeTestSequence (1 )... )
1972+
1973+ // consume the message (moving the offset from 0 -> 1)
1974+ _ , err := r .ReadMessage (ctx )
1975+ require .NoError (t , err )
1976+
1977+ // destroy the topic, then recreate it so the offset now becomes 0
1978+ deleteTopic (t , topic )
1979+ createTopic (t , topic , 1 )
1980+
1981+ // expect an error, since the offset should now be out of range
1982+ _ , err = r .ReadMessage (ctx )
1983+ require .ErrorIs (t , err , OffsetOutOfRange )
1984+ }
0 commit comments