Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ private void addReplChangeManagerConfigs() throws Exception {
private void addCompactorConfigs() {
MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEANER_ON, true);
MetastoreConf.setVar(conf, ConfVars.HIVE_METASTORE_RUNWORKER_IN, "metastore");
MetastoreConf.setLongVar(conf, ConfVars.COMPACTOR_WORKER_THREADS, 1);
threadClasses.put(Initiator.class, false);
threadClasses.put(Worker.class, false);
threadClasses.put(Cleaner.class, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public void run() {
try (ExecutorService compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
COMPACTOR_INTIATOR_THREAD_NAME_FORMAT)) {
recoverFailedCompactions(false);
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();

// Make sure we run through the loop once before checking to stop as this makes testing
Expand All @@ -81,6 +80,9 @@ public void run() {
startedAt = System.currentTimeMillis();
prevStart = handle.getLastUpdateTime();

// Check for timed out workers.
recoverFailedCompactions();
Copy link
Member

@deniskuzZ deniskuzZ Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please add new line after and keep the comment: // Check for timed out workers.

Copy link
Contributor Author

@tanishq-chugh tanishq-chugh Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added in commit 8e7d43d


if (metricsEnabled) {
perfLogger.perfLogBegin(CLASS_NAME, MetricsConstants.COMPACTION_INITIATOR_CYCLE);
stopCycleUpdater();
Expand Down Expand Up @@ -159,8 +161,6 @@ public void run() {
//Use get instead of join, so we can receive InterruptedException and shutdown gracefully
CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0])).get();

// Check for timed out remote workers.
recoverFailedCompactions(true);
handle.releaseLocks(startedAt);
} catch (InterruptedException e) {
// do not ignore interruption requests
Expand Down Expand Up @@ -235,8 +235,7 @@ private TableOptimizer instantiateTableOptimizer(String className) {
}
}

private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname());
private void recoverFailedCompactions() throws MetaException {
txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,38 +686,6 @@ private void countCompactionsInHistory(String dbName, String tableName, String p
assertEquals(expextedRefused, filteredToPartition.stream().filter(e -> e.getState().equals(TxnStore.REFUSED_RESPONSE)).count());
}

@Test
public void testRevokeFromLocalWorkers() throws Exception {
CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
txnHandler.compact(rqst);
rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
txnHandler.compact(rqst);
rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
txnHandler.compact(rqst);
assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION)));
assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("bob-193892", WORKER_VERSION)));
assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193893", WORKER_VERSION)));
txnHandler.revokeFromLocalWorkers("fred");

ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
assertEquals(3, compacts.size());
boolean sawWorkingBob = false;
int initiatedCount = 0;
for (ShowCompactResponseElement c : compacts) {
if (c.getState().equals("working")) {
assertEquals("bob-193892", c.getWorkerid());
sawWorkingBob = true;
} else if (c.getState().equals("initiated")) {
initiatedCount++;
} else {
fail("Unexpected state");
}
}
assertTrue(sawWorkingBob);
assertEquals(2, initiatedCount);
}

@Test
public void testRevokeTimedOutWorkers() throws Exception {
CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void nothing() throws Exception {
}

@Test
public void recoverFailedLocalWorkers() throws Exception {
public void recoverFailedWorkers() throws Exception {
Table t = newTable("default", "rflw1", false);
CompactionRequest rqst = new CompactionRequest("default", "rflw1", CompactionType.MINOR);
txnHandler.compact(rqst);
Expand All @@ -85,40 +85,16 @@ public void recoverFailedLocalWorkers() throws Exception {
txnHandler.findNextToCompact(aFindNextCompactRequest(ServerUtils.hostname() + "-193892", WORKER_VERSION));
txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION));

startInitiator();

ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(2, compacts.size());
boolean sawInitiated = false;
for (ShowCompactResponseElement c : compacts) {
if (c.getState().equals("working")) {
Assert.assertEquals("nosuchhost-193892", c.getWorkerid());
} else if (c.getState().equals("initiated")) {
sawInitiated = true;
} else {
Assert.fail("Unexpected state");
}
}
Assert.assertTrue(sawInitiated);
}

@Test
public void recoverFailedRemoteWorkers() throws Exception {
Table t = newTable("default", "rfrw1", false);
CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR);
txnHandler.compact(rqst);

txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION));
rsp.getCompacts().forEach(ce ->
Assert.assertEquals("working", ce.getState()));

conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS);

startInitiator();

ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
Assert.assertEquals("initiated", compacts.get(0).getState());
rsp = txnHandler.showCompact(new ShowCompactRequest());
rsp.getCompacts().forEach(ce ->
Assert.assertEquals("initiated", ce.getState()));
}

@Test
Expand Down
92 changes: 44 additions & 48 deletions service/src/java/org/apache/hive/service/server/HiveServer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,9 @@ private void initHAHealthChecker(HttpServer webServer, HiveConf hiveConf) throws

private void logCompactionParameters(HiveConf hiveConf) {
LOG.info("Compaction HS2 parameters:");
String runWorkerIn = MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN);
LOG.info("hive.metastore.runworker.in = {}", runWorkerIn);
int numWorkers = MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
LOG.info("metastore.compactor.worker.threads = {}", numWorkers);
if ("hs2".equals(runWorkerIn) && numWorkers < 1) {
if (numWorkers < 1) {
LOG.warn("Invalid number of Compactor Worker threads({}) on HS2", numWorkers);
}
}
Expand Down Expand Up @@ -1262,59 +1260,57 @@ private static void startHiveServer2() throws Throwable {

public Map<String, Integer> maybeStartCompactorThreads(HiveConf hiveConf) {
Map<String, Integer> startedWorkers = new HashMap<>();
if (MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("hs2")) {
Ref<Integer> numWorkers = new Ref<>(MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
Map<String, Integer> customPools = CompactorUtil.getPoolConf(hiveConf);

StringBuilder sb = new StringBuilder(2048);
sb.append("This HS2 instance will act as Compactor with the following worker pool configuration:\n");
sb.append("Global pool size: ").append(numWorkers.value).append("\n");

LOG.info("Initializing the compaction pools with using the global worker limit: {} ", numWorkers.value);
customPools.forEach((poolName, poolWorkers) -> {
if (poolWorkers == 0) {
LOG.warn("Pool not initialized, configured with zero workers: {}", poolName);
}
else if (numWorkers.value == 0) {
LOG.warn("Pool not initialized, no available workers remained: {}", poolName);
}
else {
if (poolWorkers > numWorkers.value) {
LOG.warn("Global worker pool exhausted, compaction pool ({}) will be configured with less workers than the " +
"required number. ({} -> {})", poolName, poolWorkers, numWorkers.value);
poolWorkers = numWorkers.value;
}
Ref<Integer> numWorkers = new Ref<>(MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
Map<String, Integer> customPools = CompactorUtil.getPoolConf(hiveConf);

LOG.info("Initializing compaction pool ({}) with {} workers.", poolName, poolWorkers);
IntStream.range(0, poolWorkers).forEach(i -> {
Worker w = new Worker();
w.setPoolName(poolName);
CompactorThread.initializeAndStartThread(w, hiveConf);
startedWorkers.compute(poolName, (k, v) -> (v == null) ? 1 : v + 1);
sb.append(
String.format("Worker - Name: %s, Pool: %s, Priority: %d", w.getName(), poolName, w.getPriority())
);
});
numWorkers.value -= poolWorkers;
}
});
StringBuilder sb = new StringBuilder(2048);
sb.append("This HS2 instance will act as Compactor with the following worker pool configuration:\n");
sb.append("Global pool size: ").append(numWorkers.value).append("\n");

if (numWorkers.value == 0) {
LOG.warn("No default compaction pool configured, all non-labeled compaction requests will remain unprocessed!");
if (customPools.size() > 0) {
sb.append("Pool not initialized, no remaining free workers: default\n");
LOG.info("Initializing the compaction pools with using the global worker limit: {} ", numWorkers.value);
customPools.forEach((poolName, poolWorkers) -> {
if (poolWorkers == 0) {
LOG.warn("Pool not initialized, configured with zero workers: {}", poolName);
}
else if (numWorkers.value == 0) {
LOG.warn("Pool not initialized, no available workers remained: {}", poolName);
}
else {
if (poolWorkers > numWorkers.value) {
LOG.warn("Global worker pool exhausted, compaction pool ({}) will be configured with less workers than the " +
"required number. ({} -> {})", poolName, poolWorkers, numWorkers.value);
poolWorkers = numWorkers.value;
}
} else {
LOG.info("Initializing default compaction pool with {} workers.", numWorkers.value);
IntStream.range(0, numWorkers.value).forEach(i -> {

LOG.info("Initializing compaction pool ({}) with {} workers.", poolName, poolWorkers);
IntStream.range(0, poolWorkers).forEach(i -> {
Worker w = new Worker();
w.setPoolName(poolName);
CompactorThread.initializeAndStartThread(w, hiveConf);
startedWorkers.compute(Constants.COMPACTION_DEFAULT_POOL, (k, v) -> (v == null) ? 1 : v + 1);
sb.append("Worker - Name: ").append(w.getName()).append(", Pool: default, Priority: ").append(w.getPriority()).append("\n");
startedWorkers.compute(poolName, (k, v) -> (v == null) ? 1 : v + 1);
sb.append(
String.format("Worker - Name: %s, Pool: %s, Priority: %d", w.getName(), poolName, w.getPriority())
);
});
numWorkers.value -= poolWorkers;
}
LOG.info(sb.toString());
});

if (numWorkers.value == 0) {
LOG.warn("No default compaction pool configured, all non-labeled compaction requests will remain unprocessed!");
if (customPools.size() > 0) {
sb.append("Pool not initialized, no remaining free workers: default\n");
}
} else {
LOG.info("Initializing default compaction pool with {} workers.", numWorkers.value);
IntStream.range(0, numWorkers.value).forEach(i -> {
Worker w = new Worker();
CompactorThread.initializeAndStartThread(w, hiveConf);
startedWorkers.compute(Constants.COMPACTION_DEFAULT_POOL, (k, v) -> (v == null) ? 1 : v + 1);
sb.append("Worker - Name: ").append(w.getName()).append(", Pool: default, Priority: ").append(w.getPriority()).append("\n");
});
}
LOG.info(sb.toString());
return startedWorkers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public void testMaybeStartCompactorThreadsOneCustomPool() {
HiveServer2 hs2 = new HiveServer2();

HiveConf conf = new HiveConf();
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 1);
conf.setInt("hive.compactor.worker.pool1.threads", 1);

Expand All @@ -46,7 +45,6 @@ public void testMaybeStartCompactorThreadsZeroTotalWorkers() {
HiveServer2 hs2 = new HiveServer2();

HiveConf conf = new HiveConf();
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 0);
conf.setInt("hive.compactor.worker.pool1.threads", 5);

Expand All @@ -59,7 +57,6 @@ public void testMaybeStartCompactorThreadsZeroCustomWorkers() {
HiveServer2 hs2 = new HiveServer2();

HiveConf conf = new HiveConf();
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 5);

Map<String, Integer> startedWorkers = hs2.maybeStartCompactorThreads(conf);
Expand All @@ -72,7 +69,6 @@ public void testMaybeStartCompactorThreadsMultipleCustomPools() {
HiveServer2 hs2 = new HiveServer2();

HiveConf conf = new HiveConf();
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 12);
conf.setInt("hive.compactor.worker.pool1.threads", 3);
conf.setInt("hive.compactor.worker.pool2.threads", 4);
Expand All @@ -90,7 +86,6 @@ public void testMaybeStartCompactorThreadsMultipleCustomPoolsAndDefaultPool() {
HiveServer2 hs2 = new HiveServer2();

HiveConf conf = new HiveConf();
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 15);
conf.setInt("hive.compactor.worker.pool1.threads", 3);
conf.setInt("hive.compactor.worker.pool2.threads", 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1774,16 +1774,6 @@ public enum ConfVars {
"Batch size for partition and other object retrieval from the underlying DB in JDO.\n" +
"The JDO implementation such as DataNucleus may run into issues when the generated queries are\n" +
"too large. Use this parameter to break the query into multiple batches. -1 means no batching."),
/**
* @deprecated Deprecated due to HIVE-26443
*/
@Deprecated
HIVE_METASTORE_RUNWORKER_IN("hive.metastore.runworker.in",
"hive.metastore.runworker.in", "hs2", new StringSetValidator("metastore", "hs2"),
"Deprecated. HMS side compaction workers doesn't support pooling. With the concept of compaction " +
"pools (HIVE-26443), running workers on HMS side is still supported but not suggested anymore. " +
"This config value will be removed in the future.\n" +
"Chooses where the compactor worker threads should run, Only possible values are \"metastore\" and \"hs2\""),
// Hive values we have copied and use as is
// These two are used to indicate that we are running tests
HIVE_IN_TEST("hive.in.test", "hive.in.test", false, "internal usage only, true in test mode"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,12 +837,9 @@ public void run() {
.setTType(LeaderElectionContext.TTYPE.HOUSEKEEPING) // housekeeping tasks
.addListener(new CMClearer(conf))
.addListener(new StatsUpdaterTask(conf))
.addListener(new CompactorTasks(conf, false))
.addListener(new CompactorTasks(conf))
.addListener(new CompactorPMF())
.addListener(new HouseKeepingTasks(conf, true))
.setTType(LeaderElectionContext.TTYPE.WORKER) // compactor worker
.addListener(new CompactorTasks(conf, true),
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore"))
.build();
if (shutdownHookMgr != null) {
shutdownHookMgr.addShutdownHook(() -> context.close(), 0);
Expand Down
Loading