From 733b665e95dc24f8e75958d2fb63c592f069662c Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 14 Mar 2026 18:48:53 -0400 Subject: [PATCH 01/11] Fix #37736: Allow composite transforms to use implicit input chaining When a composite transform has no explicit inputs/outputs on its sub-transforms, automatically chain them similar to how 'chain' type transforms work. Added test_composite_implicit_input_chaining to verify the fix. --- .../python/apache_beam/yaml/yaml_transform.py | 23 +++++++++++++++++++ .../apache_beam/yaml/yaml_transform_test.py | 20 ++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index ef065d8a3c42..e6b9e94522af 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -796,6 +796,29 @@ def to_row(element): def expand_composite_transform(spec, scope): spec = normalize_inputs_outputs(normalize_source_sink(spec)) + original_transforms = spec['transforms'] + has_explicit_io = any( + io in t for t in original_transforms for io in ('input', 'output')) + + if not has_explicit_io: + new_transforms = [] + for ix, transform in enumerate(original_transforms): + transform = dict(transform) + if ix == 0: + composite_input = spec.get('input', {}) + if is_explicitly_empty(composite_input): + transform['input'] = composite_input + elif not is_empty(composite_input): + transform['input'] = {key: key for key in composite_input.keys()} + else: + transform['input'] = new_transforms[-1]['__uuid__'] + new_transforms.append(transform) + + if new_transforms: + spec = dict(spec, transforms=new_transforms) + if 'output' not in spec: + spec['output'] = {'__implicit_outputs__': new_transforms[-1]['__uuid__']} + inner_scope = Scope( scope.root, { diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 2afb5e7d8e33..a4da97f7f50e 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -122,6 +122,26 @@ def test_composite(self): providers=TEST_PROVIDERS) assert_that(result, equal_to([1, 4, 9, 1, 8, 27])) + def test_composite_implicit_input_chaining(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + elements = p | beam.Create([1, 2, 3]) + result = elements | YamlTransform( + ''' + type: composite + transforms: + - type: PyMap + name: Square + config: + fn: "lambda x: x * x" + - type: PyMap + name: Increment + config: + fn: "lambda x: x + 1" + ''', + providers=TEST_PROVIDERS) + assert_that(result, equal_to([2, 5, 10])) + def test_chain_with_input(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: From ce2103f7724a459c167d0dbcfe830fd9804b47c9 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 15 Mar 2026 14:16:44 -0400 Subject: [PATCH 02/11] Fix line-too-long lint error in yaml_transform.py --- sdks/python/apache_beam/yaml/yaml_transform.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index e6b9e94522af..558df499b2bb 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -817,7 +817,9 @@ def expand_composite_transform(spec, scope): if new_transforms: spec = dict(spec, transforms=new_transforms) if 'output' not in spec: - spec['output'] = {'__implicit_outputs__': new_transforms[-1]['__uuid__']} + spec['output'] = { + '__implicit_outputs__': new_transforms[-1]['__uuid__'] + } inner_scope = Scope( scope.root, From 69b0d560be29102c599920a0a654367d36c2520c Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 15 Mar 2026 17:22:13 -0400 Subject: [PATCH 03/11] Fix yapf formatting in yaml_transform.py --- sdks/python/apache_beam/yaml/yaml_transform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 558df499b2bb..c0c1b71f8dbf 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -818,8 +818,8 @@ def expand_composite_transform(spec, scope): spec = dict(spec, transforms=new_transforms) if 'output' not in spec: spec['output'] = { - '__implicit_outputs__': new_transforms[-1]['__uuid__'] - } + '__implicit_outputs__': new_transforms[-1]['__uuid__'] + } inner_scope = Scope( scope.root, From 1d74b2a09b306ad1f9e278ca4a5bc26e19fc54cb Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 15 Mar 2026 17:28:13 -0400 Subject: [PATCH 04/11] Fix composite implicit input chaining logic --- sdks/python/apache_beam/yaml/yaml_transform.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index c0c1b71f8dbf..f7ad121a45a6 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -808,7 +808,10 @@ def expand_composite_transform(spec, scope): composite_input = spec.get('input', {}) if is_explicitly_empty(composite_input): transform['input'] = composite_input - elif not is_empty(composite_input): + elif is_empty(composite_input): + # No explicit input - will use the pipeline input + pass + else: transform['input'] = {key: key for key in composite_input.keys()} else: transform['input'] = new_transforms[-1]['__uuid__'] From 7b6585cfd1566d0b77131d1a7566217b956620c1 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 15 Mar 2026 20:01:20 -0400 Subject: [PATCH 05/11] Fix composite implicit input chaining - delete empty input from spec --- sdks/python/apache_beam/yaml/yaml_transform.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index f7ad121a45a6..40b4dcbedd91 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -809,8 +809,10 @@ def expand_composite_transform(spec, scope): if is_explicitly_empty(composite_input): transform['input'] = composite_input elif is_empty(composite_input): - # No explicit input - will use the pipeline input - pass + # No explicit input - will use the pipeline input. + # Remove from spec so Scope doesn't try to use it. + if 'input' in spec: + del spec['input'] else: transform['input'] = {key: key for key in composite_input.keys()} else: From 3abf34b0fec59d25eef7a8ab3d4dfd63695e34e9 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 16 Mar 2026 19:05:29 -0400 Subject: [PATCH 06/11] Fix composite implicit input - reference pipeline input directly --- sdks/python/apache_beam/yaml/yaml_transform.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 40b4dcbedd91..7b29a1a58441 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -809,10 +809,9 @@ def expand_composite_transform(spec, scope): if is_explicitly_empty(composite_input): transform['input'] = composite_input elif is_empty(composite_input): - # No explicit input - will use the pipeline input. - # Remove from spec so Scope doesn't try to use it. - if 'input' in spec: - del spec['input'] + # No explicit input - the composite input IS the pipeline input. + # Reference the 'input' key from the Scope's inputs. + transform['input'] = 'input' else: transform['input'] = {key: key for key in composite_input.keys()} else: From ceb0ac179750948bf96038e5661cb78d1f28989d Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 16 Mar 2026 19:50:52 -0400 Subject: [PATCH 07/11] Fix composite implicit input - reference pipeline input --- sdks/python/apache_beam/yaml/yaml_transform.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 7b29a1a58441..f06134a443fd 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -798,7 +798,7 @@ def expand_composite_transform(spec, scope): original_transforms = spec['transforms'] has_explicit_io = any( - io in t for t in original_transforms for io in ('input', 'output')) + not is_empty(t.get(io, {})) for t in original_transforms for io in ('input', 'output')) if not has_explicit_io: new_transforms = [] @@ -811,7 +811,7 @@ def expand_composite_transform(spec, scope): elif is_empty(composite_input): # No explicit input - the composite input IS the pipeline input. # Reference the 'input' key from the Scope's inputs. - transform['input'] = 'input' + transform['input'] = {'input': 'input'} else: transform['input'] = {key: key for key in composite_input.keys()} else: @@ -820,10 +820,13 @@ def expand_composite_transform(spec, scope): if new_transforms: spec = dict(spec, transforms=new_transforms) - if 'output' not in spec: + if is_empty(spec.get('output', {})): spec['output'] = { '__implicit_outputs__': new_transforms[-1]['__uuid__'] } + # Set a name so the expand path returns a PCollection, not a dict + if 'name' not in spec: + spec['name'] = 'Composite' inner_scope = Scope( scope.root, From c416e7cb12fb28aecfb97a4caa9937109f427d8c Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 16 Mar 2026 22:30:50 -0400 Subject: [PATCH 08/11] Revert "Fix composite implicit input - reference pipeline input" This reverts commit ceb0ac179750948bf96038e5661cb78d1f28989d. --- sdks/python/apache_beam/yaml/yaml_transform.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index f06134a443fd..7b29a1a58441 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -798,7 +798,7 @@ def expand_composite_transform(spec, scope): original_transforms = spec['transforms'] has_explicit_io = any( - not is_empty(t.get(io, {})) for t in original_transforms for io in ('input', 'output')) + io in t for t in original_transforms for io in ('input', 'output')) if not has_explicit_io: new_transforms = [] @@ -811,7 +811,7 @@ def expand_composite_transform(spec, scope): elif is_empty(composite_input): # No explicit input - the composite input IS the pipeline input. # Reference the 'input' key from the Scope's inputs. - transform['input'] = {'input': 'input'} + transform['input'] = 'input' else: transform['input'] = {key: key for key in composite_input.keys()} else: @@ -820,13 +820,10 @@ def expand_composite_transform(spec, scope): if new_transforms: spec = dict(spec, transforms=new_transforms) - if is_empty(spec.get('output', {})): + if 'output' not in spec: spec['output'] = { '__implicit_outputs__': new_transforms[-1]['__uuid__'] } - # Set a name so the expand path returns a PCollection, not a dict - if 'name' not in spec: - spec['name'] = 'Composite' inner_scope = Scope( scope.root, From 1f058248852fe9104ffa391f307af3890d57fd36 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 16 Mar 2026 22:57:29 -0400 Subject: [PATCH 09/11] Fix composite implicit input chaining from pipeline input This fix addresses the issue where composite transforms with no explicit input specification were failing to receive inputs from the pipeline. Key changes: 1. Fixed has_explicit_io check to use is_empty() instead of just checking key presence - this properly treats {} as 'no explicit input' 2. Added composite_has_input check to only do implicit chaining when the composite has an input to chain from 3. Fixed inner_scope_inputs computation to use parent scope's inputs when the composite has no explicit input 4. Fixed output handling to use is_empty() check (normalization sets {}) 5. Fixed final return to correctly resolve scope inputs vs transform outputs --- .../python/apache_beam/yaml/yaml_transform.py | 70 +++++++++++++++---- 1 file changed, 56 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 7b29a1a58441..968d70751fc6 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -797,10 +797,21 @@ def expand_composite_transform(spec, scope): spec = normalize_inputs_outputs(normalize_source_sink(spec)) original_transforms = spec['transforms'] + # Check if any transform has a NON-EMPTY explicit input or output. + # Note: {} (empty dict) means "no explicit input specified" and should + # NOT count as having explicit io. + # However, if the COMPOSITE itself has no input, we can't do implicit chaining. has_explicit_io = any( - io in t for t in original_transforms for io in ('input', 'output')) - - if not has_explicit_io: + io is not None and not is_empty(t.get(io, {})) for + t in original_transforms for io in ('input', 'output')) + + # If the composite has no input, we can't do implicit chaining + composite_has_input = not is_empty(spec.get('input', {})) + + # Only do implicit chaining if: + # 1. No transform has explicit io, AND + # 2. The composite has an input to chain from + if not has_explicit_io and composite_has_input: new_transforms = [] for ix, transform in enumerate(original_transforms): transform = dict(transform) @@ -820,17 +831,38 @@ def expand_composite_transform(spec, scope): if new_transforms: spec = dict(spec, transforms=new_transforms) - if 'output' not in spec: + # Check if output is empty, not just present (normalization sets it to {}) + if is_empty(spec.get('output', {})): spec['output'] = { '__implicit_outputs__': new_transforms[-1]['__uuid__'] } + # Compute the inputs for the inner scope. + # If the composite has an empty input dict ({}), it means the composite + # should use the parent scope's inputs directly. + composite_input = spec.get('input', {}) + + if is_empty(composite_input): + # No explicit input - use the parent scope's inputs directly + inner_scope_inputs = dict(scope._inputs) + else: + # The composite has explicit input references + # They can reference either: + # 1. A parent scope input (e.g., 'input' -> the 'input' key in scope._inputs) + # 2. A transform output (e.g., 'uuid' -> the output of a transform) + inner_scope_inputs = {} + for key, value in composite_input.items(): + if isinstance(value, str) and value in scope._inputs: + # Reference to a parent scope input + inner_scope_inputs[key] = scope._inputs[value] + else: + # Reference to a transform output + inner_scope_inputs[key] = scope.get_pcollection(value) + + inner_scope = Scope( scope.root, - { - key: scope.get_pcollection(value) - for (key, value) in empty_if_explicitly_empty(spec['input']).items() - }, + inner_scope_inputs, spec['transforms'], # TODO(robertwb): Are scoped providers ever used? Worth supporting? yaml_provider.merge_providers( @@ -843,7 +875,8 @@ class CompositePTransform(beam.PTransform): def expand(inputs): inner_scope.compute_all() if '__implicit_outputs__' in spec['output']: - return inner_scope.get_outputs(spec['output']['__implicit_outputs__']) + result = inner_scope.get_outputs(spec['output']['__implicit_outputs__']) + return result else: return { key: inner_scope.get_pcollection(value) @@ -855,16 +888,25 @@ def expand(inputs): transform = transform.with_resource_hints( **SafeLineLoader.strip_metadata(spec['resource_hints'])) + # Always set a name for the composite to ensure proper return value if 'name' not in spec: spec['name'] = 'Composite' if spec['name'] is None: # top-level pipeline, don't nest - return transform.expand(None) + result = transform.expand(None) + return result else: _LOGGER.info("Expanding %s ", identify_object(spec)) - return ({ - key: scope.get_pcollection(value) - for (key, value) in empty_if_explicitly_empty(spec['input']).items() - } or scope.root) | scope.unique_name(spec, None) >> transform + # When the input references a scope input (not a transform output), + # we need to use the scope's inputs directly + input_dict = {} + for key, value in empty_if_explicitly_empty(spec['input']).items(): + if isinstance(value, str) and value in scope._inputs: + # Reference to a scope input + input_dict[key] = scope._inputs[value] + else: + # Reference to a transform output + input_dict[key] = scope.get_pcollection(value) + return (input_dict or scope.root) | scope.unique_name(spec, None) >> transform def expand_chain_transform(spec, scope): From 1ce56b9a7c7a28df6fea899629c51a8f0f22e644 Mon Sep 17 00:00:00 2001 From: liferoad Date: Tue, 17 Mar 2026 18:57:42 -0400 Subject: [PATCH 10/11] Apply yapf formatting and fix pylint line length issues --- sdks/python/apache_beam/yaml/yaml_transform.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 968d70751fc6..4776124ad358 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -800,11 +800,11 @@ def expand_composite_transform(spec, scope): # Check if any transform has a NON-EMPTY explicit input or output. # Note: {} (empty dict) means "no explicit input specified" and should # NOT count as having explicit io. - # However, if the COMPOSITE itself has no input, we can't do implicit chaining. + # However, if the composite has no input, we can't do implicit chaining. has_explicit_io = any( - io is not None and not is_empty(t.get(io, {})) for - t in original_transforms for io in ('input', 'output')) - + io is not None and not is_empty(t.get(io, {})) + for t in original_transforms for io in ('input', 'output')) + # If the composite has no input, we can't do implicit chaining composite_has_input = not is_empty(spec.get('input', {})) @@ -841,14 +841,14 @@ def expand_composite_transform(spec, scope): # If the composite has an empty input dict ({}), it means the composite # should use the parent scope's inputs directly. composite_input = spec.get('input', {}) - + if is_empty(composite_input): # No explicit input - use the parent scope's inputs directly inner_scope_inputs = dict(scope._inputs) else: # The composite has explicit input references # They can reference either: - # 1. A parent scope input (e.g., 'input' -> the 'input' key in scope._inputs) + # 1. A parent scope input (e.g., 'input' key in scope._inputs) # 2. A transform output (e.g., 'uuid' -> the output of a transform) inner_scope_inputs = {} for key, value in composite_input.items(): @@ -858,7 +858,6 @@ def expand_composite_transform(spec, scope): else: # Reference to a transform output inner_scope_inputs[key] = scope.get_pcollection(value) - inner_scope = Scope( scope.root, @@ -906,7 +905,8 @@ def expand(inputs): else: # Reference to a transform output input_dict[key] = scope.get_pcollection(value) - return (input_dict or scope.root) | scope.unique_name(spec, None) >> transform + return (input_dict or + scope.root) | scope.unique_name(spec, None) >> transform def expand_chain_transform(spec, scope): From 5a3350ca194927cb0daa4151ef529a7e58bd5c92 Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 19 Mar 2026 19:14:18 -0400 Subject: [PATCH 11/11] Address review: revert unnecessary variable assignment --- sdks/python/apache_beam/yaml/yaml_transform.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 4776124ad358..2b745babad02 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -891,8 +891,7 @@ def expand(inputs): if 'name' not in spec: spec['name'] = 'Composite' if spec['name'] is None: # top-level pipeline, don't nest - result = transform.expand(None) - return result + return transform.expand(None) else: _LOGGER.info("Expanding %s ", identify_object(spec)) # When the input references a scope input (not a transform output),