diff --git a/apollo-router/src/plugins/response_cache/invalidation_endpoint.rs b/apollo-router/src/plugins/response_cache/invalidation_endpoint.rs index c87350d3b5b..2da666be938 100644 --- a/apollo-router/src/plugins/response_cache/invalidation_endpoint.rs +++ b/apollo-router/src/plugins/response_cache/invalidation_endpoint.rs @@ -300,7 +300,7 @@ mod tests { .await .unwrap(); let storage = Arc::new(Storage { - all: Some(pg_cache), + all: Some(Arc::new(pg_cache.into())), subgraphs: HashMap::new(), }); let invalidation = Invalidation::new(storage.clone()).await.unwrap(); @@ -363,7 +363,7 @@ mod tests { .await .unwrap(); let storage = Arc::new(Storage { - all: Some(pg_cache), + all: Some(Arc::new(pg_cache.into())), subgraphs: HashMap::new(), }); let invalidation = Invalidation::new(storage.clone()).await.unwrap(); diff --git a/apollo-router/src/plugins/response_cache/metrics.rs b/apollo-router/src/plugins/response_cache/metrics.rs index ededa3d55b1..9b320888ecc 100644 --- a/apollo-router/src/plugins/response_cache/metrics.rs +++ b/apollo-router/src/plugins/response_cache/metrics.rs @@ -12,6 +12,7 @@ use opentelemetry::KeyValue; use opentelemetry::metrics::MeterProvider; use parking_lot::Mutex; use serde_json_bytes::Value; +use tokio::sync::broadcast; use tokio_stream::StreamExt; use tokio_stream::wrappers::IntervalStream; use tower::BoxError; @@ -306,6 +307,7 @@ impl From for String { /// parameter subgraph_name is optional and is None when the database is the global one, and Some(...) when it's a database configured for a specific subgraph pub(super) async fn expired_data_task( pg_cache: PostgresCacheStorage, + mut abort_signal: broadcast::Receiver<()>, subgraph_name: Option, ) { let mut interval = IntervalStream::new(tokio::time::interval(std::time::Duration::from_secs( @@ -335,14 +337,22 @@ pub(super) async fn expired_data_task( }) .init(); - while (interval.next().await).is_some() { - let exp_data = match pg_cache.expired_data_count().await { - Ok(exp_data) => exp_data, - Err(err) => { - ::tracing::error!(error = ?err, "cannot get expired data count"); - continue; + loop { + tokio::select! { + biased; + _ = abort_signal.recv() => { + break; } - }; - expired_data_count.store(exp_data, Ordering::Relaxed); + _ = interval.next() => { + let exp_data = match pg_cache.expired_data_count().await { + Ok(exp_data) => exp_data, + Err(err) => { + ::tracing::error!(error = ?err, "cannot get expired data count"); + continue; + } + }; + expired_data_count.store(exp_data, Ordering::Relaxed); + } + } } } diff --git a/apollo-router/src/plugins/response_cache/plugin.rs b/apollo-router/src/plugins/response_cache/plugin.rs index 883f151446f..7df5ce97b3a 100644 --- a/apollo-router/src/plugins/response_cache/plugin.rs +++ b/apollo-router/src/plugins/response_cache/plugin.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use std::fmt::Write; use std::ops::ControlFlow; use std::sync::Arc; +use std::sync::OnceLock; use std::time::Duration; use std::time::Instant; @@ -17,7 +18,6 @@ use http::header; use http::header::CACHE_CONTROL; use itertools::Itertools; use multimap::MultiMap; -use parking_lot::Mutex; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -26,7 +26,11 @@ use serde_json_bytes::Value; use sha2::Digest; use sha2::Sha256; use tokio::sync::RwLock; -use tokio::task::AbortHandle; +use tokio::sync::broadcast; +use tokio::sync::broadcast::Receiver; +use tokio::sync::broadcast::Sender; +use tokio_stream::StreamExt; +use tokio_stream::wrappers::IntervalStream; use tower::BoxError; use tower::ServiceBuilder; use tower::ServiceExt; @@ -88,7 +92,7 @@ register_private_plugin!("apollo", "experimental_response_cache", ResponseCache) #[derive(Clone)] pub(crate) struct ResponseCache { - storage: Arc, + pub(super) storage: Arc, endpoint_config: Option>, subgraphs: Arc>, entity_type: Option, @@ -100,69 +104,79 @@ pub(crate) struct ResponseCache { supergraph_schema: Arc>, /// map containing the enum GRAPH subgraph_enums: Arc>, - _expired_data_count_task_aborts: Arc>>, + /// To close all related tasks + drop_tx: Sender<()>, +} + +impl Drop for ResponseCache { + fn drop(&mut self) { + let _ = self.drop_tx.send(()); + } } +#[derive(Clone)] pub(crate) struct Storage { - pub(crate) all: Option, - pub(crate) subgraphs: HashMap, + pub(crate) all: Option>>, + pub(crate) subgraphs: HashMap>>, } impl Storage { - // FIXME: why could it be None ? pub(crate) fn get(&self, subgraph: &str) -> Option<&PostgresCacheStorage> { - self.subgraphs.get(subgraph).or(self.all.as_ref()) + match self.subgraphs.get(subgraph) { + Some(subgraph) => subgraph.get(), + None => self.all.as_ref().and_then(|s| s.get()), + } } pub(crate) async fn migrate(&self) -> anyhow::Result<()> { - if let Some(all) = &self.all { + if let Some(all) = self.all.as_ref().and_then(|all| all.get()) { all.migrate().await?; } - futures::future::try_join_all(self.subgraphs.values().map(|s| s.migrate())).await?; + futures::future::try_join_all( + self.subgraphs + .values() + .filter_map(|s| Some(s.get()?.migrate())), + ) + .await?; Ok(()) } /// Spawn tokio task to refresh metrics about expired data count - fn expired_data_count_task(&self) -> Vec { - let mut resp = Vec::new(); - if let Some(all) = &self.all { - resp.push(AbortOnDrop( - tokio::task::spawn(metrics::expired_data_task(all.clone(), None)).abort_handle(), + fn expired_data_count_tasks(&self, drop_signal: Receiver<()>) { + if let Some(all) = self.all.as_ref().and_then(|all| all.get()) { + tokio::task::spawn(metrics::expired_data_task( + all.clone(), + drop_signal.resubscribe(), + None, )); } for (subgraph_name, subgraph_cache_storage) in &self.subgraphs { - resp.push(AbortOnDrop( + if let Some(subgraph_cache_storage) = subgraph_cache_storage.get() { tokio::task::spawn(metrics::expired_data_task( subgraph_cache_storage.clone(), + drop_signal.resubscribe(), subgraph_name.clone().into(), - )) - .abort_handle(), - )); + )); + } } - - resp } pub(crate) async fn update_cron(&self) -> anyhow::Result<()> { - if let Some(all) = &self.all { + if let Some(all) = self.all.as_ref().and_then(|all| all.get()) { all.update_cron().await?; } - futures::future::try_join_all(self.subgraphs.values().map(|s| s.update_cron())).await?; + futures::future::try_join_all( + self.subgraphs + .values() + .filter_map(|s| Some(s.get()?.update_cron())), + ) + .await?; Ok(()) } } -/// Call .abort on task when dropped -#[derive(Clone)] -struct AbortOnDrop(AbortHandle); -impl Drop for AbortOnDrop { - fn drop(&mut self) { - self.0.abort(); - } -} - /// Configuration for response caching #[derive(Clone, Debug, JsonSchema, Deserialize)] #[serde(rename_all = "snake_case", deny_unknown_fields)] @@ -269,12 +283,13 @@ impl PluginPrivate for ResponseCache { .map(|q| q.name.to_string()); let mut all = None; - + let (drop_tx, drop_rx) = broadcast::channel(2); + let mut task_aborts = Vec::new(); if let Some(postgres) = &init.config.subgraph.all.postgres { let postgres_config = postgres.clone(); let required_to_start = postgres_config.required_to_start; all = match PostgresCacheStorage::new(&postgres_config).await { - Ok(storage) => Some(storage), + Ok(storage) => Some(Arc::new(OnceLock::from(storage))), Err(e) => { tracing::error!( cache = "response", @@ -283,8 +298,19 @@ impl PluginPrivate for ResponseCache { ); if required_to_start { return Err(e.into()); + } else { + let pg_cache_storage = Arc::new(OnceLock::new()); + task_aborts.push( + tokio::spawn(check_pg_connection( + postgres_config, + pg_cache_storage.clone(), + drop_rx, + None, + )) + .abort_handle(), + ); + Some(pg_cache_storage) } - None } }; } @@ -293,7 +319,7 @@ impl PluginPrivate for ResponseCache { if let Some(postgres) = &config.postgres { let required_to_start = postgres.required_to_start; let storage = match PostgresCacheStorage::new(postgres).await { - Ok(storage) => Some(storage), + Ok(storage) => Arc::new(OnceLock::from(storage)), Err(e) => { tracing::error!( cache = "response", @@ -302,13 +328,22 @@ impl PluginPrivate for ResponseCache { ); if required_to_start { return Err(e.into()); + } else { + let pg_cache_storage = Arc::new(OnceLock::new()); + task_aborts.push( + tokio::spawn(check_pg_connection( + postgres.clone(), + pg_cache_storage.clone(), + drop_tx.subscribe(), + subgraph.clone().into(), + )) + .abort_handle(), + ); + pg_cache_storage } - None } }; - if let Some(storage) = storage { - subgraph_storages.insert(subgraph.clone(), storage); - } + subgraph_storages.insert(subgraph.clone(), storage); } } @@ -362,13 +397,13 @@ impl PluginPrivate for ResponseCache { invalidation, subgraph_enums: Arc::new(get_subgraph_enums(&init.supergraph_schema)), supergraph_schema: init.supergraph_schema, - _expired_data_count_task_aborts: Default::default(), + drop_tx, }) } fn activate(&self) { - let task_aborts = self.storage.expired_data_count_task(); - *(self._expired_data_count_task_aborts.lock()) = task_aborts; + self.storage + .expired_data_count_tasks(self.drop_tx.subscribe()); } fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService { @@ -406,35 +441,6 @@ impl PluginPrivate for ResponseCache { name: &str, mut service: subgraph::BoxService, ) -> subgraph::BoxService { - let storage = match self.storage.get(name) { - Some(storage) => storage.clone(), - None => { - let subgraph_name = name.to_string(); - u64_counter_with_unit!( - "apollo.router.operations.response_cache.fetch.error", - "Errors when fetching data from cache", - "{error}", - 1, - "subgraph.name" = subgraph_name, - "code" = "NO_STORAGE" - ); - - return ServiceBuilder::new() - .map_response(move |response: subgraph::Response| { - update_cache_control( - &response.context, - &CacheControl::new(response.response.headers(), None) - .ok() - .unwrap_or_else(CacheControl::no_store), - ); - - response - }) - .service(service) - .boxed(); - } - }; - let subgraph_ttl = self .subgraph_ttl(name) .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24)); // The unwrap should not happen because it's checked when creating the plugin @@ -472,7 +478,7 @@ impl PluginPrivate for ResponseCache { .boxed_clone(), entity_type: self.entity_type.clone(), name: name.to_string(), - storage, + storage: self.storage.clone(), subgraph_ttl, private_queries, private_id, @@ -567,11 +573,11 @@ impl ResponseCache { } let storage = Arc::new(Storage { - all: Some(storage), + all: Some(Arc::new(storage.into())), subgraphs: HashMap::new(), }); let invalidation = Invalidation::new(storage.clone()).await?; - + let (drop_tx, _drop_rx) = broadcast::channel(2); Ok(Self { storage, entity_type: None, @@ -599,7 +605,7 @@ impl ResponseCache { invalidation, subgraph_enums: Arc::new(get_subgraph_enums(&supergraph_schema)), supergraph_schema, - _expired_data_count_task_aborts: Default::default(), + drop_tx, }) } #[cfg(all( @@ -619,10 +625,11 @@ impl ResponseCache { use std::net::SocketAddr; let storage = Arc::new(Storage { - all: None, + all: Some(Default::default()), subgraphs: HashMap::new(), }); let invalidation = Invalidation::new(storage.clone()).await?; + let (drop_tx, _drop_rx) = broadcast::channel(2); Ok(Self { storage, @@ -651,7 +658,7 @@ impl ResponseCache { invalidation, subgraph_enums: Arc::new(get_subgraph_enums(&supergraph_schema)), supergraph_schema, - _expired_data_count_task_aborts: Default::default(), + drop_tx, }) } @@ -707,7 +714,7 @@ struct CacheService { service: subgraph::BoxCloneService, name: String, entity_type: Option, - storage: PostgresCacheStorage, + storage: Arc, subgraph_ttl: Duration, private_queries: Arc>>, private_id: Option, @@ -741,6 +748,35 @@ impl CacheService { mut self, request: subgraph::Request, ) -> Result { + let storage = match self.storage.get(&self.name) { + Some(storage) => storage.clone(), + None => { + u64_counter_with_unit!( + "apollo.router.operations.response_cache.fetch.error", + "Errors when fetching data from cache", + "{error}", + 1, + "subgraph.name" = self.name.clone(), + "code" = "NO_STORAGE" + ); + + return self + .service + .map_response(move |response: subgraph::Response| { + update_cache_control( + &response.context, + &CacheControl::new(response.response.headers(), None) + .ok() + .unwrap_or_else(CacheControl::no_store), + ); + + response + }) + .call(request) + .await; + } + }; + self.debug = self.debug && (request .supergraph_request @@ -812,7 +848,7 @@ impl CacheService { match cache_lookup_root( self.name.clone(), self.entity_type.as_deref(), - self.storage.clone(), + storage.clone(), is_known_private, private_id.as_deref(), self.debug, @@ -949,7 +985,7 @@ impl CacheService { if cache_control.should_store() { cache_store_root_from_response( - self.storage, + storage, self.subgraph_ttl, &response, cache_control, @@ -973,7 +1009,7 @@ impl CacheService { self.name.clone(), self.supergraph_schema.clone(), &self.subgraph_enums, - self.storage.clone(), + storage.clone(), is_known_private, private_id.as_deref(), request, @@ -1082,7 +1118,7 @@ impl CacheService { } cache_store_entities_from_response( - self.storage, + storage, self.subgraph_ttl, &mut response, cache_control.clone(), @@ -2403,6 +2439,45 @@ fn assemble_response_from_errors( (new_entities, new_errors) } +async fn check_pg_connection( + postgres_config: PostgresCacheConfig, + pg_storage: Arc>, + mut abort_signal: Receiver<()>, + subgraph_name: Option, +) { + let mut interval = + IntervalStream::new(tokio::time::interval(std::time::Duration::from_secs(30))); + let abort_signal_cloned = abort_signal.resubscribe(); + loop { + tokio::select! { + biased; + _ = abort_signal.recv() => { + break; + } + _ = interval.next() => { + u64_counter_with_unit!( + "apollo.router.response_cache.reconnection", + "Response cache counter for invalidated entries", + "{retry}", + 1, + "subgraph.name" = subgraph_name.clone().unwrap_or_default() + ); + if let Ok(storage) = PostgresCacheStorage::new(&postgres_config).await { + if let Err(err) = storage.migrate().await { + tracing::error!(error = %err, "cannot migrate storage"); + } + if let Err(err) = storage.update_cron().await { + tracing::error!(error = %err, "cannot update cron storage"); + } + let _ = pg_storage.set(storage.clone()); + tokio::task::spawn(metrics::expired_data_task(storage, abort_signal_cloned, None)); + break; + } + } + } + } +} + pub(crate) type CacheKeysContext = Vec; #[derive(Clone, Debug, Deserialize, Serialize)] diff --git a/apollo-router/src/plugins/response_cache/snapshots/apollo_router__plugins__response_cache__tests__failure_mode_reconnect-2.snap b/apollo-router/src/plugins/response_cache/snapshots/apollo_router__plugins__response_cache__tests__failure_mode_reconnect-2.snap new file mode 100644 index 00000000000..136226abee7 --- /dev/null +++ b/apollo-router/src/plugins/response_cache/snapshots/apollo_router__plugins__response_cache__tests__failure_mode_reconnect-2.snap @@ -0,0 +1,78 @@ +--- +source: apollo-router/src/plugins/response_cache/tests.rs +description: "Make sure everything is in status 'new' and we have all the entities and root fields" +expression: cache_keys +--- +[ + { + "key": "version:1.0:subgraph:user:type:Query:hash:26ab4118dbabffe5dfbef4462513caa8b6c8941363cf2eafa874bf1d47a3c468:data:d9d84a3c7ffc27b0190a671212f3740e5b8478e84e23825830e97822e25cf05c", + "invalidationKeys": [ + "currentUser", + "version:1.0:subgraph:user:type:Query" + ], + "kind": { + "rootFields": [ + "currentUser" + ] + }, + "subgraphName": "user", + "subgraphRequest": { + "query": "{ currentUser { activeOrganization { __typename id } } }" + }, + "status": "new", + "cacheControl": { + "created": 0, + "maxAge": 86400, + "public": true + }, + "data": { + "data": { + "currentUser": { + "activeOrganization": { + "__typename": "Organization", + "id": "1" + } + } + } + } + }, + { + "key": "version:1.0:subgraph:orga:type:Organization:entity:bcc0a4a9f8c595510c0ff8849bc36b402ac3f52506392d67107c623528ff11f4:representation::hash:a7ccf1576978c6598f6d10d8855fc683f71242533dc6d5a706743bd9bbfa554c:data:d9d84a3c7ffc27b0190a671212f3740e5b8478e84e23825830e97822e25cf05c", + "invalidationKeys": [ + "organization", + "organization-1", + "version:1.0:subgraph:orga:type:Organization", + "version:1.0:subgraph:orga:type:Organization:entity:bcc0a4a9f8c595510c0ff8849bc36b402ac3f52506392d67107c623528ff11f4" + ], + "kind": { + "typename": "Organization", + "entityKey": { + "id": "1" + } + }, + "subgraphName": "orga", + "subgraphRequest": { + "query": "query($representations: [_Any!]!) { _entities(representations: $representations) { ... on Organization { creatorUser { __typename id } } } }", + "variables": { + "representations": [ + { + "id": "1", + "__typename": "Organization" + } + ] + } + }, + "status": "new", + "cacheControl": { + "created": 0, + "maxAge": 86400, + "public": true + }, + "data": { + "creatorUser": { + "__typename": "User", + "id": 2 + } + } + } +] diff --git a/apollo-router/src/plugins/response_cache/snapshots/apollo_router__plugins__response_cache__tests__failure_mode_reconnect-4.snap b/apollo-router/src/plugins/response_cache/snapshots/apollo_router__plugins__response_cache__tests__failure_mode_reconnect-4.snap new file mode 100644 index 00000000000..02788e32bf2 --- /dev/null +++ b/apollo-router/src/plugins/response_cache/snapshots/apollo_router__plugins__response_cache__tests__failure_mode_reconnect-4.snap @@ -0,0 +1,73 @@ +--- +source: apollo-router/src/plugins/response_cache/tests.rs +description: "Make sure everything is in status 'cached' and we have all the entities and root fields" +expression: cache_keys +--- +[ + { + "key": "failure_mode_reconnect-version:1.0:subgraph:user:type:Query:hash:26ab4118dbabffe5dfbef4462513caa8b6c8941363cf2eafa874bf1d47a3c468:data:d9d84a3c7ffc27b0190a671212f3740e5b8478e84e23825830e97822e25cf05c", + "invalidationKeys": [ + "currentUser", + "version:1.0:subgraph:user:type:Query" + ], + "kind": { + "rootFields": [ + "currentUser" + ] + }, + "subgraphName": "user", + "subgraphRequest": { + "query": "{ currentUser { activeOrganization { __typename id } } }" + }, + "status": "cached", + "cacheControl": { + "created": 0, + "maxAge": 86400, + "public": true + }, + "data": { + "data": { + "currentUser": { + "activeOrganization": { + "__typename": "Organization", + "id": "1" + } + } + } + } + }, + { + "key": "version:1.0:subgraph:orga:type:Organization:entity:bcc0a4a9f8c595510c0ff8849bc36b402ac3f52506392d67107c623528ff11f4:representation::hash:a7ccf1576978c6598f6d10d8855fc683f71242533dc6d5a706743bd9bbfa554c:data:d9d84a3c7ffc27b0190a671212f3740e5b8478e84e23825830e97822e25cf05c", + "invalidationKeys": [ + "organization", + "organization-1", + "version:1.0:subgraph:orga:type:Organization", + "version:1.0:subgraph:orga:type:Organization:entity:bcc0a4a9f8c595510c0ff8849bc36b402ac3f52506392d67107c623528ff11f4" + ], + "kind": { + "typename": "Organization", + "entityKey": { + "id": "1" + } + }, + "subgraphName": "orga", + "subgraphRequest": { + "query": "query($representations: [_Any!]!) { _entities(representations: $representations) { ... on Organization { creatorUser { __typename id } } } }", + "variables": { + "representations": [] + } + }, + "status": "cached", + "cacheControl": { + "created": 0, + "maxAge": 86400, + "public": true + }, + "data": { + "creatorUser": { + "__typename": "User", + "id": 2 + } + } + } +] diff --git a/apollo-router/src/plugins/response_cache/tests.rs b/apollo-router/src/plugins/response_cache/tests.rs index 6d277951a6e..b457ad48526 100644 --- a/apollo-router/src/plugins/response_cache/tests.rs +++ b/apollo-router/src/plugins/response_cache/tests.rs @@ -7,6 +7,7 @@ use http::HeaderName; use http::HeaderValue; use http::header::CACHE_CONTROL; use serde_json_bytes::ByteString; +use tokio::sync::broadcast; use tower::Service; use tower::ServiceExt; @@ -2682,8 +2683,10 @@ async fn expired_data_count() { .await .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(10)).await; + let (_drop_rx, drop_tx) = broadcast::channel(2); tokio::spawn( - metrics::expired_data_task(pg_cache.clone(), None).with_current_meter_provider(), + metrics::expired_data_task(pg_cache.clone(), drop_tx, None) + .with_current_meter_provider(), ); tokio::time::sleep(std::time::Duration::from_millis(100)).await; assert_gauge!("apollo.router.response_cache.data.expired", 1); @@ -2691,3 +2694,295 @@ async fn expired_data_count() { .with_metrics() .await; } + +#[tokio::test] +async fn failure_mode_reconnect() { + async { + let valid_schema = Arc::new(Schema::parse_and_validate(SCHEMA, "test.graphql").unwrap()); + let query = + "query { currentUser { activeOrganization { id creatorUser { __typename id } } } }"; + + let subgraphs = serde_json::json!({ + "user": { + "query": { + "currentUser": { + "activeOrganization": { + "__typename": "Organization", + "id": "1", + } + } + }, + "headers": {"cache-control": "public"}, + }, + "orga": { + "entities": [ + { + "__typename": "Organization", + "id": "1", + "creatorUser": { + "__typename": "User", + "id": 2 + } + } + ], + "headers": {"cache-control": "public"}, + }, + }); + + let map = [ + ( + "user".to_string(), + Subgraph { + postgres: None, + private_id: Some("sub".to_string()), + enabled: true.into(), + ttl: None, + ..Default::default() + }, + ), + ( + "orga".to_string(), + Subgraph { + postgres: None, + private_id: Some("sub".to_string()), + enabled: true.into(), + ttl: None, + ..Default::default() + }, + ), + ] + .into_iter() + .collect(); + let response_cache = + ResponseCache::without_storage_for_failure_mode(map, valid_schema.clone()) + .await + .unwrap(); + + let service = TestHarness::builder() + .configuration_json(serde_json::json!({ + "include_subgraph_errors": { "all": true }, + "experimental_mock_subgraphs": subgraphs.clone(), + })) + .unwrap() + .schema(SCHEMA) + .extra_private_plugin(response_cache.clone()) + .build_supergraph() + .await + .unwrap(); + + let request = supergraph::Request::fake_builder() + .query(query) + .context(Context::new()) + .header( + HeaderName::from_static(CACHE_DEBUG_HEADER_NAME), + HeaderValue::from_static("true"), + ) + .build() + .unwrap(); + let mut response = service.oneshot(request).await.unwrap(); + let response = response.next_response().await.unwrap(); + insta::assert_json_snapshot!(response, @r###" + { + "data": { + "currentUser": { + "activeOrganization": { + "id": "1", + "creatorUser": { + "__typename": "User", + "id": 2 + } + } + } + } + } + "###); + + assert_counter!( + "apollo.router.operations.response_cache.fetch.error", + 1, + "subgraph.name" = "orga", + "code" = "NO_STORAGE" + ); + assert_counter!( + "apollo.router.operations.response_cache.fetch.error", + 1, + "subgraph.name" = "user", + "code" = "NO_STORAGE" + ); + + let pg_cache = PostgresCacheStorage::new(&PostgresCacheConfig { + cleanup_interval: std::time::Duration::from_secs(60 * 7), // Every 7 minutes + url: "postgres://127.0.0.1".parse().unwrap(), + username: None, + password: None, + idle_timeout: std::time::Duration::from_secs(5), + acquire_timeout: std::time::Duration::from_millis(50), + required_to_start: true, + pool_size: default_pool_size(), + batch_size: default_batch_size(), + namespace: Some(String::from("failure_mode_reconnect")), + }) + .await + .unwrap(); + let service = TestHarness::builder() + .configuration_json( + serde_json::json!({"include_subgraph_errors": { "all": true }, + "experimental_mock_subgraphs": subgraphs.clone(), + }), + ) + .unwrap() + .schema(SCHEMA) + .extra_private_plugin(response_cache.clone()) + .build_supergraph() + .await + .unwrap(); + + response_cache + .storage + .all + .as_ref() + .expect("the database all should already be Some") + .set(pg_cache) + .map_err(|_| "this should not be already set") + .unwrap(); + + let request = supergraph::Request::fake_builder() + .query(query) + .context(Context::new()) + .header( + HeaderName::from_static(CACHE_DEBUG_HEADER_NAME), + HeaderValue::from_static("true"), + ) + .build() + .unwrap(); + let mut response = service.oneshot(request).await.unwrap(); + let mut cache_keys: CacheKeysContext = response + .context + .get(CONTEXT_DEBUG_CACHE_KEYS) + .unwrap() + .unwrap(); + cache_keys.iter_mut().for_each(|ck| { + ck.invalidation_keys.sort(); + ck.cache_control.created = 0; + }); + cache_keys.sort_by(|a, b| a.invalidation_keys.cmp(&b.invalidation_keys)); + insta::with_settings!({ + description => "Make sure everything is in status 'new' and we have all the entities and root fields" + }, { + insta::assert_json_snapshot!(cache_keys); + }); + + let mut response = response.next_response().await.unwrap(); + assert!( + response + .extensions + .remove(CACHE_DEBUG_EXTENSIONS_KEY) + .is_some() + ); + insta::assert_json_snapshot!(response, @r###" + { + "data": { + "currentUser": { + "activeOrganization": { + "id": "1", + "creatorUser": { + "__typename": "User", + "id": 2 + } + } + } + } + } + "###); + + assert_counter!( + "apollo.router.operations.response_cache.fetch.error", + 1, + "subgraph.name" = "orga", + "code" = "NO_STORAGE" + ); + assert_counter!( + "apollo.router.operations.response_cache.fetch.error", + 1, + "subgraph.name" = "user", + "code" = "NO_STORAGE" + ); + + let service = TestHarness::builder() + .configuration_json( + serde_json::json!({"include_subgraph_errors": { "all": true }, + "experimental_mock_subgraphs": subgraphs.clone(), + }), + ) + .unwrap() + .schema(SCHEMA) + .extra_private_plugin(response_cache.clone()) + .build_supergraph() + .await + .unwrap(); + + let request = supergraph::Request::fake_builder() + .query(query) + .context(Context::new()) + .header( + HeaderName::from_static(CACHE_DEBUG_HEADER_NAME), + HeaderValue::from_static("true"), + ) + .build() + .unwrap(); + let mut response = service.oneshot(request).await.unwrap(); + let mut cache_keys: CacheKeysContext = response + .context + .get(CONTEXT_DEBUG_CACHE_KEYS) + .unwrap() + .unwrap(); + cache_keys.iter_mut().for_each(|ck| { + ck.invalidation_keys.sort(); + ck.cache_control.created = 0; + }); + cache_keys.sort_by(|a, b| a.invalidation_keys.cmp(&b.invalidation_keys)); + insta::with_settings!({ + description => "Make sure everything is in status 'cached' and we have all the entities and root fields" + }, { + insta::assert_json_snapshot!(cache_keys); + }); + + let mut response = response.next_response().await.unwrap(); + assert!( + response + .extensions + .remove(CACHE_DEBUG_EXTENSIONS_KEY) + .is_some() + ); + insta::assert_json_snapshot!(response, @r###" + { + "data": { + "currentUser": { + "activeOrganization": { + "id": "1", + "creatorUser": { + "__typename": "User", + "id": 2 + } + } + } + } + } + "###); + + assert_counter!( + "apollo.router.operations.response_cache.fetch.error", + 1, + "subgraph.name" = "orga", + "code" = "NO_STORAGE" + ); + assert_counter!( + "apollo.router.operations.response_cache.fetch.error", + 1, + "subgraph.name" = "user", + "code" = "NO_STORAGE" + ); + } + .with_metrics() + .await; +}