@@ -31,7 +31,7 @@ use nativelink_util::instant_wrapper::InstantWrapper;
3131use nativelink_util:: spawn;
3232use nativelink_util:: task:: JoinHandleDropGuard ;
3333use tokio:: sync:: { Notify , mpsc, watch} ;
34- use tracing:: { debug, error} ;
34+ use tracing:: { debug, error, info , instrument , warn } ;
3535
3636use crate :: awaited_action_db:: {
3737 AwaitedAction , AwaitedActionDb , AwaitedActionSubscriber , CLIENT_KEEPALIVE_DURATION ,
@@ -549,24 +549,29 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
549549 ActionUniqueQualifier :: Cacheable ( action_key) => {
550550 let maybe_awaited_action =
551551 action_info_hash_key_to_awaited_action. remove ( action_key) ;
552- match maybe_awaited_action {
553- Some ( removed_operation_id) => {
554- if & removed_operation_id != new_awaited_action. operation_id ( ) {
555- error ! (
556- ?removed_operation_id,
557- ?new_awaited_action,
558- ?action_key,
559- "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync" ,
560- ) ;
561- }
562- }
563- None => {
552+ if let Some ( removed_operation_id) = maybe_awaited_action {
553+ if & removed_operation_id != new_awaited_action. operation_id ( ) {
564554 error ! (
555+ ?removed_operation_id,
565556 ?new_awaited_action,
566557 ?action_key,
567- "action_info_hash_key_to_awaited_action out of sync, it should have had the unique_key " ,
558+ "action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync " ,
568559 ) ;
569560 }
561+ } else {
562+ warn ! (
563+ ?new_awaited_action,
564+ ?action_key,
565+ "action_info_hash_key_to_awaited_action missing key for finished action - attempting recovery"
566+ ) ;
567+ // This is a recoverable inconsistency - the action finished but
568+ // its key mapping was already removed or never existed.
569+ // Since the action is finished, we don't need to restore the mapping,
570+ // but we should check for data corruption indicators.
571+ Self :: log_consistency_diagnostic (
572+ action_info_hash_key_to_awaited_action,
573+ new_awaited_action,
574+ ) ;
570575 }
571576 }
572577 ActionUniqueQualifier :: Uncacheable ( _action_key) => {
@@ -577,6 +582,52 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
577582 }
578583 }
579584
585+ /// Logs diagnostic information when data structure inconsistency is detected.
586+ /// This helps with debugging and provides actionable information for recovery.
587+ #[ instrument(
588+ name = "memory_awaited_action_db.log_consistency_diagnostic" ,
589+ skip_all,
590+ fields(
591+ operation_id = %awaited_action. operation_id( ) ,
592+ action_stage = ?awaited_action. state( ) . stage,
593+ hash_key_count = action_info_hash_key_to_awaited_action. len( ) ,
594+ corruption_detected = tracing:: field:: Empty ,
595+ )
596+ ) ]
597+ fn log_consistency_diagnostic (
598+ action_info_hash_key_to_awaited_action : & HashMap < ActionUniqueKey , OperationId > ,
599+ awaited_action : & AwaitedAction ,
600+ ) {
601+ let operation_id = awaited_action. operation_id ( ) ;
602+ let stage = & awaited_action. state ( ) . stage ;
603+
604+ // Count how many hash key mappings exist for diagnostic purposes
605+ let hash_key_count = action_info_hash_key_to_awaited_action. len ( ) ;
606+
607+ // Check if this operation_id appears elsewhere in the hash map (shouldn't happen)
608+ let operation_id_appears_elsewhere = action_info_hash_key_to_awaited_action
609+ . values ( )
610+ . any ( |id| id == operation_id) ;
611+
612+ if operation_id_appears_elsewhere {
613+ error ! (
614+ ?operation_id,
615+ ?stage,
616+ hash_key_count,
617+ "CRITICAL: Operation ID found in hash key map with different key - potential data corruption"
618+ ) ;
619+ tracing:: Span :: current ( ) . record ( "corruption_detected" , true ) ;
620+ } else {
621+ debug ! (
622+ ?operation_id,
623+ ?stage,
624+ hash_key_count,
625+ "Hash key mapping missing for finished action - likely harmless race condition"
626+ ) ;
627+ tracing:: Span :: current ( ) . record ( "corruption_detected" , false ) ;
628+ }
629+ }
630+
580631 fn update_awaited_action (
581632 & mut self ,
582633 mut new_awaited_action : AwaitedAction ,
@@ -629,8 +680,27 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
629680 . is_same_stage ( & new_awaited_action. state ( ) . stage ) ;
630681
631682 if !is_same_stage {
632- self . sorted_action_info_hash_keys
633- . process_state_changes ( & old_awaited_action, & new_awaited_action) ?;
683+ // Try to process state changes and validate consistency on error
684+ if let Err ( e) = self
685+ . sorted_action_info_hash_keys
686+ . process_state_changes ( & old_awaited_action, & new_awaited_action)
687+ {
688+ warn ! (
689+ error = ?e,
690+ ?old_awaited_action,
691+ ?new_awaited_action,
692+ "State change processing failed, validating consistency"
693+ ) ;
694+ // Don't fail on validation errors during error recovery
695+ if let Err ( validation_err) = self . validate_consistency ( ) {
696+ error ! (
697+ validation_error = ?validation_err,
698+ "Data structure consistency validation failed after state change error"
699+ ) ;
700+ }
701+ return Err ( e) ;
702+ }
703+
634704 Self :: process_state_changes_for_hash_key_map (
635705 & mut self . action_info_hash_key_to_awaited_action ,
636706 & new_awaited_action,
@@ -818,6 +888,140 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
818888 self . now_fn . clone ( ) ,
819889 ) ) )
820890 }
891+
892+ /// Validates consistency between the three main data structures.
893+ /// This is primarily used for debugging and can be called periodically
894+ /// or after errors to detect data corruption.
895+ #[ instrument(
896+ name = "memory_awaited_action_db.validate_consistency" ,
897+ skip_all,
898+ fields(
899+ hash_key_count = self . action_info_hash_key_to_awaited_action. len( ) ,
900+ operation_count = self . operation_id_to_awaited_action. len( ) ,
901+ connected_clients_count = self . connected_clients_for_operation_id. len( ) ,
902+ )
903+ ) ]
904+ fn validate_consistency ( & self ) -> Result < ( ) , Error > {
905+ // Check that all entries in action_info_hash_key_to_awaited_action
906+ // have corresponding entries in operation_id_to_awaited_action
907+ for ( action_key, operation_id) in & self . action_info_hash_key_to_awaited_action {
908+ if !self
909+ . operation_id_to_awaited_action
910+ . contains_key ( operation_id)
911+ {
912+ return Err ( make_err ! (
913+ Code :: Internal ,
914+ "Hash key map contains operation_id {operation_id} for key {action_key:?} but operation_id_to_awaited_action does not"
915+ ) ) ;
916+ }
917+ }
918+
919+ // Check that all cacheable non-finished actions in operation_id_to_awaited_action
920+ // have corresponding entries in action_info_hash_key_to_awaited_action
921+ for ( operation_id, tx) in & self . operation_id_to_awaited_action {
922+ let awaited_action = tx. borrow ( ) ;
923+ if let ActionUniqueQualifier :: Cacheable ( action_key) =
924+ & awaited_action. action_info ( ) . unique_qualifier
925+ {
926+ if !awaited_action. state ( ) . stage . is_finished ( ) {
927+ match self . action_info_hash_key_to_awaited_action . get ( action_key) {
928+ Some ( mapped_operation_id) => {
929+ if mapped_operation_id != operation_id {
930+ return Err ( make_err ! (
931+ Code :: Internal ,
932+ "Hash key map has incorrect operation_id mapping: key {action_key:?} maps to {mapped_operation_id} but should map to {operation_id}"
933+ ) ) ;
934+ }
935+ }
936+ None => {
937+ return Err ( make_err ! (
938+ Code :: Internal ,
939+ "Non-finished cacheable action {operation_id} with key {action_key:?} missing from hash key map"
940+ ) ) ;
941+ }
942+ }
943+ }
944+ }
945+ }
946+
947+ // Check that connected_clients_for_operation_id is consistent
948+ for operation_id in self . connected_clients_for_operation_id . keys ( ) {
949+ if !self
950+ . operation_id_to_awaited_action
951+ . contains_key ( operation_id)
952+ {
953+ return Err ( make_err ! (
954+ Code :: Internal ,
955+ "connected_clients_for_operation_id contains {operation_id} but operation_id_to_awaited_action does not"
956+ ) ) ;
957+ }
958+ }
959+
960+ Ok ( ( ) )
961+ }
962+
963+ /// Attempts to recover from data structure inconsistencies by rebuilding
964+ /// the hash key mapping from the `operation_id_to_awaited_action` map.
965+ /// This is a self-healing mechanism for when the maps get out of sync.
966+ #[ instrument(
967+ name = "memory_awaited_action_db.attempt_recovery" ,
968+ skip_all,
969+ fields(
970+ original_hash_key_count = self . action_info_hash_key_to_awaited_action. len( ) ,
971+ operation_count = self . operation_id_to_awaited_action. len( ) ,
972+ )
973+ ) ]
974+ fn attempt_recovery ( & mut self ) {
975+ let mut recovered_mappings = 0 ;
976+
977+ // First, rebuild action_info_hash_key_to_awaited_action from scratch
978+ let mut new_hash_key_map = HashMap :: new ( ) ;
979+
980+ for ( operation_id, tx) in & self . operation_id_to_awaited_action {
981+ let awaited_action = tx. borrow ( ) ;
982+ if let ActionUniqueQualifier :: Cacheable ( action_key) =
983+ & awaited_action. action_info ( ) . unique_qualifier
984+ {
985+ // Only add non-finished actions to the hash key map
986+ if !awaited_action. state ( ) . stage . is_finished ( ) {
987+ if let Some ( existing_operation_id) =
988+ new_hash_key_map. insert ( action_key. clone ( ) , operation_id. clone ( ) )
989+ {
990+ warn ! (
991+ ?action_key,
992+ ?operation_id,
993+ ?existing_operation_id,
994+ "Duplicate cacheable action detected during recovery - keeping newer entry"
995+ ) ;
996+ } else {
997+ recovered_mappings += 1 ;
998+ }
999+ }
1000+ }
1001+ }
1002+
1003+ // Count how many stale mappings we're removing
1004+ let removed_stale_mappings = self
1005+ . action_info_hash_key_to_awaited_action
1006+ . len ( )
1007+ . saturating_sub ( new_hash_key_map. len ( ) ) ;
1008+
1009+ // Replace the hash key map with the rebuilt version
1010+ self . action_info_hash_key_to_awaited_action = new_hash_key_map;
1011+
1012+ warn ! (
1013+ recovered_mappings,
1014+ removed_stale_mappings, "Data structure recovery completed"
1015+ ) ;
1016+
1017+ // Record recovery metrics for OpenTelemetry
1018+ tracing:: Span :: current ( ) . record ( "recovered_mappings" , recovered_mappings) ;
1019+ tracing:: Span :: current ( ) . record ( "removed_stale_mappings" , removed_stale_mappings) ;
1020+ tracing:: Span :: current ( ) . record (
1021+ "final_hash_key_count" ,
1022+ self . action_info_hash_key_to_awaited_action . len ( ) ,
1023+ ) ;
1024+ }
8211025}
8221026
8231027#[ derive( Debug , MetricsComponent ) ]
@@ -868,6 +1072,63 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>
8681072 } ) ,
8691073 }
8701074 }
1075+
1076+ /// Validates data structure consistency and attempts recovery if needed.
1077+ /// This is a public method that can be called by external monitoring
1078+ /// or debugging tools to check and repair the database state.
1079+ #[ instrument(
1080+ name = "memory_awaited_action_db.validate_and_recover" ,
1081+ skip_all,
1082+ fields(
1083+ validation_result = tracing:: field:: Empty ,
1084+ recovery_attempted = false ,
1085+ recovery_result = tracing:: field:: Empty ,
1086+ )
1087+ ) ]
1088+ pub async fn validate_and_recover ( & self ) -> Result < ( ) , Error > {
1089+ let mut inner = self . inner . lock ( ) . await ;
1090+
1091+ // First attempt validation
1092+ match inner. validate_consistency ( ) {
1093+ Ok ( ( ) ) => {
1094+ debug ! ( "Memory awaited action database consistency validation passed" ) ;
1095+ tracing:: Span :: current ( ) . record ( "validation_result" , "passed" ) ;
1096+ Ok ( ( ) )
1097+ }
1098+ Err ( validation_error) => {
1099+ warn ! (
1100+ error = ?validation_error,
1101+ "Memory awaited action database consistency validation failed, attempting recovery"
1102+ ) ;
1103+ tracing:: Span :: current ( ) . record ( "validation_result" , "failed" ) ;
1104+ tracing:: Span :: current ( ) . record ( "recovery_attempted" , true ) ;
1105+
1106+ // Attempt recovery
1107+ inner. attempt_recovery ( ) ;
1108+
1109+ // Validate again after recovery
1110+ match inner. validate_consistency ( ) {
1111+ Ok ( ( ) ) => {
1112+ info ! ( "Memory awaited action database successfully recovered" ) ;
1113+ tracing:: Span :: current ( ) . record ( "recovery_result" , "success" ) ;
1114+ Ok ( ( ) )
1115+ }
1116+ Err ( post_recovery_error) => {
1117+ error ! (
1118+ original_error = ?validation_error,
1119+ post_recovery_error = ?post_recovery_error,
1120+ "Failed to recover memory awaited action database"
1121+ ) ;
1122+ tracing:: Span :: current ( ) . record ( "recovery_result" , "failed" ) ;
1123+ Err ( make_err ! (
1124+ Code :: Internal ,
1125+ "Database recovery failed: original error: {validation_error}, post-recovery error: {post_recovery_error}"
1126+ ) )
1127+ }
1128+ }
1129+ }
1130+ }
1131+ }
8711132}
8721133
8731134impl < I : InstantWrapper , NowFn : Fn ( ) -> I + Clone + Send + Sync + ' static > AwaitedActionDb
0 commit comments