Skip to content

Commit ab455e6

Browse files
committed
test: improve testing config
Instead of having X different macros and control flow exists, let's have one mechanism to read env variables and decide if a test shall run or not. This also moves the "which broker implementation supports which features" from a bunch of environment variables into a nice enum. This will greatly help when we expand the set of admit APIs, see #157 for example.
1 parent aa82995 commit ab455e6

File tree

7 files changed

+249
-129
lines changed

7 files changed

+249
-129
lines changed

.circleci/config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ jobs:
204204
RUST_LOG: "trace"
205205
# Run integration tests
206206
TEST_INTEGRATION: 1
207+
TEST_BROKER_IMPL: redpanda
207208
TEST_JAVA_INTEROPT: 1
208209
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
209210
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
@@ -278,8 +279,7 @@ jobs:
278279
RUST_LOG: "trace"
279280
# Run integration tests
280281
TEST_INTEGRATION: 1
281-
# Kafka support DeleteRecords
282-
TEST_DELETE_RECORDS: 1
282+
TEST_BROKER_IMPL: kafka
283283
TEST_JAVA_INTEROPT: 1
284284
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
285285
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ $ docker-compose -f docker-compose-redpanda.yml up
114114
in one session, and then run:
115115

116116
```console
117-
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9011 cargo test
117+
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test
118118
```
119119

120120
in another session.
@@ -130,7 +130,7 @@ $ docker-compose -f docker-compose-kafka.yml up
130130
in one session, and then run:
131131

132132
```console
133-
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9011 cargo test
133+
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test
134134
```
135135

136136
in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
@@ -230,14 +230,14 @@ execution that hooks right into the place where it is about to exit:
230230
Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:
231231

232232
```console
233-
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
233+
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
234234
```
235235

236236
If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here
237237
for the `parallel/rskafka` benchmark):
238238

239239
```console
240-
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
240+
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
241241
bench --all-features --bench write_throughput -- \
242242
--bench --noplot parallel/rskafka
243243
```

tests/client.rs

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,22 @@ use test_helpers::{maybe_start_logging, now, random_topic_name, record};
1616
async fn test_plain() {
1717
maybe_start_logging();
1818

19-
let connection = maybe_skip_kafka_integration!();
20-
ClientBuilder::new(connection).build().await.unwrap();
19+
let test_cfg = maybe_skip_kafka_integration!();
20+
ClientBuilder::new(test_cfg.bootstrap_brokers)
21+
.build()
22+
.await
23+
.unwrap();
2124
}
2225

2326
#[tokio::test]
2427
async fn test_topic_crud() {
2528
maybe_start_logging();
2629

27-
let connection = maybe_skip_kafka_integration!();
28-
let client = ClientBuilder::new(connection).build().await.unwrap();
30+
let test_cfg = maybe_skip_kafka_integration!();
31+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
32+
.build()
33+
.await
34+
.unwrap();
2935
let controller_client = client.controller_client().unwrap();
3036
let topics = client.list_topics().await.unwrap();
3137

@@ -77,10 +83,13 @@ async fn test_topic_crud() {
7783
async fn test_partition_client() {
7884
maybe_start_logging();
7985

80-
let connection = maybe_skip_kafka_integration!();
86+
let test_cfg = maybe_skip_kafka_integration!();
8187
let topic_name = random_topic_name();
8288

83-
let client = ClientBuilder::new(connection).build().await.unwrap();
89+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
90+
.build()
91+
.await
92+
.unwrap();
8493

8594
let controller_client = client.controller_client().unwrap();
8695
controller_client
@@ -134,8 +143,8 @@ async fn test_tls() {
134143
.with_single_cert(vec![producer_root], private_key)
135144
.unwrap();
136145

137-
let connection = maybe_skip_kafka_integration!();
138-
ClientBuilder::new(connection)
146+
let test_cfg = maybe_skip_kafka_integration!();
147+
ClientBuilder::new(test_cfg.bootstrap_brokers)
139148
.tls_config(Arc::new(config))
140149
.build()
141150
.await
@@ -147,14 +156,11 @@ async fn test_tls() {
147156
async fn test_socks5() {
148157
maybe_start_logging();
149158

150-
// e.g. "my-connection-kafka-bootstrap:9092"
151-
let connection = maybe_skip_kafka_integration!();
152-
// e.g. "localhost:1080"
153-
let proxy = maybe_skip_SOCKS_PROXY!();
159+
let test_cfg = maybe_skip_kafka_integration!(socks5);
154160
let topic_name = random_topic_name();
155161

156-
let client = ClientBuilder::new(connection)
157-
.socks5_proxy(proxy)
162+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
163+
.socks5_proxy(test_cfg.socks5_proxy.unwrap())
158164
.build()
159165
.await
160166
.unwrap();
@@ -186,11 +192,14 @@ async fn test_socks5() {
186192
async fn test_produce_empty() {
187193
maybe_start_logging();
188194

189-
let connection = maybe_skip_kafka_integration!();
195+
let test_cfg = maybe_skip_kafka_integration!();
190196
let topic_name = random_topic_name();
191197
let n_partitions = 2;
192198

193-
let client = ClientBuilder::new(connection).build().await.unwrap();
199+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
200+
.build()
201+
.await
202+
.unwrap();
194203
let controller_client = client.controller_client().unwrap();
195204
controller_client
196205
.create_topic(&topic_name, n_partitions, 1, 5_000)
@@ -208,11 +217,14 @@ async fn test_produce_empty() {
208217
async fn test_consume_empty() {
209218
maybe_start_logging();
210219

211-
let connection = maybe_skip_kafka_integration!();
220+
let test_cfg = maybe_skip_kafka_integration!();
212221
let topic_name = random_topic_name();
213222
let n_partitions = 2;
214223

215-
let client = ClientBuilder::new(connection).build().await.unwrap();
224+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
225+
.build()
226+
.await
227+
.unwrap();
216228
let controller_client = client.controller_client().unwrap();
217229
controller_client
218230
.create_topic(&topic_name, n_partitions, 1, 5_000)
@@ -232,11 +244,14 @@ async fn test_consume_empty() {
232244
async fn test_consume_offset_out_of_range() {
233245
maybe_start_logging();
234246

235-
let connection = maybe_skip_kafka_integration!();
247+
let test_cfg = maybe_skip_kafka_integration!();
236248
let topic_name = random_topic_name();
237249
let n_partitions = 2;
238250

239-
let client = ClientBuilder::new(connection).build().await.unwrap();
251+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
252+
.build()
253+
.await
254+
.unwrap();
240255
let controller_client = client.controller_client().unwrap();
241256
controller_client
242257
.create_topic(&topic_name, n_partitions, 1, 5_000)
@@ -268,11 +283,11 @@ async fn test_consume_offset_out_of_range() {
268283
async fn test_get_offset() {
269284
maybe_start_logging();
270285

271-
let connection = maybe_skip_kafka_integration!();
286+
let test_cfg = maybe_skip_kafka_integration!();
272287
let topic_name = random_topic_name();
273288
let n_partitions = 1;
274289

275-
let client = ClientBuilder::new(connection.clone())
290+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers.clone())
276291
.build()
277292
.await
278293
.unwrap();
@@ -336,10 +351,13 @@ async fn test_get_offset() {
336351
async fn test_produce_consume_size_cutoff() {
337352
maybe_start_logging();
338353

339-
let connection = maybe_skip_kafka_integration!();
354+
let test_cfg = maybe_skip_kafka_integration!();
340355
let topic_name = random_topic_name();
341356

342-
let client = ClientBuilder::new(connection).build().await.unwrap();
357+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
358+
.build()
359+
.await
360+
.unwrap();
343361
let controller_client = client.controller_client().unwrap();
344362
controller_client
345363
.create_topic(&topic_name, 1, 1, 5_000)
@@ -409,10 +427,13 @@ async fn test_produce_consume_size_cutoff() {
409427
async fn test_consume_midbatch() {
410428
maybe_start_logging();
411429

412-
let connection = maybe_skip_kafka_integration!();
430+
let test_cfg = maybe_skip_kafka_integration!();
413431
let topic_name = random_topic_name();
414432

415-
let client = ClientBuilder::new(connection).build().await.unwrap();
433+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
434+
.build()
435+
.await
436+
.unwrap();
416437
let controller_client = client.controller_client().unwrap();
417438
controller_client
418439
.create_topic(&topic_name, 1, 1, 5_000)
@@ -454,10 +475,13 @@ async fn test_consume_midbatch() {
454475
async fn test_delete_records() {
455476
maybe_start_logging();
456477

457-
let connection = maybe_skip_kafka_integration!();
478+
let test_cfg = maybe_skip_kafka_integration!(delete);
458479
let topic_name = random_topic_name();
459480

460-
let client = ClientBuilder::new(connection).build().await.unwrap();
481+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
482+
.build()
483+
.await
484+
.unwrap();
461485
let controller_client = client.controller_client().unwrap();
462486
controller_client
463487
.create_topic(&topic_name, 1, 1, 5_000)
@@ -498,7 +522,10 @@ async fn test_delete_records() {
498522
let offset_4 = offsets[0];
499523

500524
// delete from the middle of the 2nd batch
501-
maybe_skip_delete!(partition_client, offset_3);
525+
partition_client
526+
.delete_records(offset_3, 1_000)
527+
.await
528+
.unwrap();
502529

503530
// fetching data before the record fails
504531
let err = partition_client

tests/consumer.rs

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ mod test_helpers;
2222
async fn test_stream_consumer_start_at_0() {
2323
maybe_start_logging();
2424

25-
let connection = maybe_skip_kafka_integration!();
26-
let client = ClientBuilder::new(connection).build().await.unwrap();
25+
let test_cfg = maybe_skip_kafka_integration!();
26+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
27+
.build()
28+
.await
29+
.unwrap();
2730
let controller_client = client.controller_client().unwrap();
2831

2932
let topic = random_topic_name();
@@ -72,8 +75,11 @@ async fn test_stream_consumer_start_at_0() {
7275
async fn test_stream_consumer_start_at_1() {
7376
maybe_start_logging();
7477

75-
let connection = maybe_skip_kafka_integration!();
76-
let client = ClientBuilder::new(connection).build().await.unwrap();
78+
let test_cfg = maybe_skip_kafka_integration!();
79+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
80+
.build()
81+
.await
82+
.unwrap();
7783
let controller_client = client.controller_client().unwrap();
7884

7985
let topic = random_topic_name();
@@ -111,8 +117,11 @@ async fn test_stream_consumer_start_at_1() {
111117
async fn test_stream_consumer_offset_out_of_range() {
112118
maybe_start_logging();
113119

114-
let connection = maybe_skip_kafka_integration!();
115-
let client = ClientBuilder::new(connection).build().await.unwrap();
120+
let test_cfg = maybe_skip_kafka_integration!();
121+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
122+
.build()
123+
.await
124+
.unwrap();
116125
let controller_client = client.controller_client().unwrap();
117126

118127
let topic = random_topic_name();
@@ -142,8 +151,11 @@ async fn test_stream_consumer_offset_out_of_range() {
142151
async fn test_stream_consumer_start_at_earliest() {
143152
maybe_start_logging();
144153

145-
let connection = maybe_skip_kafka_integration!();
146-
let client = ClientBuilder::new(connection).build().await.unwrap();
154+
let test_cfg = maybe_skip_kafka_integration!();
155+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
156+
.build()
157+
.await
158+
.unwrap();
147159
let controller_client = client.controller_client().unwrap();
148160

149161
let topic = random_topic_name();
@@ -192,8 +204,11 @@ async fn test_stream_consumer_start_at_earliest() {
192204
async fn test_stream_consumer_start_at_earliest_empty() {
193205
maybe_start_logging();
194206

195-
let connection = maybe_skip_kafka_integration!();
196-
let client = ClientBuilder::new(connection).build().await.unwrap();
207+
let test_cfg = maybe_skip_kafka_integration!();
208+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
209+
.build()
210+
.await
211+
.unwrap();
197212
let controller_client = client.controller_client().unwrap();
198213

199214
let topic = random_topic_name();
@@ -232,8 +247,16 @@ async fn test_stream_consumer_start_at_earliest_empty() {
232247
async fn test_stream_consumer_start_at_earliest_after_deletion() {
233248
maybe_start_logging();
234249

235-
let connection = maybe_skip_kafka_integration!();
236-
let client = ClientBuilder::new(connection).build().await.unwrap();
250+
let test_cfg = maybe_skip_kafka_integration!(delete);
251+
if !test_cfg.broker_impl.supports_deletes() {
252+
println!("Skipping due to missing delete support");
253+
return;
254+
}
255+
256+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
257+
.build()
258+
.await
259+
.unwrap();
237260
let controller_client = client.controller_client().unwrap();
238261

239262
let topic = random_topic_name();
@@ -254,7 +277,7 @@ async fn test_stream_consumer_start_at_earliest_after_deletion() {
254277
.await
255278
.unwrap();
256279

257-
maybe_skip_delete!(partition_client, 1);
280+
partition_client.delete_records(1, 1_000).await.unwrap();
258281

259282
let mut stream =
260283
StreamConsumerBuilder::new(Arc::clone(&partition_client), StartOffset::Earliest)
@@ -274,8 +297,11 @@ async fn test_stream_consumer_start_at_earliest_after_deletion() {
274297
async fn test_stream_consumer_start_at_latest() {
275298
maybe_start_logging();
276299

277-
let connection = maybe_skip_kafka_integration!();
278-
let client = ClientBuilder::new(connection).build().await.unwrap();
300+
let test_cfg = maybe_skip_kafka_integration!();
301+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
302+
.build()
303+
.await
304+
.unwrap();
279305
let controller_client = client.controller_client().unwrap();
280306

281307
let topic = random_topic_name();
@@ -318,8 +344,11 @@ async fn test_stream_consumer_start_at_latest() {
318344
async fn test_stream_consumer_start_at_latest_empty() {
319345
maybe_start_logging();
320346

321-
let connection = maybe_skip_kafka_integration!();
322-
let client = ClientBuilder::new(connection).build().await.unwrap();
347+
let test_cfg = maybe_skip_kafka_integration!();
348+
let client = ClientBuilder::new(test_cfg.bootstrap_brokers)
349+
.build()
350+
.await
351+
.unwrap();
323352
let controller_client = client.controller_client().unwrap();
324353

325354
let topic = random_topic_name();

0 commit comments

Comments
 (0)