diff --git a/.gitignore b/.gitignore index e63e218a3bf..dbf24932ea3 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,6 @@ genesis.ssz # VSCode /.vscode + +# Environment variables +.env \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index af5d63e97f4..0d494ae2ec6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "getrandom 0.2.15", "once_cell", "version_check", "zerocopy 0.7.35", @@ -659,9 +660,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.87" +version = "0.1.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d556ec1359574147ec0c4fc5eb525f3f23263a592b1a9c07e0a75b427de55c97" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" dependencies = [ "proc-macro2", "quote", @@ -692,6 +693,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1905,9 +1915,9 @@ dependencies = [ [[package]] name = "crc" -version = "3.2.1" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" dependencies = [ "crc-catalog", ] @@ -1991,6 +2001,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2518,6 +2537,12 @@ dependencies = [ "validator_store", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -2632,6 +2657,9 @@ name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -2794,6 +2822,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "eth-keystore" version = "0.5.0" @@ -3633,6 +3672,17 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.12.3", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -4047,6 +4097,9 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "heck" @@ -5940,6 +5993,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "mdbx-sys" version = "0.11.6-4" @@ -8805,6 +8868,141 @@ dependencies = [ "der 0.7.9", ] +[[package]] +name = "sqlformat" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" +dependencies = [ + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9a2ccff1a000a5a59cd33da541d9f2fdcd9e6e8229cc200565942bff36d0aaa" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-postgres", +] + +[[package]] +name = "sqlx-core" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24ba59a9342a3d9bab6c56c118be528b27c9b60e490080e9711a04dccac83ef6" +dependencies = [ + "ahash", + "atoi", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener 2.5.3", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashlink 0.8.4", + "hex", + "indexmap 2.8.0", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "serde", + "serde_json", + "sha2 0.10.8", + "smallvec", + "sqlformat", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ea40e2345eb2faa9e1e5e326db8c34711317d2b5e08d0d5741619048a803127" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 1.0.109", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5833ef53aaa16d860e92123292f1f6a3d53c34ba8b1969f152ef1a7bb803f3c8" +dependencies = [ + "dotenvy", + "either", + "heck 0.4.1", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2 0.10.8", + "sqlx-core", + "sqlx-postgres", + "syn 1.0.109", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-postgres" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c824eb80b894f926f89a0b9da0c7f435d27cdd35b8c655b114e58223918577e" +dependencies = [ + "atoi", + "base64 0.21.7", + "bitflags 2.9.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac 0.12.1", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", + "sha2 0.10.8", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 1.0.69", + "tracing", + "whoami", +] + [[package]] name = "ssz_types" version = "0.11.0" @@ -8877,6 +9075,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" name = "store" version = "0.2.0" dependencies = [ + "async-trait", "beacon_chain", "bls", "criterion", @@ -8895,10 +9094,12 @@ dependencies = [ "safe_arith", "serde", "smallvec", + "sqlx", "state_processing", "strum", "superstruct", "tempfile", + "tokio", "tracing", "tracing-subscriber", "types", @@ -8906,6 +9107,17 @@ dependencies = [ "zstd 0.13.3", ] +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.10.0" @@ -9819,6 +10031,12 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -9834,6 +10052,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -9846,6 +10070,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.1" @@ -10255,6 +10485,12 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -10422,6 +10658,16 @@ dependencies = [ "rustix 0.38.44", ] +[[package]] +name = "whoami" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" +dependencies = [ + "redox_syscall 0.5.10", + "wasite", +] + [[package]] name = "widestring" version = "0.4.3" diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 456376e79bd..30d9e13e2b2 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -15,6 +15,7 @@ path = "src/lib.rs" write_ssz_files = [ "beacon_chain/write_ssz_files", ] # Writes debugging .ssz files to /tmp during block processing. +postgres = ["store/postgres"] [dependencies] account_utils = { workspace = true } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index eb27a03552b..ef2286c73e3 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1630,6 +1630,14 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) + .arg( + Arg::new("postgres-url") + .long("postgres-url") + .value_name("URL") + .help("The Postgres connection URL used when `--beacon-node-backend=postgres`.") + .action(ArgAction::Set) + .display_order(0) + ) .arg( Arg::new("delay-block-publishing") .long("delay-block-publishing") diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 13df83efabb..6a8ed083d3b 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -5,11 +5,13 @@ authors = ["Paul Hauner "] edition = { workspace = true } [features] -default = ["leveldb"] +default = ["postgres"] leveldb = ["dep:leveldb"] redb = ["dep:redb"] +postgres = ["dep:sqlx"] [dependencies] +async-trait = "0.1.88" bls = { workspace = true } db-key = "0.0.5" directory = { workspace = true } @@ -25,9 +27,11 @@ redb = { version = "2.1.3", optional = true } safe_arith = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } +sqlx = { version = "0.7.3", optional = true, default-features = false, features = ["postgres", "runtime-tokio", "macros"]} state_processing = { workspace = true } strum = { workspace = true } superstruct = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } types = { workspace = true } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index c16573df5e4..3952cc53fef 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -11,6 +11,8 @@ use types::non_zero_usize::new_non_zero_usize; use types::EthSpec; use zstd::Encoder; +#[cfg(all(feature = "postgres", not(feature = "redb"), not(feature = "leveldb")))] +pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::PostgresDB; #[cfg(all(feature = "redb", not(feature = "leveldb")))] pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Redb; #[cfg(feature = "leveldb")] @@ -267,4 +269,6 @@ pub enum DatabaseBackend { LevelDb, #[cfg(feature = "redb")] Redb, + #[cfg(feature = "postgres")] + PostgresDB } diff --git a/beacon_node/store/src/database.rs b/beacon_node/store/src/database.rs index 2232f73c5cc..2ac5df214f0 100644 --- a/beacon_node/store/src/database.rs +++ b/beacon_node/store/src/database.rs @@ -3,3 +3,7 @@ pub mod interface; pub mod leveldb_impl; #[cfg(feature = "redb")] pub mod redb_impl; +#[cfg(feature = "postgres")] +pub mod postgres_impl; + +pub mod async_interface; \ No newline at end of file diff --git a/beacon_node/store/src/database/async_interface.rs b/beacon_node/store/src/database/async_interface.rs new file mode 100644 index 00000000000..bcadbdb389d --- /dev/null +++ b/beacon_node/store/src/database/async_interface.rs @@ -0,0 +1,14 @@ +// use crate::{DBColumn, Error}; +// use async_trait::async_trait; +// use types::EthSpec; + +// #[async_trait] +// pub trait AsyncKeyValueStore: Send + Sync { +// async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error>; +// async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; +// // async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; +// // async fn sync(&self) -> Result<(), Error>; +// async fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result; +// // async fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error>; +// // async fn do_atomically(&self, batch: Vec) -> Result<(), Error>; +// } \ No newline at end of file diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index e405c6227d3..12a14b0f28c 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -2,17 +2,22 @@ use crate::database::leveldb_impl; #[cfg(feature = "redb")] use crate::database::redb_impl; +#[cfg(feature = "postgres")] +use crate::database::postgres_impl; use crate::{config::DatabaseBackend, KeyValueStoreOp, StoreConfig}; use crate::{metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValueStore}; use std::collections::HashSet; use std::path::Path; use types::EthSpec; +use tokio::runtime::Handle; pub enum BeaconNodeBackend { #[cfg(feature = "leveldb")] LevelDb(leveldb_impl::LevelDB), #[cfg(feature = "redb")] Redb(redb_impl::Redb), + #[cfg(feature = "postgres")] + PostgresDB(postgres_impl::PostgresDB) } impl ItemStore for BeaconNodeBackend {} @@ -24,6 +29,19 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::get_bytes(txn, column, key), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::get_bytes(txn, column, key), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => { + // Try to use the current Tokio runtime if available + match Handle::try_current() { + Ok(handle) => handle.block_on(txn.get_bytes(column, key)), + Err(_) => { + // Fallback to creating a new runtime if not in async context + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.get_bytes(column, key)) + } + } + } } } @@ -45,6 +63,16 @@ impl KeyValueStore for BeaconNodeBackend { value, txn.write_options(), ), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(ref txn) => { + match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.put_bytes(column, key, value)), + Err(_) => { + let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"); + rt.block_on(txn.put_bytes(column, key, value)) + } + } + } } } @@ -66,6 +94,10 @@ impl KeyValueStore for BeaconNodeBackend { value, txn.write_options_sync(), ), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("postgres put_bytes_sync called, which should not happen!"); + } } } @@ -75,6 +107,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::sync(txn), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::sync(txn), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -84,6 +120,17 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::key_exists(txn, column, key), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_exists(txn, column, key), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => { + match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.key_exists(column, key)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.key_exists(column, key)) + } + } + } } } @@ -93,6 +140,17 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::key_delete(txn, column, key), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_delete(txn, column, key), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => { + match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.key_delete(column, key)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.key_delete(column, key)) + } + } + } } } @@ -102,6 +160,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::do_atomically(txn, batch), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::do_atomically(txn, batch), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -111,6 +173,15 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::compact(txn), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.compact()), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.compact()) + } + }, } } @@ -128,6 +199,15 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::Redb(txn) => { redb_impl::Redb::iter_column_keys_from(txn, _column, from) } + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.iter_column_keys_from(column, start)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.iter_column_keys_from(column, start)) + } + }, } } @@ -137,6 +217,15 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::iter_column_keys(txn, column), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::iter_column_keys(txn, column), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.iter_column_keys(column)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.iter_column_keys(column)) + } + }, } } @@ -148,6 +237,15 @@ impl KeyValueStore for BeaconNodeBackend { } #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::iter_column_from(txn, column, from), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.iter_column_from(column, start)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.iter_column_from(column, start)) + } + }, } } @@ -157,6 +255,15 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::compact_column(txn, _column), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.compact_column(column)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.compact_column(column)) + } + }, } } @@ -166,6 +273,15 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::delete_batch(txn, col, ops), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_batch(txn, col, ops), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.delete_batch(column, keys)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.delete_batch(column, keys)) + } + }, } } @@ -179,6 +295,15 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::delete_if(txn, column, f), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_if(txn, column, f), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.delete_if(column, predicate)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.delete_if(column, predicate)) + } + }, } } } @@ -193,6 +318,43 @@ impl BeaconNodeBackend { } #[cfg(feature = "redb")] DatabaseBackend::Redb => redb_impl::Redb::open(path).map(BeaconNodeBackend::Redb), + #[cfg(feature = "postgres")] + DatabaseBackend::PostgresDB => { + let db_url = "postgres://postgres:admin@localhost:5432/postgres"; + let db_result = match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(postgres_impl::PostgresDB::open(db_url)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(postgres_impl::PostgresDB::open(db_url)) + } + }; + + let db = db_result.map_err(|e| Error::DBError { + message: format!("Failed to init PostgresDB: {:?}", e) + })?; + Ok(BeaconNodeBackend::PostgresDB(db)) + } + } + } + + #[cfg(feature = "postgres")] + pub async fn open_async(config: &StoreConfig, path: &Path) -> Result { + metrics::inc_counter_vec(&metrics::DISK_DB_TYPE, &[&config.backend.to_string()]); + match config.backend { + DatabaseBackend::PostgresDB => { + let db_url = "postgres://postgres:admin@localhost:5432/postgres"; + let db = postgres_impl::PostgresDB::open(db_url) + .await + .map_err(|e| Error::DBError { + message: format!("Failed to init PostgresDB: {:?}", e) + })?; + Ok(BeaconNodeBackend::PostgresDB(db)) + } + _ => { + // fallback to the sync version for other backends + Self::open(config, path) + } } } } diff --git a/beacon_node/store/src/database/postgres_impl.rs b/beacon_node/store/src/database/postgres_impl.rs new file mode 100644 index 00000000000..58b224180b0 --- /dev/null +++ b/beacon_node/store/src/database/postgres_impl.rs @@ -0,0 +1,223 @@ +use crate::{DBColumn, Error}; +use sqlx::{PgPool, postgres::PgPoolOptions, Row}; +use types::EthSpec; +use std::{marker::PhantomData, time::Duration}; + +#[derive(Clone)] +pub struct PostgresDB { + db: PgPool, + _phantom: PhantomData +} + +impl PostgresDB { + pub async fn open(database_url: &str) -> Result { + let db = PgPoolOptions::new() + .max_connections(100) + .acquire_timeout(Duration::from_secs(10)) + .connect(database_url) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + + println!("Using Postgres backend🎯!!!"); + + Ok(Self { + db, + _phantom: PhantomData + }) + } + + pub async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error> { + let table = get_table_name(column); + let key = key.to_vec(); + + let query = format!("SELECT value FROM {} WHERE key = $1", table); + let row = sqlx::query(&query) + .bind(key) + .fetch_optional(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + + Ok(row.map(|r| r.get::, _>("value"))) + } + + pub async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { + let table = get_table_name(column); + let key = key.to_vec(); + let value = value.to_vec(); + + let query = format!( + "INSERT INTO {} (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", + table + ); + + sqlx::query(&query) + .bind(key) + .bind(value) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + pub async fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result { + let table = get_table_name(column); + let key = key.to_vec(); + + let query = format!("SELECT EXISTS (SELECT 1 FROM {} WHERE key = $1)", table); + let exists: (bool, ) = sqlx::query_as(&query) + .bind(key) + .fetch_one(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + Ok(exists.0) + } + + pub async fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error> { + let table = get_table_name(column); + let key = key.to_vec(); + let query = format!("DELETE FROM {} WHERE key = $1", table); + + sqlx::query(&query) + .bind(key) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + // pub async fn do_atomically(){} + + pub async fn compact(&self) -> Result<(), Error> { + // No-op for Postgres, but we can run VACUUM FULL + sqlx::query("VACUUM FULL") + .execute(&self.db) + .await + .map(|_| ()) + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + pub async fn compact_column(&self, column: DBColumn) -> Result<(), Error> { + let table = get_table_name(column); + let query = format!("VACUUM FULL {}", table); + sqlx::query(&query) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + pub async fn iter_column_keys_from(&self, column: DBColumn, start: &[u8]) -> Result>, Error> { + let table = get_table_name(column); + let query = format!("SELECT key FROM {} WHERE key >= $1 ORDER BY key ASC", table); + let rows = sqlx::query(&query) + .bind(start.to_vec()) + .fetch_all(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + Ok(rows.into_iter().map(|r| r.get::, _>("key")).collect()) + } + + pub async fn iter_column_keys(&self, column: DBColumn) -> Result>, Error> { + let table = get_table_name(column); + let query = format!("SELECT key FROM {} ORDER BY key ASC", table); + let rows = sqlx::query(&query) + .fetch_all(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + Ok(rows.into_iter().map(|r| r.get::, _>("key")).collect()) + } + + pub async fn iter_column_from(&self, column: DBColumn, start: &[u8]) -> Result, Vec)>, Error> { + let table = get_table_name(column); + let query = format!("SELECT key, value FROM {} WHERE key >= $1 ORDER BY key ASC", table); + let rows = sqlx::query(&query) + .bind(start.to_vec()) + .fetch_all(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + Ok(rows.into_iter().map(|r| (r.get("key"), r.get("value"))).collect()) + } + + pub async fn delete_batch(&self, column: DBColumn, keys: &[Vec]) -> Result<(), Error> { + let table = get_table_name(column); + let mut tx = self.db.begin().await + .map_err(|e| Error::DBError { message: e.to_string() })?; + let query = format!("DELETE FROM {} WHERE key = $1", table); + for key in keys { + sqlx::query(&query) + .bind(key.clone()) + .execute(&mut *tx) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + } + tx.commit().await + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + pub async fn delete_if(&self, column: DBColumn, predicate: F) -> Result<(), Error> + where + F: Fn(&[u8], &[u8]) -> bool + { + let table = get_table_name(column); + let rows = sqlx::query(&format!("SELECT key, value FROM {}", table)) + .fetch_all(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + let mut tx = self.db.begin().await + .map_err(|e| Error::DBError { message: e.to_string() })?; + for row in rows { + let key: Vec = row.get("key"); + let value: Vec = row.get("value"); + if predicate(&key, &value) { + sqlx::query(&format!("DELETE FROM {} WHERE key = $1", table)) + .bind(key) + .execute(&mut *tx) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + } + } + tx.commit().await + .map_err(|e| Error::DBError { message: e.to_string() }) + } +} + + +pub fn get_table_name(column: DBColumn) -> &'static str { + match column { + DBColumn::BeaconMeta => "beacon_meta", + DBColumn::BeaconBlock => "beacon_block", + DBColumn::BeaconBlob => "beacon_blob", + DBColumn::BeaconDataColumn => "beacon_data_column", + DBColumn::BeaconState => "beacon_state", + DBColumn::BeaconStateHotDiff => "beacon_state_hot_diff", + DBColumn::BeaconStateHotSnapshot => "beacon_state_hot_snapshot", + DBColumn::BeaconStateSnapshot => "beacon_state_snapshot", + DBColumn::BeaconStateDiff => "beacon_state_diff", + DBColumn::BeaconStateSummary => "beacon_state_summary", + DBColumn::BeaconStateHotSummary => "beacon_state_hot_summary", + DBColumn::BeaconColdStateSummary => "beacon_cold_state_summary", + DBColumn::BeaconStateTemporary => "beacon_state_temporary", + DBColumn::ExecPayload => "executive_payload", + DBColumn::BeaconChain => "beacon_chain", + DBColumn::OpPool => "op_pool", + DBColumn::Eth1Cache => "eth1_cache", + DBColumn::ForkChoice => "fork_choice", + DBColumn::PubkeyCache => "pubkey_cache", + DBColumn::BeaconRestorePoint => "beacon_restore_point", + DBColumn::BeaconStateRoots => "beacon_state_roots", + DBColumn::BeaconStateRootsChunked => "beacon_state_roots_chunked", + DBColumn::BeaconBlockRoots => "beacon_block_roots", + DBColumn::BeaconBlockRootsChunked => "beacon_block_roots_chunked", + DBColumn::BeaconHistoricalRoots => "beacon_historical_roots", + DBColumn::BeaconRandaoMixes => "beacon_randao_mixes", + DBColumn::DhtEnrs => "dht_enrs", + DBColumn::CustodyContext => "custody_context", + DBColumn::OptimisticTransitionBlock => "optimistic_transition_block", + DBColumn::BeaconHistoricalSummaries => "beacon_historical_summaries", + DBColumn::OverflowLRUCache => "overflow_lru_cache", + DBColumn::LightClientUpdate => "light_client_update", + DBColumn::SyncCommitteeBranch => "sync_committe_branch", + DBColumn::SyncCommittee => "sync_committe", + DBColumn::Dummy => "dummy", + } +} \ No newline at end of file diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index e996b47b723..0bdd8248e67 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -492,7 +492,7 @@ mod tests { } } - fn test_impl(store: impl ItemStore) { + async fn test_impl(store: impl ItemStore) { let key = Hash256::random(); let item = StorableThing { a: 1, b: 42 }; @@ -512,11 +512,11 @@ mod tests { assert_eq!(store.get::(&key).unwrap(), None); } - #[test] - fn simplediskdb() { + #[tokio::test] + async fn simplediskdb() { let dir = tempdir().unwrap(); let path = dir.path(); - let store = BeaconNodeBackend::open(&StoreConfig::default(), path).unwrap(); + let store = BeaconNodeBackend::open_async(&StoreConfig::default(), path).await.unwrap(); test_impl(store); } diff --git a/beacon_node/store/src/migrations/20250626130431_init.sql b/beacon_node/store/src/migrations/20250626130431_init.sql new file mode 100644 index 00000000000..b32c801458f --- /dev/null +++ b/beacon_node/store/src/migrations/20250626130431_init.sql @@ -0,0 +1,181 @@ +-- create table if not exists store ( +-- col text not null, +-- key bytea not null, +-- value bytea not null, +-- primary key (col, key) +-- ); + +CREATE TABLE beacon_meta ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_block ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_blob ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_data_column ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_hot_diff ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_hot_snapshot ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_snapshot ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_diff ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_summary ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_hot_summary ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_cold_state_summary ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_temporary ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE executive_payload ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_chain ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE op_pool ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE eth1_cache ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE fork_choice ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE pubkey_cache ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_restore_point ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_roots ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_roots_chunked ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_block_roots ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_block_roots_chunked ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_historical_roots ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_randao_mixes ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE dht_enrs ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE custody_context ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE optimistic_transition_block ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_historical_summaries ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE overflow_lru_cache ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE light_client_update ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE sync_committe_branch ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE sync_committe ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE dummy ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); \ No newline at end of file diff --git a/beacon_node/tests/test.rs b/beacon_node/tests/test.rs deleted file mode 100644 index ab78b65ae93..00000000000 --- a/beacon_node/tests/test.rs +++ /dev/null @@ -1,60 +0,0 @@ -#![cfg(test)] - -use beacon_chain::StateSkipConfig; -use node_test_rig::{ - environment::{Environment, EnvironmentBuilder}, - eth2::types::StateId, - testing_client_config, LocalBeaconNode, -}; -use types::{EthSpec, MinimalEthSpec, Slot}; - -fn env_builder() -> EnvironmentBuilder { - EnvironmentBuilder::minimal() -} - -fn build_node(env: &mut Environment) -> LocalBeaconNode { - let context = env.core_context(); - env.runtime() - .block_on(LocalBeaconNode::production( - context, - testing_client_config(), - )) - .expect("should block until node created") -} - -#[test] -fn http_server_genesis_state() { - let mut env = env_builder() - .multi_threaded_tokio_runtime() - .expect("should start tokio runtime") - .build() - .expect("environment should build"); - - // build a runtime guard - - let node = build_node(&mut env); - - let remote_node = node.remote_node().expect("should produce remote node"); - - let api_state = env - .runtime() - .block_on(remote_node.get_debug_beacon_states(StateId::Slot(Slot::new(0)))) - .expect("should fetch state from http api") - .unwrap() - .into_data(); - - let mut db_state = node - .client - .beacon_chain() - .expect("client should have beacon chain") - .state_at_slot(Slot::new(0), StateSkipConfig::WithStateRoots) - .expect("should find state"); - db_state.drop_all_caches().unwrap(); - - assert_eq!( - api_state, db_state, - "genesis state from api should match that from the DB" - ); - - env.fire_signal(); -} diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 884e5eddeba..c53bc0238a9 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -2762,6 +2762,21 @@ fn beacon_node_backend_override() { }); } +#[test] +#[cfg(feature = "postgres")] +fn beacon_node_backend_postgres() { + CommandLineTest::new() + .flag("beacon-node-backend", Some("postgres")) + .flag("postgres-url", Some("postgres://user:pass@localhost/db")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.store.backend, BeaconNodeBackend::Postgres); + assert_eq!(config.store.postgres_config.as_ref().unwrap().url, + "postgres://user:pass@localhost/db" + ); + }); +} + #[test] fn block_publishing_delay_for_testing() { CommandLineTest::new()