Skip to content

Commit 684d627

Browse files
committed
API changes for stream transport
* extensibility for transport classes * StreamTransport and StreamTransportService implementation * streaming based search action Signed-off-by: Rishabh Maurya <[email protected]>
1 parent 57d1d17 commit 684d627

File tree

59 files changed

+1471
-72
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1471
-72
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2828
- Pass index settings to system ingest processor factories. ([#18708](https://github.com/opensearch-project/OpenSearch/pull/18708))
2929
- Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697))
3030
- Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663))
31+
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
3132

3233
### Changed
3334
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))

libs/common/src/main/java/org/opensearch/common/recycler/Recycler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.common.recycler;
3434

35+
import org.opensearch.common.annotation.ExperimentalApi;
3536
import org.opensearch.common.lease.Releasable;
3637

3738
/**
@@ -40,6 +41,7 @@
4041
*
4142
* @opensearch.internal
4243
*/
44+
@ExperimentalApi
4345
public interface Recycler<T> {
4446

4547
/**
@@ -73,6 +75,7 @@ interface C<T> {
7375
*
7476
* @opensearch.internal
7577
*/
78+
@ExperimentalApi
7679
interface V<T> extends Releasable {
7780

7881
/** Reference to the value. */

libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamInput.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -563,11 +563,11 @@ public SecureString readSecureString() throws IOException {
563563
}
564564
}
565565

566-
public final float readFloat() throws IOException {
566+
public float readFloat() throws IOException {
567567
return Float.intBitsToFloat(readInt());
568568
}
569569

570-
public final double readDouble() throws IOException {
570+
public double readDouble() throws IOException {
571571
return Double.longBitsToDouble(readLong());
572572
}
573573

@@ -582,7 +582,7 @@ public final Double readOptionalDouble() throws IOException {
582582
/**
583583
* Reads a boolean.
584584
*/
585-
public final boolean readBoolean() throws IOException {
585+
public boolean readBoolean() throws IOException {
586586
return readBoolean(readByte());
587587
}
588588

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@
286286
import org.opensearch.action.search.PutSearchPipelineTransportAction;
287287
import org.opensearch.action.search.SearchAction;
288288
import org.opensearch.action.search.SearchScrollAction;
289+
import org.opensearch.action.search.StreamSearchAction;
290+
import org.opensearch.action.search.StreamTransportSearchAction;
289291
import org.opensearch.action.search.TransportClearScrollAction;
290292
import org.opensearch.action.search.TransportCreatePitAction;
291293
import org.opensearch.action.search.TransportDeletePitAction;
@@ -734,6 +736,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
734736
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class);
735737
actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class);
736738
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
739+
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
740+
actions.register(StreamSearchAction.INSTANCE, StreamTransportSearchAction.class);
741+
}
737742
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
738743
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
739744
actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);

server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public SearchRequestBuilder(OpenSearchClient client, SearchAction action) {
6868
super(client, action, new SearchRequest());
6969
}
7070

71+
public SearchRequestBuilder(OpenSearchClient client, StreamSearchAction action) {
72+
super(client, action, new SearchRequest());
73+
}
74+
7175
/**
7276
* Sets the indices the search will be executed on.
7377
*/

server/src/main/java/org/opensearch/action/search/SearchTransportService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public class SearchTransportService {
102102
public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]";
103103

104104
private final TransportService transportService;
105-
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
105+
protected final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
106106
private final Map<String, Long> clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
107107

108108
public SearchTransportService(
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Transport action for executing a search
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class StreamSearchAction extends ActionType<SearchResponse> {
19+
20+
public static final StreamSearchAction INSTANCE = new StreamSearchAction();
21+
public static final String NAME = "indices:data/read/search/stream";
22+
23+
private StreamSearchAction() {
24+
super(NAME, SearchResponse::new);
25+
}
26+
27+
}
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.action.support.StreamChannelActionListener;
12+
import org.opensearch.core.action.ActionListener;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.core.common.io.stream.Writeable;
15+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
16+
import org.opensearch.search.SearchPhaseResult;
17+
import org.opensearch.search.SearchService;
18+
import org.opensearch.search.fetch.FetchSearchResult;
19+
import org.opensearch.search.fetch.QueryFetchSearchResult;
20+
import org.opensearch.search.fetch.ShardFetchSearchRequest;
21+
import org.opensearch.search.internal.ShardSearchRequest;
22+
import org.opensearch.search.query.QuerySearchResult;
23+
import org.opensearch.threadpool.ThreadPool;
24+
import org.opensearch.transport.StreamTransportResponseHandler;
25+
import org.opensearch.transport.StreamTransportService;
26+
import org.opensearch.transport.Transport;
27+
import org.opensearch.transport.TransportException;
28+
import org.opensearch.transport.TransportRequestOptions;
29+
import org.opensearch.transport.stream.StreamTransportResponse;
30+
31+
import java.io.IOException;
32+
import java.util.function.BiFunction;
33+
34+
/**
35+
* Search transport service for streaming search
36+
*/
37+
public class StreamSearchTransportService extends SearchTransportService {
38+
private final StreamTransportService transportService;
39+
40+
public StreamSearchTransportService(
41+
StreamTransportService transportService,
42+
BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper
43+
) {
44+
super(transportService, responseWrapper);
45+
this.transportService = transportService;
46+
}
47+
48+
public static void registerStreamRequestHandler(StreamTransportService transportService, SearchService searchService) {
49+
transportService.registerRequestHandler(
50+
QUERY_ACTION_NAME,
51+
ThreadPool.Names.SAME,
52+
false,
53+
true,
54+
AdmissionControlActionType.SEARCH,
55+
ShardSearchRequest::new,
56+
(request, channel, task) -> {
57+
searchService.executeQueryPhase(
58+
request,
59+
false,
60+
(SearchShardTask) task,
61+
new StreamChannelActionListener<>(channel, QUERY_ACTION_NAME, request)
62+
);
63+
}
64+
);
65+
transportService.registerRequestHandler(
66+
FETCH_ID_ACTION_NAME,
67+
ThreadPool.Names.SAME,
68+
true,
69+
true,
70+
AdmissionControlActionType.SEARCH,
71+
ShardFetchSearchRequest::new,
72+
(request, channel, task) -> {
73+
searchService.executeFetchPhase(
74+
request,
75+
(SearchShardTask) task,
76+
new StreamChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request)
77+
);
78+
}
79+
);
80+
transportService.registerRequestHandler(
81+
QUERY_CAN_MATCH_NAME,
82+
ThreadPool.Names.SAME,
83+
ShardSearchRequest::new,
84+
(request, channel, task) -> {
85+
searchService.canMatch(request, new StreamChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request));
86+
}
87+
);
88+
}
89+
90+
@Override
91+
public void sendExecuteQuery(
92+
Transport.Connection connection,
93+
final ShardSearchRequest request,
94+
SearchTask task,
95+
final SearchActionListener<SearchPhaseResult> listener
96+
) {
97+
final boolean fetchDocuments = request.numberOfShards() == 1;
98+
Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
99+
100+
StreamTransportResponseHandler<SearchPhaseResult> transportHandler = new StreamTransportResponseHandler<SearchPhaseResult>() {
101+
@Override
102+
public void handleStreamResponse(StreamTransportResponse<SearchPhaseResult> response) {
103+
try {
104+
SearchPhaseResult result = response.nextResponse();
105+
listener.onResponse(result);
106+
} catch (Exception e) {
107+
response.cancel("Client error during search phase", e);
108+
listener.onFailure(e);
109+
}
110+
}
111+
112+
@Override
113+
public void handleException(TransportException e) {
114+
listener.onFailure(e);
115+
}
116+
117+
@Override
118+
public String executor() {
119+
return ThreadPool.Names.STREAM_SEARCH;
120+
}
121+
122+
@Override
123+
public SearchPhaseResult read(StreamInput in) throws IOException {
124+
return reader.read(in);
125+
}
126+
};
127+
128+
transportService.sendChildRequest(
129+
connection,
130+
QUERY_ACTION_NAME,
131+
request,
132+
task,
133+
transportHandler // TODO: wrap with ConnectionCountingHandler
134+
);
135+
}
136+
137+
@Override
138+
public void sendExecuteFetch(
139+
Transport.Connection connection,
140+
final ShardFetchSearchRequest request,
141+
SearchTask task,
142+
final SearchActionListener<FetchSearchResult> listener
143+
) {
144+
StreamTransportResponseHandler<FetchSearchResult> transportHandler = new StreamTransportResponseHandler<FetchSearchResult>() {
145+
@Override
146+
public void handleStreamResponse(StreamTransportResponse<FetchSearchResult> response) {
147+
try {
148+
FetchSearchResult result = response.nextResponse();
149+
listener.onResponse(result);
150+
} catch (Exception e) {
151+
response.cancel("Client error during fetch phase", e);
152+
listener.onFailure(e);
153+
}
154+
}
155+
156+
@Override
157+
public void handleException(TransportException exp) {
158+
listener.onFailure(exp);
159+
}
160+
161+
@Override
162+
public String executor() {
163+
return ThreadPool.Names.STREAM_SEARCH;
164+
}
165+
166+
@Override
167+
public FetchSearchResult read(StreamInput in) throws IOException {
168+
return new FetchSearchResult(in);
169+
}
170+
};
171+
transportService.sendChildRequest(connection, FETCH_ID_ACTION_NAME, request, task, transportHandler);
172+
}
173+
174+
@Override
175+
public void sendCanMatch(
176+
Transport.Connection connection,
177+
final ShardSearchRequest request,
178+
SearchTask task,
179+
final ActionListener<SearchService.CanMatchResponse> listener
180+
) {
181+
StreamTransportResponseHandler<SearchService.CanMatchResponse> transportHandler = new StreamTransportResponseHandler<
182+
SearchService.CanMatchResponse>() {
183+
@Override
184+
public void handleStreamResponse(StreamTransportResponse<SearchService.CanMatchResponse> response) {
185+
try {
186+
SearchService.CanMatchResponse result = response.nextResponse();
187+
if (response.nextResponse() != null) {
188+
throw new IllegalStateException("Only one response expected from SearchService.CanMatchResponse");
189+
}
190+
listener.onResponse(result);
191+
} catch (Exception e) {
192+
response.cancel("Client error during can match", e);
193+
listener.onFailure(e);
194+
}
195+
}
196+
197+
@Override
198+
public void handleException(TransportException exp) {
199+
listener.onFailure(exp);
200+
}
201+
202+
@Override
203+
public String executor() {
204+
return ThreadPool.Names.SAME;
205+
}
206+
207+
@Override
208+
public SearchService.CanMatchResponse read(StreamInput in) throws IOException {
209+
return new SearchService.CanMatchResponse(in);
210+
}
211+
};
212+
213+
transportService.sendChildRequest(
214+
connection,
215+
QUERY_CAN_MATCH_NAME,
216+
request,
217+
task,
218+
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(),
219+
transportHandler
220+
);
221+
}
222+
}

0 commit comments

Comments
 (0)