6

I want to loop over tasks, again and again, until reaching a certain condition before continuing the rest of the workflow.

What I have so far is this:

# Loop task
class MyLoop(Task):
    def run(self):
        loop_res = prefect.context.get("task_loop_result", 1)
        print (loop_res)
        if loop_res >= 10:
            return loop_res
        raise LOOP(result=loop_res+1)

But as far as I understand this does not work for multiple tasks. Is there a way to come back further and loop on several tasks at a time ?

2 Answers 2

5

The solution is simply to create a single task that itself creates a new flow with one or more parameters and calls flow.run(). For example:

class MultipleTaskLoop(Task):
    def run(self):
        # Get previous value
        loop_res = prefect.context.get("task_loop_result", 1)
        
        # Create subflow
        with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
            x = Parameter('x', default = 1)
            loop1 = print_loop()
            add = add_value(x)
            loop2 = print_loop()
            loop1.set_downstream(add)
            add.set_downstream(loop2)

        # Run subflow and extract result
        subflow_res = flow.run(parameters={'x': loop_res})
        new_res = subflow_res.result[add]._result.value

        # Loop
        if new_res >= 10:
            return new_res
        raise LOOP(result=new_res)

where print_loop simply prints "loop" in the output and add_value adds one to the value it receives.

2
  • That's clever. I've never thought of doing that. I'm pretty sure if I tried that someone would reject my MR, haha, but it's a smart way to do what the OP wants. Commented Jul 23, 2021 at 13:45
  • 1
    Well it was recommended to me by one of the Prefect devs on their Slack, so even if it might not be the most elegant way I guess it is tolerated :)
    – Gaëtan
    Commented Jul 26, 2021 at 16:17
2

Unless I'm missing something, the answer is no.

Prefect flows are DAGs, and what you are describing (looping over multiple tasks in order again and again until some condition is met) would make a cycle, so you can't do it.

This may or may not be helpful, but you could try and make all of the tasks you want to loop into one task, and loop within that task until your exit condition has been met.

Not the answer you're looking for? Browse other questions tagged or ask your own question.