diff --git a/Cargo.lock b/Cargo.lock index 59d100f26..5afb77d55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5043,6 +5043,15 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.7.5" @@ -5175,6 +5184,12 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "mutually_exclusive_features" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94e1e6445d314f972ff7395df2de295fe51b71821694f0b0e1e79c4f12c8577" + [[package]] name = "native-tls" version = "0.2.14" @@ -5245,6 +5260,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "num" version = "0.2.1" @@ -5392,15 +5416,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "num_threads" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" -dependencies = [ - "libc", -] - [[package]] name = "number_prefix" version = "0.4.0" @@ -5604,7 +5619,6 @@ dependencies = [ "sha2 0.10.9", "sha3", "simple_asn1", - "simplelog", "solana-client", "solana-sdk", "solana-system-interface", @@ -5621,6 +5635,11 @@ dependencies = [ "thiserror 2.0.16", "tokio", "tower 0.5.2", + "tracing", + "tracing-actix-web", + "tracing-appender", + "tracing-error", + "tracing-subscriber", "utoipa", "uuid 1.18.1", "validator", @@ -7484,17 +7503,6 @@ dependencies = [ "time", ] -[[package]] -name = "simplelog" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16257adbfaef1ee58b1363bdc0664c9b8e1e30aed86049635fb5f147d065a9c0" -dependencies = [ - "log", - "termcolor", - "time", -] - [[package]] name = "siphasher" version = "0.3.11" @@ -10531,9 +10539,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83bde6f1ec10e72d583d91623c939f623002284ef622b87de38cfd546cbf2031" dependencies = [ "deranged", - "libc", "num-conv", - "num_threads", "powerfmt", "serde", "time-core", @@ -10833,6 +10839,31 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-actix-web" +version = "0.7.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5360edd490ec8dee9fedfc6a9fd83ac2f01b3e1996e3261b9ad18a61971fe064" +dependencies = [ + "actix-web", + "mutually_exclusive_features", + "pin-project", + "tracing", + "uuid 1.18.1", +] + +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror 1.0.69", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.30" @@ -10873,15 +10904,46 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "serde", + "serde_json", "sharded-slab", + "smallvec", "thread_local", + "tracing", "tracing-core", + "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 51c9b2cf0..a4a9b2dc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,11 @@ aws-config = { version = "1.6.3", features = ["behavior-version-latest"] } aws-sdk-kms = "1.75.0" actix-web = "4" log = "0.4" -simplelog = "0.12" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "tracing-log"] } +tracing-error = { version = "0.2" } +tracing-appender = "0.2" +tracing-actix-web = "0.7" prometheus = "0.14" lazy_static = "1.5" dotenvy = "0.15" diff --git a/src/constants/logging.rs b/src/constants/logging.rs new file mode 100644 index 000000000..d25b7535d --- /dev/null +++ b/src/constants/logging.rs @@ -0,0 +1,22 @@ +//! Logging configuration constants + +/// Default maximum log file size in bytes (1GB) +pub const DEFAULT_MAX_LOG_FILE_SIZE: u64 = 1_073_741_824; + +/// Default log level when not specified +pub const DEFAULT_LOG_LEVEL: &str = "info"; + +/// Default log format when not specified +pub const DEFAULT_LOG_FORMAT: &str = "compact"; + +/// Default log mode when not specified +pub const DEFAULT_LOG_MODE: &str = "stdout"; + +/// Default log directory for file logging +pub const DEFAULT_LOG_DIR: &str = "./logs"; + +/// Default log directory when running in Docker +pub const DOCKER_LOG_DIR: &str = "logs/"; + +/// Log file name +pub const LOG_FILE_NAME: &str = "relayer.log"; diff --git a/src/constants/mod.rs b/src/constants/mod.rs index 50a50550d..5471e11fa 100644 --- a/src/constants/mod.rs +++ b/src/constants/mod.rs @@ -37,3 +37,6 @@ pub use transactions::*; mod network_tags; pub use network_tags::*; + +mod logging; +pub use logging::*; diff --git a/src/jobs/handlers/notification_handler.rs b/src/jobs/handlers/notification_handler.rs index e8e8d3eaf..0e31d241c 100644 --- a/src/jobs/handlers/notification_handler.rs +++ b/src/jobs/handlers/notification_handler.rs @@ -3,6 +3,7 @@ //! This module implements the notification handling worker that processes //! notification jobs from the queue. +use crate::setup_job_tracing; use actix_web::web::ThinData; use apalis::prelude::{Attempt, Data, *}; use eyre::Result; @@ -29,6 +30,8 @@ pub async fn notification_handler( context: Data>, attempt: Attempt, ) -> Result<(), Error> { + setup_job_tracing!(job, attempt); + info!("handling notification: {:?}", job.data); let result = handle_request(job.data, context).await; diff --git a/src/jobs/handlers/solana_swap_request_handler.rs b/src/jobs/handlers/solana_swap_request_handler.rs index e0955b73a..6b3b9769d 100644 --- a/src/jobs/handlers/solana_swap_request_handler.rs +++ b/src/jobs/handlers/solana_swap_request_handler.rs @@ -3,6 +3,7 @@ //! This module implements the solana token swap request handling worker that processes //! notification jobs from the queue. +use crate::setup_job_tracing; use actix_web::web::ThinData; use apalis::prelude::{Attempt, Data, *}; use eyre::Result; @@ -29,6 +30,8 @@ pub async fn solana_token_swap_request_handler( context: Data>, attempt: Attempt, ) -> Result<(), Error> { + setup_job_tracing!(job, attempt); + info!("handling solana token swap request: {:?}", job.data); let result = handle_request(job.data, context).await; diff --git a/src/jobs/handlers/transaction_request_handler.rs b/src/jobs/handlers/transaction_request_handler.rs index 2c1a6e1d7..1a929315e 100644 --- a/src/jobs/handlers/transaction_request_handler.rs +++ b/src/jobs/handlers/transaction_request_handler.rs @@ -2,6 +2,7 @@ //! //! Handles the validation and preparation of transactions before they are //! submitted to the network +use crate::setup_job_tracing; use actix_web::web::ThinData; use apalis::prelude::{Attempt, Context, Data, TaskId, Worker, *}; use apalis_redis::RedisContext; @@ -23,6 +24,8 @@ pub async fn transaction_request_handler( task_id: TaskId, ctx: RedisContext, ) -> Result<(), Error> { + setup_job_tracing!(job, attempt); + info!("Handling transaction request: {:?}", job.data); info!("Attempt: {:?}", attempt); info!("Worker: {:?}", worker); diff --git a/src/jobs/handlers/transaction_status_handler.rs b/src/jobs/handlers/transaction_status_handler.rs index 34b0b32bb..d52c40a5b 100644 --- a/src/jobs/handlers/transaction_status_handler.rs +++ b/src/jobs/handlers/transaction_status_handler.rs @@ -7,6 +7,7 @@ use actix_web::web::ThinData; use apalis::prelude::{Attempt, Data, *}; +use crate::setup_job_tracing; use eyre::Result; use log::info; @@ -22,6 +23,8 @@ pub async fn transaction_status_handler( state: Data>, attempt: Attempt, ) -> Result<(), Error> { + setup_job_tracing!(job, attempt); + info!("Handling transaction status job: {:?}", job.data); let result = handle_request(job.data, state).await; diff --git a/src/jobs/handlers/transaction_submission_handler.rs b/src/jobs/handlers/transaction_submission_handler.rs index b186de97b..850dd7d6c 100644 --- a/src/jobs/handlers/transaction_submission_handler.rs +++ b/src/jobs/handlers/transaction_submission_handler.rs @@ -5,6 +5,7 @@ //! - Handles different submission commands (Submit, Cancel, Resubmit) //! - Updates transaction status after submission //! - Enqueues status monitoring jobs +use crate::setup_job_tracing; use actix_web::web::ThinData; use apalis::prelude::{Attempt, Data, *}; use eyre::Result; @@ -22,6 +23,8 @@ pub async fn transaction_submission_handler( state: Data>, attempt: Attempt, ) -> Result<(), Error> { + setup_job_tracing!(job, attempt); + info!("handling transaction submission: {:?}", job.data); let result = handle_request(job.data, state).await; diff --git a/src/jobs/job.rs b/src/jobs/job.rs index 840a4ce90..b842f3113 100644 --- a/src/jobs/job.rs +++ b/src/jobs/job.rs @@ -19,6 +19,8 @@ pub struct Job { pub timestamp: String, pub job_type: JobType, pub data: T, + #[serde(skip_serializing_if = "Option::is_none")] + pub request_id: Option, } impl Job { @@ -29,8 +31,13 @@ impl Job { timestamp: Utc::now().timestamp().to_string(), job_type, data, + request_id: None, } } + pub fn with_request_id(mut self, id: Option) -> Self { + self.request_id = id; + self + } } // Enum to represent different message types diff --git a/src/jobs/job_producer.rs b/src/jobs/job_producer.rs index 384b529bc..b253b12c9 100644 --- a/src/jobs/job_producer.rs +++ b/src/jobs/job_producer.rs @@ -11,6 +11,7 @@ use crate::{ Job, NotificationSend, Queue, TransactionRequest, TransactionSend, TransactionStatusCheck, }, models::RelayerError, + observability::request_id::get_request_id, }; use apalis::prelude::Storage; use apalis_redis::RedisError; @@ -124,7 +125,8 @@ impl JobProducerTrait for JobProducer { transaction_process_job ); let mut queue = self.queue.lock().await; - let job = Job::new(JobType::TransactionRequest, transaction_process_job); + let job = Job::new(JobType::TransactionRequest, transaction_process_job) + .with_request_id(get_request_id()); match scheduled_on { Some(scheduled_on) => { @@ -148,7 +150,8 @@ impl JobProducerTrait for JobProducer { scheduled_on: Option, ) -> Result<(), JobProducerError> { let mut queue = self.queue.lock().await; - let job = Job::new(JobType::TransactionSend, transaction_submit_job); + let job = Job::new(JobType::TransactionSend, transaction_submit_job) + .with_request_id(get_request_id()); match scheduled_on { Some(on) => { @@ -172,7 +175,8 @@ impl JobProducerTrait for JobProducer { let job = Job::new( JobType::TransactionStatusCheck, transaction_status_check_job, - ); + ) + .with_request_id(get_request_id()); match scheduled_on { Some(on) => { queue.transaction_status_queue.schedule(job, on).await?; @@ -191,7 +195,8 @@ impl JobProducerTrait for JobProducer { scheduled_on: Option, ) -> Result<(), JobProducerError> { let mut queue = self.queue.lock().await; - let job = Job::new(JobType::NotificationSend, notification_send_job); + let job = Job::new(JobType::NotificationSend, notification_send_job) + .with_request_id(get_request_id()); match scheduled_on { Some(on) => { @@ -212,7 +217,8 @@ impl JobProducerTrait for JobProducer { scheduled_on: Option, ) -> Result<(), JobProducerError> { let mut queue = self.queue.lock().await; - let job = Job::new(JobType::SolanaTokenSwapRequest, solana_swap_request_job); + let job = Job::new(JobType::SolanaTokenSwapRequest, solana_swap_request_job) + .with_request_id(get_request_id()); match scheduled_on { Some(on) => { diff --git a/src/lib.rs b/src/lib.rs index b8f033718..9b15cae44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ pub mod jobs; pub mod logging; pub mod metrics; pub mod models; +pub mod observability; pub mod openapi; pub mod repositories; pub mod services; diff --git a/src/logging/mod.rs b/src/logging/mod.rs index 9c6e19e84..710c824ca 100644 --- a/src/logging/mod.rs +++ b/src/logging/mod.rs @@ -3,16 +3,24 @@ //! Environment variables used: //! - LOG_MODE: "stdout" (default) or "file" //! - LOG_LEVEL: log level ("trace", "debug", "info", "warn", "error"); default is "info" +//! - LOG_FORMAT: output format ("compact" (default), "pretty", "json") //! - LOG_DATA_DIR: when using file mode, the path of the log file (default "logs/relayer.log") use chrono::Utc; -use log::info; -use simplelog::{Config, LevelFilter, SimpleLogger, WriteLogger}; use std::{ env, fs::{create_dir_all, metadata, File, OpenOptions}, path::Path, }; +use tracing::info; +use tracing_appender::non_blocking; +use tracing_error::ErrorLayer; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +use crate::constants::{ + DEFAULT_LOG_DIR, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL, DEFAULT_LOG_MODE, + DEFAULT_MAX_LOG_FILE_SIZE, DOCKER_LOG_DIR, LOG_FILE_NAME, +}; /// Computes the path of the rolled log file given the base file path and the date string. pub fn compute_rolled_file_path(base_file_path: &str, date_str: &str, index: u32) -> String { @@ -58,87 +66,150 @@ pub fn space_based_rolling( /// Sets up logging by reading configuration from environment variables. pub fn setup_logging() { - let log_mode = env::var("LOG_MODE").unwrap_or_else(|_| "stdout".to_string()); - let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()); - // Parse the log level into LevelFilter - let level_filter = match log_level.to_lowercase().as_str() { - "trace" => LevelFilter::Trace, - "debug" => LevelFilter::Debug, - "info" => LevelFilter::Info, - "warn" => LevelFilter::Warn, - "error" => LevelFilter::Error, - _ => LevelFilter::Info, - }; - - // Only run if log_mode is "file" - if log_mode.to_lowercase() == "file" { - info!("Logging to file: {}", log_level); - - // Use logs/ directly in container path, otherwise use LOG_DATA_DIR or default to logs/ for host path - let log_dir = if env::var("IN_DOCKER") - .map(|val| val == "true") - .unwrap_or(false) - { - "logs/".to_string() + // Set RUST_LOG from LOG_LEVEL if RUST_LOG is not already set + if std::env::var_os("RUST_LOG").is_none() { + if let Ok(level) = env::var("LOG_LEVEL") { + std::env::set_var("RUST_LOG", level); + } + } + + // Configure filter, format, and mode from environment + let env_filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(DEFAULT_LOG_LEVEL)); + let format = env::var("LOG_FORMAT").unwrap_or_else(|_| DEFAULT_LOG_FORMAT.to_string()); + let log_mode = env::var("LOG_MODE").unwrap_or_else(|_| DEFAULT_LOG_MODE.to_string()); + + // Set up logging based on mode + if log_mode.eq_ignore_ascii_case("file") { + // File logging setup + let log_dir = if env::var("IN_DOCKER").ok().as_deref() == Some("true") { + DOCKER_LOG_DIR.to_string() } else { - env::var("LOG_DATA_DIR").unwrap_or_else(|_| "./logs".to_string()) + env::var("LOG_DATA_DIR").unwrap_or_else(|_| DEFAULT_LOG_DIR.to_string()) }; - let log_dir = format!("{}/", log_dir.trim_end_matches('/')); - // set dates + let now = Utc::now(); let date_str = now.format("%Y-%m-%d").to_string(); + let base_file_path = format!("{}{}", log_dir, LOG_FILE_NAME); - // Get log file path from environment or use default - let base_file_path = format!("{}relayer.log", log_dir); - - // verify the log file already exists - if Path::new(&base_file_path).exists() { - info!( - "Base Log file already exists: {}. Proceeding to compute rolled log file path.", - base_file_path - ); - } - - // Time-based rolling: compute file name based on the current UTC date. - let time_based_path = time_based_rolling(&base_file_path, &date_str, 1); - - // Ensure parent directory exists. - if let Some(parent) = Path::new(&time_based_path).parent() { + if let Some(parent) = Path::new(&base_file_path).parent() { create_dir_all(parent).expect("Failed to create log directory"); } - // Space-based rolling: if an existing log file exceeds 1GB, adopt a new file name. - let max_size: u64 = env::var("LOG_MAX_SIZE") - .map(|s| { - s.parse::() - .expect("LOG_MAX_SIZE must be a valid u64 if set") - }) - .unwrap_or(1_073_741_824); - + let time_based_path = time_based_rolling(&base_file_path, &date_str, 1); + let max_size = match env::var("LOG_MAX_SIZE") { + Ok(value) => value.parse().unwrap_or_else(|_| { + panic!("LOG_MAX_SIZE must be a valid u64 if set"); + }), + Err(_) => DEFAULT_MAX_LOG_FILE_SIZE, + }; let final_path = space_based_rolling(&time_based_path, &base_file_path, &date_str, max_size); - // Open the log file. Append to it if it exists and is under threshold; otherwise, create - // it. - let log_file = if Path::new(&final_path).exists() { + let file = if Path::new(&final_path).exists() { OpenOptions::new() .append(true) .open(&final_path) - .unwrap_or_else(|e| panic!("Unable to open log file {}: {}", final_path, e)) + .expect("Failed to open log file") } else { - File::create(&final_path) - .unwrap_or_else(|e| panic!("Unable to create log file {}: {}", final_path, e)) + File::create(&final_path).expect("Failed to create log file") }; - WriteLogger::init(level_filter, Config::default(), log_file) - .expect("Failed to initialize file logger"); + + let (non_blocking_writer, guard) = non_blocking(file); + Box::leak(Box::new(guard)); // Keep guard alive for the lifetime of the program + + match format.as_str() { + "pretty" => { + tracing_subscriber::registry() + .with(env_filter) + .with(ErrorLayer::default()) + .with( + fmt::layer() + .with_writer(non_blocking_writer) + .with_ansi(false) + .pretty() + .with_thread_ids(true) + .with_file(true) + .with_line_number(true), + ) + .init(); + } + "json" => { + tracing_subscriber::registry() + .with(env_filter) + .with(ErrorLayer::default()) + .with( + fmt::layer() + .with_writer(non_blocking_writer) + .with_ansi(false) + .json() + .with_current_span(true) + .with_span_list(true) + .with_thread_ids(true) + .with_file(true) + .with_line_number(true), + ) + .init(); + } + _ => { + // compact is default + tracing_subscriber::registry() + .with(env_filter) + .with(ErrorLayer::default()) + .with( + fmt::layer() + .with_writer(non_blocking_writer) + .with_ansi(false) + .compact() + .with_target(false), + ) + .init(); + } + } } else { - // Default to stdout logging - SimpleLogger::init(level_filter, Config::default()) - .expect("Failed to initialize simple logger"); + // Stdout logging + match format.as_str() { + "pretty" => { + tracing_subscriber::registry() + .with(env_filter) + .with(ErrorLayer::default()) + .with( + fmt::layer() + .pretty() + .with_thread_ids(true) + .with_file(true) + .with_line_number(true), + ) + .init(); + } + "json" => { + tracing_subscriber::registry() + .with(env_filter) + .with(ErrorLayer::default()) + .with( + fmt::layer() + .json() + .with_current_span(true) + .with_span_list(true) + .with_thread_ids(true) + .with_file(true) + .with_line_number(true), + ) + .init(); + } + _ => { + // compact is default + tracing_subscriber::registry() + .with(env_filter) + .with(ErrorLayer::default()) + .with(fmt::layer().compact().with_target(false)) + .init(); + } + } } - info!("Logging is successfully configured (mode: {})", log_mode); + info!(mode=%log_mode, format=%format, "logging configured"); } #[cfg(test)] diff --git a/src/main.rs b/src/main.rs index 770177786..34794ce67 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,18 +27,18 @@ use std::sync::Arc; use actix_governor::{Governor, GovernorConfigBuilder}; +use actix_web::HttpResponse; use actix_web::{ dev::Service, - middleware::{self, Logger}, + middleware::{self}, web::{self}, App, HttpServer, }; use color_eyre::{eyre::WrapErr, Result}; use config::Config; +use futures::future::{ready, Either}; use metrics::middleware::MetricsMiddleware; -use actix_web::HttpResponse; - use config::ApiKeyRateLimit; use dotenvy::dotenv; use log::info; @@ -54,8 +54,10 @@ use openzeppelin_relayer::{ constants::PUBLIC_ENDPOINTS, logging::setup_logging, metrics, + observability::RequestIdMiddleware, utils::check_authorization_header, }; +use tracing_actix_web::TracingLogger; fn load_config_file(config_file_path: &str) -> Result { config::load_config(config_file_path).wrap_err("Failed to load config file") @@ -63,12 +65,14 @@ fn load_config_file(config_file_path: &str) -> Result { #[actix_web::main] async fn main() -> Result<()> { - // Initialize error reporting with eyre - color_eyre::install().wrap_err("Failed to initialize error reporting")?; - dotenv().ok(); + + // Setup logging first so ErrorLayer is available setup_logging(); + // Initialize error reporting with eyre (after logging setup) + color_eyre::install().wrap_err("Failed to initialize error reporting")?; + // Log service information at startup openzeppelin_relayer::utils::log_service_info(); @@ -114,30 +118,25 @@ async fn main() -> Result<()> { app .wrap_fn(move |req, srv| { let path = req.path(); - - let is_public_endpoint = PUBLIC_ENDPOINTS.iter().any(|prefix| path.starts_with(prefix)); - - if is_public_endpoint { - return srv.call(req); - } - - if check_authorization_header(&req, &config.api_key) { - return srv.call(req); + let is_public = PUBLIC_ENDPOINTS.iter().any(|p| path.starts_with(p)); + let authorized = is_public || check_authorization_header(&req, &config.api_key); + + if authorized { + Either::Left(srv.call(req)) + } else { + let res = HttpResponse::Unauthorized() + .body(r#"{"success":false,"code":401,"error":"Unauthorized","message":"Unauthorized"}"#) + .map_into_boxed_body(); + Either::Right(ready(Ok(req.into_response(res)))) } - Box::pin(async move { - Ok(req.into_response( - HttpResponse::Unauthorized().body( - r#"{"success": false, "code":401, "error": "Unauthorized", "message": "Unauthorized"}"#.to_string(), - ), - )) - }) }) + .wrap(RequestIdMiddleware) + .wrap(TracingLogger::default()) + .wrap(MetricsMiddleware) .wrap(Governor::new(&rate_limit_config)) - .wrap(middleware::Compress::default()) - .wrap(middleware::NormalizePath::trim()) .wrap(middleware::DefaultHeaders::new()) - .wrap(MetricsMiddleware) - .wrap(Logger::default()) + .wrap(middleware::NormalizePath::trim()) + .wrap(middleware::Compress::default()) .app_data(app_state.clone()) .service(web::scope("/api/v1").configure(api::routes::configure_routes)) } diff --git a/src/observability/job_tracing.rs b/src/observability/job_tracing.rs new file mode 100644 index 000000000..6256e12aa --- /dev/null +++ b/src/observability/job_tracing.rs @@ -0,0 +1,22 @@ +/// Macro to set up job tracing. Creates a span with job metadata, +/// sets the request ID, and maintains the span for the handler's duration. +/// +/// # Example +/// ``` +/// setup_job_tracing!(job, attempt); +/// ``` +#[macro_export] +macro_rules! setup_job_tracing { + ($job:expr, $attempt:expr) => { + let _job_request_id = $job.request_id.clone().unwrap_or_else(|| $job.message_id.clone()); + let _job_span = tracing::info_span!( + "job", + request_id=%_job_request_id, + job_message_id=%$job.message_id, + job_type=%$job.job_type.to_string(), + attempt=%$attempt.current() + ); + let _job_guard = _job_span.enter(); + $crate::observability::request_id::set_request_id(_job_request_id); + }; +} diff --git a/src/observability/middleware.rs b/src/observability/middleware.rs new file mode 100644 index 000000000..cf544c7c7 --- /dev/null +++ b/src/observability/middleware.rs @@ -0,0 +1,162 @@ +use crate::observability::request_id::set_request_id; +use actix_web::{ + dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, + http::header::{HeaderName, HeaderValue}, + Error, HttpMessage, +}; +use futures::future::LocalBoxFuture; +use std::future::{ready, Ready}; +use tracing_actix_web::RequestId as ActixRequestId; + +/// Middleware that adds request ID tracking to all HTTP requests +pub struct RequestIdMiddleware; + +impl Transform for RequestIdMiddleware +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type InitError = (); + type Transform = RequestIdMiddlewareService; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(RequestIdMiddlewareService { service })) + } +} + +pub struct RequestIdMiddlewareService { + service: S, +} + +impl Service for RequestIdMiddlewareService +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type Future = LocalBoxFuture<'static, Result>; + + forward_ready!(service); + + fn call(&self, req: ServiceRequest) -> Self::Future { + // Priority order: incoming header -> ActixRequestId -> new UUID + let rid = req + .headers() + .get("x-request-id") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + .or_else(|| { + req.extensions() + .get::() + .map(|r| r.to_string()) + }) + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + + set_request_id(rid.clone()); + + let fut = self.service.call(req); + + Box::pin(async move { + let mut res = fut.await?; + // Use safe conversion to avoid panic on invalid header values + if let Ok(header_value) = HeaderValue::from_str(&rid) { + res.headers_mut() + .insert(HeaderName::from_static("x-request-id"), header_value); + } + Ok(res) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use actix_web::{test, web, App, HttpResponse}; + use uuid::Uuid; + + #[actix_rt::test] + async fn echoes_incoming_x_request_id_header() { + let app = test::init_service(App::new().wrap(RequestIdMiddleware).route( + "/", + web::get().to(|| async { HttpResponse::Ok().body("ok") }), + )) + .await; + + let req = test::TestRequest::get() + .uri("/") + .insert_header(("x-request-id", "test-req-id-123")) + .to_request(); + + let resp = test::call_service(&app, req).await; + assert!(resp.status().is_success()); + + let hdr = resp + .headers() + .get("x-request-id") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + assert_eq!(hdr.as_deref(), Some("test-req-id-123")); + } + + #[actix_rt::test] + async fn generates_uuid_when_header_absent() { + let app = test::init_service( + App::new() + .wrap(RequestIdMiddleware) + .route("/", web::get().to(|| async { HttpResponse::Ok().finish() })), + ) + .await; + + let req = test::TestRequest::get().uri("/").to_request(); + let resp = test::call_service(&app, req).await; + assert!(resp.status().is_success()); + + let hdr = resp + .headers() + .get("x-request-id") + .expect("x-request-id header should be present") + .to_str() + .expect("header should be valid ascii"); + + // Ensure it's a valid UUID (version is not strictly asserted here) + let parsed = Uuid::try_parse(hdr).expect("x-request-id should be a UUID"); + // Sanity: UUID should not be nil + assert_ne!(parsed, Uuid::nil()); + } + + #[actix_rt::test] + async fn preserves_header_on_internal_error() { + let app = test::init_service(App::new().wrap(RequestIdMiddleware).route( + "/", + web::get().to(|| async { + // Simulate 500 error + HttpResponse::InternalServerError().finish() + }), + )) + .await; + + let req = test::TestRequest::get() + .uri("/") + .insert_header(("x-request-id", "err-req-id-999")) + .to_request(); + + let resp = test::call_service(&app, req).await; + assert_eq!( + resp.status(), + actix_web::http::StatusCode::INTERNAL_SERVER_ERROR + ); + + let hdr = resp + .headers() + .get("x-request-id") + .and_then(|v| v.to_str().ok()); + assert_eq!(hdr, Some("err-req-id-999")); + } +} diff --git a/src/observability/mod.rs b/src/observability/mod.rs new file mode 100644 index 000000000..ec7b6097b --- /dev/null +++ b/src/observability/mod.rs @@ -0,0 +1,5 @@ +pub mod job_tracing; +pub mod middleware; +pub mod request_id; + +pub use middleware::RequestIdMiddleware; diff --git a/src/observability/request_id.rs b/src/observability/request_id.rs new file mode 100644 index 000000000..37630d93a --- /dev/null +++ b/src/observability/request_id.rs @@ -0,0 +1,68 @@ +use tracing::Span; +use tracing_subscriber::{registry::LookupSpan, Registry}; + +#[derive(Clone, Debug)] +pub struct RequestId(pub String); + +pub fn set_request_id(id: impl Into) { + let id = id.into(); + Span::current().with_subscriber(|(span_id, subscriber)| { + if let Some(reg) = subscriber.downcast_ref::() { + if let Some(span_ref) = reg.span(span_id) { + span_ref.extensions_mut().replace(RequestId(id)); + } + } + }); +} + +pub fn get_request_id() -> Option { + let mut out = None; + Span::current().with_subscriber(|(span_id, subscriber)| { + if let Some(reg) = subscriber.downcast_ref::() { + if let Some(span_ref) = reg.span(span_id) { + if let Some(r) = span_ref.extensions().get::() { + out = Some(r.0.clone()); + } + } + } + }); + out +} + +#[cfg(test)] +mod tests { + use super::*; + use tracing::info_span; + use tracing_subscriber::{fmt, prelude::*}; + + #[test] + fn set_and_get_request_id_within_span() { + tracing::subscriber::with_default( + tracing_subscriber::registry().with(fmt::layer()), + || { + let span = info_span!("test_span"); + let _guard = span.enter(); + + set_request_id("abc-123"); + assert_eq!(get_request_id().as_deref(), Some("abc-123")); + }, + ); + } + + #[test] + fn overwrite_request_id_replaces_value() { + tracing::subscriber::with_default( + tracing_subscriber::registry().with(fmt::layer()), + || { + let span = info_span!("test_span"); + let _guard = span.enter(); + + set_request_id("first"); + assert_eq!(get_request_id().as_deref(), Some("first")); + + set_request_id("second"); + assert_eq!(get_request_id().as_deref(), Some("second")); + }, + ); + } +}