diff --git a/hyperactor_mesh/Cargo.toml b/hyperactor_mesh/Cargo.toml index 22738f4f9..55f6dbcdb 100644 --- a/hyperactor_mesh/Cargo.toml +++ b/hyperactor_mesh/Cargo.toml @@ -59,7 +59,6 @@ nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", " pin-project = "1.1.10" preempt_rwlock = { version = "0.0.0", path = "../preempt_rwlock" } rand = { version = "0.8", features = ["small_rng"] } -regex = "1.11.1" serde = { version = "1.0.219", features = ["derive", "rc"] } serde_bytes = "0.11" serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] } diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index d31653161..a73ebdb75 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -46,7 +46,6 @@ use hyperactor::message::Bindings; use hyperactor::message::Unbind; use hyperactor_telemetry::env; use hyperactor_telemetry::log_file_path; -use regex::Regex; use serde::Deserialize; use serde::Serialize; use tokio::io; @@ -62,172 +61,146 @@ use line_prefixing_writer::LinePrefixingWriter; const DEFAULT_AGGREGATE_WINDOW_SEC: u64 = 5; -#[derive(Debug, Clone, PartialEq)] -/// Token represents a single token in a log line. It can be either a string or a number. -enum Token { - String(String), - Number((f64, f64)), // (min, max) -} - -impl Token { - // Helper function to create a Number token with the same min and max value - pub fn number(value: f64) -> Self { - Token::Number((value, value)) - } -} - -#[derive(Debug, Clone)] -/// LogLine represents a single log line. It contains a list of tokens and a count of how many times the same -/// log line has been recorded. Here "same" is defined in PartialEq. -struct LogLine { - tokens: Vec, - pub count: u64, -} - -impl LogLine { - pub fn try_merge(&mut self, other: LogLine) -> anyhow::Result<()> { - // Check if they have the same length - if self.tokens.len() != other.tokens.len() { - return Err(anyhow::anyhow!( - "cannot merge LogLines with different lengths" - )); - } - // Check each token and merge if possible - for (i, (self_token, other_token)) in - self.tokens.iter_mut().zip(other.tokens.iter()).enumerate() - { - match (self_token, other_token) { - // For String tokens, the strings must match - (Token::String(self_string), Token::String(other_string)) => { - if self_string != other_string { - return Err(anyhow::anyhow!( - "cannot merge LogLines with different String tokens at position {}: {}, {}", - i, - self_string, - other_string - )); - } - } - // For Number tokens, update min and max - (Token::Number(self_num), Token::Number(other_num)) => { - // Update min (take the smaller of the two mins) - self_num.0 = self_num.0.min(other_num.0); - // Update max (take the larger of the two maxes) - self_num.1 = self_num.1.max(other_num.1); - } - // If one is String and the other is Number, they're not mergeable - _ => { - return Err(anyhow::anyhow!( - "cannot merge LogLines with different token types at position {}", - i - )); - } - } - } +/// Calculate the Levenshtein distance between two strings +fn levenshtein_distance(left: &str, right: &str) -> usize { + let left_chars: Vec = left.chars().collect(); + let right_chars: Vec = right.chars().collect(); - // Increment the count when merging - self.count += other.count; + let left_len = left_chars.len(); + let right_len = right_chars.len(); - Ok(()) + // Handle edge cases + if left_len == 0 { + return right_len; + } + if right_len == 0 { + return left_len; } -} - -fn parse_line(line: &str) -> LogLine { - let mut result_tokens = Vec::new(); - - // Regex to match number followed by optional string: captures number and remaining string separately - let number_string_regex = Regex::new(r"^(-?\d+(?:\.\d+)?)(.*)$").unwrap(); - // Split by whitespace first - for token in line.split_whitespace() { - if let Some(captures) = number_string_regex.captures(token) { - let number_part = captures.get(1).unwrap().as_str(); - let string_part = captures.get(2).unwrap().as_str(); + // Create a matrix of size (len_s1+1) x (len_s2+1) + let mut matrix = vec![vec![0; right_len + 1]; left_len + 1]; - // Parse the number part - if let Ok(n) = number_part.parse::() { - result_tokens.push(Token::number(n)); + // Initialize the first row and column + for (i, row) in matrix.iter_mut().enumerate().take(left_len + 1) { + row[0] = i; + } + for (j, cell) in matrix[0].iter_mut().enumerate().take(right_len + 1) { + *cell = j; + } - // Add string part if it's not empty - if !string_part.is_empty() { - result_tokens.push(Token::String(string_part.to_string())); - } + // Fill the matrix + for i in 1..=left_len { + for j in 1..=right_len { + let cost = if left_chars[i - 1] == right_chars[j - 1] { + 0 } else { - // Fallback: treat entire token as string - result_tokens.push(Token::String(token.to_string())); - } - } else { - // No number at start, treat as string - result_tokens.push(Token::String(token.to_string())); + 1 + }; + + matrix[i][j] = std::cmp::min( + std::cmp::min( + matrix[i - 1][j] + 1, // deletion + matrix[i][j - 1] + 1, // insertion + ), + matrix[i - 1][j - 1] + cost, // substitution + ); } } - LogLine { - tokens: result_tokens, - count: 1, // Initialize count to 1 for the first conversion + // Return the bottom-right cell + matrix[left_len][right_len] +} + +/// Calculate the normalized edit distance between two strings (0.0 to 1.0) +fn normalized_edit_distance(left: &str, right: &str) -> f64 { + let distance = levenshtein_distance(left, right) as f64; + let max_len = std::cmp::max(left.len(), right.len()) as f64; + + if max_len == 0.0 { + 0.0 // Both strings are empty, so they're identical + } else { + distance / max_len } } #[derive(Debug, Clone)] -/// Aggregator is a struct that holds a list of LogLines and a start time. It can aggregate new log lines to -/// existing ones if they are "similar" (same strings, different numbers). -struct Aggregator { - lines: Vec, - start_time: SystemTime, +/// LogLine represents a single log line with its content and count +struct LogLine { + content: String, + pub count: u64, } -impl fmt::Display for Token { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Token::String(s) => write!(f, "{}", s), - Token::Number((min, max)) => write!(f, "<<{}, {}>>", min, max), - } +impl LogLine { + fn new(content: String) -> Self { + Self { content, count: 1 } } } impl fmt::Display for LogLine { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let tokens_str: Vec = self.tokens.iter().map(|t| t.to_string()).collect(); write!( f, - "\x1b[33m[{} processes]\x1b[0m {}", - self.count, - tokens_str.join(" ") + "\x1b[33m[{} similar log lines]\x1b[0m {}", + self.count, self.content ) } } +#[derive(Debug, Clone)] +/// Aggregator is a struct that holds a list of LogLines and a start time. +/// It can aggregate new log lines to existing ones if they are "similar" based on edit distance. +struct Aggregator { + lines: Vec, + start_time: SystemTime, + similarity_threshold: f64, // Threshold for considering two strings similar (0.0 to 1.0) +} + impl Aggregator { - pub fn new() -> Self { + fn new() -> Self { + // Default threshold: strings with normalized edit distance < 0.15 are considered similar + Self::new_with_threshold(0.15) + } + + fn new_with_threshold(threshold: f64) -> Self { Aggregator { lines: vec![], start_time: RealClock.system_time_now(), + similarity_threshold: threshold, } } - pub fn reset(&mut self) { + fn reset(&mut self) { self.lines.clear(); self.start_time = RealClock.system_time_now(); } - pub fn add_line(&mut self, line: &str) -> anyhow::Result<()> { - // 1. Convert the string into a LogLine - let new_line = parse_line(line); + fn add_line(&mut self, line: &str) -> anyhow::Result<()> { + // Find the most similar existing line + let mut best_match_idx = None; + let mut best_similarity = f64::MAX; + + for (idx, existing_line) in self.lines.iter().enumerate() { + let distance = normalized_edit_distance(&existing_line.content, line); - // 2. Iterate through existing lines and try to merge - for existing_line in &mut self.lines { - // Try to merge directly without checking equality first - if existing_line.try_merge(new_line.clone()).is_ok() { - return Ok(()); + // If this line is more similar than our current best match + if distance < best_similarity && distance < self.similarity_threshold { + best_match_idx = Some(idx); + best_similarity = distance; } } - // 3. If no merge succeeds, append the new line - self.lines.push(new_line); + // If we found a similar enough line, increment its count + if let Some(idx) = best_match_idx { + self.lines[idx].count += 1; + } else { + // Otherwise, add a new line + self.lines.push(LogLine::new(line.to_string())); + } + Ok(()) } - pub fn is_empty(&self) -> bool { + fn is_empty(&self) -> bool { self.lines.is_empty() } } @@ -1113,115 +1086,20 @@ mod tests { } #[test] - fn test_try_merge_successful_with_same_strings_and_numbers() { - let mut line1 = LogLine { - tokens: vec![ - Token::String("ERROR".to_string()), - Token::Number((10.0, 15.0)), - Token::String("timeout".to_string()), - ], - count: 1, - }; - let line2 = LogLine { - tokens: vec![ - Token::String("ERROR".to_string()), - Token::Number((12.0, 20.0)), - Token::String("timeout".to_string()), - ], - count: 1, - }; + fn test_string_similarity() { + // Test exact match + assert_eq!(normalized_edit_distance("hello", "hello"), 0.0); - let result = line1.try_merge(line2); - assert!(result.is_ok()); + // Test completely different strings + assert_eq!(normalized_edit_distance("hello", "i'mdiff"), 1.0); - // Check that the number range was updated correctly - if let Token::Number((min, max)) = &line1.tokens[1] { - assert_eq!(*min, 10.0); // min of 10.0 and 12.0 - assert_eq!(*max, 20.0); // max of 15.0 and 20.0 - } else { - panic!("expected number token"); - } - - // Check that the count was incremented - assert_eq!(line1.count, 2); - } - - #[test] - fn test_try_merge_fails_with_different_lengths() { - let mut line1 = LogLine { - tokens: vec![Token::String("ERROR".to_string())], - count: 1, - }; - let line2 = LogLine { - tokens: vec![ - Token::String("ERROR".to_string()), - Token::String("timeout".to_string()), - ], - count: 1, - }; - - let result = line1.try_merge(line2); - assert!(result.is_err()); - assert!( - result - .unwrap_err() - .to_string() - .contains("different lengths") - ); - } + // Test similar strings + assert!(normalized_edit_distance("hello", "helo") < 0.5); + assert!(normalized_edit_distance("hello", "hello!") < 0.5); - #[test] - fn test_try_merge_fails_with_different_strings() { - let mut line1 = LogLine { - tokens: vec![ - Token::String("ERROR".to_string()), - Token::String("timeout".to_string()), - ], - count: 1, - }; - let line2 = LogLine { - tokens: vec![ - Token::String("ERROR".to_string()), - Token::String("connection".to_string()), - ], - count: 1, - }; - - let result = line1.try_merge(line2); - assert!(result.is_err()); - assert!( - result - .unwrap_err() - .to_string() - .contains("different String tokens") - ); - } - - #[test] - fn test_try_merge_fails_with_mixed_token_types() { - let mut line1 = LogLine { - tokens: vec![ - Token::String("ERROR".to_string()), - Token::Number((10.0, 10.0)), - ], - count: 1, - }; - let line2 = LogLine { - tokens: vec![ - Token::String("ERROR".to_string()), - Token::String("timeout".to_string()), - ], - count: 1, - }; - - let result = line1.try_merge(line2); - assert!(result.is_err()); - assert!( - result - .unwrap_err() - .to_string() - .contains("different token types") - ); + // Test empty strings + assert_eq!(normalized_edit_distance("", ""), 0.0); + assert_eq!(normalized_edit_distance("hello", ""), 1.0); } #[test] @@ -1231,182 +1109,50 @@ mod tests { assert!(result.is_ok()); assert_eq!(aggregator.lines.len(), 1); - assert_eq!(aggregator.lines[0].tokens.len(), 4); + assert_eq!(aggregator.lines[0].content, "ERROR 404 not found"); + assert_eq!(aggregator.lines[0].count, 1); } #[test] - fn test_add_line_merges_with_existing_line() { - let mut aggregator = Aggregator::new(); + fn test_add_line_merges_with_similar_line() { + let mut aggregator = Aggregator::new_with_threshold(0.2); // Add first line aggregator.add_line("ERROR 404 timeout").unwrap(); assert_eq!(aggregator.lines.len(), 1); - // Add second line that should merge (same strings, different number) + // Add second line that should merge (similar enough) aggregator.add_line("ERROR 500 timeout").unwrap(); assert_eq!(aggregator.lines.len(), 1); // Should still be 1 line after merge - - // Check that the number range was updated - if let Token::Number((min, max)) = &aggregator.lines[0].tokens[1] { - assert_eq!(*min, 404.0); - assert_eq!(*max, 500.0); - } else { - panic!("expected number token"); - } - - // Check that the count was incremented assert_eq!(aggregator.lines[0].count, 2); - } - #[test] - fn test_add_line_creates_separate_line_when_no_merge_possible() { - let mut aggregator = Aggregator::new(); - - // Add first line - aggregator.add_line("ERROR 404 timeout").unwrap(); - assert_eq!(aggregator.lines.len(), 1); - - // Add second line that cannot merge (different string) - aggregator.add_line("ERROR 404 connection").unwrap(); + // Add third line that's too different + aggregator + .add_line("WARNING database connection failed") + .unwrap(); assert_eq!(aggregator.lines.len(), 2); // Should be 2 lines now - // Add third line that merges with first - aggregator.add_line("ERROR 500 timeout").unwrap(); + // Add fourth line similar to third + aggregator + .add_line("WARNING database connection timed out") + .unwrap(); assert_eq!(aggregator.lines.len(), 2); // Should still be 2 lines - - // Check that first line's number range was updated - if let Token::Number((min, max)) = &aggregator.lines[0].tokens[1] { - assert_eq!(*min, 404.0); - assert_eq!(*max, 500.0); - } else { - panic!("expected number token"); - } - - // Check that the count was incremented for the first line - assert_eq!(aggregator.lines[0].count, 2); - // Check that the second line still has count 1 - assert_eq!(aggregator.lines[1].count, 1); - } - - #[test] - fn test_parse_line_with_mixed_tokens() { - let line = parse_line("ERROR 404 not found"); - - assert_eq!(line.tokens.len(), 4); - - // Check each token - match &line.tokens[0] { - Token::String(s) => assert_eq!(s, "ERROR"), - _ => panic!("expected string token"), - } - - match &line.tokens[1] { - Token::Number((min, max)) => { - assert_eq!(*min, 404.0); - assert_eq!(*max, 404.0); - } - _ => panic!("expected number token"), - } - - match &line.tokens[2] { - Token::String(s) => assert_eq!(s, "not"), - _ => panic!("expected string token"), - } - - match &line.tokens[3] { - Token::String(s) => assert_eq!(s, "found"), - _ => panic!("expected string token"), - } - - // Check that count is initialized to 1 - assert_eq!(line.count, 1); - } - - #[test] - fn test_parse_line_with_only_numbers() { - let line = parse_line("123 456.78 -9.0"); - - assert_eq!(line.tokens.len(), 3); - - match &line.tokens[0] { - Token::Number((min, max)) => { - assert_eq!(*min, 123.0); - assert_eq!(*max, 123.0); - } - _ => panic!("expected number token"), - } - - match &line.tokens[1] { - Token::Number((min, max)) => { - assert_eq!(*min, 456.78); - assert_eq!(*max, 456.78); - } - _ => panic!("expected number token"), - } - - match &line.tokens[2] { - Token::Number((min, max)) => { - assert_eq!(*min, -9.0); - assert_eq!(*max, -9.0); - } - _ => panic!("expected number token"), - } + assert_eq!(aggregator.lines[1].count, 2); // Second group has 2 lines } #[test] - fn test_parse_line_with_only_strings() { - let line = parse_line("hello world test"); + fn test_aggregation_of_similar_log_lines() { + let mut aggregator = Aggregator::new_with_threshold(0.2); - assert_eq!(line.tokens.len(), 3); + // Add the provided log lines with small differences + aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/i/ims-ansi, falling back to search/replace to update prefixes for this file.").unwrap(); + aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,351 conda-unpack-fb:292] Found invalid offsets for lib/pkgconfig/ncursesw.pc, falling back to search/replace to update prefixes for this file.").unwrap(); + aggregator.add_line("[1 similar log lines] WARNING <<2025, 2025>> -07-30 <<0, 0>> :41:45,366 conda-unpack-fb:292] Found invalid offsets for share/terminfo/k/kt7, falling back to search/replace to update prefixes for this file.").unwrap(); - for (i, expected) in ["hello", "world", "test"].iter().enumerate() { - match &line.tokens[i] { - Token::String(s) => assert_eq!(s, expected), - _ => panic!("expected string token"), - } - } - } - - #[test] - fn test_parse_line_empty_string() { - let line = parse_line(""); - assert_eq!(line.tokens.len(), 0); - } - - #[test] - fn test_aggregation_of_log_lines_with_pids() { - let mut aggregator = Aggregator::new(); - - // Add the provided log lines - aggregator.add_line("[devvm880.ldc0.facebook.com 2290566] test_actor - INFO - LogMessage from logger: Hey there, from a test!! pid: 2290566").unwrap(); - aggregator.add_line("[devvm880.ldc0.facebook.com 2290555] test_actor - INFO - LogMessage from logger: Hey there, from a test!! pid: 2290555").unwrap(); - aggregator.add_line("[devvm880.ldc0.facebook.com 2290564] test_actor - INFO - LogMessage from logger: Hey there, from a test!! pid: 2290564").unwrap(); - aggregator.add_line("[devvm880.ldc0.facebook.com 2290557] test_actor - INFO - LogMessage from logger: Hey there, from a test!! pid: 2290557").unwrap(); - - // Check that we have only one aggregated line + // Check that we have only one aggregated line due to similarity assert_eq!(aggregator.lines.len(), 1); - // Check that the count is 4 - assert_eq!(aggregator.lines[0].count, 4); - - // Print the tokens to see how they were parsed - println!("Tokens: {:?}", aggregator.lines[0].tokens); - - // Check that the PIDs have been aggregated as numbers - if let Token::Number((min, max)) = &aggregator.lines[0].tokens[1] { - assert_eq!(*min, 2290555.0); // Min of all PIDs - assert_eq!(*max, 2290566.0); // Max of all PIDs - } else { - panic!("expected number token for PID"); - } - - // Check that the closing bracket was correctly separated - assert_eq!( - aggregator.lines[0].tokens[2], - Token::String("]".to_string()) - ); - - // Print the aggregated line to see what it looks like - println!("Aggregated log line: {}", aggregator); + // Check that the count is 3 + assert_eq!(aggregator.lines[0].count, 3); } } diff --git a/python/tests/test_python_actors.py b/python/tests/test_python_actors.py index 5df210b64..6f218c6ae 100644 --- a/python/tests/test_python_actors.py +++ b/python/tests/test_python_actors.py @@ -612,41 +612,42 @@ async def test_actor_log_streaming() -> None: # Has a leading context so we can distinguish between streamed log and # the log directly printed by the child processes as they share the same stdout/stderr assert not re.search( - r"processes.*no print streaming", stdout_content + r"similar log lines.*no print streaming", stdout_content ), stdout_content assert not re.search( - r"processes.*no print streaming", stderr_content + r"similar log lines.*no print streaming", stderr_content ), stderr_content assert not re.search( - r"processes.*no log streaming", stdout_content + r"similar log lines.*no log streaming", stdout_content ), stdout_content assert not re.search( - r"processes.*no log streaming", stderr_content + r"similar log lines.*no log streaming", stderr_content ), stderr_content assert not re.search( - r"processes.*no log streaming due to level mismatch", stdout_content + r"similar log lines.*no log streaming due to level mismatch", stdout_content ), stdout_content assert not re.search( - r"processes.*no log streaming due to level mismatch", stderr_content + r"similar log lines.*no log streaming due to level mismatch", stderr_content ), stderr_content assert re.search( - r"processes.*has print streaming", stdout_content + r"similar log lines.*has print streaming", stdout_content ), stdout_content assert not re.search( - r"processes.*has print streaming", stderr_content + r"similar log lines.*has print streaming", stderr_content ), stderr_content assert re.search( - r"processes.*has print streaming too", stdout_content + r"similar log lines.*has print streaming too", stdout_content ), stdout_content assert not re.search( - r"processes.*has print streaming too", stderr_content + r"similar log lines.*has print streaming too", stderr_content ), stderr_content assert not re.search( - r"processes.*log streaming as level matched", stdout_content + r"similar log lines.*log streaming as level matched", stdout_content ), stdout_content assert re.search( - r"processes.*log streaming as level matched", stderr_content + r"similar log lines.*log streaming as level matched", + stderr_content, ), stderr_content finally: @@ -723,14 +724,18 @@ async def test_logging_option_defaults() -> None: os.unlink(stderr_path) # Assertions on the captured output - assert re.search(r"processes.*print streaming", stdout_content), stdout_content + assert re.search( + r"similar log lines.*print streaming", stdout_content + ), stdout_content assert not re.search( - r"processes.*print streaming", stderr_content + r"similar log lines.*print streaming", stderr_content ), stderr_content assert not re.search( - r"processes.*log streaming", stdout_content + r"similar log lines.*log streaming", stdout_content ), stdout_content - assert re.search(r"processes.*log streaming", stderr_content), stderr_content + assert re.search( + r"similar log lines.*log streaming", stderr_content + ), stderr_content finally: # Ensure file descriptors are restored even if something goes wrong