Skip to content

streaming with all components #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

hoschmieder
Copy link

This allows streaming over the whole pipeline with all includes components.
Here an pipeline example:

import logging
import time
from typing import Type, Dict, Any, Optional, List,Union, Generator, Callable
from haystack import Pipeline, component
from hayhooks import BasePipelineWrapper, get_last_user_message
from hayhooks import streaming_generator_all as streaming_generator #Modifikation von HS
from haystack.core.serialization import DeserializationCallbacks
from haystack.dataclasses.document import Document
from haystack.dataclasses.chat_message import ChatMessage
from haystack.dataclasses import StreamingChunk



@component
class EchoA:    
    def __init__(self):
        self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None

    @component.output_types(output=str)
    def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
        for i in range(3):
            time.sleep(0.5)
            print(f"++++++ Streamin-Callback von EchoA",streaming_callback)
            if streaming_callback:
                print(f"************* Schritt (1) {i+1}: {text.lower()}\n")
                streaming_callback(StreamingChunk(content=f"🧠 Schritt (1) {i+1}: {text.lower()}\n"))

        final_answer = f"✅ Antwort von Echo1 auf '{text}': {text.upper()}"
        if streaming_callback:            
            streaming_callback(StreamingChunk(content=final_answer + "\n"))
        meta = [{
            "index": 0,
            "model": "test_callback",
            "finish_reason": "stop"
#            "finish_reason": "content_filter"
        }]

        output= {
                    "query": text,
                    "replies": [final_answer],
                    "meta": meta,                    
                }
        print("echo",output)
        return {"output": text}
        

@component
class EchoB:    
    def __init__(self):
        self.streaming_callback: Optional[Callable[[StreamingChunk], None]] = None

    @component.output_types(output=dict)
    def run(self, text:str, streaming_callback: Optional[Callable[[StreamingChunk], None]] = None):
        print("IN echo2",text)
        for i in range(3):
            time.sleep(0.5)
            print(f"++++++ Streamin-Callback von EchoB",streaming_callback)
            if streaming_callback:
                print(f"************* Schritt (2) {i+1}: {text.lower()}")
                streaming_callback(StreamingChunk(content=f"🧠 Schritt (2) {i+1}: {text.lower()}\n"))

        final_answer = f"✅ Antwort ECHO2 auf  '{text}': {text.lower()}"
        if streaming_callback:
            streaming_callback(StreamingChunk(content=final_answer + "\n"))
        meta = [{
            "index": 0,
            "model": "test_callback",
            "finish_reason": "stop"
        }]
        output= {
                    "query": text,
                    "replies": [final_answer],
                    "meta": meta,                    
                }
        print("echo2",output)
        return output  



# 🧩 PipelineWrapper für Hayhooks/OpenWeb-UI
class PipelineWrapper(BasePipelineWrapper):
    def setup(self) -> None:
        pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
        callbacks = DeserializationCallbacks(component_pre_init=PipelineWrapper.component_pre_init_callback)
        self.pipeline = Pipeline.loads(pipeline_yaml, callbacks=callbacks)

    @staticmethod
    def component_pre_init_callback(component_name: str, component_cls: Type, init_params: Dict[str, Any]):
        custom_components = {
            "echo1": EchoA,
            "echo2": EchoB,
        }
        print ("SSSSSSSSSSSSSSSSSSSSSSSSSSSS component_pre_init_callback",EchoA)
        if component_cls.__name__ in custom_components:
            return custom_components[component_cls.__name__](**init_params)
        return component_cls(**init_params)

    def run_api(self, query: str) -> dict:
        result = self.pipeline.run({"text": query})
        return result["output"]

    def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
        question = get_last_user_message(messages)
        stream = body.get("stream", True)

        return streaming_generator(
            pipeline=self.pipeline,
            pipeline_run_args={
                "echo1": {"text": question} ,
#                "echo2": {"text": question}                
            }
        )

#yml-file
components:
  echo1:
    type: pipeline_wrapper.EchoA

  echo2:
    type: pipeline_wrapper.EchoB

connections:
  - sender: echo1.output
    receiver: echo2

outputs:
  - receiver: echo1.output
  - receiver: echo2.output

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant