Skip to content

Commit 6141de1

Browse files
committed
connectors
1 parent 2dd2655 commit 6141de1

File tree

18 files changed

+2055
-1091
lines changed

18 files changed

+2055
-1091
lines changed

core/harness-derive/src/codegen.rs

Lines changed: 147 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
use crate::attrs::{IggyTestAttrs, TlsMode, Transport};
2020
use crate::params::{
21-
DetectedParam, analyze_signature, matrix_params, needs_client, needs_harness,
22-
needs_harness_mut, needs_mcp_client,
21+
DetectedParam, analyze_signature, fixture_params, matrix_params, needs_client, needs_fixtures,
22+
needs_harness, needs_harness_mut, needs_mcp_client,
2323
};
2424
use proc_macro2::{Span, TokenStream};
2525
use quote::{format_ident, quote};
@@ -174,20 +174,27 @@ fn generate_single_test(
174174
has_mcp_client: bool,
175175
attrs: &IggyTestAttrs,
176176
) -> syn::Result<TokenStream> {
177-
let harness_setup = generate_harness_setup(variant, has_client, has_mcp_client, attrs);
177+
let has_fixtures = needs_fixtures(params);
178+
let fixture_setup = generate_fixture_setup(params);
179+
let fixture_envs = generate_fixture_envs_collection(params);
180+
let harness_setup =
181+
generate_harness_setup(variant, has_client, has_mcp_client, has_fixtures, attrs);
178182
let client_setup = if has_client {
179183
generate_client_setup()
180184
} else {
181185
quote!()
182186
};
183-
let start_and_seed = generate_start_and_seed(attrs);
187+
let fixture_seed = generate_fixture_seed(params);
188+
let start_and_seed = generate_start_and_seed(attrs, fixture_seed);
184189
let mcp_client_setup = generate_mcp_client_setup(has_mcp_client);
185190
let param_bindings = generate_param_bindings(params, has_harness, has_harness_mut);
186191

187192
Ok(quote! {
188193
#[::tokio::test]
189194
#[::serial_test::parallel]
190195
#fn_vis async fn #fn_name() {
196+
#fixture_setup
197+
#fixture_envs
191198
#harness_setup
192199
#start_and_seed
193200
#client_setup
@@ -212,24 +219,32 @@ fn generate_test_module(
212219
has_mcp_client: bool,
213220
attrs: &IggyTestAttrs,
214221
) -> syn::Result<TokenStream> {
222+
let has_fixtures = needs_fixtures(params);
223+
let fixture_setup = generate_fixture_setup(params);
224+
let fixture_envs = generate_fixture_envs_collection(params);
225+
let fixture_seed = generate_fixture_seed(params);
226+
let param_bindings = generate_param_bindings(params, has_harness, has_harness_mut);
227+
215228
let mut test_fns = Vec::new();
216229

217230
for variant in variants {
218231
let test_name = format_ident!("{}", variant.suffix());
219-
let harness_setup = generate_harness_setup(variant, has_client, has_mcp_client, attrs);
232+
let harness_setup =
233+
generate_harness_setup(variant, has_client, has_mcp_client, has_fixtures, attrs);
220234
let client_setup = if has_client {
221235
generate_client_setup()
222236
} else {
223237
quote!()
224238
};
225-
let start_and_seed = generate_start_and_seed(attrs);
239+
let start_and_seed = generate_start_and_seed(attrs, fixture_seed.clone());
226240
let mcp_client_setup = generate_mcp_client_setup(has_mcp_client);
227-
let param_bindings = generate_param_bindings(params, has_harness, has_harness_mut);
228241

229242
test_fns.push(quote! {
230243
#[::tokio::test]
231244
#[::serial_test::parallel]
232245
async fn #test_name() {
246+
#fixture_setup
247+
#fixture_envs
233248
#harness_setup
234249
#start_and_seed
235250
#client_setup
@@ -286,13 +301,18 @@ fn generate_impl_functions_for_test_matrix(
286301
})
287302
.collect();
288303

304+
let has_fixtures = needs_fixtures(params);
305+
let fixture_setup = generate_fixture_setup(params);
306+
let fixture_envs = generate_fixture_envs_collection(params);
307+
let fixture_seed = generate_fixture_seed(params);
289308
let param_bindings = generate_param_bindings(params, has_harness, has_harness_mut);
290-
let start_and_seed = generate_start_and_seed(attrs);
309+
let start_and_seed = generate_start_and_seed(attrs, fixture_seed.clone());
291310
let mcp_client_setup = generate_mcp_client_setup(has_mcp_client);
292311

293312
if variants.len() == 1 {
294313
let variant = &variants[0];
295-
let harness_setup = generate_harness_setup(variant, has_client, has_mcp_client, attrs);
314+
let harness_setup =
315+
generate_harness_setup(variant, has_client, has_mcp_client, has_fixtures, attrs);
296316
let client_setup = if has_client {
297317
generate_client_setup()
298318
} else {
@@ -304,6 +324,8 @@ fn generate_impl_functions_for_test_matrix(
304324
#[::tokio::test]
305325
#[::serial_test::parallel]
306326
#fn_vis async fn #fn_name(#(#param_names: #param_types),*) {
327+
#fixture_setup
328+
#fixture_envs
307329
#harness_setup
308330
#start_and_seed
309331
#client_setup
@@ -320,7 +342,8 @@ fn generate_impl_functions_for_test_matrix(
320342

321343
for variant in variants {
322344
let impl_name = format_ident!("__impl_{}", variant.suffix());
323-
let harness_setup = generate_harness_setup(variant, has_client, has_mcp_client, attrs);
345+
let harness_setup =
346+
generate_harness_setup(variant, has_client, has_mcp_client, has_fixtures, attrs);
324347
let client_setup = if has_client {
325348
generate_client_setup()
326349
} else {
@@ -329,6 +352,8 @@ fn generate_impl_functions_for_test_matrix(
329352

330353
impl_fns.push(quote! {
331354
async fn #impl_name(#(#param_names: #param_types),*) {
355+
#fixture_setup
356+
#fixture_envs
332357
#harness_setup
333358
#start_and_seed
334359
#client_setup
@@ -363,6 +388,7 @@ fn generate_harness_setup(
363388
variant: &TestVariant,
364389
has_client: bool,
365390
has_mcp_client: bool,
391+
has_fixtures: bool,
366392
attrs: &IggyTestAttrs,
367393
) -> TokenStream {
368394
let transport = variant.transport.variant_ident();
@@ -500,10 +526,18 @@ fn generate_harness_setup(
500526
.consumer_name
501527
.as_deref()
502528
.unwrap_or("connectors");
503-
quote!(.connector(::integration::harness::ConnectorConfig::builder()
504-
.config_path(::std::path::PathBuf::from(#config_path))
505-
.consumer_name(#consumer_name)
506-
.build()))
529+
if has_fixtures {
530+
quote!(.connector(::integration::harness::ConnectorConfig::builder()
531+
.config_path(::std::path::PathBuf::from(#config_path))
532+
.consumer_name(#consumer_name)
533+
.extra_envs(__fixture_envs.clone())
534+
.build()))
535+
} else {
536+
quote!(.connector(::integration::harness::ConnectorConfig::builder()
537+
.config_path(::std::path::PathBuf::from(#config_path))
538+
.consumer_name(#consumer_name)
539+
.build()))
540+
}
507541
} else {
508542
quote!()
509543
};
@@ -536,18 +570,35 @@ fn generate_client_setup() -> TokenStream {
536570
///
537571
/// When a seed function is present, uses `start_with_seed` to run seed
538572
/// after server but before MCP and connector (which may depend on seed data).
539-
fn generate_start_and_seed(attrs: &IggyTestAttrs) -> TokenStream {
540-
match &attrs.seed_fn {
541-
Some(seed_fn) => {
542-
// Wrap seed function in closure that takes ownership of client
543-
// to avoid lifetime issues with async functions
573+
/// Fixture seeds are combined with the global seed.
574+
fn generate_start_and_seed(attrs: &IggyTestAttrs, fixture_seed: TokenStream) -> TokenStream {
575+
let has_fixture_seed = !fixture_seed.is_empty();
576+
match (&attrs.seed_fn, has_fixture_seed) {
577+
(Some(seed_fn), true) => {
578+
quote! {
579+
__harness.start_with_seed(|__seed_client| async move {
580+
#seed_fn(&__seed_client).await?;
581+
#fixture_seed
582+
Ok(())
583+
}).await.expect("failed to start test harness");
584+
}
585+
}
586+
(Some(seed_fn), false) => {
544587
quote! {
545588
__harness.start_with_seed(|__seed_client| async move {
546589
#seed_fn(&__seed_client).await
547590
}).await.expect("failed to start test harness");
548591
}
549592
}
550-
None => {
593+
(None, true) => {
594+
quote! {
595+
__harness.start_with_seed(|__seed_client| async move {
596+
#fixture_seed
597+
Ok(())
598+
}).await.expect("failed to start test harness");
599+
}
600+
}
601+
(None, false) => {
551602
quote! {
552603
__harness.start().await.expect("failed to start test harness");
553604
}
@@ -566,6 +617,76 @@ fn generate_mcp_client_setup(has_mcp_client: bool) -> TokenStream {
566617
}
567618
}
568619

620+
/// Generate fixture setup calls (before harness setup).
621+
fn generate_fixture_setup(params: &[DetectedParam]) -> TokenStream {
622+
let fixtures = fixture_params(params);
623+
if fixtures.is_empty() {
624+
return quote!();
625+
}
626+
627+
let setup_calls: Vec<_> = fixtures
628+
.iter()
629+
.filter_map(|p| {
630+
if let DetectedParam::Fixture { name, ty } = p {
631+
let var_name = format_ident!("__fixture_{}", name);
632+
Some(quote! {
633+
let #var_name = <#ty as ::integration::harness::TestFixture>::setup()
634+
.await
635+
.expect("failed to setup fixture");
636+
})
637+
} else {
638+
None
639+
}
640+
})
641+
.collect();
642+
643+
quote!(#(#setup_calls)*)
644+
}
645+
646+
/// Generate fixture envs collection (after fixture setup, before harness).
647+
fn generate_fixture_envs_collection(params: &[DetectedParam]) -> TokenStream {
648+
let fixtures = fixture_params(params);
649+
if fixtures.is_empty() {
650+
return quote!();
651+
}
652+
653+
let env_calls: Vec<_> = fixtures
654+
.iter()
655+
.filter_map(|p| {
656+
if let DetectedParam::Fixture { name, .. } = p {
657+
let var_name = format_ident!("__fixture_{}", name);
658+
Some(quote! {
659+
__fixture_envs.extend(
660+
::integration::harness::TestFixture::connector_envs(&#var_name)
661+
);
662+
})
663+
} else {
664+
None
665+
}
666+
})
667+
.collect();
668+
669+
quote! {
670+
let mut __fixture_envs = ::std::collections::HashMap::<String, String>::new();
671+
#(#env_calls)*
672+
}
673+
}
674+
675+
/// Generate fixture seed calls (inside start_with_seed closure).
676+
///
677+
/// Note: Currently disabled to avoid move semantics issues with async closures.
678+
/// Fixtures that need to seed data should do so in the test body after harness start.
679+
fn generate_fixture_seed(_params: &[DetectedParam]) -> TokenStream {
680+
// Fixture seeding is disabled for now because:
681+
// 1. The async move closure in start_with_seed captures the fixture by value
682+
// 2. This prevents using the fixture in the test body after seeding
683+
// 3. Most fixtures don't need to seed data - they just provide env vars
684+
//
685+
// If a fixture needs to seed data, it can be done manually in the test body:
686+
// fixture.seed(&client).await.unwrap();
687+
quote!()
688+
}
689+
569690
fn generate_param_bindings(
570691
params: &[DetectedParam],
571692
_has_harness: bool,
@@ -595,6 +716,12 @@ fn generate_param_bindings(
595716
let #name = __mcp_client;
596717
});
597718
}
719+
DetectedParam::Fixture { name, .. } => {
720+
let fixture_var = format_ident!("__fixture_{}", name);
721+
bindings.push(quote! {
722+
let #name = #fixture_var;
723+
});
724+
}
598725
DetectedParam::MatrixParam { .. } => {}
599726
}
600727
}

core/harness-derive/src/params.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ pub enum DetectedParam {
3535
HarnessMut { name: Ident },
3636
/// An MCP client parameter: `mcp_client: McpClient`
3737
McpClient { name: Ident },
38+
/// A test fixture parameter: `fixture: PostgresSinkFixture`
39+
Fixture { name: Ident, ty: Box<Type> },
3840
/// A parameter from test_matrix (passed through)
3941
MatrixParam { name: Ident, ty: Box<Type> },
4042
}
@@ -47,6 +49,7 @@ impl DetectedParam {
4749
| DetectedParam::HarnessRef { name }
4850
| DetectedParam::HarnessMut { name }
4951
| DetectedParam::McpClient { name }
52+
| DetectedParam::Fixture { name, .. }
5053
| DetectedParam::MatrixParam { name, .. } => name,
5154
}
5255
}
@@ -98,6 +101,13 @@ fn detect_param_type(name: Ident, ty: &Type) -> syn::Result<DetectedParam> {
98101
return Ok(DetectedParam::HarnessRef { name });
99102
}
100103

104+
if is_fixture_type(&normalized) {
105+
return Ok(DetectedParam::Fixture {
106+
name,
107+
ty: Box::new(ty.clone()),
108+
});
109+
}
110+
101111
Ok(DetectedParam::MatrixParam {
102112
name,
103113
ty: Box::new(ty.clone()),
@@ -120,6 +130,10 @@ fn is_harness_mut_type(normalized: &str) -> bool {
120130
normalized.contains("TestHarness") && normalized.contains("&mut")
121131
}
122132

133+
fn is_fixture_type(normalized: &str) -> bool {
134+
normalized.ends_with("Fixture") || normalized.ends_with("Fixture>")
135+
}
136+
123137
/// Check if any parameter requires a client.
124138
pub fn needs_client(params: &[DetectedParam]) -> bool {
125139
params
@@ -159,6 +173,21 @@ pub fn matrix_params(params: &[DetectedParam]) -> Vec<&DetectedParam> {
159173
.collect()
160174
}
161175

176+
/// Check if any parameter is a fixture.
177+
pub fn needs_fixtures(params: &[DetectedParam]) -> bool {
178+
params
179+
.iter()
180+
.any(|p| matches!(p, DetectedParam::Fixture { .. }))
181+
}
182+
183+
/// Get the fixture parameters.
184+
pub fn fixture_params(params: &[DetectedParam]) -> Vec<&DetectedParam> {
185+
params
186+
.iter()
187+
.filter(|p| matches!(p, DetectedParam::Fixture { .. }))
188+
.collect()
189+
}
190+
162191
#[cfg(test)]
163192
mod tests {
164193
use super::*;
@@ -247,4 +276,41 @@ mod tests {
247276
let params = analyze_signature(&sig).unwrap();
248277
assert!(!super::needs_mcp_client(&params));
249278
}
279+
280+
#[test]
281+
fn detect_fixture_param() {
282+
let sig = parse_sig("async fn test(fixture: PostgresSinkFixture)");
283+
let params = analyze_signature(&sig).unwrap();
284+
assert_eq!(params.len(), 1);
285+
assert!(matches!(&params[0], DetectedParam::Fixture { name, .. } if name == "fixture"));
286+
}
287+
288+
#[test]
289+
fn detect_fixture_with_generic() {
290+
let sig = parse_sig("async fn test(fixture: Box<PostgresFixture>)");
291+
let params = analyze_signature(&sig).unwrap();
292+
assert_eq!(params.len(), 1);
293+
assert!(matches!(&params[0], DetectedParam::Fixture { name, .. } if name == "fixture"));
294+
}
295+
296+
#[test]
297+
fn needs_fixtures_works() {
298+
let sig = parse_sig("async fn test(fixture: RandomSourceFixture)");
299+
let params = analyze_signature(&sig).unwrap();
300+
assert!(super::needs_fixtures(&params));
301+
302+
let sig = parse_sig("async fn test(client: &IggyClient)");
303+
let params = analyze_signature(&sig).unwrap();
304+
assert!(!super::needs_fixtures(&params));
305+
}
306+
307+
#[test]
308+
fn fixture_params_works() {
309+
let sig = parse_sig(
310+
"async fn test(f1: PostgresFixture, client: &IggyClient, f2: RandomSourceFixture)",
311+
);
312+
let params = analyze_signature(&sig).unwrap();
313+
let fixtures = super::fixture_params(&params);
314+
assert_eq!(fixtures.len(), 2);
315+
}
250316
}

0 commit comments

Comments
 (0)