-
Notifications
You must be signed in to change notification settings - Fork 121
Add cache in HaGatewayManager #783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis pull request adds optional in-memory caching of gateway backend metadata in HaGatewayManager, driven by a new Entity relationship diagram for gateway_backend table accesserDiagram
GATEWAY_BACKEND {
string name
string routing_group
string proxy_to
string external_url
boolean active
}
HaGatewayManager ||--o{ GATEWAY_BACKEND : fetches
GatewayBackendDao ||--o{ GATEWAY_BACKEND : queries
Class diagram for updated HaGatewayManager caching logicclassDiagram
class HaGatewayManager {
-GatewayBackendDao dao
-String defaultRoutingGroup
-boolean cacheEnabled
-LoadingCache<Object, List<GatewayBackend>> backendCache
-CounterStat backendLookupSuccesses
-CounterStat backendLookupFailures
+HaGatewayManager(Jdbi, RoutingConfiguration)
+List<ProxyBackendConfiguration> getAllBackends()
+List<ProxyBackendConfiguration> getAllActiveBackends()
+List<ProxyBackendConfiguration> getActiveBackends(String)
+Optional<ProxyBackendConfiguration> getBackendByName(String)
+ProxyBackendConfiguration addBackend(ProxyBackendConfiguration)
+ProxyBackendConfiguration updateBackend(ProxyBackendConfiguration)
+void deleteBackend(String)
-List<GatewayBackend> fetchAllBackends()
-void invalidateBackendCache()
-List<GatewayBackend> getOrFetchAllBackends()
}
HaGatewayManager --> GatewayBackendDao
HaGatewayManager --> LoadingCache
HaGatewayManager --> CounterStat
class LoadingCache {
+getUnchecked(Object)
+invalidateAll()
}
class CounterStat {
+update(int)
}
Class diagram for updated RoutingConfiguration with databaseCacheTTLclassDiagram
class RoutingConfiguration {
-Duration asyncTimeout
-Duration databaseCacheTTL
-boolean addXForwardedHeaders
-String defaultRoutingGroup
+Duration getDatabaseCacheTTL()
+void setDatabaseCacheTTL(Duration)
}
RoutingConfiguration --> Duration
Class diagram for updated GatewayBackendDao interfaceclassDiagram
class GatewayBackendDao {
+List<GatewayBackend> findAll()
+GatewayBackend findByName(String name)
+void create(...)
+void update(...)
+void deleteByName(String name)
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
1f5ead3 to
16824dc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- HaGatewayManager creates a dedicated executor for cache reloads but never shuts it down—consider implementing AutoCloseable or a shutdown method to avoid thread leaks.
- Using refreshAfterWrite alone can serve stale data if an async reload fails; consider adding expireAfterWrite or an eviction policy to prevent stale entries from persisting indefinitely.
- Loading all backends and filtering in memory for every lookup may not scale with large backend lists—evaluate using selective DAO queries or secondary caches/indexes for group- and name-based lookups.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- HaGatewayManager creates a dedicated executor for cache reloads but never shuts it down—consider implementing AutoCloseable or a shutdown method to avoid thread leaks.
- Using refreshAfterWrite alone can serve stale data if an async reload fails; consider adding expireAfterWrite or an eviction policy to prevent stale entries from persisting indefinitely.
- Loading all backends and filtering in memory for every lookup may not scale with large backend lists—evaluate using selective DAO queries or secondary caches/indexes for group- and name-based lookups.
## Individual Comments
### Comment 1
<location> `gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java:65-66` </location>
<code_context>
{
dao = requireNonNull(jdbi, "jdbi is null").onDemand(GatewayBackendDao.class);
this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
+ if (!routingConfiguration.getDatabaseCacheTTL().isZero()) {
+ cacheEnabled = true;
+ backendCache = CacheBuilder
+ .newBuilder()
+ .initialCapacity(1)
+ .refreshAfterWrite(routingConfiguration.getDatabaseCacheTTL().toJavaTime())
+ .build(CacheLoader.asyncReloading(
+ CacheLoader.from(this::fetchAllBackends),
+ MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())));
+ // Load the data once during initialization. This ensures a fail-fast behavior in case of database misconfiguration.
+ backendCache.getUnchecked(ALL_BACKEND_CACHE_KEY);
+ }
+ else {
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider handling cache warm-up failures more robustly.
If cache warm-up fails due to a database issue, an unchecked exception may prevent service startup or obscure error details. Consider catching and logging these failures, or failing fast with a clear error message.
```suggestion
// Load the data once during initialization. This ensures a fail-fast behavior in case of database misconfiguration.
try {
backendCache.getUnchecked(ALL_BACKEND_CACHE_KEY);
}
catch (Exception e) {
// Log the error and fail fast with a clear message
// Replace with your logger if not using SLF4J
org.slf4j.LoggerFactory.getLogger(HaGatewayManager.class)
.error("Failed to warm up backend cache during initialization. Check database configuration.", e);
throw new IllegalStateException("Failed to warm up backend cache during initialization", e);
}
```
</issue_to_address>
### Comment 2
<location> `gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaGatewayManager.java:32-39` </location>
<code_context>
-
- @BeforeAll
- void setUp()
+ @Test
+ void testGatewayManagerWithCache()
{
JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
</code_context>
<issue_to_address>
**suggestion (testing):** Missing test for cache invalidation after backend changes.
Add tests to confirm cache invalidation and refresh when backends are modified, ensuring stale data is not served.
```suggestion
@Test
void testGatewayManagerWithCache()
{
JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
routingConfiguration.setDatabaseCacheTTL(new Duration(5, TimeUnit.SECONDS));
testGatewayManager(new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration));
}
@Test
void testCacheInvalidationAfterBackendChange()
{
JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
routingConfiguration.setDatabaseCacheTTL(new Duration(5, TimeUnit.SECONDS));
HaGatewayManager gatewayManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration);
// Populate cache by fetching backends
var initialBackends = gatewayManager.getActiveBackends();
assertThat(initialBackends).isNotEmpty();
// Simulate backend change: add a new backend
String newBackendName = "new-backend";
connectionManager.addBackend(newBackendName, "jdbc:trino://new-backend:8080");
// Wait for cache TTL to expire
try {
Thread.sleep(routingConfiguration.getDatabaseCacheTTL().toMillis() + 1000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Fetch backends again, should reflect the new backend
var refreshedBackends = gatewayManager.getActiveBackends();
assertThat(refreshedBackends.stream().anyMatch(b -> b.getName().equals(newBackendName))).isTrue();
}
```
</issue_to_address>
### Comment 3
<location> `gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaGatewayManager.java:33-38` </location>
<code_context>
- @BeforeAll
- void setUp()
+ @Test
+ void testGatewayManagerWithCache()
{
JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
- haGatewayManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration);
+ routingConfiguration.setDatabaseCacheTTL(new Duration(5, TimeUnit.SECONDS));
+ testGatewayManager(new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration));
}
</code_context>
<issue_to_address>
**suggestion (testing):** No test for cache refresh failures or database errors.
Add a test that triggers a database failure during cache refresh to verify proper exception handling and metric updates.
Suggested implementation:
```java
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.*;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.AfterEach;
```
```java
@Test
void testGatewayManagerWithCache()
{
JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
routingConfiguration.setDatabaseCacheTTL(new Duration(5, TimeUnit.SECONDS));
testGatewayManager(new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration));
}
@Test
void testCacheRefreshDatabaseFailure()
{
// Arrange: mock JdbcConnectionManager to throw exception on cache refresh
JdbcConnectionManager mockConnectionManager = mock(JdbcConnectionManager.class);
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
routingConfiguration.setDatabaseCacheTTL(new Duration(5, TimeUnit.SECONDS));
HaGatewayManager gatewayManager = new HaGatewayManager(mockConnectionManager.getJdbi(), routingConfiguration);
// Simulate database failure during cache refresh
doThrow(new RuntimeException("Database error")).when(mockConnectionManager).refreshCache();
// Act & Assert: verify exception is handled and metrics are updated
try {
gatewayManager.refreshCache();
} catch (Exception e) {
assertThat(e).isInstanceOf(RuntimeException.class)
.hasMessageContaining("Database error");
}
// If you have a metric for cache refresh failures, assert it was incremented
// Example:
// assertThat(gatewayManager.getCacheRefreshFailureCount()).isGreaterThan(0);
}
```
- You may need to implement or expose a `refreshCache()` method in `HaGatewayManager` if it is not public.
- If you track cache refresh failures via a metric, ensure you have a getter like `getCacheRefreshFailureCount()` or similar.
- Adjust the mocking/stubbing to match your actual cache refresh logic and error handling.
</issue_to_address>
### Comment 4
<location> `gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaGatewayManager.java:50` </location>
<code_context>
+ testGatewayManager(new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration));
+ }
+
+ void testGatewayManager(HaGatewayManager haGatewayManager)
{
ProxyBackendConfiguration backend = new ProxyBackendConfiguration();
</code_context>
<issue_to_address>
**suggestion (testing):** Tests do not verify cache TTL expiration and automatic refresh.
Add a test to confirm cache expiration after TTL and automatic refresh to validate correct time-based behavior.
Suggested implementation:
```java
void testGatewayManager(HaGatewayManager haGatewayManager)
{
ProxyBackendConfiguration backend = new ProxyBackendConfiguration();
backend.setActive(true);
}
@Test
void testCacheExpirationAndAutomaticRefresh() throws InterruptedException
{
JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
// Set cache TTL to 1 second for testing
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
routingConfiguration.setDatabaseCacheTTL(new Duration(1, TimeUnit.SECONDS));
HaGatewayManager haGatewayManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration);
// Initial fetch to populate cache
List<ProxyBackendConfiguration> initialBackends = haGatewayManager.getActiveBackends();
assertNotNull(initialBackends, "Initial backend list should not be null");
// Wait for TTL to expire
Thread.sleep(1500);
// Simulate a change in backend configuration
// (This step may need to be adapted to your test setup. For example, update the DB or mock the backend list.)
// For demonstration, we assume a method exists to update the backend list:
// connectionManager.updateBackendList(newBackendList);
// Fetch again after TTL expiration
List<ProxyBackendConfiguration> refreshedBackends = haGatewayManager.getActiveBackends();
assertNotNull(refreshedBackends, "Refreshed backend list should not be null");
// The refreshed list should reflect changes after TTL expiration
// (You may need to adapt this assertion based on how you simulate backend changes)
// assertNotEquals(initialBackends, refreshedBackends, "Backend list should be refreshed after cache expiration");
}
```
You may need to:
1. Implement or mock a way to change the backend list in your test setup so that the refreshed cache returns different data.
2. Adjust the assertion to compare the initial and refreshed backend lists based on your actual backend update logic.
3. Ensure that `getActiveBackends()` triggers a cache refresh after TTL expiration.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| .refreshAfterWrite(routingConfiguration.getDatabaseCacheTTL().toJavaTime()) | ||
| .build(CacheLoader.asyncReloading( | ||
| CacheLoader.from(this::fetchAllBackends), | ||
| MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using a ThreadFactory to prevent thread leaks?
By default, newSingleThreadExecutor seems to create non-daemon threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadFactory daemonThreadFactory = runnable -> {
Thread thread = new Thread(runnable, "backend-cache-refresh");
thread.setDaemon(true);
return thread;
};
...
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(daemonThreadFactory))));
| log.warn(e, "Failed to fetch backends"); | ||
| throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw exception? or just return empty list to maintain service availability
I mean the cache will retry on the next refresh cycle anyway
Peiyingy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting up this PR!
At LinkedIn, we’ve already implemented caching in GatewayBackendDao and have it running in production. Since we run multiple Gateway instances, we found that a cache-aside approach can lead to noticeable inconsistencies across instances. To address this, we switched to a DB-first model, where the cache only serves as a fallback during DB outages.
We’d recommend making the cache behavior configurable so that users can choose between cache-aside and DB-first depending on their deployment setup.
Separately, we also added a write buffer mechanism in QueryHistoryManager. With the DB-first cache and write buffer in place, the Gateway can continue processing user queries even during DB outages, eliminating this single point of failure. We plan to open up that write buffer PR to OSS soon as well.
| routing: | ||
| defaultRoutingGroup: "test-group" | ||
| # Optional: cache backend metadata to reduce database look-ups | ||
| databaseCacheTTL: "5m" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a CacheConfiguration instead of putting all the configs under routing configs. We can define cacheEnabled, expireAfterWrite, and maximumSize explicitly instead of using if (!routingConfiguration.getDatabaseCacheTTL().isZero()) to decide if cache is enabled, and set default values for them. Also, we can make cache behavior configurable there.
| */ | ||
| package io.trino.gateway.ha.router; | ||
|
|
||
| import com.google.common.cache.CacheBuilder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also use this opportunity to upgrade the caching dependency from guava to caffeine.
Peiyingy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add cache for findFirstByName to guard resiliency during DB outage
Description
Add cache in HaGatewayManager.
Continue the work in #501
Additional context and related issues
Set
databaseCacheTTLto a non-zero Airlift duration value to enable in-memory caching of backend metadata retrieved from the gateway database. Trino Gateway caches the list of backend clusters for the specified time and refreshes it asynchronously. Use this setting to reduce database load and improve routing performance.A value of
0s(the default) disables the cache and queries the database onevery request.
Release notes
(x) Release notes are required, with the following suggested text:
Summary by Sourcery
Introduce an optional in-memory cache for gateway backends in HaGatewayManager with configurable TTL and automatic invalidation on data changes.
New Features:
Enhancements:
Tests:
Summary by Sourcery
Introduce configurable in-memory caching in HaGatewayManager to reduce database load and improve routing performance by caching backend metadata with TTL, automatic invalidation, and streamlined retrieval, while updating tests and documentation.
New Features:
Enhancements:
Documentation:
Tests: