diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java index 588fae409dc9..e8ec181a684d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java @@ -117,10 +117,8 @@ private void addReplChangeManagerConfigs() throws Exception { private void addCompactorConfigs() { MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEANER_ON, true); - MetastoreConf.setVar(conf, ConfVars.HIVE_METASTORE_RUNWORKER_IN, "metastore"); MetastoreConf.setLongVar(conf, ConfVars.COMPACTOR_WORKER_THREADS, 1); threadClasses.put(Initiator.class, false); - threadClasses.put(Worker.class, false); threadClasses.put(Cleaner.class, false); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 252defbf7448..5dc40b2ba9fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -64,7 +64,6 @@ public void run() { try (ExecutorService compactionExecutor = CompactorUtil.createExecutorWithThreadFactory( conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE), COMPACTOR_INTIATOR_THREAD_NAME_FORMAT)) { - recoverFailedCompactions(false); TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); // Make sure we run through the loop once before checking to stop as this makes testing @@ -81,6 +80,9 @@ public void run() { startedAt = System.currentTimeMillis(); prevStart = handle.getLastUpdateTime(); + // Check for timed out workers. + recoverFailedCompactions(); + if (metricsEnabled) { perfLogger.perfLogBegin(CLASS_NAME, MetricsConstants.COMPACTION_INITIATOR_CYCLE); stopCycleUpdater(); @@ -159,8 +161,6 @@ public void run() { //Use get instead of join, so we can receive InterruptedException and shutdown gracefully CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0])).get(); - // Check for timed out remote workers. - recoverFailedCompactions(true); handle.releaseLocks(startedAt); } catch (InterruptedException e) { // do not ignore interruption requests @@ -235,8 +235,7 @@ private TableOptimizer instantiateTableOptimizer(String className) { } } - private void recoverFailedCompactions(boolean remoteOnly) throws MetaException { - if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname()); + private void recoverFailedCompactions() throws MetaException { txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS)); } diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 11040823495c..ab39b274a4bf 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -686,38 +686,6 @@ private void countCompactionsInHistory(String dbName, String tableName, String p assertEquals(expextedRefused, filteredToPartition.stream().filter(e -> e.getState().equals(TxnStore.REFUSED_RESPONSE)).count()); } - @Test - public void testRevokeFromLocalWorkers() throws Exception { - CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); - txnHandler.compact(rqst); - rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR); - txnHandler.compact(rqst); - assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION))); - assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("bob-193892", WORKER_VERSION))); - assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193893", WORKER_VERSION))); - txnHandler.revokeFromLocalWorkers("fred"); - - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - assertEquals(3, compacts.size()); - boolean sawWorkingBob = false; - int initiatedCount = 0; - for (ShowCompactResponseElement c : compacts) { - if (c.getState().equals("working")) { - assertEquals("bob-193892", c.getWorkerid()); - sawWorkingBob = true; - } else if (c.getState().equals("initiated")) { - initiatedCount++; - } else { - fail("Unexpected state"); - } - } - assertTrue(sawWorkingBob); - assertEquals(2, initiatedCount); - } - @Test public void testRevokeTimedOutWorkers() throws Exception { CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 636550aeaca8..1b2d6e1701a3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -73,7 +73,7 @@ public void nothing() throws Exception { } @Test - public void recoverFailedLocalWorkers() throws Exception { + public void recoverFailedWorkers() throws Exception { Table t = newTable("default", "rflw1", false); CompactionRequest rqst = new CompactionRequest("default", "rflw1", CompactionType.MINOR); txnHandler.compact(rqst); @@ -85,40 +85,16 @@ public void recoverFailedLocalWorkers() throws Exception { txnHandler.findNextToCompact(aFindNextCompactRequest(ServerUtils.hostname() + "-193892", WORKER_VERSION)); txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION)); - startInitiator(); - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(2, compacts.size()); - boolean sawInitiated = false; - for (ShowCompactResponseElement c : compacts) { - if (c.getState().equals("working")) { - Assert.assertEquals("nosuchhost-193892", c.getWorkerid()); - } else if (c.getState().equals("initiated")) { - sawInitiated = true; - } else { - Assert.fail("Unexpected state"); - } - } - Assert.assertTrue(sawInitiated); - } - - @Test - public void recoverFailedRemoteWorkers() throws Exception { - Table t = newTable("default", "rfrw1", false); - CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR); - txnHandler.compact(rqst); - - txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION)); + rsp.getCompacts().forEach(ce -> + Assert.assertEquals("working", ce.getState())); conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS); - startInitiator(); - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); - Assert.assertEquals("initiated", compacts.get(0).getState()); + rsp = txnHandler.showCompact(new ShowCompactRequest()); + rsp.getCompacts().forEach(ce -> + Assert.assertEquals("initiated", ce.getState())); } @Test diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 048033cbcf3c..c145283c1206 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -558,11 +558,9 @@ private void initHAHealthChecker(HttpServer webServer, HiveConf hiveConf) throws private void logCompactionParameters(HiveConf hiveConf) { LOG.info("Compaction HS2 parameters:"); - String runWorkerIn = MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN); - LOG.info("hive.metastore.runworker.in = {}", runWorkerIn); int numWorkers = MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS); LOG.info("metastore.compactor.worker.threads = {}", numWorkers); - if ("hs2".equals(runWorkerIn) && numWorkers < 1) { + if (numWorkers < 1) { LOG.warn("Invalid number of Compactor Worker threads({}) on HS2", numWorkers); } } @@ -1262,59 +1260,57 @@ private static void startHiveServer2() throws Throwable { public Map maybeStartCompactorThreads(HiveConf hiveConf) { Map startedWorkers = new HashMap<>(); - if (MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("hs2")) { - Ref numWorkers = new Ref<>(MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS)); - Map customPools = CompactorUtil.getPoolConf(hiveConf); - - StringBuilder sb = new StringBuilder(2048); - sb.append("This HS2 instance will act as Compactor with the following worker pool configuration:\n"); - sb.append("Global pool size: ").append(numWorkers.value).append("\n"); - - LOG.info("Initializing the compaction pools with using the global worker limit: {} ", numWorkers.value); - customPools.forEach((poolName, poolWorkers) -> { - if (poolWorkers == 0) { - LOG.warn("Pool not initialized, configured with zero workers: {}", poolName); - } - else if (numWorkers.value == 0) { - LOG.warn("Pool not initialized, no available workers remained: {}", poolName); - } - else { - if (poolWorkers > numWorkers.value) { - LOG.warn("Global worker pool exhausted, compaction pool ({}) will be configured with less workers than the " + - "required number. ({} -> {})", poolName, poolWorkers, numWorkers.value); - poolWorkers = numWorkers.value; - } + Ref numWorkers = new Ref<>(MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS)); + Map customPools = CompactorUtil.getPoolConf(hiveConf); - LOG.info("Initializing compaction pool ({}) with {} workers.", poolName, poolWorkers); - IntStream.range(0, poolWorkers).forEach(i -> { - Worker w = new Worker(); - w.setPoolName(poolName); - CompactorThread.initializeAndStartThread(w, hiveConf); - startedWorkers.compute(poolName, (k, v) -> (v == null) ? 1 : v + 1); - sb.append( - String.format("Worker - Name: %s, Pool: %s, Priority: %d", w.getName(), poolName, w.getPriority()) - ); - }); - numWorkers.value -= poolWorkers; - } - }); + StringBuilder sb = new StringBuilder(2048); + sb.append("This HS2 instance will act as Compactor with the following worker pool configuration:\n"); + sb.append("Global pool size: ").append(numWorkers.value).append("\n"); - if (numWorkers.value == 0) { - LOG.warn("No default compaction pool configured, all non-labeled compaction requests will remain unprocessed!"); - if (customPools.size() > 0) { - sb.append("Pool not initialized, no remaining free workers: default\n"); + LOG.info("Initializing the compaction pools with using the global worker limit: {} ", numWorkers.value); + customPools.forEach((poolName, poolWorkers) -> { + if (poolWorkers == 0) { + LOG.warn("Pool not initialized, configured with zero workers: {}", poolName); + } + else if (numWorkers.value == 0) { + LOG.warn("Pool not initialized, no available workers remained: {}", poolName); + } + else { + if (poolWorkers > numWorkers.value) { + LOG.warn("Global worker pool exhausted, compaction pool ({}) will be configured with less workers than the " + + "required number. ({} -> {})", poolName, poolWorkers, numWorkers.value); + poolWorkers = numWorkers.value; } - } else { - LOG.info("Initializing default compaction pool with {} workers.", numWorkers.value); - IntStream.range(0, numWorkers.value).forEach(i -> { + + LOG.info("Initializing compaction pool ({}) with {} workers.", poolName, poolWorkers); + IntStream.range(0, poolWorkers).forEach(i -> { Worker w = new Worker(); + w.setPoolName(poolName); CompactorThread.initializeAndStartThread(w, hiveConf); - startedWorkers.compute(Constants.COMPACTION_DEFAULT_POOL, (k, v) -> (v == null) ? 1 : v + 1); - sb.append("Worker - Name: ").append(w.getName()).append(", Pool: default, Priority: ").append(w.getPriority()).append("\n"); + startedWorkers.compute(poolName, (k, v) -> (v == null) ? 1 : v + 1); + sb.append( + String.format("Worker - Name: %s, Pool: %s, Priority: %d", w.getName(), poolName, w.getPriority()) + ); }); + numWorkers.value -= poolWorkers; } - LOG.info(sb.toString()); + }); + + if (numWorkers.value == 0) { + LOG.warn("No default compaction pool configured, all non-labeled compaction requests will remain unprocessed!"); + if (customPools.size() > 0) { + sb.append("Pool not initialized, no remaining free workers: default\n"); + } + } else { + LOG.info("Initializing default compaction pool with {} workers.", numWorkers.value); + IntStream.range(0, numWorkers.value).forEach(i -> { + Worker w = new Worker(); + CompactorThread.initializeAndStartThread(w, hiveConf); + startedWorkers.compute(Constants.COMPACTION_DEFAULT_POOL, (k, v) -> (v == null) ? 1 : v + 1); + sb.append("Worker - Name: ").append(w.getName()).append(", Pool: default, Priority: ").append(w.getPriority()).append("\n"); + }); } + LOG.info(sb.toString()); return startedWorkers; } diff --git a/service/src/test/org/apache/hive/service/server/TestHiveServer2.java b/service/src/test/org/apache/hive/service/server/TestHiveServer2.java index 7393dd6a77f0..42dbdb887957 100644 --- a/service/src/test/org/apache/hive/service/server/TestHiveServer2.java +++ b/service/src/test/org/apache/hive/service/server/TestHiveServer2.java @@ -32,7 +32,6 @@ public void testMaybeStartCompactorThreadsOneCustomPool() { HiveServer2 hs2 = new HiveServer2(); HiveConf conf = new HiveConf(); - MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2"); MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 1); conf.setInt("hive.compactor.worker.pool1.threads", 1); @@ -46,7 +45,6 @@ public void testMaybeStartCompactorThreadsZeroTotalWorkers() { HiveServer2 hs2 = new HiveServer2(); HiveConf conf = new HiveConf(); - MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2"); MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 0); conf.setInt("hive.compactor.worker.pool1.threads", 5); @@ -59,7 +57,6 @@ public void testMaybeStartCompactorThreadsZeroCustomWorkers() { HiveServer2 hs2 = new HiveServer2(); HiveConf conf = new HiveConf(); - MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2"); MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 5); Map startedWorkers = hs2.maybeStartCompactorThreads(conf); @@ -72,7 +69,6 @@ public void testMaybeStartCompactorThreadsMultipleCustomPools() { HiveServer2 hs2 = new HiveServer2(); HiveConf conf = new HiveConf(); - MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2"); MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 12); conf.setInt("hive.compactor.worker.pool1.threads", 3); conf.setInt("hive.compactor.worker.pool2.threads", 4); @@ -90,7 +86,6 @@ public void testMaybeStartCompactorThreadsMultipleCustomPoolsAndDefaultPool() { HiveServer2 hs2 = new HiveServer2(); HiveConf conf = new HiveConf(); - MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2"); MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 15); conf.setInt("hive.compactor.worker.pool1.threads", 3); conf.setInt("hive.compactor.worker.pool2.threads", 4); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index cff6209a6fc0..63a0d106d27c 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1774,16 +1774,6 @@ public enum ConfVars { "Batch size for partition and other object retrieval from the underlying DB in JDO.\n" + "The JDO implementation such as DataNucleus may run into issues when the generated queries are\n" + "too large. Use this parameter to break the query into multiple batches. -1 means no batching."), - /** - * @deprecated Deprecated due to HIVE-26443 - */ - @Deprecated - HIVE_METASTORE_RUNWORKER_IN("hive.metastore.runworker.in", - "hive.metastore.runworker.in", "hs2", new StringSetValidator("metastore", "hs2"), - "Deprecated. HMS side compaction workers doesn't support pooling. With the concept of compaction " + - "pools (HIVE-26443), running workers on HMS side is still supported but not suggested anymore. " + - "This config value will be removed in the future.\n" + - "Chooses where the compactor worker threads should run, Only possible values are \"metastore\" and \"hs2\""), // Hive values we have copied and use as is // These two are used to indicate that we are running tests HIVE_IN_TEST("hive.in.test", "hive.in.test", false, "internal usage only, true in test mode"), diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index e0368373a60c..1d9fbf3e3ae1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -837,12 +837,9 @@ public void run() { .setTType(LeaderElectionContext.TTYPE.HOUSEKEEPING) // housekeeping tasks .addListener(new CMClearer(conf)) .addListener(new StatsUpdaterTask(conf)) - .addListener(new CompactorTasks(conf, false)) + .addListener(new CompactorTasks(conf)) .addListener(new CompactorPMF()) .addListener(new HouseKeepingTasks(conf, true)) - .setTType(LeaderElectionContext.TTYPE.WORKER) // compactor worker - .addListener(new CompactorTasks(conf, true), - MetastoreConf.getVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore")) .build(); if (shutdownHookMgr != null) { shutdownHookMgr.addShutdownHook(() -> context.close(), 0); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java index fcc51402bb59..956a5e5371f9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java @@ -36,16 +36,14 @@ public class CompactorTasks implements LeaderElection.LeadershipStateListener { private final Configuration configuration; - private final boolean runOnlyWorker; // each MetaStoreThread runs as a thread private Map metastoreThreadsMap; - public CompactorTasks(Configuration configuration, boolean runOnlyWorker) { + public CompactorTasks(Configuration configuration) { // recreate a new configuration this.configuration = new Configuration(requireNonNull(configuration, "configuration is null")); - this.runOnlyWorker = runOnlyWorker; } // Copied from HiveMetaStore @@ -62,66 +60,41 @@ private MetaStoreThread instantiateThread(String classname) throws Exception { public List getCompactorThreads() throws Exception { List compactors = new ArrayList<>(); - if (!runOnlyWorker) { - if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) { - MetaStoreThread initiator = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator"); - compactors.add(initiator); - } - if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) { - MetaStoreThread cleaner = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner"); - compactors.add(cleaner); - } - } else { - boolean runInMetastore = MetastoreConf.getVar(configuration, - MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore"); - if (runInMetastore) { - HiveMetaStore.LOG.warn("Running compaction workers on HMS side is not suggested because compaction pools are not supported in HMS " + - "(HIVE-26443). Consider removing the hive.metastore.runworker.in configuration setting, as it will be " + - "comletely removed in future releases."); - int numWorkers = MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS); - for (int i = 0; i < numWorkers; i++) { - MetaStoreThread worker = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Worker"); - compactors.add(worker); - } - } + if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) { + MetaStoreThread initiator = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator"); + compactors.add(initiator); + } + if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) { + MetaStoreThread cleaner = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner"); + compactors.add(cleaner); } return compactors; } private void logCompactionParameters() { - if (!runOnlyWorker) { - HiveMetaStore.LOG.info("Compaction HMS parameters:"); - HiveMetaStore.LOG.info("metastore.compactor.initiator.on = {}", - MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)); - HiveMetaStore.LOG.info("metastore.compactor.cleaner.on = {}", - MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)); - HiveMetaStore.LOG.info("metastore.compactor.worker.threads = {}", - MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS)); - HiveMetaStore.LOG.info("hive.metastore.runworker.in = {}", - MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN)); - HiveMetaStore.LOG.info("metastore.compactor.history.retention.did.not.initiate = {}", - MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE)); - HiveMetaStore.LOG.info("metastore.compactor.history.retention.failed = {}", - MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED)); - HiveMetaStore.LOG.info("metastore.compactor.history.retention.succeeded = {}", - MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED)); - HiveMetaStore.LOG.info("metastore.compactor.initiator.failed.compacts.threshold = {}", - MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD)); - HiveMetaStore.LOG.info("metastore.compactor.enable.stats.compression", - MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_MINOR_STATS_COMPRESSION)); - - if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) { - HiveMetaStore.LOG.warn("Compactor Initiator is turned Off. Automatic compaction will not be triggered."); - } - if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) { - HiveMetaStore.LOG.warn("Compactor Cleaner is turned Off. Automatic compaction cleaner will not be triggered."); - } - - } else { - int numThreads = MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS); - if (numThreads < 1) { - HiveMetaStore.LOG.warn("Invalid number of Compactor Worker threads({}) on HMS", numThreads); - } + HiveMetaStore.LOG.info("Compaction HMS parameters:"); + HiveMetaStore.LOG.info("metastore.compactor.initiator.on = {}", + MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)); + HiveMetaStore.LOG.info("metastore.compactor.cleaner.on = {}", + MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)); + HiveMetaStore.LOG.info("metastore.compactor.worker.threads = {}", + MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS)); + HiveMetaStore.LOG.info("metastore.compactor.history.retention.did.not.initiate = {}", + MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE)); + HiveMetaStore.LOG.info("metastore.compactor.history.retention.failed = {}", + MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED)); + HiveMetaStore.LOG.info("metastore.compactor.history.retention.succeeded = {}", + MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED)); + HiveMetaStore.LOG.info("metastore.compactor.initiator.failed.compacts.threshold = {}", + MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD)); + HiveMetaStore.LOG.info("metastore.compactor.enable.stats.compression", + MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_MINOR_STATS_COMPRESSION)); + + if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) { + HiveMetaStore.LOG.warn("Compactor Initiator is turned Off. Automatic compaction will not be triggered."); + } + if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) { + HiveMetaStore.LOG.warn("Compactor Cleaner is turned Off. Automatic compaction cleaner will not be triggered."); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 91cfe3130331..393cb9443e5e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -258,34 +258,11 @@ public void cleanEmptyAbortedAndCommittedTxns() throws MetaException { } } - /** - * This will take all entries assigned to workers - * on a host return them to INITIATED state. The initiator should use this at start up to - * clean entries from any workers that were in the middle of compacting when the metastore - * shutdown. It does not reset entries from worker threads on other hosts as those may still - * be working. - * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, - * so that like hostname% will match the worker id. - */ - @Override - @RetrySemantics.Idempotent - public void revokeFromLocalWorkers(String hostname) throws MetaException { - jdbcResource.execute( - "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL," + - " \"CQ_STATE\" = :initiatedState WHERE \"CQ_STATE\" = :workingState AND \"CQ_WORKER_ID\" LIKE :hostname", - new MapSqlParameterSource() - .addValue("initiatedState", Character.toString(INITIATED_STATE), Types.CHAR) - .addValue("workingState", Character.toString(WORKING_STATE), Types.CHAR) - .addValue("hostname", hostname + "%"), - null); - } - /** * This call will return all compaction queue * entries assigned to a worker but over the timeout back to the initiated state. * This should be called by the initiator on start up and occasionally when running to clean up - * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called - * first. + * after dead threads. * @param timeout number of milliseconds since start time that should elapse before a worker is * declared dead. */ diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 42a80b189622..13f32f646302 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -741,26 +741,11 @@ Set findPotentialCompactions(int abortedThreshold, long abortedT @RetrySemantics.SafeToRetry void cleanEmptyAbortedAndCommittedTxns() throws MetaException; - /** - * This will take all entries assigned to workers - * on a host return them to INITIATED state. The initiator should use this at start up to - * clean entries from any workers that were in the middle of compacting when the metastore - * shutdown. It does not reset entries from worker threads on other hosts as those may still - * be working. - * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, - * so that like hostname% will match the worker id. - */ - @SqlRetry - @Transactional(POOL_COMPACTOR) - @RetrySemantics.Idempotent - void revokeFromLocalWorkers(String hostname) throws MetaException; - /** * This call will return all compaction queue * entries assigned to a worker but over the timeout back to the initiated state. * This should be called by the initiator on start up and occasionally when running to clean up - * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called - * first. + * after dead threads. * @param timeout number of milliseconds since start time that should elapse before a worker is * declared dead. */