diff --git a/.changesets/fix_caroline_propagate_mget_errors.md b/.changesets/fix_caroline_propagate_mget_errors.md new file mode 100644 index 0000000000..b55b328690 --- /dev/null +++ b/.changesets/fix_caroline_propagate_mget_errors.md @@ -0,0 +1,6 @@ +### Fix response cache fetch error metric ([PR #8644](https://github.com/apollographql/router/pull/8644)) + +The `apollo.router.operations.response_cache.fetch.error` was not being incremented as expected when fetching multiple +items from Redis. This fix changes its behavior to align with the `apollo.router.cache.redis.errors` metric. + +By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/8644 diff --git a/apollo-router/src/cache/redis.rs b/apollo-router/src/cache/redis.rs index 5f7e6fd288..1c9d798c48 100644 --- a/apollo-router/src/cache/redis.rs +++ b/apollo-router/src/cache/redis.rs @@ -648,7 +648,7 @@ impl RedisCacheStorage { pub(crate) async fn get_multiple( &self, keys: Vec>, - ) -> Vec>> { + ) -> (Vec>>, Vec) { self.get_multiple_with_options(keys, Options::default()) .await } @@ -657,28 +657,21 @@ impl RedisCacheStorage { &self, mut keys: Vec>, options: Options, - ) -> Vec>> { - // NB: MGET is different from GET in that it returns `Option`s rather than `Result`s. - // > For every key that does not hold a string value or does not exist, the special value - // nil is returned. Because of this, the operation never fails. - // - https://redis.io/docs/latest/commands/mget/ - + ) -> (Vec>>, Vec) { tracing::trace!("getting multiple values from redis: {:?}", keys); let client = self.replica_client().with_options(&options); - if keys.len() == 1 { - let key = self.make_key(keys.swap_remove(0)); - let res = client - .get(key) - .await - .inspect_err(|e| self.record_error(e)) - .ok(); - vec![res] + let (values, errors) = if keys.len() == 1 { + let key = self.make_key(keys.remove(0)); + match client.get(key).await { + Ok(value) => (vec![value], vec![]), + Err(err) => (vec![None], vec![err]), + } } else if self.is_cluster { + let len = keys.len(); // when using a cluster of redis nodes, the keys are hashed, and the hash number indicates which // node will store it. So first we have to group the keys by hash, because we cannot do a MGET // across multiple nodes (error: "ERR CROSSSLOT Keys in request don't hash to the same slot") - let len = keys.len(); let mut h: HashMap, Vec)> = HashMap::new(); for (index, key) in keys.into_iter().enumerate() { let key = self.make_key(key); @@ -694,39 +687,47 @@ impl RedisCacheStorage { let client = client.clone(); tasks.push(async move { let result: Result>>, _> = client.mget(keys).await; - (indexes, result) + // turn into Result)>> + result.map(|values| { + indexes + .into_iter() + .zip(values.into_iter()) + .collect::>)>>() + }) }); } // then we have to assemble the results, by making sure that the values are in the same order as // the keys argument's order - let mut result = vec![None; len]; - for (indexes, result_value) in join_all(tasks).await.into_iter() { - match result_value { - Ok(values) => { - for (index, value) in indexes.into_iter().zip(values.into_iter()) { - result[index] = value; + let mut values = vec![None; len]; + let mut errors = Vec::with_capacity(tasks.len()); + for result in join_all(tasks).await.into_iter() { + match result { + Ok(result) => { + for (index, value) in result.into_iter() { + values[index] = value; } } - Err(e) => { - self.record_error(&e); - } + Err(err) => errors.push(err), } } - result + (values, errors) } else { let len = keys.len(); let keys = keys .into_iter() .map(|k| self.make_key(k)) .collect::>(); - client - .mget(keys) - .await - .inspect_err(|e| self.record_error(e)) - .unwrap_or_else(|_| vec![None; len]) - } + let client = self.inner.next().with_options(&options); + match client.mget::>>, _>(keys).await { + Ok(values) => (values, vec![]), + Err(err) => (vec![None; len], vec![err]), + } + }; + + errors.iter().for_each(|err| self.record_error(err)); + (values, errors) } pub(crate) async fn insert( @@ -1106,9 +1107,10 @@ mod test { } // test the `mget` functionality - let values = storage.get_multiple(keys).await; + let (values, errors) = storage.get_multiple(keys).await; + assert!(errors.is_empty()); for value in values { - let value: RedisValue = value.ok_or("missing value")?; + let value: RedisValue = value.expect("missing value"); assert_eq!(value.0, expected_value); } @@ -1151,7 +1153,8 @@ mod test { .map(|value| value.map(ToString::to_string)) .collect(); - let values = storage.get_multiple(keys).await; + let (values, errors) = storage.get_multiple(keys).await; + assert!(errors.is_empty()); let parsed_values: Vec> = values.into_iter().map(|v| v.map(|v| v.0)).collect(); assert_eq!(parsed_values, expected_values); diff --git a/apollo-router/src/plugins/cache/entity.rs b/apollo-router/src/plugins/cache/entity.rs index 0a2465eee5..ba56277055 100644 --- a/apollo-router/src/plugins/cache/entity.rs +++ b/apollo-router/src/plugins/cache/entity.rs @@ -1012,9 +1012,10 @@ async fn cache_lookup_entities( private_id, )?; - let cache_result: Vec> = cache + let (values, _errors): (Vec>>, _) = cache .get_multiple(keys.iter().map(|k| RedisKey(k.clone())).collect::>()) - .await + .await; + let cache_result = values .into_iter() .map(|r| r.map(|v: RedisValue| v.0)) .map(|v| match v { diff --git a/apollo-router/src/plugins/response_cache/storage/mod.rs b/apollo-router/src/plugins/response_cache/storage/mod.rs index 41d4d444ee..03c49955ab 100644 --- a/apollo-router/src/plugins/response_cache/storage/mod.rs +++ b/apollo-router/src/plugins/response_cache/storage/mod.rs @@ -119,7 +119,7 @@ pub(super) trait CacheStorage { async fn internal_fetch_multiple( &self, cache_keys: &[&str], - ) -> StorageResult>>; + ) -> (Vec>, Vec); /// Fetch the values belonging to `cache_keys`. Command will be timed out after `self.fetch_timeout()`. async fn fetch_multiple( @@ -130,14 +130,22 @@ pub(super) trait CacheStorage { let batch_size = cache_keys.len(); let now = Instant::now(); - let result = flatten_storage_error( - self.internal_fetch_multiple(cache_keys) - .timeout(self.fetch_timeout()) - .await, - ); + let result = self + .internal_fetch_multiple(cache_keys) + .timeout(self.fetch_timeout()) + .await; record_fetch_duration(now.elapsed(), subgraph_name, batch_size); - result.inspect_err(|err| record_fetch_error(err, subgraph_name)) + + let (values, errors) = result + .map_err(Into::into) + .inspect_err(|err| record_fetch_error(err, subgraph_name))?; + + // individually inspect each error in the Vec, in case we had partial success + errors + .iter() + .for_each(|err| record_fetch_error(err, subgraph_name)); + Ok(values) } #[doc(hidden)] diff --git a/apollo-router/src/plugins/response_cache/storage/redis.rs b/apollo-router/src/plugins/response_cache/storage/redis.rs index 697c674b94..dfe2aae2e0 100644 --- a/apollo-router/src/plugins/response_cache/storage/redis.rs +++ b/apollo-router/src/plugins/response_cache/storage/redis.rs @@ -390,7 +390,7 @@ impl CacheStorage for Storage { async fn internal_fetch_multiple( &self, cache_keys: &[&str], - ) -> StorageResult>> { + ) -> (Vec>, Vec) { let keys: Vec> = cache_keys .iter() .map(|key| RedisKey(key.to_string())) @@ -399,18 +399,18 @@ impl CacheStorage for Storage { timeout: Some(self.fetch_timeout()), ..Options::default() }; - let values: Vec>> = + let (values, errors): (Vec>>, _) = self.storage.get_multiple_with_options(keys, options).await; - let entries = values + let values = values .into_iter() .zip(cache_keys) .map(|(opt_value, cache_key)| { opt_value.map(|value| CacheEntry::from((*cache_key, value.0))) }) .collect(); - - Ok(entries) + let errors = errors.into_iter().map(Into::into).collect(); + (values, errors) } async fn internal_invalidate_by_subgraph(&self, subgraph_name: &str) -> StorageResult { @@ -559,6 +559,7 @@ mod tests { use super::Config; use super::Storage; use super::now; + use crate::metrics::FutureMetricsExt; use crate::plugins::response_cache::ErrorCode; use crate::plugins::response_cache::storage::CacheStorage; use crate::plugins::response_cache::storage::Document; @@ -1369,32 +1370,68 @@ mod tests { } #[tokio::test] - async fn timeout_errors_are_captured() -> Result<(), BoxError> { + #[rstest::rstest] + #[case::fetch_non_cluster(false, false, 1)] + #[case::fetch_multiple_1_non_cluster(false, true, 1)] + #[case::fetch_multiple_3_non_cluster(false, true, 3)] + #[case::fetch_cluster(true, false, 1)] + #[case::fetch_multiple_1_cluster(true, true, 1)] + #[case::fetch_multiple_3_cluster(true, true, 3)] + async fn timeout_errors_are_captured( + #[case] clustered: bool, + #[case] uses_fetch_multiple: bool, + #[case] num_values: usize, + ) -> Result<(), BoxError> { let config = Config { fetch_timeout: Duration::from_nanos(0), - ..redis_config(false) + ..redis_config(clustered) }; - let (_drop_tx, drop_rx) = broadcast::channel(2); - let storage = Storage::new(&config, drop_rx).await?; - storage.truncate_namespace().await?; - let document = common_document(); + let keys: Vec = (0..num_values).map(|n| format!("key{n}")).collect(); - // because of how tokio::timeout polls, it's possible for a command to finish before the - // timeout is polled (even if the duration is 0). perform the check in a loop to give it - // a few changes to trigger. - let now = Instant::now(); - while now.elapsed() < Duration::from_secs(5) { - let error = storage.fetch(&document.key, "S1").await.unwrap_err(); - if error.is_row_not_found() { - continue; + async move { + let (_drop_tx, drop_rx) = broadcast::channel(2); + let storage = Storage::new(&config, drop_rx).await?; + storage.truncate_namespace().await?; + + // because of how tokio::timeout polls, it's possible for a command to finish before the + // timeout is polled (even if the duration is 0). perform the check in a loop to give it + // a few chances to trigger. + let now = Instant::now(); + while now.elapsed() < Duration::from_secs(5) { + let error = { + if uses_fetch_multiple { + let keys: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); + if let Err(err) = storage.fetch_multiple(&keys, "S1").await { + err + } else { + // Might get Ok([None; 3]) - try again + continue; + } + } else { + storage.fetch(&keys[0], "S1").await.unwrap_err() + } + }; + + if error.is_row_not_found() { + continue; + } + + assert!(matches!(error, Error::Timeout(_)), "{:?}", error); + assert_eq!(error.code(), "TIMEOUT"); + assert_counter!( + "apollo.router.operations.response_cache.fetch.error", + 1, + "code" = "TIMEOUT", + "subgraph.name" = "S1" + ); + + return Ok(()); } - assert!(matches!(error, Error::Timeout(_)), "{:?}", error); - assert_eq!(error.code(), "TIMEOUT"); - return Ok(()); + panic!("Never observed a timeout"); } - - panic!("Never observed a timeout"); + .with_metrics() + .await } }