Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.75
toolchain: 1.92
override: true
components: rustfmt, clippy
- name: Set up cargo cache
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ clap = { version = "4.5.26", features = ["derive"] }
env_logger = "0.11.6"
futures = "0.3.31"
lapin = "2.5.0"
libp2p = { version = "0.51.3", features = ["dns", "gossipsub", "macros", "noise", "rsa", "serde", "tcp", "tokio", "yamux"] }
libp2p = { version = "0.51.3", features = ["dns", "gossipsub", "macros", "noise", "rsa", "serde", "tcp", "tokio", "yamux", "metrics"] }
libp2p-mplex = "0.39.0"
log = "0.4.22"
# Note: by default, the sentry package requires OpenSSL. We use rustls instead to avoid depending on system packages.
Expand All @@ -25,4 +25,5 @@ serde_yaml = "0.9.34"
tokio = { version = "1.43.0", features = ["full"] }
tokio-stream = { version = "0.1.17", features = ["time"] }
tracing-subscriber = { version = "0.3.19", features = ["fmt"] }
prometheus-client = { version = "0.23.0" }
void = "1.0.2"
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Default for P2PConfig {
"/dns/api2.aleph.im/tcp/4025/p2p/QmZkurbY2G2hWay59yiTgQNaQxHSNzKZFt2jbnwJhQcKgV"
.parse()
.expect(PEER_MULTIADDR_ERROR_MESSAGE),
"/dns/api3.aleph.im/tcp/4025/p2p/Qmb5b2ZwJm9pVWrppf3D3iMF1bXbjZhbJTwGvKEBMZNxa2",
"/dns/api3.aleph.im/tcp/4025/p2p/Qmb5b2ZwJm9pVWrppf3D3iMF1bXbjZhbJTwGvKEBMZNxa2"
.parse()
.expect(PEER_MULTIADDR_ERROR_MESSAGE),
],
Expand Down
30 changes: 30 additions & 0 deletions src/http/endpoints/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
pub mod dial;
mod error;
pub mod identity;

use actix_web::{web, HttpResponse, Result};
use prometheus_client::encoding::text::encode;
use crate::AppState;

pub async fn metrics(data: web::Data<AppState>) -> Result<HttpResponse> {
data.metrics.http_requests_total.inc();
data.metrics.update_memory_usage();

let mut buffer = String::new();
if let Err(e) = encode(&mut buffer, &data.metrics.registry) {
return Ok(HttpResponse::InternalServerError().body(format!("Failed to encode metrics: {}", e)));
}

Ok(HttpResponse::Ok()
.content_type("text/plain; version=0.0.4; charset=utf-8")
.body(buffer))
}

pub async fn health(data: web::Data<AppState>) -> Result<HttpResponse> {
data.metrics.http_requests_total.inc();

let health_status = serde_json::json!({
"status": "healthy",
"peer_id": data.peer_id.to_string(),
"timestamp": chrono::Utc::now().to_rfc3339()
});

Ok(HttpResponse::Ok().json(health_status))
}
4 changes: 3 additions & 1 deletion src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ pub fn config(cfg: &mut web::ServiceConfig) {
.route("/identify", web::get().to(endpoints::identity::identify))
.route("/dial", web::post().to(endpoints::dial::dial)),
),
);
)
.route("/metrics", web::get().to(endpoints::metrics))
.route("/health", web::get().to(endpoints::health));
}
95 changes: 66 additions & 29 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ use log::{debug, error, info, warn};
use crate::config::AppConfig;
use crate::gossipsub::Message as GossipsubMessage;
use crate::message_queue::RabbitMqClient;
use crate::metrics::Metrics;
use crate::p2p::network::P2PClient;

mod config;
mod http;
mod message_queue;
mod metrics;
mod p2p;
use serde_json::{Value};
use std::str;

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
Expand All @@ -34,19 +38,21 @@ struct CliArgs {
}

fn read_config(config_file: &PathBuf) -> AppConfig {
let f = std::fs::File::open(config_file).expect("could not open config.yml");
serde_yaml::from_reader(&f).expect("invalid YAML content")
let f = std::fs::File::open(config_file)
.unwrap_or_else(|e| panic!("Could not open config file {:?}: {}", config_file, e));
serde_yaml::from_reader(&f)
.unwrap_or_else(|e| panic!("Invalid YAML content in {:?}: {}", config_file, e))
}

fn load_p2p_private_key(private_key_path: &PathBuf) -> identity::Keypair {
// Note for later: translate RSA PEM key with:
// openssl pkcs8 -topk8 -inform PEM -outform DER -in node-secret.key -out node-secret.pkcs8.der -nocrypt

// let private_key_path = std::path::Path::new(private_key_file);
let mut private_key_bytes =
std::fs::read(private_key_path).expect("could not load private key file");
let mut private_key_bytes = std::fs::read(private_key_path)
.unwrap_or_else(|e| panic!("Could not load private key file {:?}: {}", private_key_path, e));
let rsa_keypair = identity::rsa::Keypair::try_decode_pkcs8(private_key_bytes.as_mut())
.expect("could not decode private key");
.unwrap_or_else(|e| panic!("Could not decode private key from {:?}: {}", private_key_path, e));

identity::Keypair::from(rsa_keypair)
}
Expand Down Expand Up @@ -80,6 +86,7 @@ async fn dial_bootstrap_peers(network_client: &mut P2PClient, peers: &[Multiaddr

async fn subscribe_to_topics(network_client: &mut P2PClient, topics: &Vec<String>) {
for topic in topics {
info!("Subscribing to topic: {}", topic);
let topic = gossipsub::IdentTopic::new(topic);
network_client
.subscribe(&topic)
Expand All @@ -88,54 +95,78 @@ async fn subscribe_to_topics(network_client: &mut P2PClient, topics: &Vec<String
}
}

async fn publish_message(network_client: &mut P2PClient, delivery: &Delivery) {
async fn publish_message(network_client: &mut P2PClient, delivery: &Delivery, metrics: &Metrics) {
let topic = gossipsub::IdentTopic::new(delivery.routing_key.as_str());
let publish_result = network_client.publish(&topic, &delivery.data).await;

if let Err(e) = publish_result {
error!("Could not publish to P2P topic {}: {}", topic, e);
info!("Publishing message on topic: {}", topic);
match network_client.publish(&topic, &delivery.data).await {
Ok(_) => {
metrics.total_messages_sent.inc();
metrics.increment_event("message_published");
}
Err(e) => {
error!("Could not publish to P2P topic {}: {}", topic, e);
metrics.increment_event("publish_error");
}
}
}

async fn forward_p2p_message(mq_client: &mut RabbitMqClient, message: GossipsubMessage) {
async fn forward_p2p_message(mq_client: &mut RabbitMqClient, message: GossipsubMessage, metrics: &Metrics) {
match message.source {
None => {
warn!("Received pubsub message from an unspecified sender. Discarding.");
}
Some(peer_id) => {
let routing_key = format!("{}.{}.{}", "p2p", message.topic, peer_id);
mq_client
.publish(&routing_key, &message.data)
.await
.unwrap();

let item_hash = str::from_utf8(&message.data)
.ok()
.and_then(|s| serde_json::from_str::<Value>(s).ok())
.and_then(|v| v.get("item_hash").and_then(|h| h.as_str().map(String::from)));

info!(
"Forwarding p2p message from peer_id: {} on topic: {} routing_key: {} item_hash: {}",
&peer_id,
&message.topic,
&routing_key,
item_hash.as_deref().unwrap_or("N/A")
);

match mq_client.publish(&routing_key, &message.data).await {
Ok(_) => {
metrics.total_messages_received.inc();
metrics.increment_event("message_forwarded");
}
Err(e) => {
error!("Failed to forward message to RabbitMQ: {}", e);
metrics.increment_event("forward_error");
}
}
}
}
}

async fn mq_to_p2p_loop(mut mq_client: RabbitMqClient, mut network_client: P2PClient) {
async fn mq_to_p2p_loop(mut mq_client: RabbitMqClient, mut network_client: P2PClient, metrics: Metrics) {
while let Some(delivery) = mq_client.next().await {
if let Ok(delivery) = delivery {
publish_message(&mut network_client, &delivery).await;
delivery
.ack(BasicAckOptions::default())
.await
.expect("RabbitMQ message ack should succeed");
publish_message(&mut network_client, &delivery, &metrics).await;
if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
error!("Failed to acknowledge RabbitMQ message: {}", e);
}
}
}
}

async fn p2p_to_mq_loop(
mut mq_client: RabbitMqClient,
mut network_events: impl StreamExt<Item = p2p::network::Event> + Unpin,
metrics: Metrics,
) {
while let Some(network_event) = network_events.next().await {
match network_event {
p2p::network::Event::PubsubMessage {
propagation_source: _,
message_id: _,
message,
message, ..
} => {
forward_p2p_message(&mut mq_client, message).await;
forward_p2p_message(&mut mq_client, message, &metrics).await;
}
}
}
Expand All @@ -145,7 +176,9 @@ async fn p2p_to_mq_loop(
fn configure_logging() {
let mut log_builder = env_logger::builder();
let logger = sentry::integrations::log::SentryLogger::with_dest(log_builder.build());
log::set_boxed_logger(Box::new(logger)).expect("setting global logger should succeed");
if let Err(e) = log::set_boxed_logger(Box::new(logger)) {
eprintln!("Failed to set global logger: {}", e);
}
log::set_max_level(log::LevelFilter::Info);
}

Expand All @@ -165,6 +198,7 @@ pub struct AppState {
pub app_config: AppConfig,
pub p2p_client: tokio::sync::Mutex<P2PClient>,
pub peer_id: PeerId,
pub metrics: Metrics,
}

#[actix_web::main]
Expand All @@ -183,8 +217,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let peer_id = PeerId::from(id_keys.public());
info!("Peer ID: {:?}", peer_id);

let metrics = Metrics::new();

let (mut network_client, network_events, network_event_loop) =
p2p::network::new(id_keys).await?;
p2p::network::new(id_keys, metrics.connected_peers.clone()).await?;

// Spawn the network task and run it in the background.
let p2p_event_loop_handle = tokio::spawn(network_event_loop.run());
Expand All @@ -211,13 +247,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create RabbitMQ exchanges/queues
let mq_client = message_queue::new(&app_config).await?;

let mq_to_p2p_handle = tokio::spawn(mq_to_p2p_loop(mq_client.clone(), network_client.clone()));
let p2p_to_mq_handle = tokio::spawn(p2p_to_mq_loop(mq_client, network_events));
let mq_to_p2p_handle = tokio::spawn(mq_to_p2p_loop(mq_client.clone(), network_client.clone(), metrics.clone()));
let p2p_to_mq_handle = tokio::spawn(p2p_to_mq_loop(mq_client, network_events, metrics.clone()));

let app_data = Data::new(AppState {
app_config: app_config.clone(),
p2p_client: tokio::sync::Mutex::new(network_client.clone()),
peer_id,
metrics: metrics.clone(),
});

let http_server_bind_address = format!("0.0.0.0:{}", &app_config.p2p.control_port.0);
Expand Down
2 changes: 0 additions & 2 deletions src/message_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::process::exit;
pub struct RabbitMqClient {
channel: Channel,
pub_consumer: Consumer,
pub pub_exchange: String,
pub sub_exchange: String,
}

Expand Down Expand Up @@ -139,7 +138,6 @@ pub async fn new(app_config: &AppConfig) -> Result<RabbitMqClient, Box<dyn std::
Ok(RabbitMqClient {
channel,
pub_consumer,
pub_exchange: pub_exchange_name.to_owned(),
sub_exchange: sub_exchange_name.to_owned(),
})
}
Loading
Loading