Skip to content

Conversation

rishabhmaurya
Copy link
Contributor

@rishabhmaurya rishabhmaurya commented Jul 10, 2025

Description

RFC: #18425
This PR covers -

  • API changes for stream transport
  • StreamTransport and StreamTransportService implementation
  • New streaming based search action

Key points of discussion -

  1. New stream based transport availability in server.
  2. Stream based API changes for request and response handling

Client-Side Streaming API Flow

flowchart TD
    %% Simple Client Flow
    START[Client sends streaming request] --> WAIT[Wait for response asynchronously]
    
    WAIT --> RESPONSE{Response Type?}
    RESPONSE -->|Success| STREAM[handleStreamResponse called]
    RESPONSE -->|Error| ERROR[handleException called]
    RESPONSE -->|Timeout| TIMEOUT[Timeout exception]
    
    %% Stream Processing
    STREAM --> NEXT[Get next response]
    NEXT --> PROCESS[Process response]
    PROCESS --> CONTINUE{Continue?}
    CONTINUE -->|Yes| NEXT
    CONTINUE -->|No - Complete| CLOSE[streamResponse.close]
    CONTINUE -->|No - Cancel| CANCEL[streamResponse.cancel]
    
    %% Error & Completion
    ERROR --> HANDLE_ERROR[Handle error]
    TIMEOUT --> HANDLE_ERROR
    CLOSE --> SUCCESS[Complete]
    CANCEL --> SUCCESS
    HANDLE_ERROR --> SUCCESS
    
    %% Simple styling
    classDef client fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
    classDef framework fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
    classDef error fill:#ffebee,stroke:#c62828,stroke-width:2px
    
    class START,NEXT,PROCESS,CLOSE,CANCEL client
    class WAIT,STREAM,ERROR,TIMEOUT framework
    class HANDLE_ERROR error
    class RESPONSE,CONTINUE decision
Loading

Simple Client Usage

Implementation:

StreamTransportResponseHandler<MyResponse> handler = new StreamTransportResponseHandler<MyResponse>() {
    private volatile boolean cancelled = false;
    private volatile StreamTransportResponse<MyResponse> currentStream;
    
    @Override
    public void handleStreamResponse(StreamTransportResponse<MyResponse> streamResponse) {
        currentStream = streamResponse;
        
        if (cancelled) {
            handleTermination(streamResponse, "Handler already cancelled", null);
            return;
        }
        
        try {
            MyResponse response;
            while ((response = streamResponse.nextResponse()) != null) {  // BLOCKING CALL
                if (cancelled) {
                    handleTermination(streamResponse, "Processing cancelled", null);
                    return;
                }
                processResponse(response);
            }
            streamResponse.close();
        } catch (Exception e) {
            handleTermination(streamResponse, "Error: " + e.getMessage(), e);
        }
    }
    
    @Override
    public void handleException(TransportException exp) {
        cancelled = true;
        if (currentStream != null) {
            handleTermination(currentStream, "Exception occurred: " + exp.getMessage(), exp);
        }
        handleError(exp);
    }
    
    // Placeholder for custom termination logic
    private void handleTermination(StreamTransportResponse<MyResponse> streamResponse, String reason, Exception cause) {
        // Add custom cleanup/logging logic here
        streamResponse.cancel(reason, cause);
    }
};

transportService.sendRequest(node, "action", request, 
    TransportRequestOptions.builder().withType(STREAM).withTimeout(30s).build(), 
    handler);

Key Points:

  • Blocking: nextResponse() blocks waiting for server data
  • Timeout Handling: handleException can cancel active streams for timeout scenarios
  • Always Close/Cancel: Stream must be closed or cancelled to prevent resource leaks

Server-Side Streaming API Guide

Action Registration

streamTransportService.registerRequestHandler(
    "internal:my-action/stream",
    ThreadPool.Names.SEARCH,
    MyRequest::new,
    this::handleStreamRequest
);

Basic Implementation

private void handleStreamRequest(MyRequest request, TransportChannel channel, Task task) {
    try {
        // Process data incrementally
        DataIterator iterator = createDataIterator(request);
        
        while (iterator.hasNext()) {
            MyData data = iterator.next();
            MyResponse response = processData(data);
            
            // Send batch - may block or throw StreamException with CANCELLED code
            channel.sendResponseBatch(response);
        }
        
        // Signal successful completion
        channel.completeStream();
        
    } catch (StreamException e) {
        if (e.getErrorCode() == StreamErrorCode.CANCELLED) {
            // Client cancelled - exit gracefully
            logger.info("Stream cancelled by client: {}", e.getMessage());
            // Do NOT call completeStream() or sendResponse()
        } else {
            // Other stream error - send to client
            channel.sendResponse(e);
        }
        
    } catch (Exception e) {
        // Send error to client
        channel.sendResponse(e);
    }
}

Processing Flow

flowchart TD
    A[Request Received] --> B[Process Data Loop]
    B --> C[Send Response Batch]
    C --> D{Client Cancelled?}
    D -->|Yes| E[Exit Gracefully]
    D -->|No| F{More Data?}
    F -->|Yes| B
    F -->|No| G[Complete Stream]
    G --> H[Success]
    
    B --> I{Error?}
    I -->|Yes| J[Send Error]
    J --> K[Terminated]
    
    classDef success fill:#e8f5e8
    classDef error fill:#ffebee
    classDef cancel fill:#fce4ec
    
    class G,H success
    class J,K error
    class E cancel
Loading

Key Behaviors

Blocking

  • sendResponseBatch() may block if transport buffers are full
  • Server will pause until client consumes data and frees buffer space

Cancellation

  • sendResponseBatch() throws StreamException with StreamErrorCode.CANCELLED when client cancels
  • Exit handler immediately - framework handles cleanup
  • Do NOT call completeStream() or sendResponse() after cancellation

Completion

  • Always call either completeStream() (success) OR sendResponse(exception) (error)
  • Never call both methods
  • Stream must be explicitly completed or terminated

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Contributor

❌ Gradle check result for e4e6a46: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for 1942490: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rishabhmaurya rishabhmaurya force-pushed the stream-transport-server branch 3 times, most recently from d45b25d to 684d627 Compare July 10, 2025 19:35
Copy link
Contributor

✅ Gradle check result for 684d627: SUCCESS

Copy link

codecov bot commented Jul 10, 2025

Codecov Report

❌ Patch coverage is 15.59140% with 314 lines in your changes missing coverage. Please review.
✅ Project coverage is 72.84%. Comparing base (a9b6d7a) to head (62c0454).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...ch/action/search/StreamSearchTransportService.java 0.00% 99 Missing ⚠️
...g/opensearch/transport/StreamTransportService.java 9.67% 28 Missing ⚠️
...ava/org/opensearch/transport/TransportService.java 22.22% 27 Missing and 1 partial ⚠️
server/src/main/java/org/opensearch/node/Node.java 28.94% 17 Missing and 10 partials ⚠️
...g/opensearch/transport/stream/StreamException.java 0.00% 26 Missing ⚠️
...g/opensearch/transport/stream/StreamErrorCode.java 0.00% 22 Missing ⚠️
...ava/org/opensearch/cluster/node/DiscoveryNode.java 33.33% 14 Missing and 2 partials ⚠️
...ch/action/support/StreamChannelActionListener.java 0.00% 14 Missing ⚠️
...nsearch/cluster/service/ClusterApplierService.java 7.69% 10 Missing and 2 partials ⚠️
...tracing/channels/TraceableTcpTransportChannel.java 0.00% 8 Missing ⚠️
... and 17 more
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #18722      +/-   ##
============================================
- Coverage     72.93%   72.84%   -0.09%     
- Complexity    68855    68895      +40     
============================================
  Files          5590     5600      +10     
  Lines        315887   316247     +360     
  Branches      45839    45866      +27     
============================================
+ Hits         230382   230383       +1     
- Misses        66832    67193     +361     
+ Partials      18673    18671       -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@rishabhmaurya rishabhmaurya marked this pull request as ready for review July 10, 2025 20:58
@rishabhmaurya rishabhmaurya requested review from peternied, a team and jed326 as code owners July 10, 2025 20:58
Signed-off-by: Rishabh Maurya <[email protected]>
Signed-off-by: Rishabh Maurya <[email protected]>
Copy link
Contributor

github-actions bot commented Aug 1, 2025

❌ Gradle check result for 5031cb1: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-project-automation github-project-automation bot moved this from In-Review to In Progress in Performance Roadmap Aug 1, 2025
Copy link
Contributor

github-actions bot commented Aug 1, 2025

❌ Gradle check result for 56d6e7e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishabh Maurya <[email protected]>
Copy link
Contributor

github-actions bot commented Aug 2, 2025

❌ Gradle check result for cbcf9f5: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

github-actions bot commented Aug 2, 2025

✅ Gradle check result for cbcf9f5: SUCCESS

Copy link
Contributor

github-actions bot commented Aug 4, 2025

❌ Gradle check result for 62c0454: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

github-actions bot commented Aug 4, 2025

✅ Gradle check result for 62c0454: SUCCESS

@andrross andrross merged commit 8d40807 into opensearch-project:main Aug 4, 2025
31 of 34 checks passed
@github-project-automation github-project-automation bot moved this from In Progress to Done in Performance Roadmap Aug 4, 2025
@rishabhmaurya rishabhmaurya deleted the stream-transport-server branch August 4, 2025 19:43
sunqijun1 pushed a commit to sunqijun1/OpenSearch that referenced this pull request Aug 5, 2025
vinaykpud pushed a commit to vinaykpud/OpenSearch that referenced this pull request Sep 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

3 participants