Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Optimizations

Bug Fixes
---------------------
(No changes)
* SOLR-15045: `DistributedZkUpdateProcessor` now issues commits to local shards and remote shards in parallel,
halving the latency of synchronous commits (Michael Gibney)

Other Changes
---------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
// zk
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));

// TODO: revisit the need for tracking `issuedDistribCommit` -- see below, and SOLR-15045
boolean issuedDistribCommit = false;
List<SolrCmdDistributor.Node> useNodes = null;
if (req.getParams().get(COMMIT_END_POINT) == null) {
useNodes = nodes;
Expand All @@ -204,11 +206,16 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribCommit(cmd, useNodes, params);
cmdDistrib.blockAndDoRetries();
issuedDistribCommit = true;
}
}

if (isLeader) {
if (issuedDistribCommit) {
// defensive copy of params, which was passed into distribCommit(...) above; will unconditionally replace
// DISTRIB_UPDATE_PARAM, COMMIT_END_POINT, and DISTRIB_FROM if the new `params` val will actually be used
params = new ModifiableSolrParams(params);
}
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());

params.set(COMMIT_END_POINT, "replicas");
Expand All @@ -219,14 +226,21 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));

// NOTE: distribCommit(...) internally calls `blockAndDoRetries()`, flushing any TOLEADER distrib commits
cmdDistrib.distribCommit(cmd, useNodes, params);
issuedDistribCommit = true;
}

doLocalCommit(cmd);

if (useNodes != null) {
cmdDistrib.blockAndDoRetries();
}
}
if (issuedDistribCommit) {
// TODO: according to discussion on SOLR-15045, this call (and all tracking of `issuedDistribCommit`) may
// well be superfluous, and can probably simply be removed. It is left in place for now, intentionally
// punting on the question of whether this internal `blockAndDoRetries()` is necessary. At worst, its
// presence is misleading; but it should be harmless, and allows the change fixing SOLR-15045 to be as
// tightly scoped as possible, leaving the behavior of the code otherwise functionally equivalent (for
// better or worse!)
cmdDistrib.blockAndDoRetries();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?xml version="1.0" ?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!--
Test Config for a simple Classification Update Request Processor Chain
-->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="solrconfig.snippet.randomindexconfig.xml"/>
<requestHandler name="/select" class="solr.SearchHandler"></requestHandler>
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
<schemaFactory class="ClassicIndexSchemaFactory"/>

<updateHandler class="solr.DirectUpdateHandler2">
<updateLog enable="${enable.update.log:true}">
<str name="dir">${solr.ulog.dir:}</str>
</updateLog>

<commitWithin>
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
</commitWithin>

</updateHandler>

<requestHandler name="/update" class="solr.UpdateRequestHandler">
<lst name="invariants">
<str name="update.chain">ensure-parallel-commit</str>
</lst>
</requestHandler>

<updateProcessor class="org.apache.solr.cloud.ParallelCommitExecutionTest$CheckFactory" name="check"/>

<updateRequestProcessorChain name="ensure-parallel-commit" post-processor="check">
<processor class="solr.RunUpdateProcessorFactory"/>
</updateRequestProcessorChain>
</config>
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;

import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class ParallelCommitExecutionTest extends SolrCloudTestCase {

private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName();
private static final String COLLECTION_NAME = DEBUG_LABEL + "_collection";

/** A basic client for operations at the cloud level, default collection will be set */
private static CloudSolrClient CLOUD_CLIENT;
private static int expectCount;

private static volatile CountDownLatch countdown;
private static final AtomicInteger countup = new AtomicInteger();

@BeforeClass
public static void beforeClass() throws Exception {
// multi replicas matters; for the initial parallel commit execution tests, only consider repFactor=1
final int repFactor = 1;//random().nextBoolean() ? 1 : 2;
final int numShards = TestUtil.nextInt(random(), 1, 4);
final int numNodes = (numShards * repFactor);
expectCount = numNodes;

final String configName = DEBUG_LABEL + "_config-set";
final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf");

configureCluster(numNodes).addConfig(configName, configDir).configure();

Map<String, String> collectionProperties = new LinkedHashMap<>();
collectionProperties.put("config", "solrconfig-parallel-commit.xml");
collectionProperties.put("schema", "schema_latest.xml");
CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
.setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
.setProperties(collectionProperties)
.process(cluster.getSolrClient());

CLOUD_CLIENT = cluster.getSolrClient();
CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
waitForRecoveriesToFinish(CLOUD_CLIENT);
}

@AfterClass
private static void afterClass() throws Exception {
if (null != CLOUD_CLIENT) {
CLOUD_CLIENT.close();
CLOUD_CLIENT = null;
}
}

private static void initSyncVars() {
final int ct;
ct = expectCount;
countdown = new CountDownLatch(ct);
countup.set(0);
}

@Test
public void testParallelOk() throws Exception {
initSyncVars();
CLOUD_CLIENT.commit(true, true);
assertEquals(0, countdown.getCount());
assertEquals(expectCount, countup.get());
}

public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception {
assert null != client.getDefaultCollection();
AbstractDistribZkTestBase.waitForRecoveriesToFinish(client.getDefaultCollection(),
client.getZkStateReader(),
true, true, 330);
}

public static class CheckFactory extends UpdateRequestProcessorFactory {
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
return new Check(next);
}
}

public static class Check extends UpdateRequestProcessor {

public Check(UpdateRequestProcessor next) {
super(next);
}

@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
super.processCommit(cmd);
countdown.countDown();
try {
// NOTE: this ensures that all commits are executed in parallel; no commit can complete successfully
// until all commits have entered the `processCommit(...)` method.
if (!countdown.await(5, TimeUnit.SECONDS)) {
throw new RuntimeException("done waiting");
}
countup.incrementAndGet();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}