Skip to content

Commit 0980386

Browse files
andrewjcgfacebook-github-bot
authored andcommitted
Add a library to sync conda environments
Summary: This adds a crate to do rsync-style syncing of a source conda env to a destination, as per: ``` $ rsync --archive --update --no-dirtimes --delete --delete-excluded ... ``` As `rsync` (and this crate) uses mtimes for syncing, it'll work even with the differences in prefixes between source and destination conda envs. The notable differences with rsync are: 1) In current flows, when users "activate" their conda env locally, prefixes in the env will be quickly re-written, changing mtimes to be ahead of the destination (e.g. on MAST, the conda env is unpacked into it's expected prefix location). This crate uses the `pack-meta/history.jsonl` file to detect prefix update windows and use that to filter out these sprurious mtime updates. 2) Files that are copied perform in-place prefix replacement (TODO: not finished yet). Differential Revision: D78424481
1 parent 354b66c commit 0980386

File tree

6 files changed

+1965
-0
lines changed

6 files changed

+1965
-0
lines changed

monarch_conda/Cargo.toml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# @generated by autocargo from //monarch/monarch_conda:[conda-sync-cli,monarch_conda]
2+
3+
[package]
4+
name = "monarch_conda"
5+
version = "0.0.0"
6+
authors = ["Meta"]
7+
edition = "2021"
8+
license = "BSD-3-Clause"
9+
10+
[[bin]]
11+
name = "conda_sync_cli"
12+
path = "src/main.rs"
13+
14+
[dependencies]
15+
anyhow = "1.0.98"
16+
async-tempfile = "0.7.0"
17+
bincode = "1.3.3"
18+
chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false }
19+
clap = { version = "4.5.38", features = ["derive", "env", "string", "unicode", "wrap_help"] }
20+
dashmap = { version = "5.5.3", features = ["rayon", "serde"] }
21+
digest = "0.10"
22+
filetime = "0.2.25"
23+
futures = { version = "0.3.30", features = ["async-await", "compat"] }
24+
globset = { version = "0.4.13", features = ["serde1"] }
25+
ignore = "0.4"
26+
rattler_conda_types = "0.28.3"
27+
serde = { version = "1.0.185", features = ["derive", "rc"] }
28+
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] }
29+
sha2 = "0.10.6"
30+
tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] }
31+
tokio-util = { version = "0.7.15", features = ["full"] }
32+
walkdir = "2.3"
33+
34+
[dev-dependencies]
35+
tempfile = "3.15"

monarch_conda/src/diff.rs

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::collections::HashMap;
10+
use std::path::Path;
11+
use std::path::PathBuf;
12+
use std::time::Duration;
13+
use std::time::SystemTime;
14+
use std::time::UNIX_EPOCH;
15+
16+
use anyhow::Context;
17+
use anyhow::Result;
18+
use anyhow::ensure;
19+
use chrono::DateTime;
20+
use chrono::Utc;
21+
use digest::Digest;
22+
use digest::Output;
23+
use rattler_conda_types::PrefixRecord;
24+
use rattler_conda_types::prefix_record::PathsEntry;
25+
use serde::Deserialize;
26+
use serde::Serialize;
27+
use serde_json;
28+
use sha2::Sha256;
29+
use tokio::fs;
30+
use walkdir::WalkDir;
31+
32+
use crate::hash_utils;
33+
use crate::pack_meta::History;
34+
use crate::pack_meta::Offsets;
35+
36+
/// Fingerprint of the conda environment, used to detect if two envs are using the same
37+
/// "base" env.
38+
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
39+
pub struct CondaMetaFingerprint {
40+
hash: Output<Sha256>,
41+
}
42+
43+
impl CondaMetaFingerprint {
44+
async fn from_env(path: &Path) -> Result<Self> {
45+
let mut hasher = Sha256::new();
46+
hash_utils::hash_directory_tree(&path.join("conda-meta"), &mut hasher).await?;
47+
Ok(Self {
48+
hash: hasher.finalize(),
49+
})
50+
}
51+
52+
fn ensure_can_sync_to(&self, other: &CondaMetaFingerprint) -> Result<()> {
53+
ensure!(self.hash == other.hash);
54+
Ok(())
55+
}
56+
}
57+
58+
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
59+
pub struct PackMetaFingerprint {
60+
offsets: Output<Sha256>,
61+
pub history: History,
62+
}
63+
64+
impl PackMetaFingerprint {
65+
async fn from_env(path: &Path) -> Result<Self> {
66+
let pack_meta = path.join("pack-meta");
67+
68+
// Read first line of history.jsonl
69+
let contents = fs::read_to_string(pack_meta.join("history.jsonl")).await?;
70+
let history = History::from_contents(&contents)?;
71+
72+
// Read entire offsets.jsonl file
73+
let mut hasher = Sha256::new();
74+
let contents = fs::read_to_string(pack_meta.join("offsets.jsonl")).await?;
75+
let offsets = Offsets::from_contents(&contents)?;
76+
for ent in offsets.entries {
77+
let contents = bincode::serialize(&(ent.path, ent.mode, ent.offsets.len()))?;
78+
hasher.update(contents.len().to_le_bytes());
79+
hasher.update(&contents);
80+
}
81+
let offsets = hasher.finalize();
82+
83+
Ok(Self { history, offsets })
84+
}
85+
86+
fn ensure_can_sync_to(&self, other: &PackMetaFingerprint) -> Result<()> {
87+
ensure!(
88+
self.history.entries.first().context("empty history")?
89+
== other.history.entries.first().context("empty history")?
90+
);
91+
ensure!(
92+
self.history
93+
.entries
94+
.last()
95+
.context("empty history")?
96+
.timestamp
97+
<= other
98+
.history
99+
.entries
100+
.last()
101+
.context("empty history")?
102+
.timestamp
103+
);
104+
105+
ensure!(self.offsets == other.offsets);
106+
107+
Ok(())
108+
}
109+
}
110+
111+
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
112+
pub struct CondaFingerprint {
113+
pub conda_meta: CondaMetaFingerprint,
114+
pub pack_meta: PackMetaFingerprint,
115+
}
116+
117+
impl CondaFingerprint {
118+
pub async fn from_env(path: &Path) -> Result<Self> {
119+
Ok(Self {
120+
conda_meta: CondaMetaFingerprint::from_env(path).await?,
121+
pack_meta: PackMetaFingerprint::from_env(path).await?,
122+
})
123+
}
124+
125+
pub fn ensure_can_sync_to(&self, other: &CondaFingerprint) -> Result<()> {
126+
self.conda_meta
127+
.ensure_can_sync_to(&other.conda_meta)
128+
.context("conda-meta")?;
129+
self.pack_meta
130+
.ensure_can_sync_to(&other.pack_meta)
131+
.context("pack-meta")?;
132+
Ok(())
133+
}
134+
135+
pub fn mtime_comparator(
136+
a: &Self,
137+
b: &Self,
138+
) -> Result<Box<dyn Fn(&SystemTime, &SystemTime) -> std::cmp::Ordering + Send + Sync>> {
139+
let (a_prefix, a_base) = a.pack_meta.history.first()?;
140+
let (b_prefix, b_base) = b.pack_meta.history.first()?;
141+
ensure!(a_prefix == b_prefix);
142+
let a_base = UNIX_EPOCH + Duration::from_secs(a_base + 1);
143+
let b_base = UNIX_EPOCH + Duration::from_secs(b_base + 1);
144+
145+
let a_window = a
146+
.pack_meta
147+
.history
148+
.prefix_and_last_update_window()?
149+
.1
150+
.map(|(s, e)| {
151+
(
152+
UNIX_EPOCH + Duration::from_secs(s),
153+
UNIX_EPOCH + Duration::from_secs(e + 1),
154+
)
155+
});
156+
let b_window = b
157+
.pack_meta
158+
.history
159+
.prefix_and_last_update_window()?
160+
.1
161+
.map(|(s, e)| {
162+
(
163+
UNIX_EPOCH + Duration::from_secs(s),
164+
UNIX_EPOCH + Duration::from_secs(e + 1),
165+
)
166+
});
167+
Ok(Box::new(move |a: &SystemTime, b: &SystemTime| {
168+
match (
169+
*a > a_base && a_window.is_none_or(|(s, e)| *a < s || *a > e),
170+
*b > b_base && b_window.is_none_or(|(s, e)| *b < s || *b > e),
171+
) {
172+
(true, false) => std::cmp::Ordering::Greater,
173+
(false, true) => std::cmp::Ordering::Less,
174+
(false, false) => std::cmp::Ordering::Equal,
175+
(true, true) => a.cmp(b),
176+
}
177+
}))
178+
}
179+
}
180+
181+
async fn load_metadata(env: &Path) -> Result<Vec<PrefixRecord>> {
182+
let conda_meta_path = env.join("conda-meta");
183+
let mut records = Vec::new();
184+
185+
for entry in WalkDir::new(&conda_meta_path)
186+
.min_depth(1)
187+
.into_iter()
188+
.filter_map(|e| e.ok())
189+
.filter(|e| e.file_name() != "history")
190+
{
191+
let content = fs::read_to_string(entry.path())
192+
.await
193+
.with_context(|| format!("reading {:?}", entry.path()))?;
194+
let record: PrefixRecord = serde_json::from_str(&content)?;
195+
records.push(record);
196+
}
197+
198+
Ok(records)
199+
}
200+
201+
fn index_metadata(records: &[PrefixRecord]) -> HashMap<PathBuf, (&PathsEntry, &PrefixRecord)> {
202+
let mut path_map = HashMap::new();
203+
204+
for record in records {
205+
// Add all paths from paths_data to the map
206+
for paths_entry in &record.paths_data.paths {
207+
path_map.insert(paths_entry.relative_path.clone(), (paths_entry, record));
208+
}
209+
}
210+
211+
path_map
212+
}
213+
214+
pub enum Change {
215+
Missing,
216+
Added,
217+
Modified,
218+
}
219+
220+
pub async fn diff(env: &Path) -> Result<Vec<(PathBuf, Change)>> {
221+
let metadata = load_metadata(env).await?;
222+
let mut index = index_metadata(&metadata);
223+
let mut changes = Vec::new();
224+
225+
for entry in WalkDir::new(env)
226+
.into_iter()
227+
.filter_map(|e| e.ok())
228+
.filter(|e| e.file_type().is_file())
229+
.filter(|e| !e.path().starts_with(env.join("conda-meta")))
230+
{
231+
let file_path = entry.path();
232+
let relative_path = file_path.strip_prefix(env)?;
233+
234+
if let Some((relative_path, (paths_entry, prefix_record))) =
235+
index.remove_entry(relative_path)
236+
{
237+
// File is tracked in metadata, check if it's modified
238+
let file_metadata = fs::symlink_metadata(file_path).await?;
239+
let file_size = file_metadata.len();
240+
let file_mtime = file_metadata.modified()?;
241+
242+
let mut is_modified = false;
243+
244+
// Compare size if available (now we have direct access to PathsEntry)
245+
if !is_modified {
246+
if let Some(size_in_bytes) = paths_entry.size_in_bytes {
247+
is_modified = file_size != size_in_bytes;
248+
}
249+
}
250+
251+
// Compare mtime with package record timestamp
252+
// If package timestamp > file mtime, then file is not modified
253+
if !is_modified {
254+
let package_timestamp = prefix_record
255+
.repodata_record
256+
.package_record
257+
.timestamp
258+
.context("no timestamp")?;
259+
260+
// Convert file mtime to chrono DateTime for comparison
261+
let duration_since_epoch = file_mtime.duration_since(UNIX_EPOCH)?;
262+
let file_datetime = DateTime::<Utc>::from_timestamp(
263+
duration_since_epoch.as_secs() as i64,
264+
duration_since_epoch.subsec_nanos(),
265+
)
266+
.context("failed to convert mtime to DateTime")?;
267+
268+
// If file was modified after package installation, it's been modified
269+
if file_datetime > package_timestamp {
270+
is_modified = true;
271+
}
272+
}
273+
274+
if is_modified {
275+
changes.push((relative_path, Change::Modified));
276+
}
277+
} else {
278+
// File exists but is not tracked in metadata
279+
changes.push((relative_path.to_path_buf(), Change::Added));
280+
}
281+
}
282+
283+
// Remaining paths in index are missing files
284+
for path in index.into_keys() {
285+
changes.push((path, Change::Missing));
286+
}
287+
288+
Ok(changes)
289+
}

0 commit comments

Comments
 (0)