Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task concurrency limits are not observed #16761

Open
estenwoiensp1 opened this issue Jan 17, 2025 · 6 comments
Open

Task concurrency limits are not observed #16761

estenwoiensp1 opened this issue Jan 17, 2025 · 6 comments
Labels
bug Something isn't working

Comments

@estenwoiensp1
Copy link

estenwoiensp1 commented Jan 17, 2025

Bug summary

Hello! We are having some issues with concurrencies and task run orchestrations in 3.x. We set up a task concurrency limit test with limit 2 and created a flow which submits 4 tasks with tag test. We observe some differences in the orchestration in 2.x and 3.x:

2.20.4 2 tasks are started right a way, filling up the concurrency limit. When these complete, the remaining 2 are started. Expected behaviour!

3.1.12 All 4 tasks seem to start at once, but only 2 of them are being ran by the task runner. The remaining 2 are started working on after the other are completed. So the concurrency is indeed applied: only two of the tasks are worked on. However, the TaskRun reports that the tasks are started at the same time. This is also observed in the GUI, giving very poor observability.

prefect concurrency-limit create test 2

from prefect import tags, flow, task, Task
from prefect.client.schemas.objects import TaskRun, State
from time import sleep

def on_completion(task: Task, task_run: TaskRun, state: State):
    print(f"Task completed with start time {task_run.start_time}")

@task(task_run_name="task-{id}", on_completion=[on_completion])
def my_task(id: int):
    print(f"Task {id} started")
    sleep(2)
    print(f"Task {id} stopped")
    return 1

@flow
def my_flow():
    # Submit 4 tasks in parallell
    with tags("test"):
        futures = [my_task.submit(i+1) for i in range(4)]
    
    # Wait for all tasks to finish
    for future in futures:
        future.result()

if __name__ == "__main__":
    my_flow()

Version info

Version:             3.1.12
API version:         0.8.4
Python version:      3.11.9
Git commit:          e299e5a7
Built:               Thu, Jan 9, 2025 10:09 AM
OS/Arch:             linux/x86_64
Profile:             dope
Server type:         cloud
Pydantic version:    2.9.2
Integrations:
  prefect-shell:     0.3.1
  prefect-dbt:       0.6.4

Additional context

We also observe that the orchestration is taking up much more resources in the new version. Flows with 500 tasks which used to take 20 minutes now take 90++ minutes.

@estenwoiensp1 estenwoiensp1 added the bug Something isn't working label Jan 17, 2025
@zzstoatzz
Copy link
Collaborator

hi @estenwoiensp1 - I might suggest an easier way to accomplish this: specify max_workers for the task runner (this is also a setting if you prefer setting an env var)

from time import sleep
from typing import Any

from prefect import Task, flow, task
from prefect.client.schemas.objects import State, TaskRun
from prefect.task_runners import ThreadPoolTaskRunner

def on_completion(task: Task[..., Any], task_run: TaskRun, state: State[Any]):
    print(f"Task completed with start time {task_run.start_time}")

@task(task_run_name="task-{id}", on_completion=[on_completion])
def my_task(id: int) -> int:
    print(f"Task {id} started")
    sleep(2)
    print(f"Task {id} stopped")
    return 1

@flow(task_runner=ThreadPoolTaskRunner(max_workers=2))
def my_flow():
    return my_task.map(range(4)).result()


if __name__ == "__main__":
    print(my_flow())

Image


although we mostly have tag based task concurrency around for backwards compatibility, I am reproducing the undesirable behavior you're seeing here

Image

so we should look into that

@burnbay
Copy link

burnbay commented Jan 19, 2025

Hi @zzstoatzz

I'm using the same logic as @estenwoiensp1. Your suggestion looks promising, but I would really like to have the possibility to set the number of workers at runtime (flow parameter). Is that possible? Found this on Slack (https://prefect-community.slack.com/archives/CL09KU1K7/p1732300289134169?thread_ts=1732299888.990389&cid=CL09KU1K7):

In [1]: from prefect import task, flow

In [2]: from prefect.task_runners import ThreadPoolTaskRunner

In [3]: @flow
   ...: def some_flow():
   ...:     task(lambda x: x).map(range(10)).result()
   ...: @flow
   ...: def f(n_workers: int):
   ...:     some_flow.with_options(task_runner=ThreadPoolTaskRunner(max_workers=n_workers))()
   ...:

In [4]: f(24)

but then you need to have a wrapper flow. Would be nice to just have a single flow with tasks.

@burnbay
Copy link

burnbay commented Jan 23, 2025

Any progress on this, @zzstoatzz? It's a bit of a blocker for moving over to 3.0 at the moment.

@zzstoatzz
Copy link
Collaborator

zzstoatzz commented Jan 23, 2025

I would really like to have the possibility to set the number of workers at runtime (flow parameter). Is that possible?

you can, you use some_flow.with_options(task_runner=...) and then call that updated flow, free to pass in those task runner kwargs as needed to the parent flow

@burnbay
Copy link

burnbay commented Jan 23, 2025

Yes, I understand that, but I would like to avoid using sub flows. It's not really ideal in the Prefect UI.

The other thing that's really useful using tag concurrency is that we can create a tag that several flow runs are sharing. That way we can split our data load on multiple containers and at the same time limit the number of concurrent connections. I guess this can be achieved using queue concurrency, but tag concurrency is much more flexible. We have a pydantic model as input to our main flow containing concurrency name and concurrency limit. Really flexible and powerful when you need to distribute data load. Some data sources can handle multiple loads in parallel while others can only handle 1 or 2. With this flow we don't need to change any setting, re-deploy etc. to differentiate.

Fingers crossed for a quick solution to the tag concurrency bug.

@zzstoatzz
Copy link
Collaborator

that makes sense @burnbay - thanks for the extra context!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants