Skip to content

Commit bc9bde1

Browse files
committed
WIP
1 parent dc382dc commit bc9bde1

File tree

8 files changed

+278
-0
lines changed

8 files changed

+278
-0
lines changed

pastebin/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "pastebin"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]

pastebin/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
fn main() {
2+
println!("Hello, world!");
3+
}

watchdir/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "iroh-watchdir"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
anyhow = "1.0.83"
10+
bytes = "1.6.0"
11+
crossbeam-channel = "0.5.12"
12+
futures = "0.3.30"
13+
iroh = "0.16.2"
14+
notify = "6.1.1"
15+
tokio = "1.37.0"

watchdir/src/.iroh/blobs/blobs.db

1.52 MB
Binary file not shown.

watchdir/src/.iroh/docs.redb

1.52 MB
Binary file not shown.

watchdir/src/.iroh/keypair

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-----BEGIN OPENSSH PRIVATE KEY-----
2+
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
3+
QyNTUxOQAAACDrTh4ZE+pAgHz4L6uo5iQAkTziBU6tZAVJJu/lG4bbAwAAAIjuDB6V7gwe
4+
lQAAAAtzc2gtZWQyNTUxOQAAACDrTh4ZE+pAgHz4L6uo5iQAkTziBU6tZAVJJu/lG4bbAw
5+
AAAEC5Hd1ge70iFWPQ4Qq0MeiMwkB055gqcFDuUWD3T3aMJutOHhkT6kCAfPgvq6jmJACR
6+
POIFTq1kBUkm7+UbhtsDAAAAAAECAwQF
7+
-----END OPENSSH PRIVATE KEY-----

watchdir/src/.iroh/peers.postcard

150 Bytes
Binary file not shown.

watchdir/src/main.rs

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
use anyhow::Result;
2+
use futures::{
3+
// channel::mpsc::channel,
4+
SinkExt,
5+
StreamExt,
6+
TryStreamExt,
7+
};
8+
use iroh::{docs::DocTicket, util::path};
9+
use notify::{
10+
poll::ScanEvent, Config, Event, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
11+
};
12+
use std::{
13+
path::{Component, Path},
14+
str::FromStr,
15+
sync::mpsc::channel,
16+
};
17+
18+
/// Async, futures channel based event watching
19+
#[tokio::main]
20+
async fn main() -> Result<()> {
21+
let path = std::env::args()
22+
.nth(1)
23+
.expect("Argument 1 needs to be a path");
24+
25+
let mut doc_ticket = None;
26+
if let Some(ticket) = std::env::args().nth(2) {
27+
let ticket = iroh::docs::DocTicket::from_str(ticket.as_str())?;
28+
doc_ticket = Some(ticket);
29+
}
30+
println!("watching {}", path);
31+
32+
let path = std::path::PathBuf::from(&path);
33+
let iroh_path = path.join(".iroh");
34+
let node = iroh::node::Node::persistent(iroh_path)
35+
.await?
36+
.spawn()
37+
.await?;
38+
39+
let doc = open_or_create_document(&node, doc_ticket).await?;
40+
let author = node.authors.create().await?;
41+
42+
tokio::spawn(async_watch(path, doc, author)).await??;
43+
44+
Ok(())
45+
}
46+
47+
async fn open_or_create_document(
48+
node: &iroh::client::MemIroh,
49+
ticket: Option<DocTicket>,
50+
) -> anyhow::Result<iroh::client::MemDoc> {
51+
let doc = match ticket {
52+
Some(ticket) => {
53+
println!("importing ticket: {:?}", ticket);
54+
node.docs.import(ticket).await?
55+
}
56+
None => {
57+
let doc = node.docs.create().await?;
58+
let ticket = doc
59+
.share(
60+
iroh::client::docs::ShareMode::Write,
61+
iroh::base::node_addr::AddrInfoOptions::Relay,
62+
)
63+
.await?;
64+
println!(
65+
"created new doc {:?} write ticket:\n{:?}",
66+
doc.id(),
67+
ticket.to_string()
68+
);
69+
doc
70+
}
71+
};
72+
Ok(doc)
73+
}
74+
75+
enum Message {
76+
Event(notify::Result<notify::Event>),
77+
Scan(ScanEvent),
78+
}
79+
80+
async fn async_watcher() -> notify::Result<(PollWatcher, std::sync::mpsc::Receiver<Message>)> {
81+
// let (tx, rx) = tokio::sync::mpsc::channel(1000);
82+
let (tx, rx) = channel();
83+
84+
let tx_2 = tx.clone();
85+
let tx_3 = tx.clone();
86+
// use the pollwatcher and set a callback for the scanning events
87+
let watcher = PollWatcher::with_initial_scan(
88+
move |watch_event| {
89+
let tx_2 = tx_2.clone();
90+
// tx_2.blocking_send(Message::Event(watch_event)).unwrap();
91+
tokio::task::spawn(async move {
92+
println!("watch_event: {:?}", &watch_event);
93+
tx_2.send(Message::Event(watch_event)).unwrap();
94+
});
95+
},
96+
Config::default(),
97+
move |scan_event| {
98+
// tx_3.blocking_send(Message::Scan(scan_event));
99+
let tx_3 = tx_3.clone();
100+
println!("scan_event: {:?}", &scan_event);
101+
tokio::task::spawn(async move {
102+
if let Err(err) = tx_3.send(Message::Scan(scan_event)) {
103+
println!("send error: {:?}", err);
104+
}
105+
});
106+
},
107+
)?;
108+
109+
// let (tx, rx) = std::sync::mpsc::channel();
110+
// let (mut tx, rx) = tokio::sync::mpsc::channel(1);
111+
112+
// let tx_c = tx.clone();
113+
// // use the pollwatcher and set a callback for the scanning events
114+
// let mut watcher = PollWatcher::with_initial_scan(
115+
// move |watch_event| {
116+
// (|| async {
117+
// tx_c.send(Message::Event(watch_event)).await.unwrap();
118+
// });
119+
// // tokio::task::spawn_blocking(move || async {
120+
// // tx_c.send(Message::Event(watch_event)).await.unwrap();
121+
// // });
122+
// },
123+
// Config::default(),
124+
// move |scan_event| {
125+
// tokio::task::block_in_place(|| async {
126+
// tx.send(Message::Scan(scan_event)).await.unwrap();
127+
// });
128+
// },
129+
// )?;
130+
131+
// // Automatically select the best implementation for your platform.
132+
// // You can also access each implementation directly e.g. INotifyWatcher.
133+
// let watcher = RecommendedWatcher::new(
134+
// move |res| {
135+
// futures::executor::block_on(async {
136+
// tx.send(res).await.unwrap();
137+
// })
138+
// },
139+
// Config::default(),
140+
// )?;
141+
142+
Ok((watcher, rx))
143+
}
144+
145+
// fn block_on<F: Future<Output = T>, T>(rt: &tokio::runtime::Handle, fut: F) -> T {
146+
// tokio::task::block_in_place(move || match tokio::runtime::Handle::try_current() {
147+
// Ok(handle) => handle.block_on(fut),
148+
// Err(_) => rt.block_on(fut),
149+
// })
150+
// }
151+
152+
async fn async_watch<P: AsRef<Path>>(
153+
path: P,
154+
doc: iroh::client::MemDoc,
155+
author: iroh::docs::AuthorId,
156+
) -> anyhow::Result<()> {
157+
let (mut watcher, mut rx) = async_watcher().await?;
158+
159+
// Add a path to be watched. All files and directories at that path and
160+
// below will be monitored for changes.
161+
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;
162+
163+
while let res = rx.recv()? {
164+
match res {
165+
Message::Event(Ok(event)) => {
166+
println!("event: {:?}", event);
167+
for path in event.paths {
168+
let path = path.canonicalize()?;
169+
170+
if path.is_file() {
171+
// let key = canonicalized_path_to_bytes(&path, true)?;
172+
let key = bytes::Bytes::from(path.display().to_string());
173+
doc.import_file(author, key, &path, true)
174+
.await?
175+
.finish()
176+
.await?;
177+
}
178+
}
179+
}
180+
Message::Event(Err(e)) => {
181+
println!("watch error: {:?}", e);
182+
}
183+
Message::Scan(event) => {
184+
println!("scan: {:?}", event);
185+
let path = event?.canonicalize()?;
186+
if path.is_file() {
187+
// let key = canonicalized_path_to_bytes(&path, true)?;
188+
let key = bytes::Bytes::from(path.display().to_string());
189+
doc.import_file(author, key, &path, true)
190+
.await?
191+
.finish()
192+
.await?;
193+
}
194+
}
195+
}
196+
}
197+
198+
Ok(())
199+
}
200+
201+
/// This function converts an already canonicalized path to a string.
202+
///
203+
/// If `must_be_relative` is true, the function will fail if any component of the path is
204+
/// `Component::RootDir`
205+
///
206+
/// This function will also fail if the path is non canonical, i.e. contains
207+
/// `..` or `.`, or if the path components contain any windows or unix path
208+
/// separators.
209+
pub fn canonicalized_path_to_bytes(
210+
path: impl AsRef<Path>,
211+
must_be_relative: bool,
212+
) -> anyhow::Result<bytes::Bytes> {
213+
let mut path_str = String::new();
214+
let parts = path
215+
.as_ref()
216+
.components()
217+
.filter_map(|c| match c {
218+
Component::Normal(x) => {
219+
let c = match x.to_str() {
220+
Some(c) => c,
221+
None => return Some(Err(anyhow::anyhow!("invalid character in path"))),
222+
};
223+
224+
if !c.contains('/') && !c.contains('\\') {
225+
Some(Ok(c))
226+
} else {
227+
Some(Err(anyhow::anyhow!("invalid path component {:?}", c)))
228+
}
229+
}
230+
Component::RootDir => {
231+
if must_be_relative {
232+
Some(Err(anyhow::anyhow!("invalid path component {:?}", c)))
233+
} else {
234+
path_str.push('/');
235+
None
236+
}
237+
}
238+
_ => Some(Err(anyhow::anyhow!("invalid path component {:?}", c))),
239+
})
240+
.collect::<anyhow::Result<Vec<_>>>()?;
241+
let parts = parts.join("/");
242+
path_str.push_str(&parts);
243+
let path_bytes = bytes::Bytes::from(path_str);
244+
Ok(path_bytes)
245+
}

0 commit comments

Comments
 (0)