diff --git a/metaflow/cli.py b/metaflow/cli.py index a4a1558304d..ecd28646632 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -471,8 +471,16 @@ def start( raise ctx.obj.delayed_config_exception # Init all values in the flow mutators and then process them - for decorator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []): - decorator.external_init() + for mutator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []): + mutator.external_init() + + # Initialize mutators with top-level options + for mutator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []): + mutator_options = { + option: deco_options.get(option.replace("-", "_"), option_info["default"]) + for option, option_info in mutator.options.items() + } + mutator.flow_init_options(mutator_options) new_cls = ctx.obj.flow._process_config_decorators(config_options) if new_cls: diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 760508497f0..dfbe39dc10e 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -270,6 +270,7 @@ def add_decorator_options(cmd): seen = {} existing_params = set(p.name.lower() for p in cmd.params) + # Add decorator options for deco in flow_decorators(flow_cls): for option, kwargs in deco.options.items(): @@ -290,6 +291,30 @@ def add_decorator_options(cmd): kwargs["envvar"] = "METAFLOW_FLOW_%s" % option.upper() seen[option] = deco.name cmd.params.insert(0, click.Option(("--" + option,), **kwargs)) + + # Add flow mutator options + for mutator in flow_mutators(flow_cls): + for option, kwargs in mutator.options.items(): + mutator_name = mutator.__class__.__name__ + if option in seen: + msg = ( + "Flow mutator '%s' uses an option '%s' which is also " + "used by '%s'. This is a bug in Metaflow. " + "Please file a ticket on GitHub." + % (mutator_name, option, seen[option]) + ) + raise MetaflowInternalError(msg) + elif mutator_name.lower() in existing_params: + raise MetaflowInternalError( + "Flow mutator '%s' uses an option '%s' which is a reserved " + "keyword. Please use a different option name." + % (mutator_name, option) + ) + else: + kwargs["envvar"] = "METAFLOW_FLOW_%s" % option.upper() + seen[option] = mutator_name + cmd.params.insert(0, click.Option(("--" + option,), **kwargs)) + return cmd @@ -297,6 +322,12 @@ def flow_decorators(flow_cls): return [d for deco_list in flow_cls._flow_decorators.values() for d in deco_list] +def flow_mutators(flow_cls): + from metaflow.flowspec import _FlowState + + return flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, []) + + class StepDecorator(Decorator): """ Base class for all step decorators. @@ -797,6 +828,7 @@ def _init_step_decorators( pre_mutate=False, statically_defined=deco.statically_defined, inserted_by=inserted_by_value, + mutator=deco, ) # Sanity check to make sure we are applying the decorator to the right # class diff --git a/metaflow/flowspec.py b/metaflow/flowspec.py index a8df867e644..b5cfa880b39 100644 --- a/metaflow/flowspec.py +++ b/metaflow/flowspec.py @@ -297,6 +297,7 @@ def _process_config_decorators(cls, config_options, process_configs=True): pre_mutate=True, statically_defined=deco.statically_defined, inserted_by=inserted_by_value, + mutator=deco, ) # Sanity check to make sure we are applying the decorator to the right # class diff --git a/metaflow/runtime.py b/metaflow/runtime.py index 3dfb01f529d..e6dd14e0271 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -2074,6 +2074,12 @@ def __init__( for deco in flow_decorators(self.task.flow): self.top_level_options.update(deco.get_top_level_options()) + # FlowMutators can also define their own top-level options similar to decorators + from metaflow.flowspec import _FlowState + + for mutator in self.task.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []): + self.top_level_options.update(mutator.get_top_level_options()) + # We also pass configuration options using the kv. syntax which will cause # the configuration options to be loaded from the CONFIG file (or local-config-file # in the case of the local runtime) diff --git a/metaflow/user_decorators/mutable_flow.py b/metaflow/user_decorators/mutable_flow.py index bb8139d3883..0407f47fdbf 100644 --- a/metaflow/user_decorators/mutable_flow.py +++ b/metaflow/user_decorators/mutable_flow.py @@ -22,11 +22,15 @@ def __init__( pre_mutate: bool = False, statically_defined: bool = False, inserted_by: Optional[str] = None, + mutator: Optional[ + "metaflow.user_decorators.user_flow_decorator.FlowMutator" + ] = None, ): self._flow_cls = flow_spec self._pre_mutate = pre_mutate self._statically_defined = statically_defined self._inserted_by = inserted_by + self._mutator = mutator if self._inserted_by is None: # This is an error because MutableSteps should only be created by # StepMutators or FlowMutators. We need to catch it now because otherwise @@ -138,6 +142,35 @@ def parameters( ) yield var, param + @property + def tl_options(self) -> Dict[str, Any]: + """ + Get the top-level CLI options for this mutator. + + Returns a dictionary of option names to values that were passed via the CLI. + This allows mutators to access their own top-level options similar to how + they can access configs and parameters. + + Example: + ``` + class MyMutator(FlowMutator): + options = { + 'my-option': {'default': 'value', 'help': 'My option'} + } + + def pre_mutate(self, mutable_flow): + # Access the option value + val = mutable_flow.tl_options.get('my-option') + print(f'Option value: {val}') + ``` + + Returns + ------- + Dict[str, Any] + Dictionary of option names to values + """ + return self._mutator._option_values if self._mutator else {} + @property def steps( self, diff --git a/metaflow/user_decorators/mutable_step.py b/metaflow/user_decorators/mutable_step.py index 7841ea92270..e9443f6262a 100644 --- a/metaflow/user_decorators/mutable_step.py +++ b/metaflow/user_decorators/mutable_step.py @@ -44,6 +44,7 @@ def __init__( pre_mutate=pre_mutate, statically_defined=statically_defined, inserted_by=inserted_by, + mutator=None, # Step mutators don't have top-level options yet ) self._flow_cls = flow_spec.__class__ self._my_step = step diff --git a/metaflow/user_decorators/user_flow_decorator.py b/metaflow/user_decorators/user_flow_decorator.py index 36f3ed53c6b..4369544ac9a 100644 --- a/metaflow/user_decorators/user_flow_decorator.py +++ b/metaflow/user_decorators/user_flow_decorator.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional, Union, TYPE_CHECKING +from typing import Dict, Optional, Union, TYPE_CHECKING, Any from metaflow.exception import MetaflowException from metaflow.user_configs.config_parameters import ( @@ -124,6 +124,11 @@ class FlowMutator(metaclass=FlowMutatorMeta): modify the steps. """ + # Top-level options that can be specified on the command line + # Format: {'option-name': {'default': value, 'help': 'help text', ...}} + # These options will be registered as CLI arguments and passed to the mutator + options = {} + def __init__(self, *args, **kwargs): from ..flowspec import FlowSpecMeta @@ -228,6 +233,24 @@ def external_init(self): if "init" in self.__class__.__dict__: self.init(*self._args, **self._kwargs) + def flow_init_options(self, options: Dict[str, Any]): + """ + Called to initialize the mutator with top-level CLI options. + + Parameters + ---------- + options : Dict[str, Any] + Dictionary of option names to values from the CLI + """ + self._option_values = options + + def get_top_level_options(self): + """ + Return a list of option-value pairs that correspond to top-level + options that should be passed to subprocesses (tasks). + """ + return list(self._option_values.items()) + def pre_mutate( self, mutable_flow: "metaflow.user_decorators.mutable_flow.MutableFlow" ) -> None: