Skip to content

Commit

Permalink
🐛Computational backend: if a pipeline raises, it should not prevent h…
Browse files Browse the repository at this point in the history
…andling of other pipelines (#6295)
  • Loading branch information
sanderegg committed Sep 3, 2024
1 parent f004184 commit f6d362b
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pydantic import PositiveInt
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient
from servicelib.utils import logged_gather
from servicelib.utils import limited_gather

from ...constants import UNDEFINED_STR_METADATA
from ...core.errors import (
Expand Down Expand Up @@ -220,7 +220,7 @@ async def stop_pipeline(
async def schedule_all_pipelines(self) -> None:
self.wake_up_event.clear()
# if one of the task throws, the other are NOT cancelled which is what we want
await logged_gather(
await limited_gather(
*(
self._schedule_pipeline(
user_id=user_id,
Expand All @@ -234,8 +234,10 @@ async def schedule_all_pipelines(self) -> None:
iteration,
), pipeline_params in self.scheduled_pipelines.items()
),
reraise=False,
log=_logger,
max_concurrency=40,
limit=40,
tasks_group_prefix="computational-scheduled-pipeline",
)

async def _get_pipeline_dag(self, project_id: ProjectID) -> nx.DiGraph:
Expand Down

0 comments on commit f6d362b

Please sign in to comment.