From e15b25f683ad2ffcf18c1e1f621531966c5ee78b Mon Sep 17 00:00:00 2001 From: ning guo Date: Thu, 2 Oct 2025 14:51:26 +0200 Subject: [PATCH 1/2] Implement refreshCatalogsFromStore in CoordinatorDynamicCatalogManager --- .../CoordinatorDynamicCatalogManager.java | 69 +++++++++++++++++++ .../io/trino/metadata/CatalogManager.java | 2 + .../io/trino/spi/catalog/CatalogStore.java | 10 +++ 3 files changed, 81 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java b/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java index b1e50c9f0bb5..fdd1f194f588 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java +++ b/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java @@ -28,11 +28,13 @@ import io.trino.spi.catalog.CatalogName; import io.trino.spi.catalog.CatalogProperties; import io.trino.spi.catalog.CatalogStore; +import io.trino.spi.catalog.CatalogStore.StoredCatalog; import io.trino.spi.connector.CatalogVersion; import io.trino.spi.connector.ConnectorName; import jakarta.annotation.PreDestroy; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -93,6 +95,7 @@ public CoordinatorDynamicCatalogManager(CatalogStore catalogStore, CatalogFactor this.catalogStore = requireNonNull(catalogStore, "catalogStore is null"); this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.executor = requireNonNull(executor, "executor is null"); + catalogStore.setCatalogManager(this); } @PreDestroy @@ -329,4 +332,70 @@ public void dropCatalog(CatalogName catalogName, boolean exists) // Do not shut down the catalog, because there may still be running queries using this catalog. // Catalog shutdown logic will be added later. } + + /** + * Refresh catalogs from the configured catalog store. + * This method is called by catalog stores when they detect external changes. + */ + @Override + public void refreshCatalogsFromStore() + { + catalogsUpdateLock.lock(); + try { + if (state != State.INITIALIZED) { + return; + } + + // Get desired state from the catalog store + Collection desiredCatalogs = catalogStore.getCatalogs(); + Set desiredNames = desiredCatalogs.stream() + .map(StoredCatalog::name) + .collect(toImmutableSet()); + + Set currentNames = ImmutableSet.copyOf(activeCatalogs.keySet()); + + // Remove catalogs that should no longer exist + for (CatalogName catalogName : currentNames) { + if (!desiredNames.contains(catalogName)) { + activeCatalogs.remove(catalogName); + // Note: CatalogPruneTask will handle actual connector shutdown + } + } + + // Add or update catalogs that should exist + for (StoredCatalog desiredCatalog : desiredCatalogs) { + CatalogName desiredCatalogName = desiredCatalog.name(); + try { + CatalogProperties catalogProperties = desiredCatalog.loadProperties(); + + // Check if catalog exists and if version has changed + Catalog existingCatalog = activeCatalogs.get(desiredCatalogName); + boolean needsReload = existingCatalog == null || + !existingCatalog.getCatalogHandle().getVersion().equals(catalogProperties.version()); + + if (needsReload) { + log.debug("Loading/reloading catalog during refresh: %s", desiredCatalogName); + + // Remove old version if it exists + if (existingCatalog != null) { + activeCatalogs.remove(desiredCatalogName); + } + + // Create new catalog + CatalogConnector newCatalog = catalogFactory.createCatalog(catalogProperties); + activeCatalogs.put(desiredCatalogName, newCatalog.getCatalog()); + allCatalogs.put(newCatalog.getCatalogHandle(), newCatalog); + } + } + catch (Throwable e) { + log.error(e, "Failed to load catalog %s during refresh", desiredCatalogName); + } + } + + log.debug("Catalog refresh completed. Active catalogs: %s", activeCatalogs.keySet()); + } + finally { + catalogsUpdateLock.unlock(); + } + } } diff --git a/core/trino-main/src/main/java/io/trino/metadata/CatalogManager.java b/core/trino-main/src/main/java/io/trino/metadata/CatalogManager.java index 0df9e9da6f07..3e872ff40606 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/CatalogManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/CatalogManager.java @@ -75,4 +75,6 @@ public void dropCatalog(CatalogName catalogName, boolean exists) void createCatalog(CatalogName catalogName, ConnectorName connectorName, Map properties, boolean notExists); void dropCatalog(CatalogName catalogName, boolean exists); + + default void refreshCatalogsFromStore(){} } diff --git a/core/trino-spi/src/main/java/io/trino/spi/catalog/CatalogStore.java b/core/trino-spi/src/main/java/io/trino/spi/catalog/CatalogStore.java index 79967e9bacd1..809d80efce80 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/catalog/CatalogStore.java +++ b/core/trino-spi/src/main/java/io/trino/spi/catalog/CatalogStore.java @@ -42,6 +42,16 @@ public interface CatalogStore */ void removeCatalog(CatalogName catalogName); + /** + * Set the catalog manager. This allows catalog stores to trigger + * catalog reloads when they detect external changes. + * Default implementation does nothing for backward compatibility. + */ + default void setCatalogManager(Object catalogManager) + { + // Default no-op implementation + } + interface StoredCatalog { CatalogName name(); From a68e7fc57d878445ed4448f30abec978b276de4d Mon Sep 17 00:00:00 2001 From: ning guo Date: Mon, 6 Oct 2025 18:24:53 +0200 Subject: [PATCH 2/2] Add test for catalogStore that initiates a refresh if the contents of catalog changed --- .../TestFilePollingCatalogStore.java | 380 ++++++++++++++++++ 1 file changed, 380 insertions(+) create mode 100644 core/trino-main/src/test/java/io/trino/connector/TestFilePollingCatalogStore.java diff --git a/core/trino-main/src/test/java/io/trino/connector/TestFilePollingCatalogStore.java b/core/trino-main/src/test/java/io/trino/connector/TestFilePollingCatalogStore.java new file mode 100644 index 000000000000..30d20a495a6b --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/connector/TestFilePollingCatalogStore.java @@ -0,0 +1,380 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.connector; + +import com.google.common.hash.Hashing; +import io.airlift.concurrent.Threads; +import io.airlift.log.Logger; +import io.airlift.units.Duration; +import io.trino.metadata.Catalog; +import io.trino.metadata.CatalogManager; +import io.trino.spi.catalog.CatalogName; +import io.trino.spi.catalog.CatalogProperties; +import io.trino.spi.catalog.CatalogStore; +import io.trino.spi.connector.CatalogVersion; +import io.trino.spi.connector.ConnectorName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestFilePollingCatalogStore +{ + @Test + void testFileChangeTriggersRefresh(@TempDir Path catalogDirectory) + throws IOException, InterruptedException + { + Path catalogFile = catalogDirectory.resolve("my_catalog.properties"); + Files.writeString(catalogFile, "connector.name=memory"); + + TestCatalogManager manager = new TestCatalogManager(); + try (FilePollingCatalogStore store = new FilePollingCatalogStore(catalogDirectory, "100ms")) { + manager.setCatalogStore(store); + store.setCatalogManager(manager); + manager.loadInitialCatalogs(); + + // Verify initial state - my_catalog should be loaded + assertThat(manager.getCatalogNames()).containsExactly(new CatalogName("my_catalog")); + assertThat(manager.getCatalog(new CatalogName("my_catalog"))).isPresent(); + assertThat(manager.getCatalog(new CatalogName("my_catalog")).get().getConnectorName()) + .isEqualTo(new ConnectorName("memory")); + + // Wait for initial poll - should not trigger a refresh since nothing changed + Thread.sleep(150); + assertThat(manager.getRefreshCount()).isEqualTo(0); + + // Test: Modify a file - external change triggers refresh + Files.writeString(catalogFile, "connector.name=elasticsearch\nnew.prop=true"); + waitForRefreshCount(manager, 1); + assertThat(manager.getRefreshCount()).isEqualTo(1); + // Assert on activeCatalogs - should contain the updated catalog + assertThat(manager.getCatalogNames()).containsExactly(new CatalogName("my_catalog")); + assertThat(manager.getCatalog(new CatalogName("my_catalog"))).isPresent(); + assertThat(manager.getCatalog(new CatalogName("my_catalog")).get().getConnectorName()) + .isEqualTo(new ConnectorName("elasticsearch")); + + // Test: Add a new file - external addition triggers refresh + Path newCatalogFile = catalogDirectory.resolve("new_catalog.properties"); + Files.writeString(newCatalogFile, "connector.name=tpch"); + waitForRefreshCount(manager, 2); + assertThat(manager.getRefreshCount()).isEqualTo(2); + // Assert on activeCatalogs - should contain both catalogs + assertThat(manager.getCatalogNames()).containsExactlyInAnyOrder( + new CatalogName("my_catalog"), + new CatalogName("new_catalog")); + assertThat(manager.getCatalog(new CatalogName("my_catalog")).get().getConnectorName()) + .isEqualTo(new ConnectorName("elasticsearch")); + assertThat(manager.getCatalog(new CatalogName("new_catalog")).get().getConnectorName()) + .isEqualTo(new ConnectorName("tpch")); + + // Test: Delete a file - external deletion triggers refresh + Files.delete(catalogFile); + waitForRefreshCount(manager, 3); + assertThat(manager.getRefreshCount()).isEqualTo(3); + // Assert on activeCatalogs - my_catalog should be removed, only new_catalog remains + assertThat(manager.getCatalogNames()).containsExactly(new CatalogName("new_catalog")); + assertThat(manager.getCatalog(new CatalogName("my_catalog"))).isEmpty(); + assertThat(manager.getCatalog(new CatalogName("new_catalog"))).isPresent(); + assertThat(manager.getCatalog(new CatalogName("new_catalog")).get().getConnectorName()) + .isEqualTo(new ConnectorName("tpch")); + } + } + + private static void waitForRefreshCount(TestCatalogManager manager, int expectedCount) + throws InterruptedException + { + long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < endTime) { + if (manager.getRefreshCount() >= expectedCount) { + return; + } + Thread.sleep(10); + } + throw new AssertionError("Expected refresh count " + expectedCount + " but got " + manager.getRefreshCount()); + } + + private static class FilePollingCatalogStore + implements CatalogStore, Closeable + { + private static final Logger log = Logger.get(FilePollingCatalogStore.class); + private final Path directory; + private final ScheduledExecutorService executor; + private final Duration pollInterval; + private final AtomicLong lastHash = new AtomicLong(0); + + private volatile CatalogManager catalogManager; + + public FilePollingCatalogStore(Path directory, String pollInterval) + { + this.directory = directory; + this.pollInterval = Duration.valueOf(pollInterval); + this.executor = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed("file-poller")); + } + + @Override + public void setCatalogManager(Object catalogManager) + { + if (!(catalogManager instanceof CatalogManager cm)) { + throw new IllegalArgumentException("catalogManager must be an instance of CatalogManager"); + } + this.catalogManager = cm; + startPolling(); + } + + private void startPolling() + { + executor.scheduleWithFixedDelay(() -> { + try { + pollForChanges(); + } + catch (Throwable e) { + log.error(e, "Error during polling"); + } + }, pollInterval.toMillis(), pollInterval.toMillis(), TimeUnit.MILLISECONDS); + } + + private void pollForChanges() + { + long currentHash = computeStateHash(); + if (lastHash.get() != 0 && currentHash != lastHash.get()) { + log.info("Detected change in catalog directory. Triggering refresh."); + catalogManager.refreshCatalogsFromStore(); + } + lastHash.set(currentHash); + } + + private long computeStateHash() + { + try (Stream files = Files.list(directory)) { + var hasher = Hashing.murmur3_128().newHasher(); + files.filter(path -> path.toString().endsWith(".properties")) + .sorted() + .forEach(path -> { + try { + hasher.putString(path.getFileName().toString(), UTF_8); + hasher.putBytes(Files.readAllBytes(path)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + return hasher.hash().asLong(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public Collection getCatalogs() + { + try (Stream files = Files.list(directory)) { + return files.filter(path -> path.toString().endsWith(".properties")) + .map(path -> { + String catalogName = path.getFileName().toString().replace(".properties", ""); + return new StoredCatalog() + { + @Override + public CatalogName name() + { + return new CatalogName(catalogName); + } + + @Override + public CatalogProperties loadProperties() + { + try { + Properties props = new Properties(); + props.load(Files.newBufferedReader(path)); + Map properties = new ConcurrentHashMap<>(); + props.forEach((key, value) -> properties.put(key.toString(), value.toString())); + + String connectorName = properties.get("connector.name"); + if (connectorName == null) { + throw new IllegalStateException("connector.name is required"); + } + return createCatalogProperties( + new CatalogName(catalogName), + new ConnectorName(connectorName), + properties); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }; + }) + .collect(java.util.stream.Collectors.toList()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public CatalogProperties createCatalogProperties(CatalogName catalogName, ConnectorName connectorName, Map properties) + { + return new CatalogProperties(catalogName, new CatalogVersion("1"), connectorName, properties); + } + + @Override + public void addOrReplaceCatalog(CatalogProperties catalogProperties) + { + throw new UnsupportedOperationException(); + } + + @Override + public void removeCatalog(CatalogName catalogName) + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() + { + executor.shutdownNow(); + } + } + + private static class TestCatalogManager + implements CatalogManager + { + private final AtomicInteger refreshCount = new AtomicInteger(0); + private volatile CatalogStore catalogStore; + + /** + * Track active catalog properties instead of full Catalog objects. + * This is simpler and more focused on what we're actually testing. + */ + private final ConcurrentMap activeCatalogs = new ConcurrentHashMap<>(); + + public void setCatalogStore(CatalogStore catalogStore) + { + this.catalogStore = catalogStore; + } + + @Override + public Set getCatalogNames() + { + return activeCatalogs.keySet(); + } + + @Override + public Optional getCatalog(CatalogName catalogName) + { + // For testing, return a failed catalog if the name is tracked + CatalogProperties props = activeCatalogs.get(catalogName); + if (props == null) { + return Optional.empty(); + } + return Optional.of(Catalog.failedCatalog(props.name(), props.version(), props.connectorName())); + } + + @Override + public Optional getCatalogProperties(CatalogHandle catalogHandle) + { + return Optional.empty(); + } + + @Override + public Set getActiveCatalogs() + { + return activeCatalogs.values().stream() + .map(props -> CatalogHandle.createRootCatalogHandle(props.name(), props.version())) + .collect(java.util.stream.Collectors.toSet()); + } + + @Override + public void createCatalog(CatalogName catalogName, ConnectorName connectorName, Map properties, boolean notExists) + { + } + + @Override + public void dropCatalog(CatalogName catalogName, boolean exists) + { + } + + @Override + public void refreshCatalogsFromStore() + { + if (catalogStore != null) { + // Build snapshot of what should exist + Set desiredNames = new java.util.HashSet<>(); + + for (CatalogStore.StoredCatalog storedCatalog : catalogStore.getCatalogs()) { + CatalogProperties props = storedCatalog.loadProperties(); + CatalogName catalogName = storedCatalog.name(); + desiredNames.add(catalogName); + + // Store the catalog properties - always replace to catch property changes + activeCatalogs.put(catalogName, props); + } + + // Remove catalogs that should no longer exist + Set currentNames = new java.util.HashSet<>(activeCatalogs.keySet()); + for (CatalogName catalogName : currentNames) { + if (!desiredNames.contains(catalogName)) { + activeCatalogs.remove(catalogName); + } + } + } + refreshCount.incrementAndGet(); + } + + public void loadInitialCatalogs() + { + // Ensure the store is set + if (catalogStore == null) { + return; + } + + // Clear any previous state for a clean test + activeCatalogs.clear(); + + // Get all catalogs from the store, just like the real manager does on startup + for (CatalogStore.StoredCatalog storedCatalog : catalogStore.getCatalogs()) { + CatalogProperties props = storedCatalog.loadProperties(); + CatalogName catalogName = storedCatalog.name(); + + // Store the catalog properties + activeCatalogs.put(catalogName, props); + } + } + + public int getRefreshCount() + { + return refreshCount.get(); + } + } +}