diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index 9481a0bd9..ca1280a6a 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -31,7 +31,7 @@ use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::spawn; use nativelink_util::task::JoinHandleDropGuard; use tokio::sync::{Notify, mpsc, watch}; -use tracing::{debug, error}; +use tracing::{debug, error, info, instrument, warn}; use crate::awaited_action_db::{ AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION, @@ -549,24 +549,29 @@ impl I + Clone + Send + Sync> AwaitedActionDbI ActionUniqueQualifier::Cacheable(action_key) => { let maybe_awaited_action = action_info_hash_key_to_awaited_action.remove(action_key); - match maybe_awaited_action { - Some(removed_operation_id) => { - if &removed_operation_id != new_awaited_action.operation_id() { - error!( - ?removed_operation_id, - ?new_awaited_action, - ?action_key, - "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync", - ); - } - } - None => { + if let Some(removed_operation_id) = maybe_awaited_action { + if &removed_operation_id != new_awaited_action.operation_id() { error!( + ?removed_operation_id, ?new_awaited_action, ?action_key, - "action_info_hash_key_to_awaited_action out of sync, it should have had the unique_key", + "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync", ); } + } else { + warn!( + ?new_awaited_action, + ?action_key, + "action_info_hash_key_to_awaited_action missing key for finished action - attempting recovery" + ); + // This is a recoverable inconsistency - the action finished but + // its key mapping was already removed or never existed. + // Since the action is finished, we don't need to restore the mapping, + // but we should check for data corruption indicators. + Self::log_consistency_diagnostic( + action_info_hash_key_to_awaited_action, + new_awaited_action, + ); } } ActionUniqueQualifier::Uncacheable(_action_key) => { @@ -577,6 +582,52 @@ impl I + Clone + Send + Sync> AwaitedActionDbI } } + /// Logs diagnostic information when data structure inconsistency is detected. + /// This helps with debugging and provides actionable information for recovery. + #[instrument( + name = "memory_awaited_action_db.log_consistency_diagnostic", + skip_all, + fields( + operation_id = %awaited_action.operation_id(), + action_stage = ?awaited_action.state().stage, + hash_key_count = action_info_hash_key_to_awaited_action.len(), + corruption_detected = tracing::field::Empty, + ) + )] + fn log_consistency_diagnostic( + action_info_hash_key_to_awaited_action: &HashMap, + awaited_action: &AwaitedAction, + ) { + let operation_id = awaited_action.operation_id(); + let stage = &awaited_action.state().stage; + + // Count how many hash key mappings exist for diagnostic purposes + let hash_key_count = action_info_hash_key_to_awaited_action.len(); + + // Check if this operation_id appears elsewhere in the hash map (shouldn't happen) + let operation_id_appears_elsewhere = action_info_hash_key_to_awaited_action + .values() + .any(|id| id == operation_id); + + if operation_id_appears_elsewhere { + error!( + ?operation_id, + ?stage, + hash_key_count, + "CRITICAL: Operation ID found in hash key map with different key - potential data corruption" + ); + tracing::Span::current().record("corruption_detected", true); + } else { + debug!( + ?operation_id, + ?stage, + hash_key_count, + "Hash key mapping missing for finished action - likely harmless race condition" + ); + tracing::Span::current().record("corruption_detected", false); + } + } + fn update_awaited_action( &mut self, mut new_awaited_action: AwaitedAction, @@ -629,8 +680,27 @@ impl I + Clone + Send + Sync> AwaitedActionDbI .is_same_stage(&new_awaited_action.state().stage); if !is_same_stage { - self.sorted_action_info_hash_keys - .process_state_changes(&old_awaited_action, &new_awaited_action)?; + // Try to process state changes and validate consistency on error + if let Err(e) = self + .sorted_action_info_hash_keys + .process_state_changes(&old_awaited_action, &new_awaited_action) + { + warn!( + error = ?e, + ?old_awaited_action, + ?new_awaited_action, + "State change processing failed, validating consistency" + ); + // Don't fail on validation errors during error recovery + if let Err(validation_err) = self.validate_consistency() { + error!( + validation_error = ?validation_err, + "Data structure consistency validation failed after state change error" + ); + } + return Err(e); + } + Self::process_state_changes_for_hash_key_map( &mut self.action_info_hash_key_to_awaited_action, &new_awaited_action, @@ -818,6 +888,140 @@ impl I + Clone + Send + Sync> AwaitedActionDbI self.now_fn.clone(), ))) } + + /// Validates consistency between the three main data structures. + /// This is primarily used for debugging and can be called periodically + /// or after errors to detect data corruption. + #[instrument( + name = "memory_awaited_action_db.validate_consistency", + skip_all, + fields( + hash_key_count = self.action_info_hash_key_to_awaited_action.len(), + operation_count = self.operation_id_to_awaited_action.len(), + connected_clients_count = self.connected_clients_for_operation_id.len(), + ) + )] + fn validate_consistency(&self) -> Result<(), Error> { + // Check that all entries in action_info_hash_key_to_awaited_action + // have corresponding entries in operation_id_to_awaited_action + for (action_key, operation_id) in &self.action_info_hash_key_to_awaited_action { + if !self + .operation_id_to_awaited_action + .contains_key(operation_id) + { + return Err(make_err!( + Code::Internal, + "Hash key map contains operation_id {operation_id} for key {action_key:?} but operation_id_to_awaited_action does not" + )); + } + } + + // Check that all cacheable non-finished actions in operation_id_to_awaited_action + // have corresponding entries in action_info_hash_key_to_awaited_action + for (operation_id, tx) in &self.operation_id_to_awaited_action { + let awaited_action = tx.borrow(); + if let ActionUniqueQualifier::Cacheable(action_key) = + &awaited_action.action_info().unique_qualifier + { + if !awaited_action.state().stage.is_finished() { + match self.action_info_hash_key_to_awaited_action.get(action_key) { + Some(mapped_operation_id) => { + if mapped_operation_id != operation_id { + return Err(make_err!( + Code::Internal, + "Hash key map has incorrect operation_id mapping: key {action_key:?} maps to {mapped_operation_id} but should map to {operation_id}" + )); + } + } + None => { + return Err(make_err!( + Code::Internal, + "Non-finished cacheable action {operation_id} with key {action_key:?} missing from hash key map" + )); + } + } + } + } + } + + // Check that connected_clients_for_operation_id is consistent + for operation_id in self.connected_clients_for_operation_id.keys() { + if !self + .operation_id_to_awaited_action + .contains_key(operation_id) + { + return Err(make_err!( + Code::Internal, + "connected_clients_for_operation_id contains {operation_id} but operation_id_to_awaited_action does not" + )); + } + } + + Ok(()) + } + + /// Attempts to recover from data structure inconsistencies by rebuilding + /// the hash key mapping from the `operation_id_to_awaited_action` map. + /// This is a self-healing mechanism for when the maps get out of sync. + #[instrument( + name = "memory_awaited_action_db.attempt_recovery", + skip_all, + fields( + original_hash_key_count = self.action_info_hash_key_to_awaited_action.len(), + operation_count = self.operation_id_to_awaited_action.len(), + ) + )] + fn attempt_recovery(&mut self) { + let mut recovered_mappings = 0; + + // First, rebuild action_info_hash_key_to_awaited_action from scratch + let mut new_hash_key_map = HashMap::new(); + + for (operation_id, tx) in &self.operation_id_to_awaited_action { + let awaited_action = tx.borrow(); + if let ActionUniqueQualifier::Cacheable(action_key) = + &awaited_action.action_info().unique_qualifier + { + // Only add non-finished actions to the hash key map + if !awaited_action.state().stage.is_finished() { + if let Some(existing_operation_id) = + new_hash_key_map.insert(action_key.clone(), operation_id.clone()) + { + warn!( + ?action_key, + ?operation_id, + ?existing_operation_id, + "Duplicate cacheable action detected during recovery - keeping newer entry" + ); + } else { + recovered_mappings += 1; + } + } + } + } + + // Count how many stale mappings we're removing + let removed_stale_mappings = self + .action_info_hash_key_to_awaited_action + .len() + .saturating_sub(new_hash_key_map.len()); + + // Replace the hash key map with the rebuilt version + self.action_info_hash_key_to_awaited_action = new_hash_key_map; + + warn!( + recovered_mappings, + removed_stale_mappings, "Data structure recovery completed" + ); + + // Record recovery metrics for OpenTelemetry + tracing::Span::current().record("recovered_mappings", recovered_mappings); + tracing::Span::current().record("removed_stale_mappings", removed_stale_mappings); + tracing::Span::current().record( + "final_hash_key_count", + self.action_info_hash_key_to_awaited_action.len(), + ); + } } #[derive(Debug, MetricsComponent)] @@ -868,6 +1072,63 @@ impl I + Clone + Send + Sync + 'static> }), } } + + /// Validates data structure consistency and attempts recovery if needed. + /// This is a public method that can be called by external monitoring + /// or debugging tools to check and repair the database state. + #[instrument( + name = "memory_awaited_action_db.validate_and_recover", + skip_all, + fields( + validation_result = tracing::field::Empty, + recovery_attempted = false, + recovery_result = tracing::field::Empty, + ) + )] + pub async fn validate_and_recover(&self) -> Result<(), Error> { + let mut inner = self.inner.lock().await; + + // First attempt validation + match inner.validate_consistency() { + Ok(()) => { + debug!("Memory awaited action database consistency validation passed"); + tracing::Span::current().record("validation_result", "passed"); + Ok(()) + } + Err(validation_error) => { + warn!( + error = ?validation_error, + "Memory awaited action database consistency validation failed, attempting recovery" + ); + tracing::Span::current().record("validation_result", "failed"); + tracing::Span::current().record("recovery_attempted", true); + + // Attempt recovery + inner.attempt_recovery(); + + // Validate again after recovery + match inner.validate_consistency() { + Ok(()) => { + info!("Memory awaited action database successfully recovered"); + tracing::Span::current().record("recovery_result", "success"); + Ok(()) + } + Err(post_recovery_error) => { + error!( + original_error = ?validation_error, + post_recovery_error = ?post_recovery_error, + "Failed to recover memory awaited action database" + ); + tracing::Span::current().record("recovery_result", "failed"); + Err(make_err!( + Code::Internal, + "Database recovery failed: original error: {validation_error}, post-recovery error: {post_recovery_error}" + )) + } + } + } + } + } } impl I + Clone + Send + Sync + 'static> AwaitedActionDb diff --git a/nativelink-scheduler/tests/memory_awaited_action_db_test.rs b/nativelink-scheduler/tests/memory_awaited_action_db_test.rs new file mode 100644 index 000000000..e97975a7b --- /dev/null +++ b/nativelink-scheduler/tests/memory_awaited_action_db_test.rs @@ -0,0 +1,266 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use nativelink_config::stores::EvictionPolicy; +use nativelink_error::Error; +use nativelink_scheduler::awaited_action_db::AwaitedActionDb; +use nativelink_scheduler::memory_awaited_action_db::MemoryAwaitedActionDb; +use nativelink_util::action_messages::{ + ActionInfo, ActionUniqueKey, ActionUniqueQualifier, OperationId, +}; +use nativelink_util::common::DigestInfo; +use nativelink_util::digest_hasher::DigestHasherFunc; +use nativelink_util::instant_wrapper::{InstantWrapper, MockInstantWrapped}; +use tokio::sync::Notify; + +/// Helper function to create a test action info with unique qualifier +fn make_test_action_info(digest_str: &str, cacheable: bool) -> ActionInfo { + let digest = DigestInfo::try_new(digest_str, 32).unwrap(); + let unique_key = ActionUniqueKey { + instance_name: "test".to_string(), + digest_function: DigestHasherFunc::Sha256, + digest: DigestInfo::try_new(digest_str, 32).unwrap(), + }; + + ActionInfo { + command_digest: digest.clone(), + input_root_digest: digest, + timeout: Duration::from_secs(300), + platform_properties: Default::default(), + priority: 0, + load_timestamp: MockInstantWrapped::default().now(), + insert_timestamp: MockInstantWrapped::default().now(), + unique_qualifier: if cacheable { + ActionUniqueQualifier::Cacheable(unique_key) + } else { + ActionUniqueQualifier::Uncacheable(unique_key) + }, + } +} + +/// Test that validate_and_recover works correctly when database is consistent +#[tokio::test] +async fn test_validate_and_recover_consistent_database() -> Result<(), Error> { + let eviction_config = EvictionPolicy { + max_count: 100, + max_bytes: 1024 * 1024, + ..Default::default() + }; + + let tasks_change_notify = Arc::new(Notify::new()); + let now_fn = MockInstantWrapped::default; + + let db = MemoryAwaitedActionDb::new(&eviction_config, tasks_change_notify, now_fn); + + // Add a test action + let action_info = make_test_action_info( + "abcd1234567890abcdef1234567890abcdef1234567890abcdef1234567890ab", + true, + ); + let operation_id = OperationId::default(); + + let _subscriber = db + .add_action(operation_id.clone(), Arc::new(action_info)) + .await?; + + // Validation should pass on a consistent database + let result = db.validate_and_recover().await; + assert!( + result.is_ok(), + "Validation should pass for consistent database" + ); + + Ok(()) +} + +/// Test that the database can recover from simulated inconsistency +/// Note: This is a conceptual test - in practice we can't easily induce +/// inconsistency from the public API without internal access +#[tokio::test] +async fn test_basic_functionality_with_validation() -> Result<(), Error> { + let eviction_config = EvictionPolicy { + max_count: 100, + max_bytes: 1024 * 1024, + ..Default::default() + }; + + let tasks_change_notify = Arc::new(Notify::new()); + let now_fn = MockInstantWrapped::default; + + let db = MemoryAwaitedActionDb::new(&eviction_config, tasks_change_notify, now_fn); + + // Test adding multiple cacheable actions + let action_info1 = make_test_action_info( + "abcd1234567890abcdef1234567890abcdef1234567890abcdef1234567890ab", + true, + ); + let action_info2 = make_test_action_info( + "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", + true, + ); + let operation_id1 = OperationId::default(); + let operation_id2 = OperationId::default(); + + let _subscriber1 = db + .add_action(operation_id1.clone(), Arc::new(action_info1)) + .await?; + let _subscriber2 = db + .add_action(operation_id2.clone(), Arc::new(action_info2)) + .await?; + + // Validate consistency after adding actions + db.validate_and_recover().await?; + + // Check that we can retrieve the actions + let retrieved1 = db.get_awaited_action_by_id(&operation_id1).await?; + let retrieved2 = db.get_awaited_action_by_id(&operation_id2).await?; + + assert!( + retrieved1.is_some(), + "Should be able to retrieve first action" + ); + assert!( + retrieved2.is_some(), + "Should be able to retrieve second action" + ); + + // Validate consistency again + db.validate_and_recover().await?; + + Ok(()) +} + +/// Test that uncacheable actions don't interfere with validation +#[tokio::test] +async fn test_uncacheable_actions_validation() -> Result<(), Error> { + let eviction_config = EvictionPolicy { + max_count: 100, + max_bytes: 1024 * 1024, + ..Default::default() + }; + + let tasks_change_notify = Arc::new(Notify::new()); + let now_fn = MockInstantWrapped::default; + + let db = MemoryAwaitedActionDb::new(&eviction_config, tasks_change_notify, now_fn); + + // Add both cacheable and uncacheable actions + let cacheable_action = make_test_action_info( + "abcd1234567890abcdef1234567890abcdef1234567890abcdef1234567890ab", + true, + ); + let uncacheable_action = make_test_action_info( + "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", + false, + ); + + let operation_id1 = OperationId::default(); + let operation_id2 = OperationId::default(); + + let _subscriber1 = db + .add_action(operation_id1.clone(), Arc::new(cacheable_action)) + .await?; + let _subscriber2 = db + .add_action(operation_id2.clone(), Arc::new(uncacheable_action)) + .await?; + + // Validation should pass with mixed action types + db.validate_and_recover().await?; + + // Check that both actions exist + let retrieved1 = db.get_awaited_action_by_id(&operation_id1).await?; + let retrieved2 = db.get_awaited_action_by_id(&operation_id2).await?; + + assert!(retrieved1.is_some(), "Cacheable action should exist"); + assert!(retrieved2.is_some(), "Uncacheable action should exist"); + + Ok(()) +} + +/// Test duplicate cacheable action handling with validation +#[tokio::test] +async fn test_duplicate_cacheable_actions_with_validation() -> Result<(), Error> { + let eviction_config = EvictionPolicy { + max_count: 100, + max_bytes: 1024 * 1024, + ..Default::default() + }; + + let tasks_change_notify = Arc::new(Notify::new()); + let now_fn = MockInstantWrapped::default; + + let db = MemoryAwaitedActionDb::new(&eviction_config, tasks_change_notify, now_fn); + + // Create two actions with the same digest (should be deduplicated) + let action_info1 = make_test_action_info( + "abcd1234567890abcdef1234567890abcdef1234567890abcdef1234567890ab", + true, + ); + let action_info2 = make_test_action_info( + "abcd1234567890abcdef1234567890abcdef1234567890abcdef1234567890ab", + true, + ); + + let operation_id1 = OperationId::default(); + let operation_id2 = OperationId::default(); + + let _subscriber1 = db + .add_action(operation_id1.clone(), Arc::new(action_info1)) + .await?; + let _subscriber2 = db + .add_action(operation_id2.clone(), Arc::new(action_info2)) + .await?; + + // Both should succeed but subscriber2 should be for the same underlying action + // If we got here, both add_action calls succeeded (otherwise await? would have failed) + + // Validation should pass even with deduplication + db.validate_and_recover().await?; + + Ok(()) +} + +/// Test that validation works correctly during eviction scenarios +#[tokio::test] +async fn test_validation_with_eviction() -> Result<(), Error> { + let eviction_config = EvictionPolicy { + max_count: 2, // Very small to trigger eviction + max_bytes: 1024, + ..Default::default() + }; + + let tasks_change_notify = Arc::new(Notify::new()); + let now_fn = MockInstantWrapped::default; + + let db = MemoryAwaitedActionDb::new(&eviction_config, tasks_change_notify, now_fn); + + // Add more actions than the eviction limit + for i in 0..5 { + let action_info = make_test_action_info( + &format!("{:064x}", i), // Create unique digests + true, + ); + let operation_id = OperationId::default(); + + let _subscriber = db.add_action(operation_id, Arc::new(action_info)).await?; + + // Validate after each addition + db.validate_and_recover().await?; + } + + Ok(()) +}