diff --git a/librdkafka b/librdkafka index 826f585..b4c6085 160000 --- a/librdkafka +++ b/librdkafka @@ -1 +1 @@ -Subproject commit 826f585564ec33750e9211c7eeb801d3f85a91a0 +Subproject commit b4c608570f796c18ff2211a7af876046d264d392 diff --git a/tests/consumer.lua b/tests/consumer.lua index e47f720..d1baa93 100644 --- a/tests/consumer.lua +++ b/tests/consumer.lua @@ -203,7 +203,7 @@ local function test_seek_partitions() local out = consumer:output() for _ = 1, 5 do - local msg = out:get(3) + local msg = out:get(10) if msg == nil then error('Message is not delivered') end diff --git a/tests/test_consumer.py b/tests/test_consumer.py index f9358fd..f116e1f 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -211,12 +211,15 @@ def test_consumer_should_completely_unsubscribe_from_topics(): "value": "test34" } - write_into_kafka("test_unsubscribe", (message1, message2)) + t = f"test_unsubscribe_{randomword(4)}" + write_into_kafka(t, (message1, message2)) server = get_server() - with create_consumer(server, KAFKA_HOST, {"group.id": "should_completely_unsubscribe_from_topics"}): - server.call("consumer.subscribe", [["test_unsubscribe"]]) + with create_consumer(server, KAFKA_HOST, { + "group.id": "should_completely_unsubscribe_from_topics", + }): + server.call("consumer.subscribe", [[t]]) response = server.call("consumer.consume", [10])[0] @@ -225,9 +228,9 @@ def test_consumer_should_completely_unsubscribe_from_topics(): "test2", } - server.call("consumer.unsubscribe", [["test_unsubscribe"]]) + server.call("consumer.unsubscribe", [[t]]) - write_into_kafka("test_unsubscribe", (message3, )) + write_into_kafka(t, (message3, )) response = server.call("consumer.consume", [10]) @@ -257,11 +260,16 @@ def test_consumer_should_partially_unsubscribe_from_topics(): server = get_server() - with create_consumer(server, KAFKA_HOST, {"group.id": "should_partially_unsubscribe_from_topics"}): - server.call("consumer.subscribe", [["test_unsub_partially_1", "test_unsub_partially_2"]]) + salt = randomword(4) + with create_consumer(server, KAFKA_HOST, { + "group.id": f"should_partially_unsubscribe_from_topics_{salt}", + }): + t1 = f"test_unsub_partially_{salt}_1" + t2 = f"test_unsub_partially_{salt}_2" + server.call("consumer.subscribe", [[t1, t2]]) - write_into_kafka("test_unsub_partially_1", (message1, )) - write_into_kafka("test_unsub_partially_2", (message2, )) + write_into_kafka(t1, (message1, )) + write_into_kafka(t2, (message2, )) time.sleep(5) # waiting up to 30 seconds @@ -272,10 +280,10 @@ def test_consumer_should_partially_unsubscribe_from_topics(): "test2", } - server.call("consumer.unsubscribe", [["test_unsub_partially_1"]]) + server.call("consumer.unsubscribe", [[t1]]) - write_into_kafka("test_unsub_partially_1", (message3, )) - write_into_kafka("test_unsub_partially_2", (message4, )) + write_into_kafka(t1, (message3, )) + write_into_kafka(t2, (message4, )) time.sleep(5) response = server.call("consumer.consume", [30])[0]