diff --git a/docs/routing-rules.md b/docs/routing-rules.md index 8133dbaec..54201e9ed 100644 --- a/docs/routing-rules.md +++ b/docs/routing-rules.md @@ -10,8 +10,19 @@ defaults to the `adhoc` group. ```yaml routing: defaultRoutingGroup: "test-group" + # Optional: cache backend metadata to reduce database look-ups + databaseCacheTTL: "5m" ``` +Set `databaseCacheTTL` to a non-zero [Airlift duration](https://airlift.github.io/airlift/units/) value to enable in-memory caching of backend +metadata retrieved from the gateway database. Trino Gateway caches the list of +backend clusters for the specified time and refreshes it asynchronously. Use +this setting to reduce database load and improve routing performance. + +A value of `0s` (the default) disables the cache and queries the database on +every request. + + The routing rules engine feature enables you to either write custom logic to route requests based on the request info such as any of the [request headers](https://trino.io/docs/current/develop/client-protocol.html#client-request-headers), diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java index 035218d3d..5b9fefd8e 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java @@ -15,11 +15,13 @@ import io.airlift.units.Duration; +import static io.airlift.units.Duration.ZERO; import static java.util.concurrent.TimeUnit.MINUTES; public class RoutingConfiguration { private Duration asyncTimeout = new Duration(2, MINUTES); + private Duration databaseCacheTTL = ZERO; private boolean addXForwardedHeaders = true; @@ -54,4 +56,14 @@ public void setDefaultRoutingGroup(String defaultRoutingGroup) { this.defaultRoutingGroup = defaultRoutingGroup; } + + public Duration getDatabaseCacheTTL() + { + return databaseCacheTTL; + } + + public void setDatabaseCacheTTL(Duration databaseCacheTTL) + { + this.databaseCacheTTL = databaseCacheTTL; + } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/GatewayBackendDao.java b/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/GatewayBackendDao.java index 46831ca9c..746ad7559 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/GatewayBackendDao.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/GatewayBackendDao.java @@ -23,24 +23,6 @@ public interface GatewayBackendDao @SqlQuery("SELECT * FROM gateway_backend") List findAll(); - @SqlQuery(""" - SELECT * FROM gateway_backend - WHERE active = true - """) - List findActiveBackend(); - - @SqlQuery(""" - SELECT * FROM gateway_backend - WHERE active = true AND routing_group = :routingGroup - """) - List findActiveBackendByRoutingGroup(String routingGroup); - - @SqlQuery(""" - SELECT * FROM gateway_backend - WHERE name = :name - """) - List findByName(String name); - @SqlQuery(""" SELECT * FROM gateway_backend WHERE name = :name diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java index db5ad7d0f..4035a9d1e 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java @@ -13,8 +13,13 @@ */ package io.trino.gateway.ha.router; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; import io.airlift.log.Logger; +import io.airlift.stats.CounterStat; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; import io.trino.gateway.ha.persistence.dao.GatewayBackend; @@ -24,35 +29,92 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.Executors; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; public class HaGatewayManager implements GatewayBackendManager { private static final Logger log = Logger.get(HaGatewayManager.class); + private static final Object ALL_BACKEND_CACHE_KEY = new Object(); private final GatewayBackendDao dao; private final String defaultRoutingGroup; + private final boolean cacheEnabled; + private final LoadingCache> backendCache; + + private final CounterStat backendLookupSuccesses = new CounterStat(); + private final CounterStat backendLookupFailures = new CounterStat(); public HaGatewayManager(Jdbi jdbi, RoutingConfiguration routingConfiguration) { dao = requireNonNull(jdbi, "jdbi is null").onDemand(GatewayBackendDao.class); this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup(); + if (!routingConfiguration.getDatabaseCacheTTL().isZero()) { + cacheEnabled = true; + backendCache = CacheBuilder + .newBuilder() + .initialCapacity(1) + .refreshAfterWrite(routingConfiguration.getDatabaseCacheTTL().toJavaTime()) + .build(CacheLoader.asyncReloading( + CacheLoader.from(this::fetchAllBackends), + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()))); + // Load the data once during initialization. This ensures a fail-fast behavior in case of database misconfiguration. + backendCache.getUnchecked(ALL_BACKEND_CACHE_KEY); + } + else { + cacheEnabled = false; + backendCache = null; + } + } + + private List fetchAllBackends() + { + try { + List backends = dao.findAll(); + backendLookupSuccesses.update(1); + return backends; + } + catch (Exception e) { + backendLookupFailures.update(1); + log.warn(e, "Failed to fetch backends"); + throw e; + } + } + + private void invalidateBackendCache() + { + if (cacheEnabled) { + backendCache.invalidateAll(); + } + } + + private List getOrFetchAllBackends() + { + if (cacheEnabled) { + return backendCache.getUnchecked(ALL_BACKEND_CACHE_KEY); + } + else { + return fetchAllBackends(); + } } @Override public List getAllBackends() { - List proxyBackendList = dao.findAll(); + List proxyBackendList = getOrFetchAllBackends(); return upcast(proxyBackendList); } @Override public List getAllActiveBackends() { - List proxyBackendList = dao.findActiveBackend(); + List proxyBackendList = getOrFetchAllBackends().stream() + .filter(GatewayBackend::active) + .collect(toImmutableList()); return upcast(proxyBackendList); } @@ -71,14 +133,19 @@ public List getActiveDefaultBackends() @Override public List getActiveBackends(String routingGroup) { - List proxyBackendList = dao.findActiveBackendByRoutingGroup(routingGroup); + List proxyBackendList = getOrFetchAllBackends().stream() + .filter(GatewayBackend::active) + .filter(backend -> backend.routingGroup().equals(routingGroup)) + .collect(toImmutableList()); return upcast(proxyBackendList); } @Override public Optional getBackendByName(String name) { - List proxyBackendList = dao.findByName(name); + List proxyBackendList = getOrFetchAllBackends().stream() + .filter(backend -> backend.name().equals(name)) + .collect(toImmutableList()); return upcast(proxyBackendList).stream().findAny(); } @@ -102,6 +169,7 @@ private void updateClusterActivationStatus(String clusterName, boolean newStatus boolean previousStatus = model.active(); changeActiveStatus.run(); logActivationStatusChange(clusterName, newStatus, previousStatus); + invalidateBackendCache(); } private static void logActivationStatusChange(String clusterName, boolean newStatus, boolean previousStatus) @@ -117,6 +185,7 @@ public ProxyBackendConfiguration addBackend(ProxyBackendConfiguration backend) String backendProxyTo = removeTrailingSlash(backend.getProxyTo()); String backendExternalUrl = removeTrailingSlash(backend.getExternalUrl()); dao.create(backend.getName(), backend.getRoutingGroup(), backendProxyTo, backendExternalUrl, backend.isActive()); + invalidateBackendCache(); return backend; } @@ -133,12 +202,14 @@ public ProxyBackendConfiguration updateBackend(ProxyBackendConfiguration backend dao.update(backend.getName(), backend.getRoutingGroup(), backendProxyTo, backendExternalUrl, backend.isActive()); logActivationStatusChange(backend.getName(), backend.isActive(), model.active()); } + invalidateBackendCache(); return backend; } public void deleteBackend(String name) { dao.deleteByName(name); + invalidateBackendCache(); } private static List upcast(List gatewayBackendList) diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaGatewayManager.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaGatewayManager.java index f3a69b571..cb6038e98 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaGatewayManager.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaGatewayManager.java @@ -13,32 +13,41 @@ */ package io.trino.gateway.ha.router; +import io.airlift.units.Duration; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; +import java.util.concurrent.TimeUnit; + import static io.trino.gateway.ha.TestingJdbcConnectionManager.createTestingJdbcConnectionManager; import static org.assertj.core.api.Assertions.assertThat; @TestInstance(Lifecycle.PER_CLASS) final class TestHaGatewayManager { - private HaGatewayManager haGatewayManager; - - @BeforeAll - void setUp() + @Test + void testGatewayManagerWithCache() { JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager(); RoutingConfiguration routingConfiguration = new RoutingConfiguration(); - haGatewayManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration); + routingConfiguration.setDatabaseCacheTTL(new Duration(5, TimeUnit.SECONDS)); + testGatewayManager(new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration)); } @Test - void testGatewayManager() + void testGatewayManagerWithoutCache() + { + JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager(); + RoutingConfiguration routingConfiguration = new RoutingConfiguration(); + routingConfiguration.setDatabaseCacheTTL(new Duration(0, TimeUnit.SECONDS)); + testGatewayManager(new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration)); + } + + void testGatewayManager(HaGatewayManager haGatewayManager) { ProxyBackendConfiguration backend = new ProxyBackendConfiguration(); backend.setActive(true); @@ -101,6 +110,10 @@ void testGatewayManager() @Test void testRemoveTrailingSlashInUrl() { + JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager(); + RoutingConfiguration routingConfiguration = new RoutingConfiguration(); + HaGatewayManager haGatewayManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration); + ProxyBackendConfiguration etl = new ProxyBackendConfiguration(); etl.setActive(false); etl.setRoutingGroup("etl");