Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions src/hayhooks/server/pipelines/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,28 @@ def get_last_user_message(messages: List[Union[Message, Dict]]) -> Union[str, No

return None

def send_message(message: str, streaming_callback: Callable[[StreamingChunk], None]):
if streaming_callback is not None:
streaming_callback(StreamingChunk(content=message))

def find_streaming_component(pipeline: Union[Pipeline, AsyncPipeline]) -> Tuple[Component, str]:
def find_streaming_components(pipeline: Union[Pipeline, AsyncPipeline]) -> List[Tuple[Component, str]]:
"""
Finds the component in the pipeline that supports streaming_callback
Finds all components in the pipeline that support streaming_callback

Returns:
The first component that supports streaming
A list of tuples (component, component_name) for each component that supports streaming
"""
streaming_component = None
streaming_component_name = ""
streaming_components = []

for name, component in pipeline.walk():
if hasattr(component, "streaming_callback"):
log.trace(f"Streaming component found in '{name}' with type {type(component)}")
streaming_component = component
streaming_component_name = name
if not streaming_component:
raise ValueError("No streaming-capable component found in the pipeline")
streaming_components.append((component, name))

return streaming_component, streaming_component_name
if not streaming_components:
raise ValueError("No streaming-capable components found in the pipeline")

return streaming_components


def _setup_streaming_callback_for_pipeline(
Expand All @@ -72,16 +74,17 @@ def _setup_streaming_callback_for_pipeline(
Returns:
Updated pipeline run arguments
"""
_, streaming_component_name = find_streaming_component(pipeline)
streaming_components = find_streaming_components(pipeline)

# Ensure component args exist in pipeline run args
if streaming_component_name not in pipeline_run_args:
pipeline_run_args[streaming_component_name] = {}
for streaming_component, streaming_component_name in streaming_components:
# Ensure component args exist in pipeline run args
if streaming_component_name not in pipeline_run_args:
pipeline_run_args[streaming_component_name] = {}

# Set the streaming callback on the component
streaming_component = pipeline.get_component(streaming_component_name)
assert hasattr(streaming_component, "streaming_callback")
streaming_component.streaming_callback = streaming_callback
# Set the streaming callback on the component
component = pipeline.get_component(streaming_component_name)
assert hasattr(component, "streaming_callback")
component.streaming_callback = streaming_callback

return pipeline_run_args

Expand Down Expand Up @@ -232,15 +235,20 @@ def _validate_async_streaming_support(pipeline: Union[Pipeline, AsyncPipeline])
Raises:
ValueError: If the pipeline doesn't support async streaming
"""
streaming_component, streaming_component_name = find_streaming_component(pipeline)
streaming_components = find_streaming_components(pipeline)
unsupported_components = []

for streaming_component, streaming_component_name in streaming_components:
# Check if the streaming component supports async streaming callbacks
if not hasattr(streaming_component, "run_async"):
component_type = type(streaming_component).__name__
unsupported_components.append(f"Component '{streaming_component_name}' of type {component_type})")

# Check if the streaming component supports async streaming callbacks
# We check for run_async method as an indicator of async support
if not hasattr(streaming_component, "run_async"):
component_type = type(streaming_component).__name__
if unsupported_components:
component_list = ", ".join(unsupported_components)
raise ValueError(
f"Component '{streaming_component_name}' of type '{component_type}' seems to not support async streaming callbacks. "
f"Use the sync 'streaming_generator' function instead, or switch to a component that supports async streaming callbacks "
f"The following components do not seem to support async streaming callbacks: {component_list}. "
f"Use the sync 'streaming_generator' function instead, or switch to components that support async streaming callbacks "
f"(e.g., OpenAIChatGenerator instead of OpenAIGenerator)."
)

Expand Down