diff --git a/CHANGELOG.md b/CHANGELOG.md index 26ba59a70c565..dd41466fb0a02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511)) - Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880)) - Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877)) +- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722)) ### Changed - Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570)) diff --git a/server/src/main/java/org/opensearch/OpenSearchServerException.java b/server/src/main/java/org/opensearch/OpenSearchServerException.java index 247a23dc4bd57..7e299abd8d943 100644 --- a/server/src/main/java/org/opensearch/OpenSearchServerException.java +++ b/server/src/main/java/org/opensearch/OpenSearchServerException.java @@ -23,6 +23,7 @@ import static org.opensearch.Version.V_2_6_0; import static org.opensearch.Version.V_2_7_0; import static org.opensearch.Version.V_3_0_0; +import static org.opensearch.Version.V_3_2_0; /** * Utility class to register server exceptions @@ -1232,5 +1233,13 @@ public static void registerExceptions() { V_3_0_0 ) ); + registerExceptionHandle( + new OpenSearchExceptionHandle( + org.opensearch.transport.stream.StreamException.class, + org.opensearch.transport.stream.StreamException::new, + 177, + V_3_2_0 + ) + ); } } diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 67a86db37e790..4755eb8d21999 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -286,6 +286,8 @@ import org.opensearch.action.search.PutSearchPipelineTransportAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; +import org.opensearch.action.search.StreamSearchAction; +import org.opensearch.action.search.StreamTransportSearchAction; import org.opensearch.action.search.TransportClearScrollAction; import org.opensearch.action.search.TransportCreatePitAction; import org.opensearch.action.search.TransportDeletePitAction; @@ -734,6 +736,9 @@ public void reg actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class); actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class); actions.register(SearchAction.INSTANCE, TransportSearchAction.class); + if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) { + actions.register(StreamSearchAction.INSTANCE, StreamTransportSearchAction.class); + } actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class); actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class); actions.register(ExplainAction.INSTANCE, TransportExplainAction.class); diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java index 0245857fa77ec..db9e4eb628232 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java @@ -68,6 +68,10 @@ public SearchRequestBuilder(OpenSearchClient client, SearchAction action) { super(client, action, new SearchRequest()); } + public SearchRequestBuilder(OpenSearchClient client, StreamSearchAction action) { + super(client, action, new SearchRequest()); + } + /** * Sets the indices the search will be executed on. */ diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 64c738f633f2e..fec8c4e790e7a 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -102,7 +102,7 @@ public class SearchTransportService { public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]"; private final TransportService transportService; - private final BiFunction responseWrapper; + protected final BiFunction responseWrapper; private final Map clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); public SearchTransportService( diff --git a/server/src/main/java/org/opensearch/action/search/StreamSearchAction.java b/server/src/main/java/org/opensearch/action/search/StreamSearchAction.java new file mode 100644 index 0000000000000..20e2797f87318 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/StreamSearchAction.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +/** + * Transport action for executing a search + * + * @opensearch.internal + */ +public class StreamSearchAction extends ActionType { + + public static final StreamSearchAction INSTANCE = new StreamSearchAction(); + public static final String NAME = "indices:data/read/search/stream"; + + private StreamSearchAction() { + super(NAME, SearchResponse::new); + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java b/server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java new file mode 100644 index 0000000000000..4dff5d91cb59a --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java @@ -0,0 +1,341 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.OriginalIndices; +import org.opensearch.action.support.StreamChannelActionListener; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.search.SearchPhaseResult; +import org.opensearch.search.SearchService; +import org.opensearch.search.dfs.DfsSearchResult; +import org.opensearch.search.fetch.FetchSearchResult; +import org.opensearch.search.fetch.QueryFetchSearchResult; +import org.opensearch.search.fetch.ShardFetchSearchRequest; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.StreamTransportResponseHandler; +import org.opensearch.transport.StreamTransportService; +import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportRequestOptions; +import org.opensearch.transport.stream.StreamTransportResponse; + +import java.io.IOException; +import java.util.function.BiFunction; + +/** + * Search transport service for streaming search + * + * @opensearch.internal + */ +public class StreamSearchTransportService extends SearchTransportService { + private final StreamTransportService transportService; + + public StreamSearchTransportService( + StreamTransportService transportService, + BiFunction responseWrapper + ) { + super(transportService, responseWrapper); + this.transportService = transportService; + } + + public static void registerStreamRequestHandler(StreamTransportService transportService, SearchService searchService) { + transportService.registerRequestHandler( + QUERY_ACTION_NAME, + ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, + ShardSearchRequest::new, + (request, channel, task) -> { + searchService.executeQueryPhase( + request, + false, + (SearchShardTask) task, + new StreamChannelActionListener<>(channel, QUERY_ACTION_NAME, request), + ThreadPool.Names.STREAM_SEARCH + ); + } + ); + transportService.registerRequestHandler( + FETCH_ID_ACTION_NAME, + ThreadPool.Names.SAME, + true, + true, + AdmissionControlActionType.SEARCH, + ShardFetchSearchRequest::new, + (request, channel, task) -> { + searchService.executeFetchPhase( + request, + (SearchShardTask) task, + new StreamChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request), + ThreadPool.Names.STREAM_SEARCH + ); + } + ); + transportService.registerRequestHandler( + QUERY_CAN_MATCH_NAME, + ThreadPool.Names.SAME, + ShardSearchRequest::new, + (request, channel, task) -> { + searchService.canMatch(request, new StreamChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request)); + } + ); + transportService.registerRequestHandler( + FREE_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + SearchFreeContextRequest::new, + (request, channel, task) -> { + boolean freed = searchService.freeReaderContext(request.id()); + channel.sendResponseBatch(new SearchFreeContextResponse(freed)); + channel.completeStream(); + } + ); + + transportService.registerRequestHandler( + DFS_ACTION_NAME, + ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, + ShardSearchRequest::new, + (request, channel, task) -> searchService.executeDfsPhase( + request, + false, + (SearchShardTask) task, + new StreamChannelActionListener<>(channel, DFS_ACTION_NAME, request), + ThreadPool.Names.STREAM_SEARCH + ) + ); + } + + @Override + public void sendExecuteQuery( + Transport.Connection connection, + final ShardSearchRequest request, + SearchTask task, + final SearchActionListener listener + ) { + final boolean fetchDocuments = request.numberOfShards() == 1; + Writeable.Reader reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; + + StreamTransportResponseHandler transportHandler = new StreamTransportResponseHandler() { + @Override + public void handleStreamResponse(StreamTransportResponse response) { + try { + SearchPhaseResult result = response.nextResponse(); + listener.onResponse(result); + response.close(); + } catch (Exception e) { + response.cancel("Client error during search phase", e); + listener.onFailure(e); + } + } + + @Override + public void handleException(TransportException e) { + listener.onFailure(e); + } + + @Override + public String executor() { + return ThreadPool.Names.STREAM_SEARCH; + } + + @Override + public SearchPhaseResult read(StreamInput in) throws IOException { + return reader.read(in); + } + }; + + transportService.sendChildRequest( + connection, + QUERY_ACTION_NAME, + request, + task, + transportHandler // TODO: wrap with ConnectionCountingHandler + ); + } + + @Override + public void sendExecuteFetch( + Transport.Connection connection, + final ShardFetchSearchRequest request, + SearchTask task, + final SearchActionListener listener + ) { + StreamTransportResponseHandler transportHandler = new StreamTransportResponseHandler() { + @Override + public void handleStreamResponse(StreamTransportResponse response) { + try { + FetchSearchResult result = response.nextResponse(); + listener.onResponse(result); + response.close(); + } catch (Exception e) { + response.cancel("Client error during fetch phase", e); + listener.onFailure(e); + } + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.STREAM_SEARCH; + } + + @Override + public FetchSearchResult read(StreamInput in) throws IOException { + return new FetchSearchResult(in); + } + }; + transportService.sendChildRequest(connection, FETCH_ID_ACTION_NAME, request, task, transportHandler); + } + + @Override + public void sendCanMatch( + Transport.Connection connection, + final ShardSearchRequest request, + SearchTask task, + final ActionListener listener + ) { + StreamTransportResponseHandler transportHandler = new StreamTransportResponseHandler< + SearchService.CanMatchResponse>() { + @Override + public void handleStreamResponse(StreamTransportResponse response) { + try { + SearchService.CanMatchResponse result = response.nextResponse(); + if (response.nextResponse() != null) { + throw new IllegalStateException("Only one response expected from SearchService.CanMatchResponse"); + } + listener.onResponse(result); + response.close(); + } catch (Exception e) { + response.cancel("Client error during can match", e); + listener.onFailure(e); + } + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public SearchService.CanMatchResponse read(StreamInput in) throws IOException { + return new SearchService.CanMatchResponse(in); + } + }; + + transportService.sendChildRequest( + connection, + QUERY_CAN_MATCH_NAME, + request, + task, + TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(), + transportHandler + ); + } + + @Override + public void sendFreeContext(Transport.Connection connection, final ShardSearchContextId contextId, OriginalIndices originalIndices) { + StreamTransportResponseHandler transportHandler = new StreamTransportResponseHandler<>() { + @Override + public void handleStreamResponse(StreamTransportResponse response) { + try { + response.nextResponse(); + response.close(); + } catch (Exception ignore) { + + } + } + + @Override + public void handleException(TransportException exp) { + + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public SearchFreeContextResponse read(StreamInput in) throws IOException { + return new SearchFreeContextResponse(in); + } + }; + transportService.sendRequest( + connection, + FREE_CONTEXT_ACTION_NAME, + new SearchFreeContextRequest(originalIndices, contextId), + TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(), + transportHandler + ); + } + + @Override + public void sendExecuteDfs( + Transport.Connection connection, + final ShardSearchRequest request, + SearchTask task, + final SearchActionListener listener + ) { + StreamTransportResponseHandler transportHandler = new StreamTransportResponseHandler<>() { + @Override + public void handleStreamResponse(StreamTransportResponse response) { + try { + DfsSearchResult result = response.nextResponse(); + listener.onResponse(result); + response.close(); + } catch (Exception e) { + response.cancel("Client error during search phase", e); + listener.onFailure(e); + } + } + + @Override + public void handleException(TransportException e) { + listener.onFailure(e); + } + + @Override + public String executor() { + return ThreadPool.Names.STREAM_SEARCH; + } + + @Override + public DfsSearchResult read(StreamInput in) throws IOException { + return new DfsSearchResult(in); + } + }; + + transportService.sendChildRequest( + connection, + DFS_ACTION_NAME, + request, + task, + TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(), + transportHandler + ); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/StreamTransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/StreamTransportSearchAction.java new file mode 100644 index 0000000000000..ce258ac714536 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/StreamTransportSearchAction.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.indices.breaker.CircuitBreakerService; +import org.opensearch.search.SearchService; +import org.opensearch.search.pipeline.SearchPipelineService; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.StreamTransportService; +import org.opensearch.transport.client.node.NodeClient; + +/** + * Transport search action for streaming search + * @opensearch.internal + */ +public class StreamTransportSearchAction extends TransportSearchAction { + @Inject + public StreamTransportSearchAction( + NodeClient client, + ThreadPool threadPool, + CircuitBreakerService circuitBreakerService, + @Nullable StreamTransportService transportService, + SearchService searchService, + @Nullable StreamSearchTransportService searchTransportService, + SearchPhaseController searchPhaseController, + ClusterService clusterService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + NamedWriteableRegistry namedWriteableRegistry, + SearchPipelineService searchPipelineService, + MetricsRegistry metricsRegistry, + SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory, + Tracer tracer, + TaskResourceTrackingService taskResourceTrackingService + ) { + super( + client, + threadPool, + circuitBreakerService, + transportService, + searchService, + searchTransportService, + searchPhaseController, + clusterService, + actionFilters, + indexNameExpressionResolver, + namedWriteableRegistry, + searchPipelineService, + metricsRegistry, + searchRequestOperationsCompositeListenerFactory, + tracer, + taskResourceTrackingService + ); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 1da080e5bd302..7f40bd4ec1274 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -97,6 +97,7 @@ import org.opensearch.transport.RemoteClusterAware; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.StreamTransportService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; import org.opensearch.transport.client.Client; @@ -207,7 +208,11 @@ public TransportSearchAction( this.searchPhaseController = searchPhaseController; this.searchTransportService = searchTransportService; this.remoteClusterService = searchTransportService.getRemoteClusterService(); - SearchTransportService.registerRequestHandler(transportService, searchService); + if (transportService instanceof StreamTransportService) { + StreamSearchTransportService.registerStreamRequestHandler((StreamTransportService) transportService, searchService); + } else { + SearchTransportService.registerRequestHandler(transportService, searchService); + } this.clusterService = clusterService; this.searchService = searchService; this.indexNameExpressionResolver = indexNameExpressionResolver; diff --git a/server/src/main/java/org/opensearch/action/support/StreamChannelActionListener.java b/server/src/main/java/org/opensearch/action/support/StreamChannelActionListener.java new file mode 100644 index 0000000000000..5b337fd2cef4a --- /dev/null +++ b/server/src/main/java/org/opensearch/action/support/StreamChannelActionListener.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; + +/** + * A listener that sends the response back to the channel in streaming fashion + * + */ +@ExperimentalApi +public class StreamChannelActionListener + implements + ActionListener { + + private final TransportChannel channel; + private final Request request; + private final String actionName; + + public StreamChannelActionListener(TransportChannel channel, String actionName, Request request) { + this.channel = channel; + this.request = request; + this.actionName = actionName; + } + + @Override + public void onResponse(Response response) { + try { + // placeholder for batching + channel.sendResponseBatch(response); + } finally { + // this can be removed once batching is supported + channel.completeStream(); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException exc) { + channel.completeStream(); + throw new RuntimeException(exc); + } + } +} diff --git a/server/src/main/java/org/opensearch/cluster/StreamNodeConnectionsService.java b/server/src/main/java/org/opensearch/cluster/StreamNodeConnectionsService.java new file mode 100644 index 0000000000000..2cb0df9a07822 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/StreamNodeConnectionsService.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.StreamTransportService; + +/** + * NodeConnectionsService for StreamTransportService + */ +@ExperimentalApi +public class StreamNodeConnectionsService extends NodeConnectionsService { + @Inject + public StreamNodeConnectionsService(Settings settings, ThreadPool threadPool, StreamTransportService streamTransportService) { + super(settings, threadPool, streamTransportService); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index ffcb35799729c..1bb26478af08e 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -136,6 +136,7 @@ public static boolean isDedicatedWarmNode(Settings settings) { private final String hostName; private final String hostAddress; private final TransportAddress address; + private final TransportAddress streamAddress; private final Map attributes; private final Version version; private final SortedSet roles; @@ -219,6 +220,20 @@ public DiscoveryNode( ); } + public DiscoveryNode( + String nodeName, + String nodeId, + String ephemeralId, + String hostName, + String hostAddress, + TransportAddress address, + Map attributes, + Set roles, + Version version + ) { + this(nodeName, nodeId, ephemeralId, hostName, hostAddress, address, null, attributes, roles, version); + } + /** * Creates a new {@link DiscoveryNode}. *

@@ -244,6 +259,7 @@ public DiscoveryNode( String hostName, String hostAddress, TransportAddress address, + TransportAddress streamAddress, Map attributes, Set roles, Version version @@ -258,6 +274,7 @@ public DiscoveryNode( this.hostName = hostName.intern(); this.hostAddress = hostAddress.intern(); this.address = address; + this.streamAddress = streamAddress; if (version == null) { this.version = Version.CURRENT; } else { @@ -277,6 +294,21 @@ public DiscoveryNode( this.roles = Collections.unmodifiableSortedSet(new TreeSet<>(roles)); } + public DiscoveryNode(DiscoveryNode node, TransportAddress streamAddress) { + this( + node.getName(), + node.getId(), + node.getEphemeralId(), + node.getHostName(), + node.getHostAddress(), + node.getAddress(), + streamAddress, + node.getAttributes(), + node.getRoles(), + node.getVersion() + ); + } + /** Creates a DiscoveryNode representing the local node. */ public static DiscoveryNode createLocal(Settings settings, TransportAddress publishAddress, String nodeId) { Map attributes = Node.NODE_ATTRIBUTES.getAsMap(settings); @@ -320,6 +352,12 @@ public DiscoveryNode(StreamInput in) throws IOException { this.hostName = in.readString().intern(); this.hostAddress = in.readString().intern(); this.address = new TransportAddress(in); + if (in.getVersion().onOrAfter(Version.V_3_2_0)) { + this.streamAddress = in.readOptionalWriteable(TransportAddress::new); + } else { + streamAddress = null; + } + int size = in.readVInt(); this.attributes = new HashMap<>(size); for (int i = 0; i < size; i++) { @@ -397,6 +435,9 @@ private void writeNodeDetails(StreamOutput out) throws IOException { out.writeString(hostName); out.writeString(hostAddress); address.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_3_2_0)) { + out.writeOptionalWriteable(streamAddress); + } } private void writeRolesAndVersion(StreamOutput out) throws IOException { @@ -417,6 +458,10 @@ public TransportAddress getAddress() { return address; } + public TransportAddress getStreamAddress() { + return streamAddress; + } + /** * The unique id of the node. */ @@ -577,6 +622,9 @@ public String toString() { sb.append('{').append(ephemeralId).append('}'); sb.append('{').append(hostName).append('}'); sb.append('{').append(address).append('}'); + if (streamAddress != null) { + sb.append('{').append(streamAddress).append('}'); + } if (roles.isEmpty() == false) { sb.append('{'); roles.stream().map(DiscoveryNodeRole::roleNameAbbreviation).sorted().forEach(sb::append); @@ -603,6 +651,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("name", getName()); builder.field("ephemeral_id", getEphemeralId()); builder.field("transport_address", getAddress().toString()); + if (streamAddress != null) { + builder.field("stream_transport_address", getStreamAddress().toString()); + } builder.startObject("attributes"); for (Map.Entry entry : attributes.entrySet()) { diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java index fc2a121c90e54..e05dff5ea5977 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.ClusterStateTaskConfig; import org.opensearch.cluster.LocalNodeClusterManagerListener; import org.opensearch.cluster.NodeConnectionsService; +import org.opensearch.cluster.StreamNodeConnectionsService; import org.opensearch.cluster.TimeoutClusterStateListener; import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.opensearch.cluster.node.DiscoveryNodes; @@ -124,6 +125,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private final String nodeName; private NodeConnectionsService nodeConnectionsService; + private NodeConnectionsService streamNodeConnectionsService; + private final ClusterManagerMetrics clusterManagerMetrics; public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { @@ -159,6 +162,11 @@ public synchronized void setNodeConnectionsService(NodeConnectionsService nodeCo this.nodeConnectionsService = nodeConnectionsService; } + public synchronized void setStreamNodeConnectionsService(StreamNodeConnectionsService streamNodeConnectionsService) { + assert this.streamNodeConnectionsService == null : "streamNodeConnectionsService is already set"; + this.streamNodeConnectionsService = streamNodeConnectionsService; + } + @Override public void setInitialState(ClusterState initialState) { if (lifecycle.started()) { @@ -588,6 +596,9 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl logger.debug("completed calling appliers of cluster state for version {}", newClusterState.version()); nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes()); + if (streamNodeConnectionsService != null) { + streamNodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes()); + } assert newClusterState.coordinationMetadata() .getLastAcceptedConfiguration() @@ -615,6 +626,16 @@ protected void connectToNodesAndWait(ClusterState newClusterState) { logger.debug("interrupted while connecting to nodes, continuing", e); Thread.currentThread().interrupt(); } + final CountDownLatch streamNodeLatch = new CountDownLatch(1); + if (streamNodeConnectionsService != null) { + streamNodeConnectionsService.connectToNodes(newClusterState.nodes(), streamNodeLatch::countDown); + try { + streamNodeLatch.await(); + } catch (InterruptedException e) { + logger.debug("interrupted while connecting to nodes, continuing", e); + Thread.currentThread().interrupt(); + } + } } private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) { diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index 05d478bbb9df1..1173bd1f06af5 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.ClusterStateTaskListener; import org.opensearch.cluster.LocalNodeClusterManagerListener; import org.opensearch.cluster.NodeConnectionsService; +import org.opensearch.cluster.StreamNodeConnectionsService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.OperationRouting; import org.opensearch.cluster.routing.RerouteService; @@ -131,6 +132,10 @@ public synchronized void setNodeConnectionsService(NodeConnectionsService nodeCo clusterApplierService.setNodeConnectionsService(nodeConnectionsService); } + public synchronized void setStreamNodeConnectionsService(StreamNodeConnectionsService streamNodeConnectionsService) { + clusterApplierService.setStreamNodeConnectionsService(streamNodeConnectionsService); + } + public void setRerouteService(RerouteService rerouteService) { assert this.rerouteService == null : "RerouteService is already set"; this.rerouteService = rerouteService; diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 3d6c884069f71..1be92d9a1a751 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -93,9 +93,13 @@ public final class NetworkModule { public static final String TRANSPORT_TYPE_KEY = "transport.type"; + public static final String STREAM_TRANSPORT_TYPE_KEY = "transport.stream.type"; + public static final String HTTP_TYPE_KEY = "http.type"; public static final String HTTP_TYPE_DEFAULT_KEY = "http.type.default"; public static final String TRANSPORT_TYPE_DEFAULT_KEY = "transport.type.default"; + public static final String STREAM_TRANSPORT_TYPE_DEFAULT_KEY = "transport.stream.type.default"; + public static final String TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION_KEY = "transport.ssl.enforce_hostname_verification"; public static final String TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION_RESOLVE_HOST_NAME_KEY = "transport.ssl.resolve_hostname"; public static final String TRANSPORT_SSL_DUAL_MODE_ENABLED_KEY = "transport.ssl.dual_mode.enabled"; @@ -104,9 +108,17 @@ public final class NetworkModule { TRANSPORT_TYPE_DEFAULT_KEY, Property.NodeScope ); + + public static final Setting STREAM_TRANSPORT_DEFAULT_TYPE_SETTING = Setting.simpleString( + STREAM_TRANSPORT_TYPE_DEFAULT_KEY, + "FLIGHT", + Property.NodeScope + ); + public static final Setting HTTP_DEFAULT_TYPE_SETTING = Setting.simpleString(HTTP_TYPE_DEFAULT_KEY, Property.NodeScope); public static final Setting HTTP_TYPE_SETTING = Setting.simpleString(HTTP_TYPE_KEY, Property.NodeScope); public static final Setting TRANSPORT_TYPE_SETTING = Setting.simpleString(TRANSPORT_TYPE_KEY, Property.NodeScope); + public static final Setting STREAM_TRANSPORT_TYPE_SETTING = Setting.simpleString(STREAM_TRANSPORT_TYPE_KEY, Property.NodeScope); public static final Setting TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION = Setting.boolSetting( TRANSPORT_SSL_ENFORCE_HOSTNAME_VERIFICATION_KEY, @@ -434,6 +446,16 @@ public Supplier getTransportSupplier() { return factory; } + public Supplier getStreamTransportSupplier() { + String name; + if (STREAM_TRANSPORT_TYPE_SETTING.exists(settings)) { + name = STREAM_TRANSPORT_TYPE_SETTING.get(settings); + } else { + name = STREAM_TRANSPORT_DEFAULT_TYPE_SETTING.get(settings); + } + return transportFactories.get(name); + } + /** * Registers a new {@link TransportInterceptor} */ diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 09c17d4c36029..792353908d2f2 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -176,6 +176,7 @@ import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.RemoteConnectionStrategy; import org.opensearch.transport.SniffConnectionStrategy; +import org.opensearch.transport.StreamTransportService; import org.opensearch.transport.TransportSettings; import org.opensearch.transport.client.Client; import org.opensearch.watcher.ResourceWatcherService; @@ -360,6 +361,7 @@ public void apply(Settings value, Settings current, Settings previous) { PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, + NetworkModule.STREAM_TRANSPORT_DEFAULT_TYPE_SETTING, NetworkModule.HTTP_TYPE_SETTING, NetworkModule.TRANSPORT_TYPE_SETTING, NetworkModule.TRANSPORT_SSL_DUAL_MODE_ENABLED, @@ -844,7 +846,8 @@ public void apply(Settings value, Settings current, Settings previous) { ForceMergeManagerSettings.CPU_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, ForceMergeManagerSettings.DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, - ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER + ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER, + StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index a53debf564ce4..ba6ba1f88b58c 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -38,6 +38,7 @@ protected FeatureFlagSettings( FeatureFlags.APPLICATION_BASED_CONFIGURATION_TEMPLATES_SETTING, FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING, FeatureFlags.ARROW_STREAMS_SETTING, + FeatureFlags.STREAM_TRANSPORT_SETTING, FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index b63361ec78e98..c53922b0e5ceb 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -114,6 +114,9 @@ public class FeatureFlags { Property.NodeScope ); + public static final String STREAM_TRANSPORT = FEATURE_FLAG_PREFIX + "transport.stream.enabled"; + public static final Setting STREAM_TRANSPORT_SETTING = Setting.boolSetting(STREAM_TRANSPORT, false, Property.NodeScope); + public static final String ARROW_STREAMS = FEATURE_FLAG_PREFIX + "arrow.streams.enabled"; public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope); @@ -141,6 +144,7 @@ static class FeatureFlagsImpl { ); put(TERM_VERSION_PRECOMMIT_ENABLE_SETTING, TERM_VERSION_PRECOMMIT_ENABLE_SETTING.getDefault(Settings.EMPTY)); put(ARROW_STREAMS_SETTING, ARROW_STREAMS_SETTING.getDefault(Settings.EMPTY)); + put(STREAM_TRANSPORT_SETTING, STREAM_TRANSPORT_SETTING.getDefault(Settings.EMPTY)); put(MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY)); } }; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index b972457ee085a..fb4a5fc1b12d5 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -54,6 +54,7 @@ import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTaskRequestOperationsListener; import org.opensearch.action.search.SearchTransportService; +import org.opensearch.action.search.StreamSearchTransportService; import org.opensearch.action.support.TransportAction; import org.opensearch.action.update.UpdateHelper; import org.opensearch.arrow.spi.StreamManager; @@ -67,6 +68,7 @@ import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; +import org.opensearch.cluster.StreamNodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; import org.opensearch.cluster.action.shard.LocalShardStateAction; import org.opensearch.cluster.action.shard.ShardStateAction; @@ -89,6 +91,7 @@ import org.opensearch.cluster.routing.allocation.DiskThresholdMonitor; import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.service.LocalClusterService; +import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; import org.opensearch.common.StopWatch; import org.opensearch.common.cache.module.CacheModule; @@ -97,6 +100,7 @@ import org.opensearch.common.inject.Key; import org.opensearch.common.inject.Module; import org.opensearch.common.inject.ModulesBuilder; +import org.opensearch.common.inject.util.Providers; import org.opensearch.common.lease.Releasables; import org.opensearch.common.lifecycle.Lifecycle; import org.opensearch.common.lifecycle.LifecycleComponent; @@ -279,6 +283,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.AuxTransport; import org.opensearch.transport.RemoteClusterService; +import org.opensearch.transport.StreamTransportService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; import org.opensearch.transport.TransportService; @@ -327,6 +332,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -334,6 +340,7 @@ import static java.util.stream.Collectors.toList; import static org.opensearch.common.util.FeatureFlags.ARROW_STREAMS_SETTING; import static org.opensearch.common.util.FeatureFlags.BACKGROUND_TASK_EXECUTION_EXPERIMENTAL; +import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT; import static org.opensearch.common.util.FeatureFlags.TELEMETRY; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED; @@ -1238,13 +1245,21 @@ protected Node(final Environment initialEnvironment, Collection clas } new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); + final Supplier streamTransportSupplier = networkModule.getStreamTransportSupplier(); + if (FeatureFlags.isEnabled(STREAM_TRANSPORT) && streamTransportSupplier == null) { + throw new IllegalStateException(STREAM_TRANSPORT + " is enabled but no stream transport supplier is provided"); + } + final Transport streamTransport = (streamTransportSupplier != null ? streamTransportSupplier.get() : null); + Set taskHeaders = Stream.concat( pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), Stream.of(Task.X_OPAQUE_ID) ).collect(Collectors.toSet()); + final TransportService transportService = newTransportService( settings, transport, + streamTransport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, @@ -1252,8 +1267,25 @@ protected Node(final Environment initialEnvironment, Collection clas taskHeaders, tracer ); + final Optional streamTransportService = streamTransport != null + ? Optional.of( + new StreamTransportService( + settings, + streamTransport, + threadPool, + networkModule.getTransportInterceptor(), + new LocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreNodeService), + settingsModule.getClusterSettings(), + transportService.getTaskManager(), + transportService.getRemoteClusterService(), + tracer + ) + ) + : Optional.empty(); + TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings()); transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer); + streamTransportService.ifPresent(service -> service.getTaskManager().registerTaskResourceConsumer(taskConsumer)); this.extensionsManager.initializeServicesAndRestHandler( actionModule, settingsModule, @@ -1270,6 +1302,9 @@ protected Node(final Environment initialEnvironment, Collection clas transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService) ); + final Optional streamSearchTransportService = streamTransportService.map( + stc -> new StreamSearchTransportService(stc, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)) + ); final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); pluginComponents.addAll(newAuxTransports(networkModule)); @@ -1549,10 +1584,20 @@ protected Node(final Environment initialEnvironment, Collection clas b.bind(ViewService.class).toInstance(viewService); b.bind(SearchService.class).toInstance(searchService); b.bind(SearchTransportService.class).toInstance(searchTransportService); + if (streamSearchTransportService.isPresent()) { + b.bind(StreamSearchTransportService.class).toInstance(streamSearchTransportService.get()); + } else { + b.bind(StreamSearchTransportService.class).toProvider((Providers.of(null))); + } b.bind(SearchPhaseController.class) .toInstance(new SearchPhaseController(namedWriteableRegistry, searchService::aggReduceContextBuilder)); b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); + if (streamTransportService.isPresent()) { + b.bind(StreamTransportService.class).toInstance(streamTransportService.get()); + } else { + b.bind(StreamTransportService.class).toProvider((Providers.of(null))); + } b.bind(NetworkService.class).toInstance(networkService); b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService)); b.bind(MetadataIndexUpgradeService.class).toInstance(metadataIndexUpgradeService); @@ -1667,6 +1712,7 @@ protected Node(final Environment initialEnvironment, Collection clas protected TransportService newTransportService( Settings settings, Transport transport, + @Nullable Transport streamTransport, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, @@ -1674,7 +1720,17 @@ protected TransportService newTransportService( Set taskHeaders, Tracer tracer ) { - return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer); + return new TransportService( + settings, + transport, + streamTransport, + threadPool, + interceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + tracer + ); } /** @@ -1736,6 +1792,12 @@ public Node start() throws NodeValidationException { final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); nodeConnectionsService.start(); clusterService.setNodeConnectionsService(nodeConnectionsService); + StreamTransportService streamTransportService = injector.getInstance(StreamTransportService.class); + if (streamTransportService != null) { + final StreamNodeConnectionsService streamNodeConnectionsService = injector.getInstance(StreamNodeConnectionsService.class); + streamNodeConnectionsService.start(); + clusterService.setStreamNodeConnectionsService(streamNodeConnectionsService); + } injector.getInstance(GatewayService.class).start(); Discovery discovery = injector.getInstance(Discovery.class); @@ -1749,8 +1811,13 @@ public Node start() throws NodeValidationException { TaskResourceTrackingService taskResourceTrackingService = injector.getInstance(TaskResourceTrackingService.class); transportService.getTaskManager().setTaskResourceTrackingService(taskResourceTrackingService); - runnableTaskListener.set(taskResourceTrackingService); + runnableTaskListener.set(taskResourceTrackingService); + // start streamTransportService before transportService so that transport service has access to publish address + // of stream transport for it to use it in localNode creation + if (streamTransportService != null) { + streamTransportService.start(); + } transportService.start(); assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) @@ -1820,6 +1887,10 @@ public Node start() throws NodeValidationException { assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); + if (streamTransportService != null) { + streamTransportService.acceptIncomingRequests(); + } + discovery.startInitialJoin(); final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings()); configureNodeAndClusterIdStateListener(clusterService); @@ -1910,6 +1981,10 @@ private Node stop() { injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); + StreamTransportService stc = injector.getInstance(StreamTransportService.class); + if (stc != null) { + stc.stop(); + } nodeService.getTaskCancellationMonitoringService().stop(); autoForceMergeManager.stop(); pluginLifecycleComponents.forEach(LifecycleComponent::stop); @@ -1979,6 +2054,10 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(SearchService.class)); toClose.add(() -> stopWatch.stop().start("transport")); toClose.add(injector.getInstance(TransportService.class)); + StreamTransportService stc = injector.getInstance(StreamTransportService.class); + if (stc != null) { + toClose.add(stc); + } toClose.add(nodeService.getTaskCancellationMonitoringService()); toClose.add(injector.getInstance(RemoteStorePinnedTimestampService.class)); diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index df334d6f4ec76..2bb865820fcf8 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -625,13 +625,23 @@ public void executeDfsPhase( boolean keepStatesInContext, SearchShardTask task, ActionListener listener + ) { + executeDfsPhase(request, keepStatesInContext, task, listener, null); + } + + public void executeDfsPhase( + ShardSearchRequest request, + boolean keepStatesInContext, + SearchShardTask task, + ActionListener listener, + String executorName ) { final IndexShard shard = getShard(request); rewriteAndFetchShardRequest(shard, request, new ActionListener() { @Override public void onResponse(ShardSearchRequest rewritten) { // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), listener); + runAsync(getExecutor(executorName, shard), () -> executeDfsPhase(request, task, keepStatesInContext), listener); } @Override @@ -677,6 +687,16 @@ public void executeQueryPhase( boolean keepStatesInContext, SearchShardTask task, ActionListener listener + ) { + executeQueryPhase(request, keepStatesInContext, task, listener, null); + } + + public void executeQueryPhase( + ShardSearchRequest request, + boolean keepStatesInContext, + SearchShardTask task, + ActionListener listener, + String executorName ) { assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1 : "empty responses require more than one shard"; @@ -701,7 +721,7 @@ public void onResponse(ShardSearchRequest orig) { } } // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener); + runAsync(getExecutor(executorName, shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener); } @Override @@ -794,7 +814,7 @@ public void executeQueryPhase( freeReaderContext(readerContext.id()); throw e; } - runAsync(getExecutor(readerContext.indexShard()), () -> { + runAsync(getExecutor(null, readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); @@ -820,7 +840,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest()); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); - runAsync(getExecutor(readerContext.indexShard()), () -> { + runAsync(getExecutor(null, readerContext.indexShard()), () -> { readerContext.setAggregatedDfs(request.dfs()); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); @@ -850,16 +870,14 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, }, wrapFailureListener(listener, readerContext, markAsUsed)); } - private Executor getExecutor(IndexShard indexShard) { + private Executor getExecutor(String executor, IndexShard indexShard) { assert indexShard != null; final String executorName; if (indexShard.isSystem()) { executorName = Names.SYSTEM_READ; } else if (indexShard.indexSettings().isSearchThrottled()) { executorName = Names.SEARCH_THROTTLED; - } else { - executorName = Names.SEARCH; - } + } else executorName = Objects.requireNonNullElse(executor, Names.SEARCH); return threadPool.executor(executorName); } @@ -877,7 +895,7 @@ public void executeFetchPhase( freeReaderContext(readerContext.id()); throw e; } - runAsync(getExecutor(readerContext.indexShard()), () -> { + runAsync(getExecutor(null, readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); @@ -902,10 +920,19 @@ public void executeFetchPhase( } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { + executeFetchPhase(request, task, listener, null); + } + + public void executeFetchPhase( + ShardFetchRequest request, + SearchShardTask task, + ActionListener listener, + String executorName + ) { final ReaderContext readerContext = findReaderContext(request.contextId(), request); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); - runAsync(getExecutor(readerContext.indexShard()), () -> { + runAsync(getExecutor(executorName, readerContext.indexShard()), () -> { try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { if (request.lastEmittedDoc() != null) { searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java index 45268b4807cd9..645204ade03a0 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java @@ -95,6 +95,22 @@ public void sendResponse(TransportResponse response) throws IOException { } } + public void sendResponseBatch(TransportResponse response) { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.sendResponseBatch(response); + } finally { + span.endSpan(); + } + } + + public void completeStream() { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.completeStream(); + } finally { + span.endSpan(); + } + } + @Override public void sendResponse(Exception exception) throws IOException { try (SpanScope scope = tracer.withSpanInScope(span)) { diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java index eb9d53d2df51b..5d3bd6c4daf73 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java @@ -15,6 +15,7 @@ import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.stream.StreamTransportResponse; import java.io.IOException; import java.util.Objects; @@ -75,6 +76,15 @@ public void handleResponse(T response) { } } + @Override + public void handleStreamResponse(StreamTransportResponse response) { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.handleStreamResponse(response); + } finally { + span.endSpan(); + } + } + @Override public void handleException(TransportException exp) { try (SpanScope scope = tracer.withSpanInScope(span)) { diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index b67b00bb42054..e4d151750b5bf 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -104,6 +104,7 @@ public static class Names { public static final String ANALYZE = "analyze"; public static final String WRITE = "write"; public static final String SEARCH = "search"; + public static final String STREAM_SEARCH = "stream_search"; public static final String SEARCH_THROTTLED = "search_throttled"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; @@ -181,6 +182,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.ANALYZE, ThreadPoolType.FIXED); map.put(Names.WRITE, ThreadPoolType.FIXED); map.put(Names.SEARCH, ThreadPoolType.RESIZABLE); + map.put(Names.STREAM_SEARCH, ThreadPoolType.RESIZABLE); map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); map.put(Names.FLUSH, ThreadPoolType.SCALING); map.put(Names.REFRESH, ThreadPoolType.SCALING); @@ -261,6 +263,17 @@ public ThreadPool( Names.SEARCH, new ResizableExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, runnableTaskListener) ); + // TODO: configure the appropriate size and explore use of virtual threads + builders.put( + Names.STREAM_SEARCH, + new ResizableExecutorBuilder( + settings, + Names.STREAM_SEARCH, + searchThreadPoolSize(allocatedProcessors), + 1000, + runnableTaskListener + ) + ); builders.put(Names.SEARCH_THROTTLED, new ResizableExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, runnableTaskListener)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded diff --git a/server/src/main/java/org/opensearch/transport/ConnectionProfile.java b/server/src/main/java/org/opensearch/transport/ConnectionProfile.java index 931707e4a1cdc..91fc056ff6b94 100644 --- a/server/src/main/java/org/opensearch/transport/ConnectionProfile.java +++ b/server/src/main/java/org/opensearch/transport/ConnectionProfile.java @@ -112,6 +112,8 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) // if we are not a data-node we don't need any dedicated channels for recovery builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY); builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG); + // we build a single channel profile with only supported type as STREAM for stream transport defined in StreamTransportService + builder.addConnections(0, TransportRequestOptions.Type.STREAM); return builder.build(); } diff --git a/server/src/main/java/org/opensearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/opensearch/transport/RemoteConnectionStrategy.java index f2c159d1380e8..db3a370dba298 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/opensearch/transport/RemoteConnectionStrategy.java @@ -188,7 +188,8 @@ static ConnectionProfile buildConnectionProfile(String clusterAlias, Settings se TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY, - TransportRequestOptions.Type.PING + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.STREAM ) .addConnections(mode.numberOfChannels, TransportRequestOptions.Type.REG); return builder.build(); diff --git a/server/src/main/java/org/opensearch/transport/StreamTransportResponseHandler.java b/server/src/main/java/org/opensearch/transport/StreamTransportResponseHandler.java new file mode 100644 index 0000000000000..45a9ddd2d6fba --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/StreamTransportResponseHandler.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.transport.stream.StreamTransportResponse; + +/** + * A handler specialized for streaming transport responses. + *

+ * Responsibilities: + *

    + *
  • Process streaming responses via {@link #handleStreamResponse(StreamTransportResponse)}.
  • + *
  • Close the stream with {@link StreamTransportResponse#close()} after processing.
  • + *
  • Call {@link StreamTransportResponse#cancel(String, Throwable)} for errors or early termination.
  • + *
+ *

+ * Non-streaming responses are not supported and will throw an {@link UnsupportedOperationException}. + *

+ * Example: + *

{@code
+ * public void handleStreamResponse(StreamTransportResponse response) {
+ *     try {
+ *         while (true) {
+ *             T result = response.nextResponse();
+ *             if (result == null) break;
+ *             // Process result...
+ *         }
+ *     } catch (Exception e) {
+ *         response.cancel("Processing error", e);
+ *         throw e;
+ *     } finally {
+ *         response.close();
+ *     }
+ * }
+ * }
+ * + * @opensearch.api + */ +@ExperimentalApi +public interface StreamTransportResponseHandler extends TransportResponseHandler { + + /** + * Default implementation throws UnsupportedOperationException since streaming handlers + * should only handle streaming responses + */ + @Override + default void handleResponse(T response) { + throw new UnsupportedOperationException("handleResponse is not supported for streaming handlers"); + } +} diff --git a/server/src/main/java/org/opensearch/transport/StreamTransportService.java b/server/src/main/java/org/opensearch/transport/StreamTransportService.java new file mode 100644 index 0000000000000..6535e9c8fda41 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/StreamTransportService.java @@ -0,0 +1,137 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Nullable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.transport.BoundTransportAddress; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.threadpool.ThreadPool; + +import java.util.function.Function; + +import static org.opensearch.discovery.HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING; +import static org.opensearch.discovery.HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING; + +/** + * Transport service for streaming requests, handling StreamTransportResponse. + * @opensearch.internal + */ +public class StreamTransportService extends TransportService { + private static final Logger logger = LogManager.getLogger(StreamTransportService.class); + public static final Setting STREAM_TRANSPORT_REQ_TIMEOUT_SETTING = Setting.timeSetting( + "transport.stream.request_timeout", + TimeValue.timeValueMinutes(5), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private volatile TimeValue streamTransportReqTimeout; + + public StreamTransportService( + Settings settings, + Transport streamTransport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + TaskManager taskManager, + RemoteClusterService remoteClusterService, + Tracer tracer + ) { + super( + settings, + streamTransport, + threadPool, + transportInterceptor, + localNodeFactory, + // it's a single channel profile and let underlying client handle parallelism by creating multiple channels as needed + new ClusterConnectionManager( + ConnectionProfile.buildSingleChannelProfile( + TransportRequestOptions.Type.STREAM, + PROBE_CONNECT_TIMEOUT_SETTING.get(settings), + PROBE_HANDSHAKE_TIMEOUT_SETTING.get(settings), + TimeValue.MINUS_ONE, + false + ), + streamTransport + ), + tracer, + taskManager, + remoteClusterService, + true + ); + this.streamTransportReqTimeout = STREAM_TRANSPORT_REQ_TIMEOUT_SETTING.get(settings); + if (clusterSettings != null) { + clusterSettings.addSettingsUpdateConsumer(STREAM_TRANSPORT_REQ_TIMEOUT_SETTING, this::setStreamTransportReqTimeout); + } + } + + @Override + public void sendChildRequest( + final Transport.Connection connection, + final String action, + final TransportRequest request, + final Task parentTask, + final TransportResponseHandler handler + ) { + sendChildRequest( + connection, + action, + request, + parentTask, + TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).withTimeout(streamTransportReqTimeout).build(), + handler + ); + } + + @Override + public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { + if (isLocalNode(node)) { + listener.onResponse(null); + return; + } + // TODO: add logic for validation + final ActionListener wrappedListener = ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { + logger.warn("Failed to connect to streaming node [{}]: {}", node, exception.getMessage()); + listener.onFailure(new ConnectTransportException(node, "Failed to connect for streaming", exception)); + }); + + connectionManager.connectToNode( + node, + connectionProfile, + (connection, profile, listener1) -> listener1.onResponse(null), + wrappedListener + ); + } + + @Override + public Transport.Connection getConnection(DiscoveryNode node) { + try { + return connectionManager.getConnection(node); + } catch (Exception e) { + logger.error("Failed to get streaming connection to node [{}]: {}", node, e.getMessage()); + throw new ConnectTransportException(node, "Failed to get streaming connection", e); + } + } + + private void setStreamTransportReqTimeout(TimeValue streamTransportReqTimeout) { + this.streamTransportReqTimeout = streamTransportReqTimeout; + } +} diff --git a/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java b/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java index 4dab0039ec878..c93c121833dde 100644 --- a/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TaskTransportChannel.java @@ -73,6 +73,20 @@ public void sendResponse(TransportResponse response) throws IOException { } } + @Override + public void sendResponseBatch(TransportResponse response) { + channel.sendResponseBatch(response); + } + + @Override + public void completeStream() { + try { + onTaskFinished.close(); + } finally { + channel.completeStream(); + } + } + @Override public void sendResponse(Exception exception) throws IOException { try { diff --git a/server/src/main/java/org/opensearch/transport/TransportChannel.java b/server/src/main/java/org/opensearch/transport/TransportChannel.java index 7b6715ff2c73d..bca653ba12f3c 100644 --- a/server/src/main/java/org/opensearch/transport/TransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TransportChannel.java @@ -36,8 +36,11 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.transport.stream.StreamErrorCode; +import org.opensearch.transport.stream.StreamException; import java.io.IOException; import java.util.Optional; @@ -56,6 +59,30 @@ public interface TransportChannel { String getChannelType(); + /** + * Sends a batch of responses to the request that this channel is associated with. + * Call {@link #completeStream()} on a successful completion. + * For errors, use {@link #sendResponse(Exception)} and do not call {@link #completeStream()} + * Do not use {@link #sendResponse} in conjunction with this method if you are sending a batch of responses. + * + * @param response the batch of responses to send + * @throws StreamException with {@link StreamErrorCode#CANCELLED} if the stream has been canceled. + * Do not call this method again or completeStream() once canceled. + */ + @ExperimentalApi + default void sendResponseBatch(TransportResponse response) { + throw new UnsupportedOperationException(); + } + + /** + * Call this method on a successful completion the streaming response. + * Note: not calling this method on success will result in a memory leak + */ + @ExperimentalApi + default void completeStream() { + throw new UnsupportedOperationException(); + } + void sendResponse(TransportResponse response) throws IOException; void sendResponse(Exception exception) throws IOException; diff --git a/server/src/main/java/org/opensearch/transport/TransportMessageListener.java b/server/src/main/java/org/opensearch/transport/TransportMessageListener.java index 284c4646655c5..bd0f1c49db4e8 100644 --- a/server/src/main/java/org/opensearch/transport/TransportMessageListener.java +++ b/server/src/main/java/org/opensearch/transport/TransportMessageListener.java @@ -32,8 +32,10 @@ package org.opensearch.transport; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.transport.stream.StreamTransportResponse; /** * Listens for transport messages @@ -62,6 +64,9 @@ default void onRequestReceived(long requestId, String action) {} */ default void onResponseSent(long requestId, String action, TransportResponse response) {} + @ExperimentalApi + default void onStreamResponseSent(long requestId, String action, StreamTransportResponse response) {} + /*** * Called for every failed action response after the response has been passed to the underlying network implementation. * @param requestId the request ID (unique per client) diff --git a/server/src/main/java/org/opensearch/transport/TransportRequestOptions.java b/server/src/main/java/org/opensearch/transport/TransportRequestOptions.java index 9f44d93f0cd71..375c8f2042170 100644 --- a/server/src/main/java/org/opensearch/transport/TransportRequestOptions.java +++ b/server/src/main/java/org/opensearch/transport/TransportRequestOptions.java @@ -72,7 +72,8 @@ public enum Type { BULK, REG, STATE, - PING + PING, + STREAM } public static Builder builder() { diff --git a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java index 748d2a4d867ec..d7c14eaf53303 100644 --- a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java @@ -32,10 +32,12 @@ package org.opensearch.transport; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.transport.stream.StreamTransportResponse; import java.io.IOException; import java.util.function.Function; @@ -50,6 +52,45 @@ public interface TransportResponseHandler extends W void handleResponse(T response); + /** + * Processes a streaming transport response containing multiple batches. + *

+ * Responsibilities: + *

    + *
  • Iterate over responses using {@link StreamTransportResponse#nextResponse()}.
  • + *
  • Close the stream with {@link StreamTransportResponse#close()} after processing.
  • + *
  • Call {@link StreamTransportResponse#cancel(String, Throwable)} for errors, timeouts, or early termination.
  • + *
+ *

+ * Exceptions from {@code nextResponse()} are propagated to the caller. Other errors + * (e.g., connection issues or timeouts before streaming starts) trigger + * {@link #handleException(TransportException)}. + *

+ * Example: + *

{@code
+     * public void handleStreamResponse(StreamTransportResponse response) {
+     *     try {
+     *         while (true) {
+     *             T result = response.nextResponse();
+     *             if (result == null) break;
+     *             // Process result...
+     *         }
+     *     } catch (Exception e) {
+     *         response.cancel("Processing error", e);
+     *         throw e;
+     *     } finally {
+     *         response.close();
+     *     }
+     * }
+     * }
+ * + * @param response the streaming response, which must be closed by the handler + */ + @ExperimentalApi + default void handleStreamResponse(StreamTransportResponse response) { + throw new UnsupportedOperationException("Streaming responses not supported by this handler"); + } + void handleException(TransportException exp); String executor(); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index fe8631aa5ca3d..ed64aa1229517 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -74,6 +74,7 @@ import org.opensearch.telemetry.tracing.handler.TraceableTransportResponseHandler; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.stream.StreamTransportResponse; import java.io.IOException; import java.io.UncheckedIOException; @@ -116,10 +117,11 @@ public class TransportService extends AbstractLifecycleComponent protected final ClusterName clusterName; protected final TaskManager taskManager; private final TransportInterceptor.AsyncSender asyncSender; - private final Function localNodeFactory; + protected final Function localNodeFactory; private final boolean remoteClusterClient; - private final Transport.ResponseHandlers responseHandlers; - private final TransportInterceptor interceptor; + protected final Transport.ResponseHandlers responseHandlers; + protected final TransportInterceptor interceptor; + private final Transport streamTransport; // An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they // do show up, we can print more descriptive information about them @@ -142,7 +144,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { volatile String[] tracerLogExclude; private final RemoteClusterService remoteClusterService; - private final Tracer tracer; + protected final Tracer tracer; /** if set will call requests sent to this id to shortcut and executed locally */ volatile DiscoveryNode localNode = null; @@ -183,6 +185,29 @@ public void close() {} /** does nothing. easy way to ensure class is loaded so the above static block is called to register the streamables */ public static void ensureClassloaded() {} + public TransportService( + Settings settings, + Transport transport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + Set taskHeaders, + Tracer tracer + ) { + this( + settings, + transport, + threadPool, + transportInterceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + new ClusterConnectionManager(settings, transport), + tracer + ); + } + /** * Build the service. * @@ -192,6 +217,7 @@ public static void ensureClassloaded() {} public TransportService( Settings settings, Transport transport, + @Nullable Transport streamTransport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function localNodeFactory, @@ -202,6 +228,7 @@ public TransportService( this( settings, transport, + streamTransport, threadPool, transportInterceptor, localNodeFactory, @@ -222,8 +249,67 @@ public TransportService( Set taskHeaders, ConnectionManager connectionManager, Tracer tracer + ) { + this( + settings, + transport, + null, + threadPool, + transportInterceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + connectionManager, + tracer + ); + } + + TransportService( + Settings settings, + Transport streamTransport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + ConnectionManager connectionManager, + Tracer tracer, + TaskManager taskManager, + RemoteClusterService remoteClusterService, + boolean streamTransportMode + ) { + if (!streamTransportMode) { + throw new IllegalStateException("Constructor only supported to construct StreamTransportService"); + } + this.transport = streamTransport; + this.streamTransport = streamTransport; + streamTransport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); + this.threadPool = threadPool; + this.localNodeFactory = localNodeFactory; + this.connectionManager = connectionManager; + this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); + tracerLog = Loggers.getLogger(logger, ".tracer"); + this.taskManager = taskManager; + this.interceptor = transportInterceptor; + this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); + this.remoteClusterClient = false; + this.tracer = tracer; + this.remoteClusterService = remoteClusterService; + responseHandlers = streamTransport.getResponseHandlers(); + } + + public TransportService( + Settings settings, + Transport transport, + @Nullable Transport streamTransport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + Set taskHeaders, + ConnectionManager connectionManager, + Tracer tracer ) { this.transport = transport; + this.streamTransport = streamTransport; transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); this.threadPool = threadPool; this.localNodeFactory = localNodeFactory; @@ -310,8 +396,14 @@ protected void doStart() { logger.info("profile [{}]: {}", entry.getKey(), entry.getValue()); } } - localNode = localNodeFactory.apply(transport.boundAddress()); - + // TODO: Making localNodeFactory BiConsumer is a bigger change since it should accept both default transport and + // stream publish address + synchronized (this) { + localNode = localNodeFactory.apply(transport.boundAddress()); + if (streamTransport != null) { + localNode = new DiscoveryNode(localNode, streamTransport.boundAddress().publishAddress()); + } + } if (remoteClusterClient) { // here we start to connect to the remote clusters remoteClusterService.initializeRemoteClusters( @@ -1027,7 +1119,7 @@ protected void doRun() throws Exception { } } - private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) { + protected void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) { final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool); try { onRequestSent(localNode, requestId, action, request, options); @@ -1123,7 +1215,7 @@ public TransportAddress[] addressesFromString(String address) throws UnknownHost ) ); - private void validateActionName(String actionName) { + protected void validateActionName(String actionName) { // TODO we should makes this a hard validation and throw an exception but we need a good way to add backwards layer // for it. Maybe start with a deprecation layer if (isValidActionName(actionName) == false) { @@ -1496,6 +1588,16 @@ public void handleResponse(T response) { } } + @Override + public void handleStreamResponse(StreamTransportResponse response) { + if (handler != null) { + handler.cancel(); + } + try (ThreadContext.StoredContext ignore = contextSupplier.get()) { + delegate.handleStreamResponse(response); + } + } + @Override public void handleException(TransportException exp) { if (handler != null) { @@ -1643,7 +1745,7 @@ public ThreadPool getThreadPool() { return threadPool; } - private boolean isLocalNode(DiscoveryNode discoveryNode) { + protected boolean isLocalNode(DiscoveryNode discoveryNode) { return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode); } @@ -1693,7 +1795,7 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } } - private void sendRequestAsync( + protected void sendRequestAsync( final Transport.Connection connection, final String action, final TransportRequest request, @@ -1713,6 +1815,11 @@ public void handleResponse(T response) { handler.handleResponse(response); } + @Override + public void handleStreamResponse(StreamTransportResponse response) { + handler.handleStreamResponse(response); + } + @Override public void handleException(TransportException exp) { unregisterChildNode.close(); diff --git a/server/src/main/java/org/opensearch/transport/client/Client.java b/server/src/main/java/org/opensearch/transport/client/Client.java index ba71bdb0304aa..2196ef15b92f3 100644 --- a/server/src/main/java/org/opensearch/transport/client/Client.java +++ b/server/src/main/java/org/opensearch/transport/client/Client.java @@ -314,6 +314,11 @@ public interface Client extends OpenSearchClient, Releasable { */ SearchRequestBuilder prepareSearch(String... indices); + /** + * Search across one or more indices with a query. + */ + SearchRequestBuilder prepareStreamSearch(String... indices); + /** * A search scroll request to continue searching a previous scrollable search request. * diff --git a/server/src/main/java/org/opensearch/transport/client/node/NodeClient.java b/server/src/main/java/org/opensearch/transport/client/node/NodeClient.java index bfc0253c4c2d2..161934f417d51 100644 --- a/server/src/main/java/org/opensearch/transport/client/node/NodeClient.java +++ b/server/src/main/java/org/opensearch/transport/client/node/NodeClient.java @@ -35,6 +35,7 @@ import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionType; +import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.support.TransportAction; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.annotation.PublicApi; @@ -157,4 +158,9 @@ public Client getRemoteClusterClient(String clusterAlias) { public NamedWriteableRegistry getNamedWriteableRegistry() { return namedWriteableRegistry; } + + @Override + public SearchRequestBuilder prepareStreamSearch(String... indices) { + throw new UnsupportedOperationException("Stream search is not supported in NodeClient"); + } } diff --git a/server/src/main/java/org/opensearch/transport/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/transport/client/support/AbstractClient.java index bfd64ebb571a3..498d6483c2e3d 100644 --- a/server/src/main/java/org/opensearch/transport/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/transport/client/support/AbstractClient.java @@ -408,6 +408,7 @@ import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.action.search.SearchScrollRequestBuilder; +import org.opensearch.action.search.StreamSearchAction; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.clustermanager.AcknowledgedResponse; import org.opensearch.action.termvectors.MultiTermVectorsAction; @@ -636,6 +637,11 @@ public SearchRequestBuilder prepareSearch(String... indices) { return new SearchRequestBuilder(this, SearchAction.INSTANCE).setIndices(indices); } + @Override + public SearchRequestBuilder prepareStreamSearch(String... indices) { + return new SearchRequestBuilder(this, StreamSearchAction.INSTANCE).setIndices(indices); + } + @Override public ActionFuture searchScroll(final SearchScrollRequest request) { return execute(SearchScrollAction.INSTANCE, request); diff --git a/server/src/main/java/org/opensearch/transport/stream/StreamErrorCode.java b/server/src/main/java/org/opensearch/transport/stream/StreamErrorCode.java new file mode 100644 index 0000000000000..75a2c41018613 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/stream/StreamErrorCode.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.stream; + +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Error codes for streaming transport operations, inspired by gRPC and Arrow Flight error codes. + * These codes provide standardized error categories for stream-based transports + * like Arrow Flight RPC. + * + */ +@ExperimentalApi +public enum StreamErrorCode { + /** + * Operation completed successfully. + */ + OK(0), + + /** + * The operation was cancelled, typically by the caller. + */ + CANCELLED(1), + + /** + * Unknown error. An example of where this error may be returned is + * if a Status value received from another address space belongs to + * an error-space that is not known in this address space. + */ + UNKNOWN(2), + + /** + * Client specified an invalid argument. Note that this differs + * from INVALID_ARGUMENT. INVALID_ARGUMENT indicates arguments + * that are problematic regardless of the state of the system. + */ + INVALID_ARGUMENT(3), + + /** + * Deadline expired before operation could complete. + */ + TIMED_OUT(4), + + /** + * Some requested entity (e.g., file or directory) was not found. + */ + NOT_FOUND(5), + + /** + * Some entity that we attempted to create (e.g., file or directory) already exists. + */ + ALREADY_EXISTS(6), + + /** + * The caller does not have permission to execute the specified operation. + * This can be due to lack of authentication. + */ + UNAUTHENTICATED(7), + + /** + * The caller does not have permission to execute the specified operation. + * This is used when the caller is authenticated but lacks permissions. + */ + UNAUTHORIZED(8), + + /** + * Some resource has been exhausted, perhaps a per-user quota, or + * perhaps the entire file system is out of space. + */ + RESOURCE_EXHAUSTED(9), + + /** + * Operation is not implemented or not supported/enabled in this service. + */ + UNIMPLEMENTED(10), + + /** + * Internal errors. Means some invariants expected by underlying + * system has been broken. Or there is some server side bug + */ + INTERNAL(11), + + /** + * The service is currently unavailable. + */ + UNAVAILABLE(12); + + private final int code; + + StreamErrorCode(int code) { + this.code = code; + } + + /** + * Returns the numeric code of this status. + */ + public int code() { + return code; + } + + /** + * Return a StreamErrorCode from a numeric value. + * + * @param code the numeric code + * @return the corresponding StreamErrorCode or UNKNOWN if not recognized + */ + public static StreamErrorCode fromCode(int code) { + for (StreamErrorCode value : values()) { + if (value.code == code) { + return value; + } + } + return UNKNOWN; + } +} diff --git a/server/src/main/java/org/opensearch/transport/stream/StreamException.java b/server/src/main/java/org/opensearch/transport/stream/StreamException.java new file mode 100644 index 0000000000000..ba8c28b198441 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/stream/StreamException.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.stream; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportException; + +import java.io.IOException; +import java.util.Objects; + +/** + * Exception for streaming transport operations with standardized error codes. + * This provides a consistent error model for stream-based transports like Arrow Flight RPC. + * + */ +@ExperimentalApi +public class StreamException extends TransportException { + + private final StreamErrorCode errorCode; + + public StreamException(StreamInput streamInput) throws IOException { + super(streamInput); + this.errorCode = StreamErrorCode.fromCode(streamInput.read()); + } + + /** + * Creates a new StreamException with the given error code and message. + * + * @param errorCode the error code + * @param message the error message + */ + public StreamException(StreamErrorCode errorCode, String message) { + this(errorCode, message, null); + } + + /** + * Creates a new StreamException with the given error code, message, and cause. + * + * @param errorCode the error code + * @param message the error message + * @param cause the cause of this exception + */ + public StreamException(StreamErrorCode errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = Objects.requireNonNull(errorCode); + } + + /** + * Returns the error code for this exception. + * + * @return the error code + */ + public StreamErrorCode getErrorCode() { + return errorCode; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("StreamException[errorCode=").append(errorCode); + if (getMessage() != null) { + sb.append(", message=").append(getMessage()); + } + if (!metadata.isEmpty()) { + sb.append(", metadata=").append(metadata); + } + sb.append("]"); + return sb.toString(); + } + + /** + * Creates a CANCELLED exception. + * This is thrown when attempting to send response batches on a cancelled stream. + * Once a stream is cancelled, no further response batches can be sent. + * + * @param message the error message + * @return a new StreamException with CANCELLED error code + */ + public static StreamException cancelled(String message) { + return new StreamException(StreamErrorCode.CANCELLED, message); + } + + /** + * Creates a CANCELLED exception with a cause. + * This is thrown when attempting to send response batches on a cancelled stream. + * Once a stream is cancelled, no further response batches can be sent. + * + * @param message the error message + * @param cause the cause of this exception + * @return a new StreamException with CANCELLED error code + */ + public static StreamException cancelled(String message, Throwable cause) { + return new StreamException(StreamErrorCode.CANCELLED, message, cause); + } + + /** + * Creates an UNAVAILABLE exception. + * + * @param message the error message + * @return a new StreamException with UNAVAILABLE error code + */ + public static StreamException unavailable(String message) { + return new StreamException(StreamErrorCode.UNAVAILABLE, message); + } + + /** + * Creates an INTERNAL exception. + * + * @param message the error message + * @param cause the cause of this exception + * @return a new StreamException with INTERNAL error code + */ + public static StreamException internal(String message, Throwable cause) { + return new StreamException(StreamErrorCode.INTERNAL, message, cause); + } + + /** + * Creates a RESOURCE_EXHAUSTED exception. + * + * @param message the error message + * @return a new StreamException with RESOURCE_EXHAUSTED error code + */ + public static StreamException resourceExhausted(String message) { + return new StreamException(StreamErrorCode.RESOURCE_EXHAUSTED, message); + } + + /** + * Creates an UNAUTHENTICATED exception. + * + * @param message the error message + * @return a new StreamException with UNAUTHENTICATED error code + */ + public static StreamException unauthenticated(String message) { + return new StreamException(StreamErrorCode.UNAUTHENTICATED, message); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.write(errorCode.code()); + } +} diff --git a/server/src/main/java/org/opensearch/transport/stream/StreamTransportResponse.java b/server/src/main/java/org/opensearch/transport/stream/StreamTransportResponse.java new file mode 100644 index 0000000000000..06ad42ddfb711 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/stream/StreamTransportResponse.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.stream; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.transport.TransportResponse; + +import java.io.Closeable; + +/** + * Represents a streaming transport response that yields multiple response batches. + *

+ * Responsibilities: + *

    + *
  • Iterate over responses using {@link #nextResponse()} until {@code null} is returned.
  • + *
  • Close the stream using {@link #close()} after processing to prevent resource leaks.
  • + *
  • Call {@link #cancel(String, Throwable)} for early termination, client-side errors, or timeouts.
  • + *
+ *

+ * The framework may call {@code cancel} for internal errors, propagating exceptions to the caller. + */ +@ExperimentalApi +public interface StreamTransportResponse extends Closeable { + + /** + * Retrieves the next response in the stream. + *

+ * This may block if responses are not buffered on the wire, depending on the server's + * backpressure strategy. Returns {@code null} when the stream is exhausted. + *

+ * Exceptions during fetching are propagated to the caller. The framework may call + * {@link #cancel(String, Throwable)} for internal errors. + * + * @return the next response, or {@code null} if the stream is exhausted + */ + T nextResponse(); + + /** + * Cancels the stream due to client-side errors, timeouts, or early termination. + *

+ * The {@code reason} should describe the cause (e.g., "Client timeout"), and + * {@code cause} may provide additional details (or be {@code null}). + * + * @param reason the reason for cancellation + * @param cause the underlying exception, if any + */ + void cancel(String reason, Throwable cause); +} diff --git a/server/src/main/java/org/opensearch/transport/stream/StreamingTransportChannel.java b/server/src/main/java/org/opensearch/transport/stream/StreamingTransportChannel.java new file mode 100644 index 0000000000000..5656cf48756ca --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/stream/StreamingTransportChannel.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.stream; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.transport.TransportChannel; + +import java.io.IOException; + +/** + * A TransportChannel that supports streaming responses. + *

+ * Streaming channels allow sending multiple response batches for a single request. + * Once a stream is cancelled (either by client or due to error), subsequent calls + * to {@link #sendResponseBatch(TransportResponse)} will throw {@link StreamException} with + * {@link StreamErrorCode#CANCELLED}. + * At this point, no action is needed as the underlying channel is already closed and call to + * completeStream() will fail. + */ +@ExperimentalApi +public interface StreamingTransportChannel extends TransportChannel { + + // TODO: introduce a way to poll for cancellation in addition to current way of detection i.e. depending on channel + // throwing StreamException with CANCELLED error code. + /** + * Sends a batch of responses to the request that this channel is associated with. + * Call {@link #completeStream()} on a successful completion. + * For errors, use {@link #sendResponse(Exception)} and do not call {@link #completeStream()} + * Do not use {@link #sendResponse} in conjunction with this method if you are sending a batch of responses. + * + * @param response the batch of responses to send + * @throws StreamException with {@link StreamErrorCode#CANCELLED} if the stream has been canceled. + * Do not call this method again or completeStream() once canceled. + */ + void sendResponseBatch(TransportResponse response) throws StreamException; + + /** + * Completes the streaming response, indicating no more batches will be sent. + * Note: not calling this method on success will result in a memory leak + */ + void completeStream(); + + @Override + default void sendResponse(TransportResponse response) throws IOException { + throw new UnsupportedOperationException("sendResponse() is not supported for streaming requests in StreamingTransportChannel"); + } +} diff --git a/server/src/main/java/org/opensearch/transport/stream/package-info.java b/server/src/main/java/org/opensearch/transport/stream/package-info.java new file mode 100644 index 0000000000000..5db15437b17b3 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/stream/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Streaming transport response interfaces and implementations. + * This package provides support for streaming responses in OpenSearch transport layer. + */ +package org.opensearch.transport.stream; diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index 59d20655151c1..d011826e81af4 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -128,6 +128,7 @@ import org.opensearch.transport.TcpTransport; import org.opensearch.transport.client.node.AbstractClientHeadersTestCase; import org.opensearch.transport.client.transport.NoNodeAvailableException; +import org.opensearch.transport.stream.StreamException; import java.io.EOFException; import java.io.FileNotFoundException; @@ -900,6 +901,7 @@ public void testIds() { ids.put(174, InvalidIndexContextException.class); ids.put(175, ResponseLimitBreachedException.class); ids.put(176, IngestionEngineException.class); + ids.put(177, StreamException.class); ids.put(10001, IndexCreateBlockException.class); Map, Integer> reverse = new HashMap<>(); diff --git a/server/src/test/java/org/opensearch/transport/ConnectionProfileTests.java b/server/src/test/java/org/opensearch/transport/ConnectionProfileTests.java index d3a20e9b68e34..640900cfa7e1b 100644 --- a/server/src/test/java/org/opensearch/transport/ConnectionProfileTests.java +++ b/server/src/test/java/org/opensearch/transport/ConnectionProfileTests.java @@ -76,8 +76,12 @@ public void testBuildConnectionProfile() { builder.addConnections(1, TransportRequestOptions.Type.BULK); builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY); builder.addConnections(3, TransportRequestOptions.Type.PING); + IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, builder::build); - assertEquals("not all types are added for this connection profile - missing types: [REG]", illegalStateException.getMessage()); + assertEquals( + "not all types are added for this connection profile - missing types: [REG, STREAM]", + illegalStateException.getMessage() + ); IllegalArgumentException illegalArgumentException = expectThrows( IllegalArgumentException.class, @@ -85,11 +89,12 @@ public void testBuildConnectionProfile() { ); assertEquals("type [PING] is already registered", illegalArgumentException.getMessage()); builder.addConnections(4, TransportRequestOptions.Type.REG); + builder.addConnections(1, TransportRequestOptions.Type.STREAM); ConnectionProfile build = builder.build(); if (randomBoolean()) { build = new ConnectionProfile.Builder(build).build(); } - assertEquals(10, build.getNumConnections()); + assertEquals(11, build.getNumConnections()); if (setConnectTimeout) { assertEquals(connectTimeout, build.getConnectTimeout()); } else { @@ -114,12 +119,12 @@ public void testBuildConnectionProfile() { assertNull(build.getPingInterval()); } - List list = new ArrayList<>(10); - for (int i = 0; i < 10; i++) { + List list = new ArrayList<>(11); + for (int i = 0; i < 11; i++) { list.add(i); } final int numIters = randomIntBetween(5, 10); - assertEquals(4, build.getHandles().size()); + assertEquals(5, build.getHandles().size()); assertEquals(0, build.getHandles().get(0).offset); assertEquals(1, build.getHandles().get(0).length); assertEquals(EnumSet.of(TransportRequestOptions.Type.BULK), build.getHandles().get(0).getTypes()); @@ -155,11 +160,20 @@ public void testBuildConnectionProfile() { assertThat(channel, Matchers.anyOf(Matchers.is(6), Matchers.is(7), Matchers.is(8), Matchers.is(9))); } + assertEquals(10, build.getHandles().get(4).offset); + assertEquals(1, build.getHandles().get(4).length); + assertEquals(EnumSet.of(TransportRequestOptions.Type.STREAM), build.getHandles().get(4).getTypes()); + channel = build.getHandles().get(4).getChannel(list); + for (int i = 0; i < numIters; i++) { + assertEquals(10, channel.intValue()); + } + assertEquals(3, build.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); assertEquals(4, build.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(1, build.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + assertEquals(1, build.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM)); } public void testNoChannels() { @@ -169,7 +183,8 @@ public void testNoChannels() { TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY, - TransportRequestOptions.Type.REG + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STREAM ); builder.addConnections(0, TransportRequestOptions.Type.PING); ConnectionProfile build = builder.build(); @@ -188,6 +203,7 @@ public void testConnectionProfileResolve() { builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG); builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE); builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STREAM); final boolean connectionTimeoutSet = randomBoolean(); if (connectionTimeoutSet) { @@ -235,6 +251,7 @@ public void testDefaultConnectionProfile() { assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM)); assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getConnectTimeout()); assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getHandshakeTimeout()); assertEquals(TransportSettings.TRANSPORT_COMPRESS.get(Settings.EMPTY), profile.getCompressionEnabled()); @@ -247,6 +264,7 @@ public void testDefaultConnectionProfile() { assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM)); profile = ConnectionProfile.buildDefaultConnectionProfile(nonDataNode()); assertEquals(11, profile.getNumConnections()); @@ -255,6 +273,7 @@ public void testDefaultConnectionProfile() { assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM)); profile = ConnectionProfile.buildDefaultConnectionProfile( removeRoles( @@ -267,5 +286,6 @@ public void testDefaultConnectionProfile() { assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM)); } } diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index b83e77891b92f..8297e6b066cde 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.MockInternalClusterInfoService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -219,6 +220,7 @@ protected ScriptService newScriptService(Settings settings, Map localNodeFactory, @@ -234,6 +236,7 @@ protected TransportService newTransportService( return super.newTransportService( settings, transport, + streamTransport, threadPool, interceptor, localNodeFactory, @@ -245,6 +248,7 @@ protected TransportService newTransportService( return new MockTransportService( settings, transport, + streamTransport, threadPool, interceptor, localNodeFactory, diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java index 6bf5381b62cc9..d7668d089690e 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java @@ -120,6 +120,7 @@ public static MockTransportService createNewService(Settings settings, Version v return createNewService(settings, version, threadPool, null, tracer); } + // TODO: we need to add support for mock version of StreamTransportService public static MockTransportService createNewService( Settings settings, Version version, @@ -237,12 +238,47 @@ public MockTransportService( Set taskHeaders, Tracer tracer ) { - this(settings, new StubbableTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer); + this( + settings, + new StubbableTransport(transport), + null, + threadPool, + interceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + tracer + ); + } + + public MockTransportService( + Settings settings, + Transport transport, + @Nullable Transport streamTransport, + ThreadPool threadPool, + TransportInterceptor interceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + Set taskHeaders, + Tracer tracer + ) { + this( + settings, + new StubbableTransport(transport), + streamTransport != null ? new StubbableTransport(streamTransport) : null, + threadPool, + interceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + tracer + ); } private MockTransportService( Settings settings, StubbableTransport transport, + @Nullable StubbableTransport streamTransport, ThreadPool threadPool, TransportInterceptor interceptor, Function localNodeFactory, @@ -253,6 +289,7 @@ private MockTransportService( super( settings, transport, + streamTransport, threadPool, interceptor, localNodeFactory, diff --git a/test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java index f0f2d452faf8d..2da8cdc78ec33 100644 --- a/test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java @@ -2177,6 +2177,7 @@ public void testTimeoutPerConnection() throws IOException { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); // connection with one connection and a large timeout -- should consume the one spot in the backlog queue try (TransportService service = buildService("TS_TPC", Version.CURRENT, null, Settings.EMPTY, true, false)) { IOUtils.close(service.openConnection(first, builder.build())); @@ -2213,6 +2214,7 @@ public void testHandshakeWithIncompatVersion() { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); expectThrows(ConnectTransportException.class, () -> serviceA.openConnection(node, builder.build())); } } @@ -2234,6 +2236,7 @@ public void testHandshakeUpdatesVersion() throws IOException { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); try (Transport.Connection connection = serviceA.openConnection(node, builder.build())) { assertEquals(version, connection.getVersion()); } @@ -2305,6 +2308,7 @@ public void testTcpHandshakeTimeout() throws IOException { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); builder.setHandshakeTimeout(TimeValue.timeValueMillis(1)); ConnectTransportException ex = expectThrows( ConnectTransportException.class, @@ -2347,6 +2351,7 @@ public void run() { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); builder.setHandshakeTimeout(TimeValue.timeValueHours(1)); ConnectTransportException ex = expectThrows( ConnectTransportException.class, @@ -2470,6 +2475,8 @@ public String executor() { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); + try (Transport.Connection connection = serviceB.openConnection(serviceC.getLocalNode(), builder.build())) { serviceC.close(); serviceB.sendRequest( @@ -2541,6 +2548,7 @@ public String executor() { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); try (Transport.Connection connection = serviceB.openConnection(serviceC.getLocalNode(), builder.build())) { serviceB.sendRequest( @@ -2621,6 +2629,7 @@ public String executor() { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here TransportStats transportStats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake @@ -2742,6 +2751,7 @@ public String executor() { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here TransportStats transportStats = serviceC.transport.getStats(); // request has been sent @@ -3027,6 +3037,7 @@ public void onConnectionClosed(Transport.Connection connection) { TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); final ConnectTransportException e = expectThrows( ConnectTransportException.class, () -> service.openConnection(nodeA, builder.build()) diff --git a/test/framework/src/main/java/org/opensearch/transport/TestProfiles.java b/test/framework/src/main/java/org/opensearch/transport/TestProfiles.java index 312f80255e05a..3f31fc1d31352 100644 --- a/test/framework/src/main/java/org/opensearch/transport/TestProfiles.java +++ b/test/framework/src/main/java/org/opensearch/transport/TestProfiles.java @@ -59,6 +59,7 @@ private TestProfiles() {} TransportRequestOptions.Type.REG, TransportRequestOptions.Type.STATE ); + builder.addConnections(0, TransportRequestOptions.Type.STREAM); LIGHT_PROFILE = builder.build(); } }