From b11ca90f8a51e8734cfeafdc43472db7eba5beab Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 20 Nov 2025 13:03:23 -0500 Subject: [PATCH 1/7] fix: return Results from MGETs to be able to report errors in callers --- apollo-router/src/cache/redis.rs | 84 ++++++++++--------- apollo-router/src/plugins/cache/entity.rs | 2 +- .../src/plugins/response_cache/storage/mod.rs | 30 ++++--- .../plugins/response_cache/storage/redis.rs | 17 ++-- 4 files changed, 74 insertions(+), 59 deletions(-) diff --git a/apollo-router/src/cache/redis.rs b/apollo-router/src/cache/redis.rs index 6985151507..b0a329179c 100644 --- a/apollo-router/src/cache/redis.rs +++ b/apollo-router/src/cache/redis.rs @@ -9,6 +9,7 @@ use std::time::Duration; use fred::clients::Client; use fred::clients::Pipeline; +use fred::error::ErrorKind; use fred::interfaces::EventInterface; #[cfg(test)] use fred::mocks::Mocks; @@ -32,6 +33,7 @@ use fred::types::config::UnresponsiveConfig; use fred::types::scan::ScanResult; use futures::Stream; use futures::future::join_all; +use itertools::Itertools; use tokio::sync::broadcast::error::RecvError; use tokio::task::AbortHandle; use tower::BoxError; @@ -628,7 +630,7 @@ impl RedisCacheStorage { pub(crate) async fn get_multiple( &self, keys: Vec>, - ) -> Vec>> { + ) -> Vec, RedisError>> { self.get_multiple_with_options(keys, Options::default()) .await } @@ -637,28 +639,18 @@ impl RedisCacheStorage { &self, mut keys: Vec>, options: Options, - ) -> Vec>> { - // NB: MGET is different from GET in that it returns `Option`s rather than `Result`s. - // > For every key that does not hold a string value or does not exist, the special value - // nil is returned. Because of this, the operation never fails. - // - https://redis.io/docs/latest/commands/mget/ - + ) -> Vec, RedisError>> { tracing::trace!("getting multiple values from redis: {:?}", keys); - if keys.len() == 1 { + let values = if keys.len() == 1 { let key = self.make_key(keys.remove(0)); let client = self.inner.next().with_options(&options); - let res = client - .get(key) - .await - .inspect_err(|e| self.record_error(e)) - .ok(); + let res = client.get(key).await; vec![res] } else if self.is_cluster { // when using a cluster of redis nodes, the keys are hashed, and the hash number indicates which // node will store it. So first we have to group the keys by hash, because we cannot do a MGET // across multiple nodes (error: "ERR CROSSSLOT Keys in request don't hash to the same slot") - let len = keys.len(); let mut h: HashMap, Vec)> = HashMap::new(); for (index, key) in keys.into_iter().enumerate() { let key = self.make_key(key); @@ -674,27 +666,33 @@ impl RedisCacheStorage { let client = self.inner.next().with_options(&options); tasks.push(async move { let result: Result>>, _> = client.mget(keys).await; - (indexes, result) + + // turn (indexes, result) into a Vec<(Index, Result)> + match result { + Ok(values) => indexes + .into_iter() + .zip(values.into_iter()) + .map(|(index, value)| { + (index, value.ok_or(RedisError::new(ErrorKind::NotFound, ""))) + }) + .collect::, _>)>>(), + Err(err) => indexes + .into_iter() + .map(|index| (index, Err(err.clone()))) + .collect(), + } }); } // then we have to assemble the results, by making sure that the values are in the same order as // the keys argument's order - let mut result = vec![None; len]; - for (indexes, result_value) in join_all(tasks).await.into_iter() { - match result_value { - Ok(values) => { - for (index, value) in indexes.into_iter().zip(values.into_iter()) { - result[index] = value; - } - } - Err(e) => { - self.record_error(&e); - } - } - } - - result + join_all(tasks) + .await + .into_iter() + .flatten() + .sorted_by_key(|(index, _)| *index) + .map(|(_, value)| value) + .collect() } else { let len = keys.len(); let keys = keys @@ -702,12 +700,22 @@ impl RedisCacheStorage { .map(|k| self.make_key(k)) .collect::>(); let client = self.inner.next().with_options(&options); - client - .mget(keys) - .await - .inspect_err(|e| self.record_error(e)) - .unwrap_or_else(|_| vec![None; len]) - } + match client.mget::>>, _>(keys).await { + Ok(values) => values + .into_iter() + .map(|value| value.ok_or(RedisError::new(ErrorKind::NotFound, ""))) + .collect(), + Err(err) => vec![Err(err.clone()); len], + } + }; + + values.iter().for_each(|value| { + if let Err(err) = value { + self.record_error(err) + } + }); + + values } pub(crate) async fn insert( @@ -1039,7 +1047,7 @@ mod test { // test the `mget` functionality let values = storage.get_multiple(keys).await; for value in values { - let value: RedisValue = value.ok_or("missing value")?; + let value: RedisValue = value?; assert_eq!(value.0, expected_value); } @@ -1090,7 +1098,7 @@ mod test { let values = storage.get_multiple(keys).await; let parsed_values: Vec> = - values.into_iter().map(|v| v.map(|v| v.0)).collect(); + values.into_iter().map(|v| v.map(|v| v.0).ok()).collect(); assert_eq!(parsed_values, expected_values); } diff --git a/apollo-router/src/plugins/cache/entity.rs b/apollo-router/src/plugins/cache/entity.rs index 0a2465eee5..4a427b5d6f 100644 --- a/apollo-router/src/plugins/cache/entity.rs +++ b/apollo-router/src/plugins/cache/entity.rs @@ -1016,7 +1016,7 @@ async fn cache_lookup_entities( .get_multiple(keys.iter().map(|k| RedisKey(k.clone())).collect::>()) .await .into_iter() - .map(|r| r.map(|v: RedisValue| v.0)) + .map(|r| r.map(|v: RedisValue| v.0).ok()) .map(|v| match v { None => None, Some(v) => { diff --git a/apollo-router/src/plugins/response_cache/storage/mod.rs b/apollo-router/src/plugins/response_cache/storage/mod.rs index 41d4d444ee..03fba7aaf1 100644 --- a/apollo-router/src/plugins/response_cache/storage/mod.rs +++ b/apollo-router/src/plugins/response_cache/storage/mod.rs @@ -116,10 +116,7 @@ pub(super) trait CacheStorage { } #[doc(hidden)] - async fn internal_fetch_multiple( - &self, - cache_keys: &[&str], - ) -> StorageResult>>; + async fn internal_fetch_multiple(&self, cache_keys: &[&str]) -> Vec>; /// Fetch the values belonging to `cache_keys`. Command will be timed out after `self.fetch_timeout()`. async fn fetch_multiple( @@ -130,14 +127,27 @@ pub(super) trait CacheStorage { let batch_size = cache_keys.len(); let now = Instant::now(); - let result = flatten_storage_error( - self.internal_fetch_multiple(cache_keys) - .timeout(self.fetch_timeout()) - .await, - ); + let result = self + .internal_fetch_multiple(cache_keys) + .timeout(self.fetch_timeout()) + .await; record_fetch_duration(now.elapsed(), subgraph_name, batch_size); - result.inspect_err(|err| record_fetch_error(err, subgraph_name)) + + let values = result + .map_err(Into::into) + .inspect_err(|err| record_fetch_error(err, subgraph_name))?; + + // individually inspect each error in the Vec, in case we had partial success + let values = values + .into_iter() + .map(|value| { + value + .inspect_err(|err| record_fetch_error(err, subgraph_name)) + .ok() + }) + .collect(); + Ok(values) } #[doc(hidden)] diff --git a/apollo-router/src/plugins/response_cache/storage/redis.rs b/apollo-router/src/plugins/response_cache/storage/redis.rs index 7ca91467eb..b44fa579d2 100644 --- a/apollo-router/src/plugins/response_cache/storage/redis.rs +++ b/apollo-router/src/plugins/response_cache/storage/redis.rs @@ -387,10 +387,7 @@ impl CacheStorage for Storage { Ok(CacheEntry::from((cache_key, value.0))) } - async fn internal_fetch_multiple( - &self, - cache_keys: &[&str], - ) -> StorageResult>> { + async fn internal_fetch_multiple(&self, cache_keys: &[&str]) -> Vec> { let keys: Vec> = cache_keys .iter() .map(|key| RedisKey(key.to_string())) @@ -399,18 +396,18 @@ impl CacheStorage for Storage { timeout: Some(self.fetch_timeout()), ..Options::default() }; - let values: Vec>> = + let values: Vec, _>> = self.storage.get_multiple_with_options(keys, options).await; - let entries = values + values .into_iter() .zip(cache_keys) .map(|(opt_value, cache_key)| { - opt_value.map(|value| CacheEntry::from((*cache_key, value.0))) + opt_value + .map(|value| CacheEntry::from((*cache_key, value.0))) + .map_err(Into::into) }) - .collect(); - - Ok(entries) + .collect() } async fn internal_invalidate_by_subgraph(&self, subgraph_name: &str) -> StorageResult { From f1f4440678edb7af816f599f548a5c4ca10a1f18 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 20 Nov 2025 16:59:56 -0500 Subject: [PATCH 2/7] test: check that apollo.router.operations.response_cache.fetch.error observes timeouts --- .../src/plugins/response_cache/metrics.rs | 6 ++- .../src/plugins/response_cache/storage/mod.rs | 3 +- .../plugins/response_cache/storage/redis.rs | 53 +++++++++++-------- 3 files changed, 39 insertions(+), 23 deletions(-) diff --git a/apollo-router/src/plugins/response_cache/metrics.rs b/apollo-router/src/plugins/response_cache/metrics.rs index 694a2b147f..665d55afd4 100644 --- a/apollo-router/src/plugins/response_cache/metrics.rs +++ b/apollo-router/src/plugins/response_cache/metrics.rs @@ -24,6 +24,10 @@ impl From for String { } pub(super) fn record_fetch_error(error: &storage::Error, subgraph_name: &str) { + record_fetch_errors(error, subgraph_name, 1) +} + +pub(super) fn record_fetch_errors(error: &storage::Error, subgraph_name: &str, count: u64) { if error.is_row_not_found() { return; } @@ -32,7 +36,7 @@ pub(super) fn record_fetch_error(error: &storage::Error, subgraph_name: &str) { "apollo.router.operations.response_cache.fetch.error", "Errors when fetching data from cache", "{error}", - 1, + count, "subgraph.name" = subgraph_name.to_string(), "code" = error.code() ); diff --git a/apollo-router/src/plugins/response_cache/storage/mod.rs b/apollo-router/src/plugins/response_cache/storage/mod.rs index 03fba7aaf1..9a08193938 100644 --- a/apollo-router/src/plugins/response_cache/storage/mod.rs +++ b/apollo-router/src/plugins/response_cache/storage/mod.rs @@ -14,6 +14,7 @@ use super::cache_control::CacheControl; use crate::plugins::response_cache::invalidation::InvalidationKind; use crate::plugins::response_cache::metrics::record_fetch_duration; use crate::plugins::response_cache::metrics::record_fetch_error; +use crate::plugins::response_cache::metrics::record_fetch_errors; use crate::plugins::response_cache::metrics::record_insert_duration; use crate::plugins::response_cache::metrics::record_insert_error; use crate::plugins::response_cache::metrics::record_invalidation_duration; @@ -136,7 +137,7 @@ pub(super) trait CacheStorage { let values = result .map_err(Into::into) - .inspect_err(|err| record_fetch_error(err, subgraph_name))?; + .inspect_err(|err| record_fetch_errors(err, subgraph_name, batch_size as u64))?; // individually inspect each error in the Vec, in case we had partial success let values = values diff --git a/apollo-router/src/plugins/response_cache/storage/redis.rs b/apollo-router/src/plugins/response_cache/storage/redis.rs index b44fa579d2..afe5762c9a 100644 --- a/apollo-router/src/plugins/response_cache/storage/redis.rs +++ b/apollo-router/src/plugins/response_cache/storage/redis.rs @@ -556,6 +556,7 @@ mod tests { use super::Config; use super::Storage; use super::now; + use crate::metrics::FutureMetricsExt; use crate::plugins::response_cache::ErrorCode; use crate::plugins::response_cache::storage::CacheStorage; use crate::plugins::response_cache::storage::Document; @@ -1367,31 +1368,41 @@ mod tests { #[tokio::test] async fn timeout_errors_are_captured() -> Result<(), BoxError> { - let config = Config { - fetch_timeout: Duration::from_nanos(0), - ..redis_config(false) - }; - let (_drop_tx, drop_rx) = broadcast::channel(2); - let storage = Storage::new(&config, drop_rx).await?; - storage.truncate_namespace().await?; + async { + let config = Config { + fetch_timeout: Duration::from_nanos(0), + ..redis_config(false) + }; + let (_drop_tx, drop_rx) = broadcast::channel(2); + let storage = Storage::new(&config, drop_rx).await?; + storage.truncate_namespace().await?; - let document = common_document(); + let keys = ["key1", "key2", "key3"]; - // because of how tokio::timeout polls, it's possible for a command to finish before the - // timeout is polled (even if the duration is 0). perform the check in a loop to give it - // a few changes to trigger. - let now = Instant::now(); - while now.elapsed() < Duration::from_secs(5) { - let error = storage.fetch(&document.key, "S1").await.unwrap_err(); - if error.is_row_not_found() { - continue; + // because of how tokio::timeout polls, it's possible for a command to finish before the + // timeout is polled (even if the duration is 0). perform the check in a loop to give it + // a few changes to trigger. + let now = Instant::now(); + while now.elapsed() < Duration::from_secs(5) { + let error = storage.fetch_multiple(&keys, "S1").await.unwrap_err(); + if error.is_row_not_found() { + continue; + } + + assert!(matches!(error, Error::Timeout(_)), "{:?}", error); + assert_eq!(error.code(), "TIMEOUT"); + assert_counter!( + "apollo.router.operations.response_cache.fetch.error", + keys.len(), + "code" = "TIMEOUT", + "subgraph.name" = "S1" + ); + return Ok(()); } - assert!(matches!(error, Error::Timeout(_)), "{:?}", error); - assert_eq!(error.code(), "TIMEOUT"); - return Ok(()); + panic!("Never observed a timeout"); } - - panic!("Never observed a timeout"); + .with_metrics() + .await } } From ee3ead3531f8d0e7d483ea6b375a25a821ccbc7f Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 20 Nov 2025 17:43:35 -0500 Subject: [PATCH 3/7] doc: changeset --- .changesets/fix_caroline_propagate_mget_errors.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changesets/fix_caroline_propagate_mget_errors.md diff --git a/.changesets/fix_caroline_propagate_mget_errors.md b/.changesets/fix_caroline_propagate_mget_errors.md new file mode 100644 index 0000000000..b55b328690 --- /dev/null +++ b/.changesets/fix_caroline_propagate_mget_errors.md @@ -0,0 +1,6 @@ +### Fix response cache fetch error metric ([PR #8644](https://github.com/apollographql/router/pull/8644)) + +The `apollo.router.operations.response_cache.fetch.error` was not being incremented as expected when fetching multiple +items from Redis. This fix changes its behavior to align with the `apollo.router.cache.redis.errors` metric. + +By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/8644 From 114f98cc1521babf20c600158530487933943d84 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Tue, 25 Nov 2025 14:07:33 -0500 Subject: [PATCH 4/7] fmt: reformat after merge --- apollo-router/src/cache/redis.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/apollo-router/src/cache/redis.rs b/apollo-router/src/cache/redis.rs index bbe1acf208..f7f22c71b3 100644 --- a/apollo-router/src/cache/redis.rs +++ b/apollo-router/src/cache/redis.rs @@ -665,9 +665,7 @@ impl RedisCacheStorage { let values = if keys.len() == 1 { let key = self.make_key(keys.remove(0)); - let res = client - .get(key) - .await; + let res = client.get(key).await; vec![res] } else if self.is_cluster { // when using a cluster of redis nodes, the keys are hashed, and the hash number indicates which From 83b43cf0623cc051dcdcfce19244c1a8845d3e33 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Tue, 25 Nov 2025 14:13:11 -0500 Subject: [PATCH 5/7] test: fix results after merge --- apollo-router/src/cache/redis.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apollo-router/src/cache/redis.rs b/apollo-router/src/cache/redis.rs index f7f22c71b3..397db9432e 100644 --- a/apollo-router/src/cache/redis.rs +++ b/apollo-router/src/cache/redis.rs @@ -1117,7 +1117,7 @@ mod test { // test the `mget` functionality let values = storage.get_multiple(keys).await; for value in values { - let value: RedisValue = value.ok_or("missing value")?; + let value: RedisValue = value?; assert_eq!(value.0, expected_value); } @@ -1162,7 +1162,7 @@ mod test { let values = storage.get_multiple(keys).await; let parsed_values: Vec> = - values.into_iter().map(|v| v.map(|v| v.0)).collect(); + values.into_iter().map(|v| v.ok().map(|v| v.0)).collect(); assert_eq!(parsed_values, expected_values); } From 76c12f8f22b50a0acbe7d068914b0e83725c612f Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Tue, 25 Nov 2025 14:36:12 -0500 Subject: [PATCH 6/7] test: improve test reliability --- .../src/plugins/response_cache/storage/redis.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/apollo-router/src/plugins/response_cache/storage/redis.rs b/apollo-router/src/plugins/response_cache/storage/redis.rs index 6e5475f1ec..b7ad82f43e 100644 --- a/apollo-router/src/plugins/response_cache/storage/redis.rs +++ b/apollo-router/src/plugins/response_cache/storage/redis.rs @@ -1381,13 +1381,14 @@ mod tests { // because of how tokio::timeout polls, it's possible for a command to finish before the // timeout is polled (even if the duration is 0). perform the check in a loop to give it - // a few changes to trigger. + // a few chances to trigger. let now = Instant::now(); while now.elapsed() < Duration::from_secs(5) { - let error = storage.fetch_multiple(&keys, "S1").await.unwrap_err(); - if error.is_row_not_found() { - continue; - } + let error = match storage.fetch_multiple(&keys, "S1").await { + Ok(_) => continue, + Err(err) if err.is_row_not_found() => continue, + Err(err) => err, + }; assert!(matches!(error, Error::Timeout(_)), "{:?}", error); assert_eq!(error.code(), "TIMEOUT"); From b0be219b5b31ee0f1d95a25fac41e719c0cd15fc Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Mon, 1 Dec 2025 15:32:20 -0500 Subject: [PATCH 7/7] try returning a tuple instead --- apollo-router/src/cache/redis.rs | 80 +++++++++---------- apollo-router/src/plugins/cache/entity.rs | 7 +- .../src/plugins/response_cache/metrics.rs | 6 +- .../src/plugins/response_cache/storage/mod.rs | 21 +++-- .../plugins/response_cache/storage/redis.rs | 68 +++++++++++----- 5 files changed, 99 insertions(+), 83 deletions(-) diff --git a/apollo-router/src/cache/redis.rs b/apollo-router/src/cache/redis.rs index 397db9432e..1c9d798c48 100644 --- a/apollo-router/src/cache/redis.rs +++ b/apollo-router/src/cache/redis.rs @@ -10,7 +10,6 @@ use std::time::Duration; use fred::clients::Client; use fred::clients::Pipeline; use fred::clients::Replicas; -use fred::error::ErrorKind; use fred::interfaces::EventInterface; #[cfg(test)] use fred::mocks::Mocks; @@ -35,7 +34,6 @@ use fred::types::config::UnresponsiveConfig; use fred::types::scan::ScanResult; use futures::Stream; use futures::future::join_all; -use itertools::Itertools; use tokio::sync::broadcast::error::RecvError; use tokio::task::AbortHandle; use tower::BoxError; @@ -650,7 +648,7 @@ impl RedisCacheStorage { pub(crate) async fn get_multiple( &self, keys: Vec>, - ) -> Vec, RedisError>> { + ) -> (Vec>>, Vec) { self.get_multiple_with_options(keys, Options::default()) .await } @@ -659,15 +657,18 @@ impl RedisCacheStorage { &self, mut keys: Vec>, options: Options, - ) -> Vec, RedisError>> { + ) -> (Vec>>, Vec) { tracing::trace!("getting multiple values from redis: {:?}", keys); let client = self.replica_client().with_options(&options); - let values = if keys.len() == 1 { + let (values, errors) = if keys.len() == 1 { let key = self.make_key(keys.remove(0)); - let res = client.get(key).await; - vec![res] + match client.get(key).await { + Ok(value) => (vec![value], vec![]), + Err(err) => (vec![None], vec![err]), + } } else if self.is_cluster { + let len = keys.len(); // when using a cluster of redis nodes, the keys are hashed, and the hash number indicates which // node will store it. So first we have to group the keys by hash, because we cannot do a MGET // across multiple nodes (error: "ERR CROSSSLOT Keys in request don't hash to the same slot") @@ -686,33 +687,32 @@ impl RedisCacheStorage { let client = client.clone(); tasks.push(async move { let result: Result>>, _> = client.mget(keys).await; - - // turn (indexes, result) into a Vec<(Index, Result)> - match result { - Ok(values) => indexes + // turn into Result)>> + result.map(|values| { + indexes .into_iter() .zip(values.into_iter()) - .map(|(index, value)| { - (index, value.ok_or(RedisError::new(ErrorKind::NotFound, ""))) - }) - .collect::, _>)>>(), - Err(err) => indexes - .into_iter() - .map(|index| (index, Err(err.clone()))) - .collect(), - } + .collect::>)>>() + }) }); } // then we have to assemble the results, by making sure that the values are in the same order as // the keys argument's order - join_all(tasks) - .await - .into_iter() - .flatten() - .sorted_by_key(|(index, _)| *index) - .map(|(_, value)| value) - .collect() + let mut values = vec![None; len]; + let mut errors = Vec::with_capacity(tasks.len()); + for result in join_all(tasks).await.into_iter() { + match result { + Ok(result) => { + for (index, value) in result.into_iter() { + values[index] = value; + } + } + Err(err) => errors.push(err), + } + } + + (values, errors) } else { let len = keys.len(); let keys = keys @@ -721,21 +721,13 @@ impl RedisCacheStorage { .collect::>(); let client = self.inner.next().with_options(&options); match client.mget::>>, _>(keys).await { - Ok(values) => values - .into_iter() - .map(|value| value.ok_or(RedisError::new(ErrorKind::NotFound, ""))) - .collect(), - Err(err) => vec![Err(err.clone()); len], + Ok(values) => (values, vec![]), + Err(err) => (vec![None; len], vec![err]), } }; - values.iter().for_each(|value| { - if let Err(err) = value { - self.record_error(err) - } - }); - - values + errors.iter().for_each(|err| self.record_error(err)); + (values, errors) } pub(crate) async fn insert( @@ -1115,9 +1107,10 @@ mod test { } // test the `mget` functionality - let values = storage.get_multiple(keys).await; + let (values, errors) = storage.get_multiple(keys).await; + assert!(errors.is_empty()); for value in values { - let value: RedisValue = value?; + let value: RedisValue = value.expect("missing value"); assert_eq!(value.0, expected_value); } @@ -1160,9 +1153,10 @@ mod test { .map(|value| value.map(ToString::to_string)) .collect(); - let values = storage.get_multiple(keys).await; + let (values, errors) = storage.get_multiple(keys).await; + assert!(errors.is_empty()); let parsed_values: Vec> = - values.into_iter().map(|v| v.ok().map(|v| v.0)).collect(); + values.into_iter().map(|v| v.map(|v| v.0)).collect(); assert_eq!(parsed_values, expected_values); } diff --git a/apollo-router/src/plugins/cache/entity.rs b/apollo-router/src/plugins/cache/entity.rs index 4a427b5d6f..ba56277055 100644 --- a/apollo-router/src/plugins/cache/entity.rs +++ b/apollo-router/src/plugins/cache/entity.rs @@ -1012,11 +1012,12 @@ async fn cache_lookup_entities( private_id, )?; - let cache_result: Vec> = cache + let (values, _errors): (Vec>>, _) = cache .get_multiple(keys.iter().map(|k| RedisKey(k.clone())).collect::>()) - .await + .await; + let cache_result = values .into_iter() - .map(|r| r.map(|v: RedisValue| v.0).ok()) + .map(|r| r.map(|v: RedisValue| v.0)) .map(|v| match v { None => None, Some(v) => { diff --git a/apollo-router/src/plugins/response_cache/metrics.rs b/apollo-router/src/plugins/response_cache/metrics.rs index 665d55afd4..694a2b147f 100644 --- a/apollo-router/src/plugins/response_cache/metrics.rs +++ b/apollo-router/src/plugins/response_cache/metrics.rs @@ -24,10 +24,6 @@ impl From for String { } pub(super) fn record_fetch_error(error: &storage::Error, subgraph_name: &str) { - record_fetch_errors(error, subgraph_name, 1) -} - -pub(super) fn record_fetch_errors(error: &storage::Error, subgraph_name: &str, count: u64) { if error.is_row_not_found() { return; } @@ -36,7 +32,7 @@ pub(super) fn record_fetch_errors(error: &storage::Error, subgraph_name: &str, c "apollo.router.operations.response_cache.fetch.error", "Errors when fetching data from cache", "{error}", - count, + 1, "subgraph.name" = subgraph_name.to_string(), "code" = error.code() ); diff --git a/apollo-router/src/plugins/response_cache/storage/mod.rs b/apollo-router/src/plugins/response_cache/storage/mod.rs index 9a08193938..03c49955ab 100644 --- a/apollo-router/src/plugins/response_cache/storage/mod.rs +++ b/apollo-router/src/plugins/response_cache/storage/mod.rs @@ -14,7 +14,6 @@ use super::cache_control::CacheControl; use crate::plugins::response_cache::invalidation::InvalidationKind; use crate::plugins::response_cache::metrics::record_fetch_duration; use crate::plugins::response_cache::metrics::record_fetch_error; -use crate::plugins::response_cache::metrics::record_fetch_errors; use crate::plugins::response_cache::metrics::record_insert_duration; use crate::plugins::response_cache::metrics::record_insert_error; use crate::plugins::response_cache::metrics::record_invalidation_duration; @@ -117,7 +116,10 @@ pub(super) trait CacheStorage { } #[doc(hidden)] - async fn internal_fetch_multiple(&self, cache_keys: &[&str]) -> Vec>; + async fn internal_fetch_multiple( + &self, + cache_keys: &[&str], + ) -> (Vec>, Vec); /// Fetch the values belonging to `cache_keys`. Command will be timed out after `self.fetch_timeout()`. async fn fetch_multiple( @@ -135,19 +137,14 @@ pub(super) trait CacheStorage { record_fetch_duration(now.elapsed(), subgraph_name, batch_size); - let values = result + let (values, errors) = result .map_err(Into::into) - .inspect_err(|err| record_fetch_errors(err, subgraph_name, batch_size as u64))?; + .inspect_err(|err| record_fetch_error(err, subgraph_name))?; // individually inspect each error in the Vec, in case we had partial success - let values = values - .into_iter() - .map(|value| { - value - .inspect_err(|err| record_fetch_error(err, subgraph_name)) - .ok() - }) - .collect(); + errors + .iter() + .for_each(|err| record_fetch_error(err, subgraph_name)); Ok(values) } diff --git a/apollo-router/src/plugins/response_cache/storage/redis.rs b/apollo-router/src/plugins/response_cache/storage/redis.rs index b7ad82f43e..dfe2aae2e0 100644 --- a/apollo-router/src/plugins/response_cache/storage/redis.rs +++ b/apollo-router/src/plugins/response_cache/storage/redis.rs @@ -387,7 +387,10 @@ impl CacheStorage for Storage { Ok(CacheEntry::from((cache_key, value.0))) } - async fn internal_fetch_multiple(&self, cache_keys: &[&str]) -> Vec> { + async fn internal_fetch_multiple( + &self, + cache_keys: &[&str], + ) -> (Vec>, Vec) { let keys: Vec> = cache_keys .iter() .map(|key| RedisKey(key.to_string())) @@ -396,18 +399,18 @@ impl CacheStorage for Storage { timeout: Some(self.fetch_timeout()), ..Options::default() }; - let values: Vec, _>> = + let (values, errors): (Vec>>, _) = self.storage.get_multiple_with_options(keys, options).await; - values + let values = values .into_iter() .zip(cache_keys) .map(|(opt_value, cache_key)| { - opt_value - .map(|value| CacheEntry::from((*cache_key, value.0))) - .map_err(Into::into) + opt_value.map(|value| CacheEntry::from((*cache_key, value.0))) }) - .collect() + .collect(); + let errors = errors.into_iter().map(Into::into).collect(); + (values, errors) } async fn internal_invalidate_by_subgraph(&self, subgraph_name: &str) -> StorageResult { @@ -1367,37 +1370,62 @@ mod tests { } #[tokio::test] - async fn timeout_errors_are_captured() -> Result<(), BoxError> { - async { - let config = Config { - fetch_timeout: Duration::from_nanos(0), - ..redis_config(false) - }; + #[rstest::rstest] + #[case::fetch_non_cluster(false, false, 1)] + #[case::fetch_multiple_1_non_cluster(false, true, 1)] + #[case::fetch_multiple_3_non_cluster(false, true, 3)] + #[case::fetch_cluster(true, false, 1)] + #[case::fetch_multiple_1_cluster(true, true, 1)] + #[case::fetch_multiple_3_cluster(true, true, 3)] + async fn timeout_errors_are_captured( + #[case] clustered: bool, + #[case] uses_fetch_multiple: bool, + #[case] num_values: usize, + ) -> Result<(), BoxError> { + let config = Config { + fetch_timeout: Duration::from_nanos(0), + ..redis_config(clustered) + }; + + let keys: Vec = (0..num_values).map(|n| format!("key{n}")).collect(); + + async move { let (_drop_tx, drop_rx) = broadcast::channel(2); let storage = Storage::new(&config, drop_rx).await?; storage.truncate_namespace().await?; - let keys = ["key1", "key2", "key3"]; - // because of how tokio::timeout polls, it's possible for a command to finish before the // timeout is polled (even if the duration is 0). perform the check in a loop to give it // a few chances to trigger. let now = Instant::now(); while now.elapsed() < Duration::from_secs(5) { - let error = match storage.fetch_multiple(&keys, "S1").await { - Ok(_) => continue, - Err(err) if err.is_row_not_found() => continue, - Err(err) => err, + let error = { + if uses_fetch_multiple { + let keys: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); + if let Err(err) = storage.fetch_multiple(&keys, "S1").await { + err + } else { + // Might get Ok([None; 3]) - try again + continue; + } + } else { + storage.fetch(&keys[0], "S1").await.unwrap_err() + } }; + if error.is_row_not_found() { + continue; + } + assert!(matches!(error, Error::Timeout(_)), "{:?}", error); assert_eq!(error.code(), "TIMEOUT"); assert_counter!( "apollo.router.operations.response_cache.fetch.error", - keys.len(), + 1, "code" = "TIMEOUT", "subgraph.name" = "S1" ); + return Ok(()); }