Skip to content
Open
6 changes: 6 additions & 0 deletions .changesets/fix_caroline_propagate_mget_errors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Fix response cache fetch error metric ([PR #8644](https://github.com/apollographql/router/pull/8644))

The `apollo.router.operations.response_cache.fetch.error` was not being incremented as expected when fetching multiple
items from Redis. This fix changes its behavior to align with the `apollo.router.cache.redis.errors` metric.

By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/8644
75 changes: 39 additions & 36 deletions apollo-router/src/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ impl RedisCacheStorage {
pub(crate) async fn get_multiple<K: KeyType, V: ValueType>(
&self,
keys: Vec<RedisKey<K>>,
) -> Vec<Option<RedisValue<V>>> {
) -> (Vec<Option<RedisValue<V>>>, Vec<RedisError>) {
self.get_multiple_with_options(keys, Options::default())
.await
}
Expand All @@ -657,28 +657,21 @@ impl RedisCacheStorage {
&self,
mut keys: Vec<RedisKey<K>>,
options: Options,
) -> Vec<Option<RedisValue<V>>> {
// NB: MGET is different from GET in that it returns `Option`s rather than `Result`s.
// > For every key that does not hold a string value or does not exist, the special value
// nil is returned. Because of this, the operation never fails.
// - https://redis.io/docs/latest/commands/mget/

) -> (Vec<Option<RedisValue<V>>>, Vec<RedisError>) {
tracing::trace!("getting multiple values from redis: {:?}", keys);
let client = self.replica_client().with_options(&options);

if keys.len() == 1 {
let key = self.make_key(keys.swap_remove(0));
let res = client
.get(key)
.await
.inspect_err(|e| self.record_error(e))
.ok();
vec![res]
let (values, errors) = if keys.len() == 1 {
let key = self.make_key(keys.remove(0));
match client.get(key).await {
Ok(value) => (vec![value], vec![]),
Err(err) => (vec![None], vec![err]),
}
} else if self.is_cluster {
let len = keys.len();
// when using a cluster of redis nodes, the keys are hashed, and the hash number indicates which
// node will store it. So first we have to group the keys by hash, because we cannot do a MGET
// across multiple nodes (error: "ERR CROSSSLOT Keys in request don't hash to the same slot")
let len = keys.len();
let mut h: HashMap<u16, (Vec<usize>, Vec<String>)> = HashMap::new();
for (index, key) in keys.into_iter().enumerate() {
let key = self.make_key(key);
Expand All @@ -694,39 +687,47 @@ impl RedisCacheStorage {
let client = client.clone();
tasks.push(async move {
let result: Result<Vec<Option<RedisValue<V>>>, _> = client.mget(keys).await;
(indexes, result)
// turn into Result<Vec<(index, Option<RedisValue>)>>
result.map(|values| {
indexes
.into_iter()
.zip(values.into_iter())
.collect::<Vec<(usize, Option<RedisValue<V>>)>>()
})
});
}

// then we have to assemble the results, by making sure that the values are in the same order as
// the keys argument's order
let mut result = vec![None; len];
for (indexes, result_value) in join_all(tasks).await.into_iter() {
match result_value {
Ok(values) => {
for (index, value) in indexes.into_iter().zip(values.into_iter()) {
result[index] = value;
let mut values = vec![None; len];
let mut errors = Vec::with_capacity(tasks.len());
for result in join_all(tasks).await.into_iter() {
match result {
Ok(result) => {
for (index, value) in result.into_iter() {
values[index] = value;
}
}
Err(e) => {
self.record_error(&e);
}
Err(err) => errors.push(err),
}
}

result
(values, errors)
} else {
let len = keys.len();
let keys = keys
.into_iter()
.map(|k| self.make_key(k))
.collect::<Vec<_>>();
client
.mget(keys)
.await
.inspect_err(|e| self.record_error(e))
.unwrap_or_else(|_| vec![None; len])
}
let client = self.inner.next().with_options(&options);
match client.mget::<Vec<Option<RedisValue<V>>>, _>(keys).await {
Ok(values) => (values, vec![]),
Err(err) => (vec![None; len], vec![err]),
}
};

errors.iter().for_each(|err| self.record_error(err));
(values, errors)
}

pub(crate) async fn insert<K: KeyType, V: ValueType>(
Expand Down Expand Up @@ -1106,9 +1107,10 @@ mod test {
}

// test the `mget` functionality
let values = storage.get_multiple(keys).await;
let (values, errors) = storage.get_multiple(keys).await;
assert!(errors.is_empty());
for value in values {
let value: RedisValue<usize> = value.ok_or("missing value")?;
let value: RedisValue<usize> = value.expect("missing value");
assert_eq!(value.0, expected_value);
}

Expand Down Expand Up @@ -1151,7 +1153,8 @@ mod test {
.map(|value| value.map(ToString::to_string))
.collect();

let values = storage.get_multiple(keys).await;
let (values, errors) = storage.get_multiple(keys).await;
assert!(errors.is_empty());
let parsed_values: Vec<Option<String>> =
values.into_iter().map(|v| v.map(|v| v.0)).collect();
assert_eq!(parsed_values, expected_values);
Expand Down
5 changes: 3 additions & 2 deletions apollo-router/src/plugins/cache/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,9 +1012,10 @@ async fn cache_lookup_entities(
private_id,
)?;

let cache_result: Vec<Option<CacheEntry>> = cache
let (values, _errors): (Vec<Option<RedisValue<CacheEntry>>>, _) = cache
.get_multiple(keys.iter().map(|k| RedisKey(k.clone())).collect::<Vec<_>>())
.await
.await;
let cache_result = values
.into_iter()
.map(|r| r.map(|v: RedisValue<CacheEntry>| v.0))
.map(|v| match v {
Expand Down
22 changes: 15 additions & 7 deletions apollo-router/src/plugins/response_cache/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub(super) trait CacheStorage {
async fn internal_fetch_multiple(
&self,
cache_keys: &[&str],
) -> StorageResult<Vec<Option<CacheEntry>>>;
) -> (Vec<Option<CacheEntry>>, Vec<Error>);

/// Fetch the values belonging to `cache_keys`. Command will be timed out after `self.fetch_timeout()`.
async fn fetch_multiple(
Expand All @@ -130,14 +130,22 @@ pub(super) trait CacheStorage {
let batch_size = cache_keys.len();

let now = Instant::now();
let result = flatten_storage_error(
self.internal_fetch_multiple(cache_keys)
.timeout(self.fetch_timeout())
.await,
);
let result = self
.internal_fetch_multiple(cache_keys)
.timeout(self.fetch_timeout())
.await;

record_fetch_duration(now.elapsed(), subgraph_name, batch_size);
result.inspect_err(|err| record_fetch_error(err, subgraph_name))

let (values, errors) = result
.map_err(Into::into)
.inspect_err(|err| record_fetch_error(err, subgraph_name))?;

// individually inspect each error in the Vec, in case we had partial success
errors
.iter()
.for_each(|err| record_fetch_error(err, subgraph_name));
Ok(values)
}

#[doc(hidden)]
Expand Down
85 changes: 61 additions & 24 deletions apollo-router/src/plugins/response_cache/storage/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl CacheStorage for Storage {
async fn internal_fetch_multiple(
&self,
cache_keys: &[&str],
) -> StorageResult<Vec<Option<CacheEntry>>> {
) -> (Vec<Option<CacheEntry>>, Vec<super::Error>) {
let keys: Vec<RedisKey<String>> = cache_keys
.iter()
.map(|key| RedisKey(key.to_string()))
Expand All @@ -399,18 +399,18 @@ impl CacheStorage for Storage {
timeout: Some(self.fetch_timeout()),
..Options::default()
};
let values: Vec<Option<RedisValue<CacheValue>>> =
let (values, errors): (Vec<Option<RedisValue<CacheValue>>>, _) =
self.storage.get_multiple_with_options(keys, options).await;

let entries = values
let values = values
.into_iter()
.zip(cache_keys)
.map(|(opt_value, cache_key)| {
opt_value.map(|value| CacheEntry::from((*cache_key, value.0)))
})
.collect();

Ok(entries)
let errors = errors.into_iter().map(Into::into).collect();
(values, errors)
}

async fn internal_invalidate_by_subgraph(&self, subgraph_name: &str) -> StorageResult<u64> {
Expand Down Expand Up @@ -559,6 +559,7 @@ mod tests {
use super::Config;
use super::Storage;
use super::now;
use crate::metrics::FutureMetricsExt;
use crate::plugins::response_cache::ErrorCode;
use crate::plugins::response_cache::storage::CacheStorage;
use crate::plugins::response_cache::storage::Document;
Expand Down Expand Up @@ -1369,32 +1370,68 @@ mod tests {
}

#[tokio::test]
async fn timeout_errors_are_captured() -> Result<(), BoxError> {
#[rstest::rstest]
#[case::fetch_non_cluster(false, false, 1)]
#[case::fetch_multiple_1_non_cluster(false, true, 1)]
#[case::fetch_multiple_3_non_cluster(false, true, 3)]
#[case::fetch_cluster(true, false, 1)]
#[case::fetch_multiple_1_cluster(true, true, 1)]
#[case::fetch_multiple_3_cluster(true, true, 3)]
async fn timeout_errors_are_captured(
#[case] clustered: bool,
#[case] uses_fetch_multiple: bool,
#[case] num_values: usize,
) -> Result<(), BoxError> {
let config = Config {
fetch_timeout: Duration::from_nanos(0),
..redis_config(false)
..redis_config(clustered)
};
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&config, drop_rx).await?;
storage.truncate_namespace().await?;

let document = common_document();
let keys: Vec<String> = (0..num_values).map(|n| format!("key{n}")).collect();

// because of how tokio::timeout polls, it's possible for a command to finish before the
// timeout is polled (even if the duration is 0). perform the check in a loop to give it
// a few changes to trigger.
let now = Instant::now();
while now.elapsed() < Duration::from_secs(5) {
let error = storage.fetch(&document.key, "S1").await.unwrap_err();
if error.is_row_not_found() {
continue;
async move {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&config, drop_rx).await?;
storage.truncate_namespace().await?;

// because of how tokio::timeout polls, it's possible for a command to finish before the
// timeout is polled (even if the duration is 0). perform the check in a loop to give it
// a few chances to trigger.
let now = Instant::now();
while now.elapsed() < Duration::from_secs(5) {
let error = {
if uses_fetch_multiple {
let keys: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
if let Err(err) = storage.fetch_multiple(&keys, "S1").await {
err
} else {
// Might get Ok([None; 3]) - try again
continue;
}
} else {
storage.fetch(&keys[0], "S1").await.unwrap_err()
}
};

if error.is_row_not_found() {
continue;
}

assert!(matches!(error, Error::Timeout(_)), "{:?}", error);
assert_eq!(error.code(), "TIMEOUT");
assert_counter!(
"apollo.router.operations.response_cache.fetch.error",
1,
"code" = "TIMEOUT",
"subgraph.name" = "S1"
);

return Ok(());
}

assert!(matches!(error, Error::Timeout(_)), "{:?}", error);
assert_eq!(error.code(), "TIMEOUT");
return Ok(());
panic!("Never observed a timeout");
}

panic!("Never observed a timeout");
.with_metrics()
.await
}
}