Skip to content

Commit 8d40807

Browse files
API changes for stream transport and stream based search transport action (#18722)
Signed-off-by: Rishabh Maurya <[email protected]>
1 parent 173149a commit 8d40807

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1709
-34
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4545
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))
4646
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
4747
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
48+
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
4849

4950
### Changed
5051
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))

server/src/main/java/org/opensearch/OpenSearchServerException.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.opensearch.Version.V_2_6_0;
2424
import static org.opensearch.Version.V_2_7_0;
2525
import static org.opensearch.Version.V_3_0_0;
26+
import static org.opensearch.Version.V_3_2_0;
2627

2728
/**
2829
* Utility class to register server exceptions
@@ -1232,5 +1233,13 @@ public static void registerExceptions() {
12321233
V_3_0_0
12331234
)
12341235
);
1236+
registerExceptionHandle(
1237+
new OpenSearchExceptionHandle(
1238+
org.opensearch.transport.stream.StreamException.class,
1239+
org.opensearch.transport.stream.StreamException::new,
1240+
177,
1241+
V_3_2_0
1242+
)
1243+
);
12351244
}
12361245
}

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@
286286
import org.opensearch.action.search.PutSearchPipelineTransportAction;
287287
import org.opensearch.action.search.SearchAction;
288288
import org.opensearch.action.search.SearchScrollAction;
289+
import org.opensearch.action.search.StreamSearchAction;
290+
import org.opensearch.action.search.StreamTransportSearchAction;
289291
import org.opensearch.action.search.TransportClearScrollAction;
290292
import org.opensearch.action.search.TransportCreatePitAction;
291293
import org.opensearch.action.search.TransportDeletePitAction;
@@ -734,6 +736,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
734736
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class);
735737
actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class);
736738
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
739+
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
740+
actions.register(StreamSearchAction.INSTANCE, StreamTransportSearchAction.class);
741+
}
737742
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
738743
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
739744
actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);

server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public SearchRequestBuilder(OpenSearchClient client, SearchAction action) {
6868
super(client, action, new SearchRequest());
6969
}
7070

71+
public SearchRequestBuilder(OpenSearchClient client, StreamSearchAction action) {
72+
super(client, action, new SearchRequest());
73+
}
74+
7175
/**
7276
* Sets the indices the search will be executed on.
7377
*/

server/src/main/java/org/opensearch/action/search/SearchTransportService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public class SearchTransportService {
102102
public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]";
103103

104104
private final TransportService transportService;
105-
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
105+
protected final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
106106
private final Map<String, Long> clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
107107

108108
public SearchTransportService(
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Transport action for executing a search
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class StreamSearchAction extends ActionType<SearchResponse> {
19+
20+
public static final StreamSearchAction INSTANCE = new StreamSearchAction();
21+
public static final String NAME = "indices:data/read/search/stream";
22+
23+
private StreamSearchAction() {
24+
super(NAME, SearchResponse::new);
25+
}
26+
27+
}

0 commit comments

Comments
 (0)