Skip to content

Commit 1f5ead3

Browse files
committed
Add cache in HaGatewayManager
1 parent 535802e commit 1f5ead3

File tree

4 files changed

+107
-29
lines changed

4 files changed

+107
-29
lines changed

gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515

1616
import io.airlift.units.Duration;
1717

18+
import static io.airlift.units.Duration.ZERO;
1819
import static java.util.concurrent.TimeUnit.MINUTES;
1920

2021
public class RoutingConfiguration
2122
{
2223
private Duration asyncTimeout = new Duration(2, MINUTES);
24+
private Duration databaseCacheTTL = ZERO;
2325

2426
private boolean addXForwardedHeaders = true;
2527

@@ -54,4 +56,14 @@ public void setDefaultRoutingGroup(String defaultRoutingGroup)
5456
{
5557
this.defaultRoutingGroup = defaultRoutingGroup;
5658
}
59+
60+
public Duration getDatabaseCacheTTL()
61+
{
62+
return databaseCacheTTL;
63+
}
64+
65+
public void setDatabaseCacheTTL(Duration databaseCacheTTL)
66+
{
67+
this.databaseCacheTTL = databaseCacheTTL;
68+
}
5769
}

gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/GatewayBackendDao.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,6 @@ public interface GatewayBackendDao
2323
@SqlQuery("SELECT * FROM gateway_backend")
2424
List<GatewayBackend> findAll();
2525

26-
@SqlQuery("""
27-
SELECT * FROM gateway_backend
28-
WHERE active = true
29-
""")
30-
List<GatewayBackend> findActiveBackend();
31-
32-
@SqlQuery("""
33-
SELECT * FROM gateway_backend
34-
WHERE active = true AND routing_group = :routingGroup
35-
""")
36-
List<GatewayBackend> findActiveBackendByRoutingGroup(String routingGroup);
37-
38-
@SqlQuery("""
39-
SELECT * FROM gateway_backend
40-
WHERE name = :name
41-
""")
42-
List<GatewayBackend> findByName(String name);
43-
4426
@SqlQuery("""
4527
SELECT * FROM gateway_backend
4628
WHERE name = :name

gateway-ha/src/main/java/io/trino/gateway/ha/router/HaGatewayManager.java

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,13 @@
1313
*/
1414
package io.trino.gateway.ha.router;
1515

16+
import com.google.common.cache.CacheBuilder;
17+
import com.google.common.cache.CacheLoader;
18+
import com.google.common.cache.LoadingCache;
1619
import com.google.common.collect.ImmutableList;
20+
import com.google.common.util.concurrent.MoreExecutors;
1721
import io.airlift.log.Logger;
22+
import io.airlift.stats.CounterStat;
1823
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
1924
import io.trino.gateway.ha.config.RoutingConfiguration;
2025
import io.trino.gateway.ha.persistence.dao.GatewayBackend;
@@ -24,35 +29,92 @@
2429
import java.util.ArrayList;
2530
import java.util.List;
2631
import java.util.Optional;
32+
import java.util.concurrent.Executors;
2733

2834
import static com.google.common.base.Preconditions.checkState;
35+
import static com.google.common.collect.ImmutableList.toImmutableList;
2936
import static java.util.Objects.requireNonNull;
3037

3138
public class HaGatewayManager
3239
implements GatewayBackendManager
3340
{
3441
private static final Logger log = Logger.get(HaGatewayManager.class);
42+
private static final Object ALL_BACKEND_CACHE_KEY = new Object();
3543

3644
private final GatewayBackendDao dao;
3745
private final String defaultRoutingGroup;
46+
private final boolean cacheEnabled;
47+
private final LoadingCache<Object, List<GatewayBackend>> backendCache;
48+
49+
private final CounterStat backendLookupSuccesses = new CounterStat();
50+
private final CounterStat backendLookupFailures = new CounterStat();
3851

3952
public HaGatewayManager(Jdbi jdbi, RoutingConfiguration routingConfiguration)
4053
{
4154
dao = requireNonNull(jdbi, "jdbi is null").onDemand(GatewayBackendDao.class);
4255
this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
56+
if (!routingConfiguration.getDatabaseCacheTTL().isZero()) {
57+
cacheEnabled = true;
58+
backendCache = CacheBuilder
59+
.newBuilder()
60+
.initialCapacity(1)
61+
.refreshAfterWrite(routingConfiguration.getDatabaseCacheTTL().toJavaTime())
62+
.build(CacheLoader.asyncReloading(
63+
CacheLoader.from(this::fetchAllBackends),
64+
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())));
65+
// Load the data once during initialization. This ensures a fail-fast behavior in case of database misconfiguration.
66+
backendCache.getUnchecked(ALL_BACKEND_CACHE_KEY);
67+
}
68+
else {
69+
cacheEnabled = false;
70+
backendCache = null;
71+
}
72+
}
73+
74+
private List<GatewayBackend> fetchAllBackends()
75+
{
76+
try {
77+
List<GatewayBackend> backends = dao.findAll();
78+
backendLookupSuccesses.update(1);
79+
return backends;
80+
}
81+
catch (Exception e) {
82+
backendLookupFailures.update(1);
83+
log.warn(e, "Failed to fetch backends");
84+
throw e;
85+
}
86+
}
87+
88+
private void invalidateBackendCache()
89+
{
90+
if (cacheEnabled) {
91+
backendCache.invalidateAll();
92+
}
93+
}
94+
95+
private List<GatewayBackend> getOrFetchAllBackends()
96+
{
97+
if (cacheEnabled) {
98+
return backendCache.getUnchecked(ALL_BACKEND_CACHE_KEY);
99+
}
100+
else {
101+
return fetchAllBackends();
102+
}
43103
}
44104

45105
@Override
46106
public List<ProxyBackendConfiguration> getAllBackends()
47107
{
48-
List<GatewayBackend> proxyBackendList = dao.findAll();
108+
List<GatewayBackend> proxyBackendList = getOrFetchAllBackends();
49109
return upcast(proxyBackendList);
50110
}
51111

52112
@Override
53113
public List<ProxyBackendConfiguration> getAllActiveBackends()
54114
{
55-
List<GatewayBackend> proxyBackendList = dao.findActiveBackend();
115+
List<GatewayBackend> proxyBackendList = getOrFetchAllBackends().stream()
116+
.filter(GatewayBackend::active)
117+
.collect(toImmutableList());
56118
return upcast(proxyBackendList);
57119
}
58120

@@ -71,14 +133,19 @@ public List<ProxyBackendConfiguration> getActiveDefaultBackends()
71133
@Override
72134
public List<ProxyBackendConfiguration> getActiveBackends(String routingGroup)
73135
{
74-
List<GatewayBackend> proxyBackendList = dao.findActiveBackendByRoutingGroup(routingGroup);
136+
List<GatewayBackend> proxyBackendList = getOrFetchAllBackends().stream()
137+
.filter(GatewayBackend::active)
138+
.filter(backend -> backend.routingGroup().equals(routingGroup))
139+
.collect(toImmutableList());
75140
return upcast(proxyBackendList);
76141
}
77142

78143
@Override
79144
public Optional<ProxyBackendConfiguration> getBackendByName(String name)
80145
{
81-
List<GatewayBackend> proxyBackendList = dao.findByName(name);
146+
List<GatewayBackend> proxyBackendList = getOrFetchAllBackends().stream()
147+
.filter(backend -> backend.name().equals(name))
148+
.collect(toImmutableList());
82149
return upcast(proxyBackendList).stream().findAny();
83150
}
84151

@@ -102,6 +169,7 @@ private void updateClusterActivationStatus(String clusterName, boolean newStatus
102169
boolean previousStatus = model.active();
103170
changeActiveStatus.run();
104171
logActivationStatusChange(clusterName, newStatus, previousStatus);
172+
invalidateBackendCache();
105173
}
106174

107175
private static void logActivationStatusChange(String clusterName, boolean newStatus, boolean previousStatus)
@@ -117,6 +185,7 @@ public ProxyBackendConfiguration addBackend(ProxyBackendConfiguration backend)
117185
String backendProxyTo = removeTrailingSlash(backend.getProxyTo());
118186
String backendExternalUrl = removeTrailingSlash(backend.getExternalUrl());
119187
dao.create(backend.getName(), backend.getRoutingGroup(), backendProxyTo, backendExternalUrl, backend.isActive());
188+
invalidateBackendCache();
120189
return backend;
121190
}
122191

@@ -133,12 +202,14 @@ public ProxyBackendConfiguration updateBackend(ProxyBackendConfiguration backend
133202
dao.update(backend.getName(), backend.getRoutingGroup(), backendProxyTo, backendExternalUrl, backend.isActive());
134203
logActivationStatusChange(backend.getName(), backend.isActive(), model.active());
135204
}
205+
invalidateBackendCache();
136206
return backend;
137207
}
138208

139209
public void deleteBackend(String name)
140210
{
141211
dao.deleteByName(name);
212+
invalidateBackendCache();
142213
}
143214

144215
private static List<ProxyBackendConfiguration> upcast(List<GatewayBackend> gatewayBackendList)

gateway-ha/src/test/java/io/trino/gateway/ha/router/TestHaGatewayManager.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,32 +13,41 @@
1313
*/
1414
package io.trino.gateway.ha.router;
1515

16+
import io.airlift.units.Duration;
1617
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
1718
import io.trino.gateway.ha.config.RoutingConfiguration;
1819
import io.trino.gateway.ha.persistence.JdbcConnectionManager;
19-
import org.junit.jupiter.api.BeforeAll;
2020
import org.junit.jupiter.api.Test;
2121
import org.junit.jupiter.api.TestInstance;
2222
import org.junit.jupiter.api.TestInstance.Lifecycle;
2323

24+
import java.util.concurrent.TimeUnit;
25+
2426
import static io.trino.gateway.ha.TestingJdbcConnectionManager.createTestingJdbcConnectionManager;
2527
import static org.assertj.core.api.Assertions.assertThat;
2628

2729
@TestInstance(Lifecycle.PER_CLASS)
2830
final class TestHaGatewayManager
2931
{
30-
private HaGatewayManager haGatewayManager;
31-
32-
@BeforeAll
33-
void setUp()
32+
@Test
33+
void testGatewayManagerWithCache()
3434
{
3535
JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
3636
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
37-
haGatewayManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration);
37+
routingConfiguration.setDatabaseCacheTTL(new Duration(5, TimeUnit.SECONDS));
38+
testGatewayManager(new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration));
3839
}
3940

4041
@Test
41-
void testGatewayManager()
42+
void testGatewayManagerWithoutCache()
43+
{
44+
JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
45+
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
46+
routingConfiguration.setDatabaseCacheTTL(new Duration(0, TimeUnit.SECONDS));
47+
testGatewayManager(new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration));
48+
}
49+
50+
void testGatewayManager(HaGatewayManager haGatewayManager)
4251
{
4352
ProxyBackendConfiguration backend = new ProxyBackendConfiguration();
4453
backend.setActive(true);
@@ -101,6 +110,10 @@ void testGatewayManager()
101110
@Test
102111
void testRemoveTrailingSlashInUrl()
103112
{
113+
JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager();
114+
RoutingConfiguration routingConfiguration = new RoutingConfiguration();
115+
HaGatewayManager haGatewayManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration);
116+
104117
ProxyBackendConfiguration etl = new ProxyBackendConfiguration();
105118
etl.setActive(false);
106119
etl.setRoutingGroup("etl");

0 commit comments

Comments
 (0)