From d81f52cc049256b727f36fbc35e54db406a3741d Mon Sep 17 00:00:00 2001 From: root Date: Mon, 30 Jun 2025 17:58:52 +0200 Subject: [PATCH] all_streaning --- src/hayhooks/__init__.py | 2 ++ src/hayhooks/server/pipelines/utils.py | 36 ++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/src/hayhooks/__init__.py b/src/hayhooks/__init__.py index 3c0df03..669ec6f 100644 --- a/src/hayhooks/__init__.py +++ b/src/hayhooks/__init__.py @@ -5,6 +5,7 @@ is_user_message, get_last_user_message, streaming_generator, + streaming_generator_all, async_streaming_generator ) @@ -14,6 +15,7 @@ "is_user_message", "get_last_user_message", "streaming_generator", + "streaming_generator_all", "async_streaming_generator", "create_app", ] diff --git a/src/hayhooks/server/pipelines/utils.py b/src/hayhooks/server/pipelines/utils.py index 3bc17ed..542a088 100644 --- a/src/hayhooks/server/pipelines/utils.py +++ b/src/hayhooks/server/pipelines/utils.py @@ -175,3 +175,39 @@ async def streaming_callback(chunk): except Exception as e: log.warning(f"Error during pipeline task cleanup: {e}") raise e + + + +# by HS: register streaming for all components +def streaming_generator_all(pipeline: Pipeline, pipeline_run_args: Dict) -> Generator: + queue: Queue[str] = Queue() + + def streaming_callback(chunk): + queue.put(chunk.content) + + pipeline_run_args = pipeline_run_args.copy() + + # NEU: Alle Komponenten mit streaming_callback erfassen + for name, component in pipeline.walk(): + if hasattr(component, "streaming_callback"): + if name not in pipeline_run_args: + pipeline_run_args[name] = {} + pipeline_run_args[name]["streaming_callback"] = streaming_callback + + def run_pipeline(): + try: + pipeline.run(pipeline_run_args) + finally: + queue.put(None) + + thread = threading.Thread(target=run_pipeline) + thread.start() + + while True: + chunk = queue.get() + if chunk is None: + break + yield chunk + + thread.join() +