Skip to content

Commit d195e4c

Browse files
authored
SOLR-15045: DistributedZkUpdateProcessor should issue commits to local shards and remote shards in parallel (#545)
1 parent 957c57d commit d195e4c

File tree

4 files changed

+212
-6
lines changed

4 files changed

+212
-6
lines changed

solr/CHANGES.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ Optimizations
2222

2323
Bug Fixes
2424
---------------------
25-
(No changes)
25+
* SOLR-15045: `DistributedZkUpdateProcessor` now issues commits to local shards and remote shards in parallel,
26+
halving the latency of synchronous commits (Michael Gibney)
2627

2728
Other Changes
2829
---------------------

solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
195195
// zk
196196
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
197197

198+
// TODO: revisit the need for tracking `issuedDistribCommit` -- see below, and SOLR-15045
199+
boolean issuedDistribCommit = false;
198200
List<SolrCmdDistributor.Node> useNodes = null;
199201
if (req.getParams().get(COMMIT_END_POINT) == null) {
200202
useNodes = nodes;
@@ -204,11 +206,16 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
204206
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
205207
zkController.getBaseUrl(), req.getCore().getName()));
206208
cmdDistrib.distribCommit(cmd, useNodes, params);
207-
cmdDistrib.blockAndDoRetries();
209+
issuedDistribCommit = true;
208210
}
209211
}
210212

211213
if (isLeader) {
214+
if (issuedDistribCommit) {
215+
// defensive copy of params, which was passed into distribCommit(...) above; will unconditionally replace
216+
// DISTRIB_UPDATE_PARAM, COMMIT_END_POINT, and DISTRIB_FROM if the new `params` val will actually be used
217+
params = new ModifiableSolrParams(params);
218+
}
212219
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
213220

214221
params.set(COMMIT_END_POINT, "replicas");
@@ -219,14 +226,21 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
219226
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
220227
zkController.getBaseUrl(), req.getCore().getName()));
221228

229+
// NOTE: distribCommit(...) internally calls `blockAndDoRetries()`, flushing any TOLEADER distrib commits
222230
cmdDistrib.distribCommit(cmd, useNodes, params);
231+
issuedDistribCommit = true;
223232
}
224233

225234
doLocalCommit(cmd);
226-
227-
if (useNodes != null) {
228-
cmdDistrib.blockAndDoRetries();
229-
}
235+
}
236+
if (issuedDistribCommit) {
237+
// TODO: according to discussion on SOLR-15045, this call (and all tracking of `issuedDistribCommit`) may
238+
// well be superfluous, and can probably simply be removed. It is left in place for now, intentionally
239+
// punting on the question of whether this internal `blockAndDoRetries()` is necessary. At worst, its
240+
// presence is misleading; but it should be harmless, and allows the change fixing SOLR-15045 to be as
241+
// tightly scoped as possible, leaving the behavior of the code otherwise functionally equivalent (for
242+
// better or worse!)
243+
cmdDistrib.blockAndDoRetries();
230244
}
231245
}
232246
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?xml version="1.0" ?>
2+
3+
<!--
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
-->
19+
20+
<!--
21+
Test Config for a simple Classification Update Request Processor Chain
22+
-->
23+
<config>
24+
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
25+
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="solrconfig.snippet.randomindexconfig.xml"/>
26+
<requestHandler name="/select" class="solr.SearchHandler"></requestHandler>
27+
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
28+
<schemaFactory class="ClassicIndexSchemaFactory"/>
29+
30+
<updateHandler class="solr.DirectUpdateHandler2">
31+
<updateLog enable="${enable.update.log:true}">
32+
<str name="dir">${solr.ulog.dir:}</str>
33+
</updateLog>
34+
35+
<commitWithin>
36+
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
37+
</commitWithin>
38+
39+
</updateHandler>
40+
41+
<requestHandler name="/update" class="solr.UpdateRequestHandler">
42+
<lst name="invariants">
43+
<str name="update.chain">ensure-parallel-commit</str>
44+
</lst>
45+
</requestHandler>
46+
47+
<updateProcessor class="org.apache.solr.cloud.ParallelCommitExecutionTest$CheckFactory" name="check"/>
48+
49+
<updateRequestProcessorChain name="ensure-parallel-commit" post-processor="check">
50+
<processor class="solr.RunUpdateProcessorFactory"/>
51+
</updateRequestProcessorChain>
52+
</config>
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.solr.cloud;
18+
19+
import org.apache.solr.request.SolrQueryRequest;
20+
import org.apache.solr.response.SolrQueryResponse;
21+
import org.apache.solr.update.CommitUpdateCommand;
22+
import org.apache.solr.update.processor.UpdateRequestProcessor;
23+
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
24+
25+
import java.io.IOException;
26+
import java.lang.invoke.MethodHandles;
27+
import java.nio.file.Path;
28+
import java.nio.file.Paths;
29+
import java.util.LinkedHashMap;
30+
import java.util.Map;
31+
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
35+
import org.apache.lucene.util.TestUtil;
36+
import org.apache.solr.client.solrj.impl.CloudSolrClient;
37+
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
38+
import org.junit.AfterClass;
39+
import org.junit.BeforeClass;
40+
import org.junit.Test;
41+
42+
public class ParallelCommitExecutionTest extends SolrCloudTestCase {
43+
44+
private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName();
45+
private static final String COLLECTION_NAME = DEBUG_LABEL + "_collection";
46+
47+
/** A basic client for operations at the cloud level, default collection will be set */
48+
private static CloudSolrClient CLOUD_CLIENT;
49+
private static int expectCount;
50+
51+
private static volatile CountDownLatch countdown;
52+
private static final AtomicInteger countup = new AtomicInteger();
53+
54+
@BeforeClass
55+
public static void beforeClass() throws Exception {
56+
// multi replicas matters; for the initial parallel commit execution tests, only consider repFactor=1
57+
final int repFactor = 1;//random().nextBoolean() ? 1 : 2;
58+
final int numShards = TestUtil.nextInt(random(), 1, 4);
59+
final int numNodes = (numShards * repFactor);
60+
expectCount = numNodes;
61+
62+
final String configName = DEBUG_LABEL + "_config-set";
63+
final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf");
64+
65+
configureCluster(numNodes).addConfig(configName, configDir).configure();
66+
67+
Map<String, String> collectionProperties = new LinkedHashMap<>();
68+
collectionProperties.put("config", "solrconfig-parallel-commit.xml");
69+
collectionProperties.put("schema", "schema_latest.xml");
70+
CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
71+
.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
72+
.setProperties(collectionProperties)
73+
.process(cluster.getSolrClient());
74+
75+
CLOUD_CLIENT = cluster.getSolrClient();
76+
CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
77+
waitForRecoveriesToFinish(CLOUD_CLIENT);
78+
}
79+
80+
@AfterClass
81+
private static void afterClass() throws Exception {
82+
if (null != CLOUD_CLIENT) {
83+
CLOUD_CLIENT.close();
84+
CLOUD_CLIENT = null;
85+
}
86+
}
87+
88+
private static void initSyncVars() {
89+
final int ct;
90+
ct = expectCount;
91+
countdown = new CountDownLatch(ct);
92+
countup.set(0);
93+
}
94+
95+
@Test
96+
public void testParallelOk() throws Exception {
97+
initSyncVars();
98+
CLOUD_CLIENT.commit(true, true);
99+
assertEquals(0, countdown.getCount());
100+
assertEquals(expectCount, countup.get());
101+
}
102+
103+
public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception {
104+
assert null != client.getDefaultCollection();
105+
AbstractDistribZkTestBase.waitForRecoveriesToFinish(client.getDefaultCollection(),
106+
client.getZkStateReader(),
107+
true, true, 330);
108+
}
109+
110+
public static class CheckFactory extends UpdateRequestProcessorFactory {
111+
@Override
112+
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
113+
return new Check(next);
114+
}
115+
}
116+
117+
public static class Check extends UpdateRequestProcessor {
118+
119+
public Check(UpdateRequestProcessor next) {
120+
super(next);
121+
}
122+
123+
@Override
124+
public void processCommit(CommitUpdateCommand cmd) throws IOException {
125+
super.processCommit(cmd);
126+
countdown.countDown();
127+
try {
128+
// NOTE: this ensures that all commits are executed in parallel; no commit can complete successfully
129+
// until all commits have entered the `processCommit(...)` method.
130+
if (!countdown.await(5, TimeUnit.SECONDS)) {
131+
throw new RuntimeException("done waiting");
132+
}
133+
countup.incrementAndGet();
134+
} catch (InterruptedException ex) {
135+
throw new RuntimeException(ex);
136+
}
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)