Skip to content

Commit 8662143

Browse files
andrewjcgfacebook-github-bot
authored andcommitted
Add a library to sync conda environments (#628)
Summary: This adds a crate to do rsync-style syncing of a source conda env to a destination, as per: ``` $ rsync --archive --update --no-dirtimes --delete --delete-excluded ... ``` As `rsync` (and this crate) uses mtimes for syncing, it'll work even with the differences in prefixes between source and destination conda envs. The notable differences with rsync are: 1) In current flows, when users "activate" their conda env locally, prefixes in the env will be quickly re-written, changing mtimes to be ahead of the destination (e.g. on MAST, the conda env is unpacked into it's expected prefix location). This crate uses the `pack-meta/history.jsonl` file to detect prefix update windows and use that to filter out these sprurious mtime updates. 2) Files that are copied perform in-place prefix replacement (TODO: not finished yet). Differential Revision: D78424481
1 parent 53a1b4a commit 8662143

File tree

6 files changed

+1836
-0
lines changed

6 files changed

+1836
-0
lines changed

monarch_conda/Cargo.toml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# @generated by autocargo from //monarch/monarch_conda:[conda-sync-cli,monarch_conda]
2+
3+
[package]
4+
name = "monarch_conda"
5+
version = "0.0.0"
6+
authors = ["Meta"]
7+
edition = "2021"
8+
license = "BSD-3-Clause"
9+
10+
[[bin]]
11+
name = "conda_sync_cli"
12+
path = "src/main.rs"
13+
14+
[dependencies]
15+
anyhow = "1.0.98"
16+
async-tempfile = "0.7.0"
17+
bincode = "1.3.3"
18+
chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false }
19+
clap = { version = "4.5.38", features = ["derive", "env", "string", "unicode", "wrap_help"] }
20+
dashmap = { version = "5.5.3", features = ["rayon", "serde"] }
21+
digest = "0.10"
22+
filetime = "0.2.25"
23+
futures = { version = "0.3.30", features = ["async-await", "compat"] }
24+
globset = { version = "0.4.13", features = ["serde1"] }
25+
ignore = "0.4"
26+
itertools = "0.14.0"
27+
memchr = "2.5.0"
28+
memmap2 = "0.9.5"
29+
rattler_conda_types = "0.28.3"
30+
serde = { version = "1.0.185", features = ["derive", "rc"] }
31+
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] }
32+
sha2 = "0.10.6"
33+
tokio = { version = "1.46.1", features = ["full", "test-util", "tracing"] }
34+
tokio-util = { version = "0.7.15", features = ["full"] }
35+
walkdir = "2.3"
36+
37+
[dev-dependencies]
38+
tempfile = "3.15"

monarch_conda/src/diff.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::collections::HashMap;
10+
use std::path::Path;
11+
use std::path::PathBuf;
12+
use std::time::Duration;
13+
use std::time::SystemTime;
14+
use std::time::UNIX_EPOCH;
15+
16+
use anyhow::Context;
17+
use anyhow::Result;
18+
use anyhow::ensure;
19+
use chrono::DateTime;
20+
use chrono::Utc;
21+
use digest::Digest;
22+
use digest::Output;
23+
use rattler_conda_types::PrefixRecord;
24+
use rattler_conda_types::prefix_record::PathsEntry;
25+
use serde::Deserialize;
26+
use serde::Serialize;
27+
use serde_json;
28+
use sha2::Sha256;
29+
use tokio::fs;
30+
use walkdir::WalkDir;
31+
32+
use crate::hash_utils;
33+
use crate::pack_meta::History;
34+
use crate::pack_meta::Offsets;
35+
36+
/// Fingerprint of the conda-meta directory, used by `CondaFingerprint` below.
37+
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
38+
pub struct CondaMetaFingerprint {
39+
// TODO(agallagher): It might be worth storing more information of installed
40+
// packages, so that we could print better error messages when we detect two
41+
// envs are not equivalent.
42+
hash: Output<Sha256>,
43+
}
44+
45+
impl CondaMetaFingerprint {
46+
async fn from_env(path: &Path) -> Result<Self> {
47+
let mut hasher = Sha256::new();
48+
hash_utils::hash_directory_tree(&path.join("conda-meta"), &mut hasher).await?;
49+
Ok(Self {
50+
hash: hasher.finalize(),
51+
})
52+
}
53+
}
54+
55+
/// Fingerprint of the pack-meta directory, used by `CondaFingerprint` below.
56+
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
57+
pub struct PackMetaFingerprint {
58+
offsets: Output<Sha256>,
59+
pub history: History,
60+
}
61+
62+
impl PackMetaFingerprint {
63+
async fn from_env(path: &Path) -> Result<Self> {
64+
let pack_meta = path.join("pack-meta");
65+
66+
// Read the fulle history.jsonl file.
67+
let contents = fs::read_to_string(pack_meta.join("history.jsonl")).await?;
68+
let history = History::from_contents(&contents)?;
69+
70+
// Read entire offsets.jsonl file, but avoid hashing the offsets, which can change.
71+
let mut hasher = Sha256::new();
72+
let contents = fs::read_to_string(pack_meta.join("offsets.jsonl")).await?;
73+
let offsets = Offsets::from_contents(&contents)?;
74+
for ent in offsets.entries {
75+
let contents = bincode::serialize(&(ent.path, ent.mode, ent.offsets.len()))?;
76+
hasher.update(contents.len().to_le_bytes());
77+
hasher.update(&contents);
78+
}
79+
let offsets = hasher.finalize();
80+
81+
Ok(Self { history, offsets })
82+
}
83+
}
84+
85+
/// A fingerprint of a conda environment, used to detect if two envs are similar enough to
86+
/// facilitate mtime-based conda syncing.
87+
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
88+
pub struct CondaFingerprint {
89+
pub conda_meta: CondaMetaFingerprint,
90+
pub pack_meta: PackMetaFingerprint,
91+
}
92+
93+
impl CondaFingerprint {
94+
pub async fn from_env(path: &Path) -> Result<Self> {
95+
Ok(Self {
96+
conda_meta: CondaMetaFingerprint::from_env(path).await?,
97+
pack_meta: PackMetaFingerprint::from_env(path).await?,
98+
})
99+
}
100+
101+
/// Create a comparator to compare the mtimes of files from two "equivalent" conda envs.
102+
/// In particular, thie comparator will be aware of spuriuos mtime changes that occurs from
103+
/// prefix replacement (via `meta-pack`), and will filter them out.
104+
pub fn mtime_comparator(
105+
a: &Self,
106+
b: &Self,
107+
) -> Result<Box<dyn Fn(&SystemTime, &SystemTime) -> std::cmp::Ordering + Send + Sync>> {
108+
let (a_prefix, a_base) = a.pack_meta.history.first()?;
109+
let (b_prefix, b_base) = b.pack_meta.history.first()?;
110+
ensure!(a_prefix == b_prefix);
111+
112+
// NOTE(agallagher): There appears to be some mtime drift on some files after fbpkg creation,
113+
// so acccount for that here.
114+
let slop = Duration::from_secs(5 * 60);
115+
116+
// We load the timestamp from the first history entry, and use this to see if any
117+
// files have been updated since the env was created.
118+
let a_base = UNIX_EPOCH + Duration::from_secs(a_base) + slop;
119+
let b_base = UNIX_EPOCH + Duration::from_secs(b_base) + slop;
120+
121+
// We also load the last prefix update window for each, as any mtimes from this window
122+
// should be ignored.
123+
let a_window = a
124+
.pack_meta
125+
.history
126+
.prefix_and_last_update_window()?
127+
.1
128+
.map(|(s, e)| {
129+
(
130+
UNIX_EPOCH + Duration::from_secs(s),
131+
UNIX_EPOCH + Duration::from_secs(e + 1),
132+
)
133+
});
134+
let b_window = b
135+
.pack_meta
136+
.history
137+
.prefix_and_last_update_window()?
138+
.1
139+
.map(|(s, e)| {
140+
(
141+
UNIX_EPOCH + Duration::from_secs(s),
142+
UNIX_EPOCH + Duration::from_secs(e + 1),
143+
)
144+
});
145+
146+
Ok(Box::new(move |a: &SystemTime, b: &SystemTime| {
147+
match (
148+
*a > a_base && a_window.is_none_or(|(s, e)| *a < s || *a > e),
149+
*b > b_base && b_window.is_none_or(|(s, e)| *b < s || *b > e),
150+
) {
151+
(true, false) => std::cmp::Ordering::Greater,
152+
(false, true) => std::cmp::Ordering::Less,
153+
(false, false) => std::cmp::Ordering::Equal,
154+
(true, true) => a.cmp(b),
155+
}
156+
}))
157+
}
158+
}

monarch_conda/src/hash_utils.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::path::Path;
10+
11+
use anyhow::Result;
12+
use anyhow::bail;
13+
use digest::Digest;
14+
use tokio::fs;
15+
use walkdir::WalkDir;
16+
17+
/// Compute a hash of a directory tree using the provided hasher.
18+
///
19+
/// This function traverses the directory tree deterministically (sorted by file name)
20+
/// and includes both file paths and file contents in the hash computation.
21+
///
22+
/// # Arguments
23+
/// * `dir` - The directory to hash
24+
/// * `hasher` - A hasher implementing the Digest trait (e.g., Sha256::new())
25+
///
26+
/// # Returns
27+
/// () - The hasher is updated with the directory tree data
28+
pub async fn hash_directory_tree<D: Digest>(dir: &Path, hasher: &mut D) -> Result<()> {
29+
// Iterate entries with deterministic ordering
30+
for entry in WalkDir::new(dir).sort_by_file_name().into_iter() {
31+
let entry = entry?;
32+
let path = entry.path();
33+
let relative_path = path.strip_prefix(dir)?;
34+
35+
// Hash the relative path (normalized to use forward slashes)
36+
let path_str = relative_path.to_string_lossy().replace('\\', "/");
37+
hasher.update(path_str.as_bytes());
38+
hasher.update(b"\0"); // null separator
39+
40+
if entry.file_type().is_file() {
41+
// Hash file type marker, size, and contents
42+
hasher.update(b"FILE:");
43+
let contents = fs::read(path).await?;
44+
hasher.update(contents.len().to_le_bytes());
45+
hasher.update(&contents);
46+
} else if entry.file_type().is_dir() {
47+
// For directories, hash a type marker
48+
hasher.update(b"DIR:");
49+
} else if entry.file_type().is_symlink() {
50+
// For symlinks, hash type marker, target size, and target
51+
hasher.update(b"SYMLINK:");
52+
let target = fs::read_link(path).await?;
53+
let target_string = target.to_string_lossy().into_owned();
54+
let target_bytes = target_string.as_bytes();
55+
hasher.update(target_bytes.len().to_le_bytes());
56+
hasher.update(target_bytes);
57+
} else {
58+
// Unexpected file type
59+
bail!("Unexpected file type for path: {}", path.display());
60+
}
61+
62+
hasher.update(b"\n"); // entry separator
63+
}
64+
65+
Ok(())
66+
}
67+
68+
#[cfg(test)]
69+
mod tests {
70+
use sha2::Sha256;
71+
use tempfile::TempDir;
72+
use tokio::fs;
73+
74+
use super::*;
75+
76+
#[tokio::test]
77+
async fn test_hash_directory_tree() -> Result<()> {
78+
// Create a temporary directory with some test files
79+
let temp_dir = TempDir::new()?;
80+
let dir_path = temp_dir.path();
81+
82+
// Create test files
83+
fs::write(dir_path.join("file1.txt"), "Hello, world!").await?;
84+
fs::write(dir_path.join("file2.txt"), "Another file").await?;
85+
fs::create_dir(dir_path.join("subdir")).await?;
86+
fs::write(dir_path.join("subdir").join("file3.txt"), "Nested file").await?;
87+
88+
// Hash the directory
89+
let mut hasher1 = Sha256::new();
90+
let mut hasher2 = Sha256::new();
91+
hash_directory_tree(dir_path, &mut hasher1).await?;
92+
hash_directory_tree(dir_path, &mut hasher2).await?;
93+
94+
let hash1 = hasher1.finalize();
95+
let hash2 = hasher2.finalize();
96+
97+
// Should be deterministic
98+
assert_eq!(hash1, hash2);
99+
assert_eq!(hash1.len(), 32); // SHA256 raw bytes length
100+
101+
Ok(())
102+
}
103+
104+
#[tokio::test]
105+
async fn test_no_hash_collision_between_file_and_dir() -> Result<()> {
106+
// Test that a file containing "DIR:" and an empty directory don't collide
107+
let temp_dir1 = TempDir::new()?;
108+
let temp_dir2 = TempDir::new()?;
109+
110+
// Create a file with content that could collide with directory marker
111+
fs::write(temp_dir1.path().join("test"), "DIR:").await?;
112+
113+
// Create an empty directory with the same name
114+
fs::create_dir(temp_dir2.path().join("test")).await?;
115+
116+
// Hash both scenarios
117+
let mut hasher_file = Sha256::new();
118+
let mut hasher_dir = Sha256::new();
119+
hash_directory_tree(temp_dir1.path(), &mut hasher_file).await?;
120+
hash_directory_tree(temp_dir2.path(), &mut hasher_dir).await?;
121+
122+
let hash_file = hasher_file.finalize();
123+
let hash_dir = hasher_dir.finalize();
124+
125+
// Should be different due to type prefixes
126+
assert_ne!(hash_file, hash_dir);
127+
128+
Ok(())
129+
}
130+
131+
#[tokio::test]
132+
async fn test_no_structural_marker_collision() -> Result<()> {
133+
// Test that files containing our structural markers don't cause collisions
134+
let temp_dir1 = TempDir::new()?;
135+
let temp_dir2 = TempDir::new()?;
136+
137+
// Create a file that could potentially collide without size prefixes:
138+
// Path: "test1", Content: "foo\n"
139+
// Without size prefixes: test1\0FILE:foo\n\n
140+
fs::write(temp_dir1.path().join("test1"), "foo\n").await?;
141+
142+
// Create a file with path that includes our structural markers:
143+
// Path: "test1\nFILE:", Content: "foo\n"
144+
// Without size prefixes: test1\nFILE:\0FILE:foo\n\n
145+
// This could potentially collide with the above
146+
fs::write(temp_dir2.path().join("test1\nFILE:"), "foo\n").await?;
147+
148+
// Hash both scenarios
149+
let mut hasher1 = Sha256::new();
150+
let mut hasher2 = Sha256::new();
151+
hash_directory_tree(temp_dir1.path(), &mut hasher1).await?;
152+
hash_directory_tree(temp_dir2.path(), &mut hasher2).await?;
153+
154+
let hash1 = hasher1.finalize();
155+
let hash2 = hasher2.finalize();
156+
157+
// Should be different - size prefixes prevent structural marker confusion
158+
assert_ne!(hash1, hash2);
159+
160+
Ok(())
161+
}
162+
}

monarch_conda/src/lib.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#![feature(once_cell_try)]
10+
11+
pub mod diff;
12+
pub mod hash_utils;
13+
pub mod pack_meta;
14+
pub mod sync;

0 commit comments

Comments
 (0)