Skip to content

Conversation

@LifeJiggy
Copy link

Summary

This PR adds streaming support utilities that enable developers to easily process and analyze real-time streaming API responses with custom event handlers and aggregation capabilities.

Problem

Streaming API responses require custom processing logic for different event types, but developers currently have no built-in way to handle streaming events efficiently. This leads to:

  • Manual event processing code scattered throughout applications
  • Difficulty handling different event formats and types
  • No built-in aggregation or statistics collection
  • Complex async streaming handling

Solution

Add streaming utilities with:

  • StreamProcessor class for event-based processing with custom handlers
  • StreamCollector class for collecting and aggregating streaming events
  • Support for different event formats (type/event fields)
  • Event type-based handler registration and processing
  • Aggregation and statistics collection for streaming data
  • Async support for stream processing

Key Features

  • Event Processing: Custom handlers for different event types
  • Event Collection: Automatic collection and aggregation of streaming events
  • Format Agnostic: Handles different event structures (type/event fields)
  • Async Support: Full async/await compatibility for stream processing
  • Statistics: Built-in aggregation and counting of events
  • Standard Library: No external dependencies

Benefits

  • Simplifies streaming event processing workflows
  • Enables real-time data analysis and monitoring
  • Provides flexible event handling and aggregation
  • Improves streaming data processing capabilities
  • Reduces boilerplate code for event handling

Testing

  • Added comprehensive test suite covering:
  • Event handler registration and processing
  • Different event format support
  • Stream processing and collection
  • Event aggregation and statistics
  • Async processing capabilities
  • Edge cases and error conditions

All tests pass with full coverage of streaming functionality.

Usage Examples

from gradient._utils import StreamProcessor, StreamCollector

# Process streaming events with custom handlers
processor = StreamProcessor()

def handle_text_event(event):
    return f"Text: {event.get('content', '')}"

def handle_error_event(event):
    print(f"Error: {event.get('message', '')}")
    return None

processor.add_handler("text", handle_text_event)
processor.add_handler("error", handle_error_event)

# Process stream
results = processor.process_stream(stream)

# Or collect and analyze events
collector = StreamCollector()

for event in stream:
    collector.collect(event)

# Get statistics
text_events = collector.get_events("text")
stats = collector.get_aggregated()
print(f"Processed {stats['text']['count']} text events")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant