Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))

### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.opensearch.Version.V_2_6_0;
import static org.opensearch.Version.V_2_7_0;
import static org.opensearch.Version.V_3_0_0;
import static org.opensearch.Version.V_3_2_0;

/**
* Utility class to register server exceptions
Expand Down Expand Up @@ -1232,5 +1233,13 @@ public static void registerExceptions() {
V_3_0_0
)
);
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.transport.stream.StreamException.class,
org.opensearch.transport.stream.StreamException::new,
177,
V_3_2_0
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@
import org.opensearch.action.search.PutSearchPipelineTransportAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.StreamSearchAction;
import org.opensearch.action.search.StreamTransportSearchAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
Expand Down Expand Up @@ -734,6 +736,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
actions.register(StreamSearchAction.INSTANCE, StreamTransportSearchAction.class);
}
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public SearchRequestBuilder(OpenSearchClient client, SearchAction action) {
super(client, action, new SearchRequest());
}

public SearchRequestBuilder(OpenSearchClient client, StreamSearchAction action) {
super(client, action, new SearchRequest());
}

/**
* Sets the indices the search will be executed on.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class SearchTransportService {
public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]";

private final TransportService transportService;
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
protected final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
private final Map<String, Long> clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

public SearchTransportService(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

/**
* Transport action for executing a search
*
* @opensearch.internal
*/
public class StreamSearchAction extends ActionType<SearchResponse> {

public static final StreamSearchAction INSTANCE = new StreamSearchAction();
public static final String NAME = "indices:data/read/search/stream";

private StreamSearchAction() {
super(NAME, SearchResponse::new);
}

}
Loading
Loading