Skip to content

Commit 355e624

Browse files
committed
Add custom-protocol example from 0.35
A few changes: - client side is not creating a router - allow specifying secret key - don't publish discovery info for client
1 parent 8eb5733 commit 355e624

File tree

1 file changed

+354
-0
lines changed

1 file changed

+354
-0
lines changed

examples/custom-protocol.rs

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
//! Example for adding a custom protocol to a iroh node.
2+
//!
3+
//! We are building a very simple custom protocol here, and make our iroh nodes speak this protocol
4+
//! in addition to a protocol that is provider by number0, iroh-blobs.
5+
//!
6+
//! Our custom protocol allows querying the blob store of other nodes for text matches. For
7+
//! this, we keep a very primitive index of the UTF-8 text of our blobs.
8+
//!
9+
//! The example is contrived - we only use memory nodes, and our database is a hashmap in a mutex,
10+
//! and our queries just match if the query string appears as-is in a blob.
11+
//! Nevertheless, this shows how powerful systems can be built with custom protocols by also using
12+
//! the existing iroh protocols (blobs in this case).
13+
//!
14+
//! ## Usage
15+
//!
16+
//! In one terminal, run
17+
//!
18+
//! cargo run --example custom-protocol -- listen "hello-world" "foo-bar" "hello-moon"
19+
//!
20+
//! This spawns an iroh nodes with three blobs. It will print the node's node id.
21+
//!
22+
//! In another terminal, run
23+
//!
24+
//! cargo run --example custom-protocol -- query <node-id> hello
25+
//!
26+
//! Replace <node-id> with the node id from above. This will connect to the listening node with our
27+
//! custom protocol and query for the string `hello`. The listening node will return a list of
28+
//! blob hashes that contain `hello`. We will then download all these blobs with iroh-blobs,
29+
//! and then print a list of the hashes with their content.
30+
//!
31+
//! For this example, this will print:
32+
//!
33+
//! 7b54d6be55: hello-moon
34+
//! c92dabdf91: hello-world
35+
//!
36+
//! That's it! Follow along in the code below, we added a bunch of comments to explain things.
37+
38+
use std::{
39+
collections::HashMap,
40+
sync::{Arc, Mutex},
41+
};
42+
43+
use anyhow::{Context, Result};
44+
use clap::Parser;
45+
use iroh::{
46+
discovery::pkarr::PkarrResolver,
47+
endpoint::Connection,
48+
protocol::{AcceptError, ProtocolHandler, Router},
49+
Endpoint, NodeId, SecretKey,
50+
};
51+
use iroh_blobs::{api::Store, store::mem::MemStore, BlobsProtocol, Hash};
52+
use tracing_subscriber::{prelude::*, EnvFilter};
53+
54+
#[derive(Debug, Parser)]
55+
pub struct Cli {
56+
#[clap(subcommand)]
57+
command: Command,
58+
}
59+
60+
#[derive(Debug, Parser)]
61+
pub enum Command {
62+
/// Spawn a node in listening mode.
63+
Listen {
64+
/// Each text string will be imported as a blob and inserted into the search database.
65+
text: Vec<String>,
66+
},
67+
/// Query a remote node for data and print the results.
68+
Query {
69+
/// The node id of the node we want to query.
70+
node_id: NodeId,
71+
/// The text we want to match.
72+
query: String,
73+
},
74+
}
75+
76+
/// Each custom protocol is identified by its ALPN string.
77+
///
78+
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
79+
/// and the connection is aborted unless both nodes pass the same bytestring.
80+
const ALPN: &[u8] = b"iroh-example/text-search/0";
81+
82+
async fn listen(text: Vec<String>) -> Result<()> {
83+
// allow the user to provide a secret so we can have a stable node id.
84+
// This is only needed for the listen side.
85+
let secret_key = get_or_generate_secret_key()?;
86+
// Use an in-memory store for this example. You would use a persistent store in production code.
87+
let store = MemStore::new();
88+
// Create an endpoint with the secret key and discovery publishing to the n0 dns server enabled.
89+
let endpoint = Endpoint::builder()
90+
.secret_key(secret_key)
91+
.discovery_n0()
92+
.bind()
93+
.await?;
94+
// Build our custom protocol handler. The `builder` exposes access to various subsystems in the
95+
// iroh node. In our case, we need a blobs client and the endpoint.
96+
let proto = BlobSearch::new(&store);
97+
// Insert the text strings as blobs and index them.
98+
for text in text.into_iter() {
99+
proto.insert_and_index(text).await?;
100+
}
101+
// Build the iroh-blobs protocol handler, which is used to download blobs.
102+
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
103+
104+
// create a router that handles both our custom protocol and the iroh-blobs protocol.
105+
let node = Router::builder(endpoint)
106+
.accept(ALPN, proto.clone())
107+
.accept(iroh_blobs::ALPN, blobs.clone())
108+
.spawn();
109+
110+
// Print our node id, so clients know how to connect to us.
111+
let node_id = node.endpoint().node_id();
112+
println!("our node id: {node_id}");
113+
114+
// Wait for Ctrl-C to be pressed.
115+
tokio::signal::ctrl_c().await?;
116+
node.shutdown().await?;
117+
Ok(())
118+
}
119+
120+
async fn query(node_id: NodeId, query: String) -> Result<()> {
121+
// Build a in-memory node. For production code, you'd want a persistent node instead usually.
122+
let store = MemStore::new();
123+
// Create an endpoint with a random secret key and no discovery publishing.
124+
// For a client we just need discovery resolution via the n0 dns server, which
125+
// the PkarrResolver provides.
126+
let endpoint = Endpoint::builder()
127+
.add_discovery(PkarrResolver::n0_dns())
128+
.bind()
129+
.await?;
130+
// Query the remote node.
131+
// This will send the query over our custom protocol, read hashes on the reply stream,
132+
// and download each hash over iroh-blobs.
133+
let hashes = query_remote(&endpoint, &store, node_id, &query).await?;
134+
135+
// Print out our query results.
136+
for hash in hashes {
137+
read_and_print(&store, hash).await?;
138+
}
139+
140+
// Close the endpoint and shutdown the store.
141+
// Shutting down the store is not needed for a memory store, but would be important for persistent stores
142+
// to allow them to flush their data to disk.
143+
endpoint.close().await;
144+
store.shutdown().await?;
145+
146+
Ok(())
147+
}
148+
149+
#[tokio::main]
150+
async fn main() -> Result<()> {
151+
setup_logging();
152+
let args = Cli::parse();
153+
154+
match args.command {
155+
Command::Listen { text } => {
156+
listen(text).await?;
157+
}
158+
Command::Query {
159+
node_id,
160+
query: query_text,
161+
} => {
162+
query(node_id, query_text).await?;
163+
}
164+
}
165+
166+
Ok(())
167+
}
168+
169+
#[derive(Debug, Clone)]
170+
struct BlobSearch {
171+
blobs: Store,
172+
index: Arc<Mutex<HashMap<String, Hash>>>,
173+
}
174+
175+
impl ProtocolHandler for BlobSearch {
176+
/// The `accept` method is called for each incoming connection for our ALPN.
177+
///
178+
/// The returned future runs on a newly spawned tokio task, so it can run as long as
179+
/// the connection lasts.
180+
async fn accept(&self, connection: Connection) -> std::result::Result<(), AcceptError> {
181+
let this = self.clone();
182+
// We can get the remote's node id from the connection.
183+
let node_id = connection.remote_node_id()?;
184+
println!("accepted connection from {node_id}");
185+
186+
// Our protocol is a simple request-response protocol, so we expect the
187+
// connecting peer to open a single bi-directional stream.
188+
let (mut send, mut recv) = connection.accept_bi().await?;
189+
190+
// We read the query from the receive stream, while enforcing a max query length.
191+
let query_bytes = recv.read_to_end(64).await.map_err(AcceptError::from_err)?;
192+
193+
// Now, we can perform the actual query on our local database.
194+
let query = String::from_utf8(query_bytes).map_err(AcceptError::from_err)?;
195+
let hashes = this.query_local(&query);
196+
println!("query: {query}, found {} results", hashes.len());
197+
198+
// We want to return a list of hashes. We do the simplest thing possible, and just send
199+
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is
200+
// very easy to parse on the other end.
201+
for hash in hashes {
202+
send.write_all(hash.as_bytes())
203+
.await
204+
.map_err(AcceptError::from_err)?;
205+
}
206+
207+
// By calling `finish` on the send stream we signal that we will not send anything
208+
// further, which makes the receive stream on the other end terminate.
209+
send.finish()?;
210+
connection.closed().await;
211+
Ok(())
212+
}
213+
}
214+
215+
impl BlobSearch {
216+
/// Create a new protocol handler.
217+
pub fn new(blobs: &Store) -> Arc<Self> {
218+
Arc::new(Self {
219+
blobs: blobs.clone(),
220+
index: Default::default(),
221+
})
222+
}
223+
224+
/// Query the local database.
225+
///
226+
/// Returns the list of hashes of blobs which contain `query` literally.
227+
pub fn query_local(&self, query: &str) -> Vec<Hash> {
228+
let db = self.index.lock().unwrap();
229+
db.iter()
230+
.filter_map(|(text, hash)| text.contains(query).then_some(*hash))
231+
.collect::<Vec<_>>()
232+
}
233+
234+
/// Insert a text string into the database.
235+
///
236+
/// This first imports the text as a blob into the iroh blob store, and then inserts a
237+
/// reference to that hash in our (primitive) text database.
238+
pub async fn insert_and_index(&self, text: String) -> Result<Hash> {
239+
let hash = self.blobs.add_bytes(text.into_bytes()).await?.hash;
240+
self.add_to_index(hash).await?;
241+
Ok(hash)
242+
}
243+
244+
/// Index a blob which is already in our blob store.
245+
///
246+
/// This only indexes complete blobs that are smaller than 1MiB.
247+
///
248+
/// Returns `true` if the blob was indexed.
249+
async fn add_to_index(&self, hash: Hash) -> Result<bool> {
250+
let bitfield = self.blobs.observe(hash).await?;
251+
if !bitfield.is_complete() || bitfield.size() > 1024 * 1024 {
252+
// If the blob is not complete or too large, we do not index it.
253+
return Ok(false);
254+
}
255+
let data = self.blobs.get_bytes(hash).await?;
256+
match String::from_utf8(data.to_vec()) {
257+
Ok(text) => {
258+
let mut db = self.index.lock().unwrap();
259+
db.insert(text, hash);
260+
Ok(true)
261+
}
262+
Err(_err) => Ok(false),
263+
}
264+
}
265+
}
266+
267+
/// Query a remote node, download all matching blobs and print the results.
268+
pub async fn query_remote(
269+
endpoint: &Endpoint,
270+
store: &Store,
271+
node_id: NodeId,
272+
query: &str,
273+
) -> Result<Vec<Hash>> {
274+
// Establish a connection to our node.
275+
// We use the default node discovery in iroh, so we can connect by node id without
276+
// providing further information.
277+
let conn = endpoint.connect(node_id, ALPN).await?;
278+
let blobs_conn = endpoint.connect(node_id, iroh_blobs::ALPN).await?;
279+
280+
// Open a bi-directional in our connection.
281+
let (mut send, mut recv) = conn.open_bi().await?;
282+
283+
// Send our query.
284+
send.write_all(query.as_bytes()).await?;
285+
286+
// Finish the send stream, signalling that no further data will be sent.
287+
// This makes the `read_to_end` call on the accepting side terminate.
288+
send.finish()?;
289+
290+
// In this example, we simply collect all results into a vector.
291+
// For real protocols, you'd usually want to return a stream of results instead.
292+
let mut out = vec![];
293+
294+
// The response is sent as a list of 32-byte long hashes.
295+
// We simply read one after the other into a byte buffer.
296+
let mut hash_bytes = [0u8; 32];
297+
loop {
298+
// Read 32 bytes from the stream.
299+
match recv.read_exact(&mut hash_bytes).await {
300+
// FinishedEarly means that the remote side did not send further data,
301+
// so in this case we break our loop.
302+
Err(iroh::endpoint::ReadExactError::FinishedEarly(_)) => break,
303+
// Other errors are connection errors, so we bail.
304+
Err(err) => return Err(err.into()),
305+
Ok(_) => {}
306+
};
307+
// Upcast the raw bytes to the `Hash` type.
308+
let hash = Hash::from_bytes(hash_bytes);
309+
// Download the content via iroh-blobs.
310+
store.remote().fetch(blobs_conn.clone(), hash).await?;
311+
out.push(hash);
312+
}
313+
conn.close(0u32.into(), b"done");
314+
blobs_conn.close(0u32.into(), b"done");
315+
Ok(out)
316+
}
317+
318+
/// Read a blob from the local blob store and print it to STDOUT.
319+
async fn read_and_print(store: &Store, hash: Hash) -> Result<()> {
320+
let content = store.get_bytes(hash).await?;
321+
let message = String::from_utf8(content.to_vec())?;
322+
println!("{}: {message}", hash.fmt_short());
323+
Ok(())
324+
}
325+
326+
/// Set the RUST_LOG env var to one of {debug,info,warn} to see logging.
327+
fn setup_logging() {
328+
tracing_subscriber::registry()
329+
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
330+
.with(EnvFilter::from_default_env())
331+
.try_init()
332+
.ok();
333+
}
334+
335+
/// Gets a secret key from the IROH_SECRET environment variable or generates a new random one.
336+
/// If the environment variable is set, it must be a valid string representation of a secret key.
337+
pub fn get_or_generate_secret_key() -> Result<SecretKey> {
338+
use std::{env, str::FromStr};
339+
340+
use rand::thread_rng;
341+
if let Ok(secret) = env::var("IROH_SECRET") {
342+
// Parse the secret key from string
343+
SecretKey::from_str(&secret).context("Invalid secret key format")
344+
} else {
345+
// Generate a new random key
346+
let secret_key = SecretKey::generate(&mut thread_rng());
347+
println!(
348+
"Generated new secret key: {}",
349+
hex::encode(secret_key.to_bytes())
350+
);
351+
println!("To reuse this key, set the IROH_SECRET environment variable to this value");
352+
Ok(secret_key)
353+
}
354+
}

0 commit comments

Comments
 (0)