-
Notifications
You must be signed in to change notification settings - Fork 8
Draft: Add skip decorator; A few clean ups #306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/aip
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
# Skip decorator is a workaround solution to implement conditional branching in metaflow. | ||
# When condition variable is_skipping is evaluated to True, | ||
# it will skip current step and execute the supplied next step. | ||
|
||
from functools import wraps | ||
from metaflow.decorators import StepDecorator | ||
|
||
|
||
class SkipDecorator(StepDecorator): | ||
""" | ||
The @skip decorator is a workaround for conditional branching. The @skip decorator checks an artifact | ||
and if it is false, skips the evaluation of the step function and jumps to the supplied next step. | ||
**The `start` and `end` steps are always expected and should not be skipped.** | ||
Usage: | ||
class SkipFlow(FlowSpec): | ||
condition = Parameter("condition", default=False) | ||
@step | ||
def start(self): | ||
print("Should skip:", self.condition) | ||
self.next(self.middle) | ||
@skip(check='condition', next='end') | ||
@step | ||
def middle(self): | ||
print("Running the middle step - not skipping") | ||
self.next(self.end) | ||
@step | ||
def end(self): | ||
pass | ||
""" | ||
|
||
name = "skip" | ||
|
||
def __init__(self, check="", next=""): | ||
super().__init__() | ||
self.check = check | ||
self.next = next | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The feature ask is to skip a step. There are valid deep reasons to not allow conditional branching; the mainly that upsteam Metaflow OSS has not implemented this feature because conditional branching introduces for complex DAG reasoning and branching that is difficult to test and reason over for the applied scientist. For example, this next conditional could goto any step. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the version that RMX is using right now: https://gitlab.zgtools.net/analytics/artificial-intelligence/rmx/workflows/rmx-delg-eventing/-/merge_requests/43/diffs It's also the same version provided by Ville in https://zillowgroup.slack.com/archives/C02116BBNTU/p1660145168960519?thread_ts=1660134373.601939&cid=C02116BBNTU |
||
|
||
def __call__(self, f): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How and when is |
||
@wraps(f) | ||
def func(step): | ||
if getattr(step, self.check): | ||
step.next(getattr(step, self.next)) | ||
else: | ||
return f(step) | ||
|
||
return func | ||
cloudw marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from metaflow import Parameter, FlowSpec, step, skip | ||
|
||
|
||
class SkipFlow(FlowSpec): | ||
|
||
condition_true = Parameter("condition-true", default=True) | ||
|
||
@step | ||
def start(self): | ||
print("Should skip:", self.condition) | ||
self.desired_step_executed = False | ||
self.condition_false = False | ||
self.next(self.skipped_step) | ||
|
||
@skip(check="condition_true", next="desired_step") | ||
@step | ||
def skipped_step(self): | ||
raise Exception( | ||
"Unexpectedly ran the skipped_step step. This step should have been skipped." | ||
) | ||
self.next(self.unreachable_step) | ||
|
||
@step | ||
def unreachable_step(self): | ||
raise Exception( | ||
"Unexpectedly ran the unreachable step. This step should have been skipped." | ||
) | ||
self.next(self.desired_step) | ||
|
||
@skip(check="condition_false", next="end") | ||
@step | ||
def desired_step(self): | ||
self.desired_step_executed = True | ||
self.next(self.end) | ||
|
||
@step | ||
def end(self): | ||
assert self.desired_step_executed, "Desired step was not executed" | ||
|
||
|
||
if __name__ == "__main__": | ||
SkipFlow() |
Uh oh!
There was an error while loading. Please reload this page.