Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected behavior of AsyncPipeline #144

Closed
alex-stoica opened this issue Dec 3, 2024 · 1 comment
Closed

Unexpected behavior of AsyncPipeline #144

alex-stoica opened this issue Dec 3, 2024 · 1 comment

Comments

@alex-stoica
Copy link

alex-stoica commented Dec 3, 2024

Hello, I wanted to check the async functionality from #142
I am using AsyncPipeline from the haystack_experimental.core package, and I observed unexpected behavior during pipeline execution. While the pipeline executes two concurrent calls successfully, the components A and B, which are not dependent on each other, do not execute concurrently. Instead, B starts only after A finishes, despite no explicit dependency between them.

For reference, my code is

import asyncio
from haystack_experimental.core import AsyncPipeline
from haystack import component
from datetime import datetime

def print_time(message: str):
    print(f"{message} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

async def async_sleep_task(duration: int):
    await asyncio.sleep(duration)

@component
class ComponentA:
    @component.output_types(A_output=str)
    def run(self, dummy: str) -> dict:
        print_time("ComponentA run started")
        result = {"A_output": f"Processed by A: {dummy}"}
        print_time("ComponentA run ended")
        return result

    @component.output_types(A_output=str)
    async def run_async(self, dummy: str) -> dict:
        print_time("ComponentA run_async started")
        await async_sleep_task(3)
        result = {"A_output": f"Processed by A: {dummy}"}
        print_time("ComponentA run_async ended")
        return result

@component
class ComponentB:
    @component.output_types(B_output=str)
    def run(self, dummy: str) -> dict:
        print_time("ComponentB run started")
        result = {"B_output": f"Processed by B: {dummy}"}
        print_time("ComponentB run ended")
        return result

    @component.output_types(B_output=str)
    async def run_async(self, dummy: str) -> dict:
        print_time("ComponentB run_async started")
        await async_sleep_task(2)
        result = {"B_output": f"Processed by B: {dummy}"}
        print_time("ComponentB run_async ended")
        return result

@component
class ComponentC:
    @component.output_types(C_output=str)
    def run(self, A_output: str, B_output: str) -> dict:
        print_time("ComponentC run started")
        result = {"C_output": f"C combined outputs: {A_output}, {B_output}"}
        print_time("ComponentC run ended")
        return result

    @component.output_types(C_output=str)
    async def run_async(self, A_output: str, B_output: str) -> dict:
        print_time("ComponentC run_async started")
        await async_sleep_task(1)
        result = {"C_output": f"C combined outputs: {A_output}, {B_output}"}
        print_time("ComponentC run_async ended")
        return result

def create_pipeline():
    async_pipeline = AsyncPipeline()
    async_pipeline.add_component("A", ComponentA())
    async_pipeline.add_component("B", ComponentB())
    async_pipeline.add_component("C", ComponentC())
    async_pipeline.connect("A.A_output", "C.A_output")
    async_pipeline.connect("B.B_output", "C.B_output")
    return async_pipeline

if __name__ == "__main__":
    async def run_pipeline(pipeline, input_data):
        async for output in pipeline.run(input_data):
            print(f"Pipeline output: {output}")

    async def main():
        input_data1 = {"dummy": "Test data 1"}
        input_data2 = {"dummy": "Test data 2"}
        pipeline = create_pipeline()
        task1 = asyncio.create_task(run_pipeline(pipeline, input_data1))
        task2 = asyncio.create_task(run_pipeline(pipeline, input_data2))
        await asyncio.gather(task1, task2)

    asyncio.run(main())

and my output is

ComponentA run_async started at 2024-12-03 17:51:08
ComponentA run_async started at 2024-12-03 17:51:08
ComponentA run_async ended at 2024-12-03 17:51:11
Pipeline output: {'A': {'A_output': 'Processed by A: Test data 1'}}
ComponentB run_async started at 2024-12-03 17:51:11
ComponentA run_async ended at 2024-12-03 17:51:11
Pipeline output: {'A': {'A_output': 'Processed by A: Test data 2'}}
ComponentB run_async started at 2024-12-03 17:51:11
ComponentB run_async ended at 2024-12-03 17:51:13
Pipeline output: {'B': {'B_output': 'Processed by B: Test data 1'}}
ComponentC run_async started at 2024-12-03 17:51:13
ComponentB run_async ended at 2024-12-03 17:51:13
Pipeline output: {'B': {'B_output': 'Processed by B: Test data 2'}}
ComponentC run_async started at 2024-12-03 17:51:13
ComponentC run_async ended at 2024-12-03 17:51:14
Pipeline output: {'C': {'C_output': 'C combined outputs: Processed by A: Test data 1, Processed by B: Test data 1'}}
Pipeline output: {'C': {'C_output': 'C combined outputs: Processed by A: Test data 1, Processed by B: Test data 1'}}
ComponentC run_async ended at 2024-12-03 17:51:14
Pipeline output: {'C': {'C_output': 'C combined outputs: Processed by A: Test data 2, Processed by B: Test data 2'}}
Pipeline output: {'C': {'C_output': 'C combined outputs: Processed by A: Test data 2, Processed by B: Test data 2'}}

Could you confirm if this is the expected behavior of AsyncPipeline? If not, what adjustments should be made to achieve true component-wise concurrency?

Environment:

@julian-risch
Copy link
Member

Hello @alex-stoica this is expected behavior. Components that are in the same pipeline are not expected to run concurrently in the current implementation.
Thanks for trying out haystack-experimental and providing feedback!

@julian-risch julian-risch closed this as not planned Won't fix, can't repro, duplicate, stale Dec 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants