Skip to content

[FEATURE] fork/join pattern #17

@accupham

Description

@accupham

Was reading the docs and playing around with the library. Is there not an elegant abstraction for feeding multiple inputs (which can be executed independently and thus in parallel) into a function as part of the pipeline? I'm basically using asyncio.gather() for this here, but this isn't really an improvement over the regular python async.

flowchart TB
    Start((Start)) --> Range[range10]
    
    subgraph ForkJoin["Fork-Join Stage 10 workers"]
        direction TB
        Range --> |fork|P1[generate_name]
        Range --> |fork|P2[generate_age]
        Range --> |fork|P3[generate_writing_prompt]
        
        P1 --> Join((Join))
        P2 --> Join
        P3 --> Join
    end
    
    Join --> Story[generate_tiny_story<br/>10 workers]
    Story --> Print[Print Story]
Loading
import asyncio
from openai import AsyncOpenAI
from pyper import task

class LLMActivities:
    def __init__(self, model, base_url, api_key):
        self.model = model
        self.llm = AsyncOpenAI(
            base_url=base_url,
            api_key=api_key,
        )

    async def prompt(self, prompt, temp=0.0, seed=0):
        r = await self.llm.chat.completions.create(
            temperature=temp,
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            seed=seed,
        )
        return r.choices[0].message.content


    async def generate_name(self, seed):
        return await self.prompt(
            "Generate a random name. Only output the name.",
            temp=1.0,
            seed=seed,
        )
    async def generate_age(self, seed):
        return await self.prompt(
            "Generate a random age. Only output the output the age",
            temp=1.0,
            seed=seed,
        )

    async def generate_writing_prompt(self, seed):
        return await self.prompt(
            "Generate a simple writing prompt for a super short story.",
            temp=1.0,
            seed=seed,
        )

    async def generate_writing_inputs(self, seed):
        return await asyncio.gather(
            asyncio.create_task(self.generate_name(seed)),
            asyncio.create_task(self.generate_age(seed)),
            asyncio.create_task(self.generate_writing_prompt(seed)),
        )

    async def generate_tiny_story(self, inputs):
        name, age, writing_prompt = inputs
        return await self.prompt(
            prompt=(
                f"Given the following writing prompt, generate a super short story "
                f"with {name}, age {age} as the main character.\n\n"
                f"{writing_prompt}"
            ),
            temp=1.0,
        )


async def main():
    base_url = "http://10.0.3.4:4000"
    llm = LLMActivities("gpt-4o", base_url, "x")

    pipeline = (
        task(lambda: range(10), branch=True)
        | task(llm.generate_writing_inputs, workers=10)
        | task(llm.generate_tiny_story, workers=10)
    )

    async for result in pipeline():
        print(f"------- BEGIN STORY------")
        print(result)
        print(f"------- END STORY ------")


if __name__ == "__main__":
    asyncio.run(main())

Perhaps another abstraction, taskgroup might be elegant for this:

    from pyper import task, taskgroup

    pipeline = (
        task(lambda: range(10), branch=True)
        | taskgroup(
            task(llm.generate_name),
            task(llm.generate_age),
            task(llm.generate_writing_prompt)
        )
        | task(llm.generate_tiny_story, workers=10)
    )
    
    # the taskgroup would return a tuple of the input tasks, which is "*unpacked" into the parameters of the next function, `llm.generate_writing_prompt`.

The key advantage of this library IMO is that you can think about the entire flow of the pipeline at a glance without having to jump all over the IDE.


Syntax sugar like this should be possible too, because operator precedence for & is higher than |. This syntax is also inspired by unix's fork() abstraction.

    pipeline = (
        task(lambda: range(10), branch=True)
         & task(llm.generate_name
         & task(llm.generate_age)
         & task(llm.generate_writing_prompt)
        | task(llm.generate_tiny_story, workers=10)
    )

Kind of like this:

for i in {1..10}; do
    (generate_name & generate_age & generate_writing_prompt) | generate_story
done

Anyway, it's a cool library; thanks for making it!

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions