Skip to content

Commit 7f05438

Browse files
bnjjjmorriswchris
authored andcommitted
response_cache: automatically connect the cache storage when available (#7911)
Signed-off-by: Benjamin <[email protected]>
1 parent 0aa9ed9 commit 7f05438

File tree

6 files changed

+625
-94
lines changed

6 files changed

+625
-94
lines changed

apollo-router/src/plugins/response_cache/invalidation_endpoint.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ mod tests {
300300
.await
301301
.unwrap();
302302
let storage = Arc::new(Storage {
303-
all: Some(pg_cache),
303+
all: Some(Arc::new(pg_cache.into())),
304304
subgraphs: HashMap::new(),
305305
});
306306
let invalidation = Invalidation::new(storage.clone()).await.unwrap();
@@ -363,7 +363,7 @@ mod tests {
363363
.await
364364
.unwrap();
365365
let storage = Arc::new(Storage {
366-
all: Some(pg_cache),
366+
all: Some(Arc::new(pg_cache.into())),
367367
subgraphs: HashMap::new(),
368368
});
369369
let invalidation = Invalidation::new(storage.clone()).await.unwrap();

apollo-router/src/plugins/response_cache/metrics.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use opentelemetry::KeyValue;
1212
use opentelemetry::metrics::MeterProvider;
1313
use parking_lot::Mutex;
1414
use serde_json_bytes::Value;
15+
use tokio::sync::broadcast;
1516
use tokio_stream::StreamExt;
1617
use tokio_stream::wrappers::IntervalStream;
1718
use tower::BoxError;
@@ -306,6 +307,7 @@ impl From<CacheMetricContextKey> for String {
306307
/// parameter subgraph_name is optional and is None when the database is the global one, and Some(...) when it's a database configured for a specific subgraph
307308
pub(super) async fn expired_data_task(
308309
pg_cache: PostgresCacheStorage,
310+
mut abort_signal: broadcast::Receiver<()>,
309311
subgraph_name: Option<String>,
310312
) {
311313
let mut interval = IntervalStream::new(tokio::time::interval(std::time::Duration::from_secs(
@@ -335,14 +337,22 @@ pub(super) async fn expired_data_task(
335337
})
336338
.init();
337339

338-
while (interval.next().await).is_some() {
339-
let exp_data = match pg_cache.expired_data_count().await {
340-
Ok(exp_data) => exp_data,
341-
Err(err) => {
342-
::tracing::error!(error = ?err, "cannot get expired data count");
343-
continue;
340+
loop {
341+
tokio::select! {
342+
biased;
343+
_ = abort_signal.recv() => {
344+
break;
344345
}
345-
};
346-
expired_data_count.store(exp_data, Ordering::Relaxed);
346+
_ = interval.next() => {
347+
let exp_data = match pg_cache.expired_data_count().await {
348+
Ok(exp_data) => exp_data,
349+
Err(err) => {
350+
::tracing::error!(error = ?err, "cannot get expired data count");
351+
continue;
352+
}
353+
};
354+
expired_data_count.store(exp_data, Ordering::Relaxed);
355+
}
356+
}
347357
}
348358
}

0 commit comments

Comments
 (0)