From 3d07691a3b3d1b7dbbd38012e8f424b9a5accd32 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Tue, 1 Jul 2025 14:10:25 +0100 Subject: [PATCH 01/13] feat(store): Add PostgresDB backend with basic get/put and tests --- .gitignore | 3 + Cargo.lock | 262 ++++++++++++++++++ beacon_node/store/Cargo.toml | 5 + beacon_node/store/src/database.rs | 4 + .../store/src/database/async_interface.rs | 14 + .../store/src/database/postgres_impl.rs | 138 +++++++++ beacon_node/store/src/lib.rs | 1 + .../src/migrations/20250626130431_init.sql | 9 + beacon_node/store/tests/postgres_store.rs | 27 ++ 9 files changed, 463 insertions(+) create mode 100644 beacon_node/store/src/database/async_interface.rs create mode 100644 beacon_node/store/src/database/postgres_impl.rs create mode 100644 beacon_node/store/src/migrations/20250626130431_init.sql create mode 100644 beacon_node/store/tests/postgres_store.rs diff --git a/.gitignore b/.gitignore index e63e218a3bf..dbf24932ea3 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,6 @@ genesis.ssz # VSCode /.vscode + +# Environment variables +.env \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5e22c9742ad..ca515949aec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" dependencies = [ "cfg-if", + "getrandom 0.3.2", "once_cell", "version_check", "zerocopy", @@ -659,6 +660,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1871,6 +1881,21 @@ dependencies = [ "sha2 0.10.9", ] +[[package]] +name = "crc" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.4.2" @@ -1944,6 +1969,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2461,6 +2495,12 @@ dependencies = [ "validator_store", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -2575,6 +2615,9 @@ name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -2737,6 +2780,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "eth-keystore" version = "0.5.0" @@ -3576,6 +3630,17 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.12.3", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -3992,6 +4057,9 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "heck" @@ -5802,6 +5870,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "mdbx-sys" version = "0.11.6-4" @@ -8671,6 +8749,141 @@ dependencies = [ "der 0.7.10", ] +[[package]] +name = "sqlformat" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" +dependencies = [ + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9a2ccff1a000a5a59cd33da541d9f2fdcd9e6e8229cc200565942bff36d0aaa" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-postgres", +] + +[[package]] +name = "sqlx-core" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24ba59a9342a3d9bab6c56c118be528b27c9b60e490080e9711a04dccac83ef6" +dependencies = [ + "ahash", + "atoi", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener 2.5.3", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashlink 0.8.4", + "hex", + "indexmap 2.9.0", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "serde", + "serde_json", + "sha2 0.10.9", + "smallvec", + "sqlformat", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ea40e2345eb2faa9e1e5e326db8c34711317d2b5e08d0d5741619048a803127" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 1.0.109", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5833ef53aaa16d860e92123292f1f6a3d53c34ba8b1969f152ef1a7bb803f3c8" +dependencies = [ + "dotenvy", + "either", + "heck 0.4.1", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2 0.10.9", + "sqlx-core", + "sqlx-postgres", + "syn 1.0.109", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-postgres" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c824eb80b894f926f89a0b9da0c7f435d27cdd35b8c655b114e58223918577e" +dependencies = [ + "atoi", + "base64 0.21.7", + "bitflags 2.9.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac 0.12.1", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", + "sha2 0.10.9", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 1.0.69", + "tracing", + "whoami", +] + [[package]] name = "ssz_types" version = "0.10.1" @@ -8743,11 +8956,13 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" name = "store" version = "0.2.0" dependencies = [ + "async-trait", "beacon_chain", "bls", "criterion", "db-key", "directory", + "dotenvy", "ethereum_ssz", "ethereum_ssz_derive", "itertools 0.10.5", @@ -8761,10 +8976,12 @@ dependencies = [ "safe_arith", "serde", "smallvec", + "sqlx", "state_processing", "strum", "superstruct", "tempfile", + "tokio", "tracing", "tracing-subscriber", "types", @@ -8772,6 +8989,17 @@ dependencies = [ "zstd 0.13.3", ] +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.10.0" @@ -9684,6 +9912,12 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -9699,6 +9933,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -9711,6 +9951,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.1" @@ -10120,6 +10366,12 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -10287,6 +10539,16 @@ dependencies = [ "rustix 0.38.44", ] +[[package]] +name = "whoami" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" +dependencies = [ + "redox_syscall 0.5.12", + "wasite", +] + [[package]] name = "widestring" version = "0.4.3" diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 13df83efabb..ec735203eda 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -8,8 +8,10 @@ edition = { workspace = true } default = ["leveldb"] leveldb = ["dep:leveldb"] redb = ["dep:redb"] +postgres = ["dep:sqlx"] [dependencies] +async-trait = "0.1.88" bls = { workspace = true } db-key = "0.0.5" directory = { workspace = true } @@ -25,14 +27,17 @@ redb = { version = "2.1.3", optional = true } safe_arith = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } +sqlx = { version = "0.7.3", optional = true, default-features = false, features = ["postgres", "runtime-tokio", "macros"]} state_processing = { workspace = true } strum = { workspace = true } superstruct = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } types = { workspace = true } xdelta3 = { workspace = true } zstd = { workspace = true } +dotenvy = "0.15.7" [dev-dependencies] beacon_chain = { workspace = true } diff --git a/beacon_node/store/src/database.rs b/beacon_node/store/src/database.rs index 2232f73c5cc..2ac5df214f0 100644 --- a/beacon_node/store/src/database.rs +++ b/beacon_node/store/src/database.rs @@ -3,3 +3,7 @@ pub mod interface; pub mod leveldb_impl; #[cfg(feature = "redb")] pub mod redb_impl; +#[cfg(feature = "postgres")] +pub mod postgres_impl; + +pub mod async_interface; \ No newline at end of file diff --git a/beacon_node/store/src/database/async_interface.rs b/beacon_node/store/src/database/async_interface.rs new file mode 100644 index 00000000000..69829b7cac1 --- /dev/null +++ b/beacon_node/store/src/database/async_interface.rs @@ -0,0 +1,14 @@ +use crate::{DBColumn, Error}; +use async_trait::async_trait; +use types::EthSpec; + +#[async_trait] +pub trait AsyncKeyValueStore: Send + Sync { + async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error>; + async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; + // async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; + // async fn sync(&self) -> Result<(), Error>; + // async fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result; + // async fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error>; + // async fn do_atomically(&self, batch: Vec) -> Result<(), Error>; +} \ No newline at end of file diff --git a/beacon_node/store/src/database/postgres_impl.rs b/beacon_node/store/src/database/postgres_impl.rs new file mode 100644 index 00000000000..30e7a2c47a2 --- /dev/null +++ b/beacon_node/store/src/database/postgres_impl.rs @@ -0,0 +1,138 @@ +use crate::{DBColumn, Error, KeyValueStoreOp, AsyncKeyValueStore}; +use sqlx::{PgPool, postgres::PgPoolOptions}; +use types::EthSpec; +use std::marker::PhantomData; +use super::interface::WriteOptions; +use async_trait::async_trait; +use std::env; + +#[derive(Clone)] +pub struct PostgresDB { + db: PgPool, + _phantom: PhantomData +} + +impl PostgresDB { + pub async fn new(database_url: &str) -> Result { + let db = PgPoolOptions::new() + .max_connections(10) + .connect(database_url) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + + Ok(Self { + db, + _phantom: PhantomData + }) + } +} + +#[async_trait] +impl AsyncKeyValueStore for PostgresDB { + async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error> { + let col = column.as_str(); + let key = key.to_vec(); + + sqlx::query_scalar!( + r#"SELECT value FROM store WHERE col = $1 AND key = $2"#, + col, + key + ) + .fetch_optional(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { + let col = column.as_str(); + let key = key.to_vec(); + let value = value.to_vec(); + + sqlx::query!( + r#" + INSERT INTO store (col, key, value) + VALUES ($1, $2, $3) + ON CONFLICT (col, key) DO UPDATE SET value = EXCLUDED.value + "#, + col, + key, + value + ) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + // async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { + // self.put_bytes(column, key, value).await + // } + + // async fn sync(&self) -> Result<(), Error> { + // Ok(()) + // } + + // async fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result { + // let col = column.as_str(); + // let key = key.to_vec(); + + // let exists = sqlx::query_scalar!( + // r#"SELECT EXISTS(SELECT 1 FROM store WHERE col = $1 AND key = $2)"#, + // col, + // key + // ) + // .fetch_one(&self.db) + // .await + // .map_err(|e| Error::DBError { message: e.to_string() })?; + + // Ok(exists) + // } + + // async fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error> { + // let col = column.as_str(); + // let key = key.to_vec(); + + // sqlx::query!( + // r#"DELETE FROM store WHERE col = $1 AND key = $2"#, + // col, + // key + // ) + // .execute(&self.db) + // .await + // .map(|_| ()) + // .map_err(|e| Error::DBError { message: e.to_string() })?; + // } + + // async fn do_atomically(&self, batch: Vec) -> Result<(), Error> { + // let mut tx = self.db.begin().await.map_err(|e| Error::DBError { message: e.to_string() })?; + + // for op in batch { + // match op { + // KeyValueStoreOp::PutKeyValue(col, key, value) => { + // sqlx::query!( + // r#"INSERT INTO store (col, key, value) + // VALUES ($1, $2, $3) + // ON CONFLICT (col, key) DO UPDATE SET VALUE = EXCLUDED.value"#, + // col.as_str(), + // key, + // value + // ) + // .execute(&mut tx) + // .await + // .map_err(|e| Error::DBError { message: e.to_string() })?; + // } + // KeyValueStoreOp::DeleteKey(col, key) => { + // sqlx::query!( + // r#"DELETE FROM store WHERE col = $1 AND key = $2"#, + // col.as_str(), + // key + // ) + // .execute(&mut tx) + // .await + // .map_err(|e| Error::DBError { message: e.to_string() })?; + // } + // } + // } + // tx.commit().await.map_err(|e| Error::DBError { message: e.to_string() }) + // } +} \ No newline at end of file diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index e996b47b723..2e45e97f4e5 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -41,6 +41,7 @@ use std::collections::HashSet; use std::sync::Arc; use strum::{EnumIter, EnumString, IntoStaticStr}; pub use types::*; +pub use database::async_interface::AsyncKeyValueStore; const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8; diff --git a/beacon_node/store/src/migrations/20250626130431_init.sql b/beacon_node/store/src/migrations/20250626130431_init.sql new file mode 100644 index 00000000000..39b03b56496 --- /dev/null +++ b/beacon_node/store/src/migrations/20250626130431_init.sql @@ -0,0 +1,9 @@ +create table if not exists store ( + col text not null, + key bytea not null, + value bytea not null, + primary key (col, key) +); + +INSERT INTO store (col, key, value) +VALUES ('example', decode('aabbcc', 'hex'), decode('112233', 'hex')); \ No newline at end of file diff --git a/beacon_node/store/tests/postgres_store.rs b/beacon_node/store/tests/postgres_store.rs new file mode 100644 index 00000000000..d9e5fb1883b --- /dev/null +++ b/beacon_node/store/tests/postgres_store.rs @@ -0,0 +1,27 @@ +use store::database::postgres_impl::PostgresDB; +use store::{AsyncKeyValueStore, DBColumn}; +use types::MainnetEthSpec; + +#[tokio::test] +async fn test_postgres_store() { + dotenvy::dotenv().ok(); + + let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + + let db = PostgresDB::::new(&db_url) + .await + .expect("failed to connect"); + + let key = b"test_key"; + let value = b"test_value"; + + db.put_bytes(DBColumn::BeaconBlock, key, value) + .await + .expect("put failed"); + + let result = db.get_bytes(DBColumn::BeaconBlock, key) + .await + .expect("get failed"); + + assert_eq!(result, Some(value.to_vec())); +} \ No newline at end of file From a08e2379fb21885626d6fbab39abb5907cd74b25 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Mon, 7 Jul 2025 13:11:13 +0100 Subject: [PATCH 02/13] Add PostgresDB to DatabaseBackend --- beacon_node/store/src/config.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index c16573df5e4..e87c5ac029e 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -267,4 +267,6 @@ pub enum DatabaseBackend { LevelDb, #[cfg(feature = "redb")] Redb, + #[cfg(feature = "postgres")] + PostgresDB } From da21ccb8f6408c34fa9095eb92cb2defd9164888 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Mon, 7 Jul 2025 15:36:31 +0100 Subject: [PATCH 03/13] draft: wire PostgresDB into BeaconNodeBackend --- beacon_node/store/src/database/interface.rs | 28 +++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index e405c6227d3..3c9e9cd8e31 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -2,6 +2,8 @@ use crate::database::leveldb_impl; #[cfg(feature = "redb")] use crate::database::redb_impl; +#[cfg(feature = "postgres")] +use crate::database::postgres_impl; use crate::{config::DatabaseBackend, KeyValueStoreOp, StoreConfig}; use crate::{metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValueStore}; use std::collections::HashSet; @@ -13,6 +15,8 @@ pub enum BeaconNodeBackend { LevelDb(leveldb_impl::LevelDB), #[cfg(feature = "redb")] Redb(redb_impl::Redb), + #[cfg(feature = "postgres")] + PostgresDB(postgres_impl::PostgresDB) } impl ItemStore for BeaconNodeBackend {} @@ -24,6 +28,11 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::get_bytes(txn, column, key), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::get_bytes(txn, column, key), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(db) => { + let rt = tokio::runtime::Runtime::new().expect("failed to build tokio runtime"); + rt.block_on(db.get_bytes(column, key)) + } } } @@ -45,6 +54,11 @@ impl KeyValueStore for BeaconNodeBackend { value, txn.write_options(), ), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(db) => { + let rt = tokio::runtime::Runtime::new().expect("Failed to block tokio runtime"); + rt.block_on(db.put_bytes(column, key, value)) + } } } @@ -193,6 +207,20 @@ impl BeaconNodeBackend { } #[cfg(feature = "redb")] DatabaseBackend::Redb => redb_impl::Redb::open(path).map(BeaconNodeBackend::Redb), + #[cfg(feature = "postgres")] + DatabaseBackend::PostgresDB => { + // Read the DATABASE_URL from the "path", assuming it's a file containing the URL + + use std::fs; + + use tokio::fs; + let file_path = path.join("postgres_url.txt"); + let db_url = fs::read_to_string(&file_path).map_err(|e| Error::Confiq(format!("Failed to read postgres_url.txt: {}", e)))?; + let rt = tokio::runtime::Runtime::new().map_err(|e| Error::Config(format!("Failed to create tokio runtime: {}", e)))?; + let db = rt..block_on(PostgresDB::new(db_url.trim())).map_err(|e| Error::Config(format!("Failed to init PostgresDB: {:?}", e)))?; + + Ok(BeaconNodeBackend::PostgresDB(db)) + } } } } From 10ae3477147da4fc9b4a3783f45e7d360eee147f Mon Sep 17 00:00:00 2001 From: Owanikin Date: Mon, 14 Jul 2025 15:22:01 +0100 Subject: [PATCH 04/13] Implement PostgresDB backend integration in BeaconNode store --- beacon_node/src/cli.rs | 8 ++ beacon_node/src/config.rs | 5 ++ beacon_node/store/src/config.rs | 5 ++ beacon_node/store/src/database/interface.rs | 80 ++++++++++++++++--- .../store/src/database/postgres_impl.rs | 4 +- .../src/migrations/20250626130431_init.sql | 5 +- beacon_node/tests/test.rs | 60 -------------- 7 files changed, 89 insertions(+), 78 deletions(-) delete mode 100644 beacon_node/tests/test.rs diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index f3f9aa97a20..b1c8049d40c 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1630,6 +1630,14 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) + .arg( + Arg::new("postgres-url") + .long("postgres-url") + .value_name("URL") + .help("The Postgres connection URL used when `--beacon-node-backend=postgres`.") + .action(ArgAction::Set) + .display_order(0) + ) .arg( Arg::new("delay-block-publishing") .long("delay-block-publishing") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 3c6339c03e3..9ced812974b 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -459,6 +459,11 @@ pub fn get_config( client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs; } + #[cfg(feature = "postgres")] + if let Some(url) = cli_args.get_one::("postgres-url") { + client_config.store.postgres_url = Some(url.to_string()); + } + if let Some(malicious_withhold_count) = clap_utils::parse_optional(cli_args, "malicious-withhold-count")? { diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index e87c5ac029e..d088005446f 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -64,6 +64,9 @@ pub struct StoreConfig { /// The margin for blob pruning in epochs. The oldest blobs are pruned up until /// data_availability_boundary - blob_prune_margin_epochs. Default: 0. pub blob_prune_margin_epochs: u64, + /// Postgres database connection URL + #[cfg(feature = "postgres")] + pub postgres_url: Option, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -120,6 +123,8 @@ impl Default for StoreConfig { prune_blobs: true, epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE, blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS, + #[cfg(feature = "postgres")] + postgres_url: None, } } } diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index 3c9e9cd8e31..d52d57c6358 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -5,6 +5,7 @@ use crate::database::redb_impl; #[cfg(feature = "postgres")] use crate::database::postgres_impl; use crate::{config::DatabaseBackend, KeyValueStoreOp, StoreConfig}; +use crate::database::async_interface::AsyncKeyValueStore; use crate::{metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValueStore}; use std::collections::HashSet; use std::path::Path; @@ -29,9 +30,9 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::get_bytes(txn, column, key), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(db) => { + BeaconNodeBackend::PostgresDB(ref txn) => { let rt = tokio::runtime::Runtime::new().expect("failed to build tokio runtime"); - rt.block_on(db.get_bytes(column, key)) + rt.block_on(txn.get_bytes(column, key)) } } } @@ -55,7 +56,7 @@ impl KeyValueStore for BeaconNodeBackend { txn.write_options(), ), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(db) => { + BeaconNodeBackend::PostgresDB(ref db) => { let rt = tokio::runtime::Runtime::new().expect("Failed to block tokio runtime"); rt.block_on(db.put_bytes(column, key, value)) } @@ -80,6 +81,10 @@ impl KeyValueStore for BeaconNodeBackend { value, txn.write_options_sync(), ), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -89,6 +94,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::sync(txn), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::sync(txn), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -98,6 +107,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::key_exists(txn, column, key), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_exists(txn, column, key), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -107,6 +120,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::key_delete(txn, column, key), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_delete(txn, column, key), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -116,6 +133,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::do_atomically(txn, batch), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::do_atomically(txn, batch), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -125,6 +146,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::compact(txn), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -142,6 +167,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::Redb(txn) => { redb_impl::Redb::iter_column_keys_from(txn, _column, from) } + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -151,6 +180,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::iter_column_keys(txn, column), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::iter_column_keys(txn, column), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -162,6 +195,10 @@ impl KeyValueStore for BeaconNodeBackend { } #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::iter_column_from(txn, column, from), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -171,6 +208,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::compact_column(txn, _column), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -180,6 +221,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::delete_batch(txn, col, ops), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_batch(txn, col, ops), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } @@ -193,6 +238,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::delete_if(txn, column, f), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_if(txn, column, f), + #[cfg(feature = "postgres")] + BeaconNodeBackend::PostgresDB(_) => { + todo!("Implement PostgresDB logic"); + } } } } @@ -209,15 +258,24 @@ impl BeaconNodeBackend { DatabaseBackend::Redb => redb_impl::Redb::open(path).map(BeaconNodeBackend::Redb), #[cfg(feature = "postgres")] DatabaseBackend::PostgresDB => { - // Read the DATABASE_URL from the "path", assuming it's a file containing the URL - - use std::fs; + use crate::{database::postgres_impl::PostgresDB}; + + let db_url = config + .postgres_url + .as_ref() + .ok_or_else(|| Error::DBError { + message: "Missing Postgres URL".into(), + })?; - use tokio::fs; - let file_path = path.join("postgres_url.txt"); - let db_url = fs::read_to_string(&file_path).map_err(|e| Error::Confiq(format!("Failed to read postgres_url.txt: {}", e)))?; - let rt = tokio::runtime::Runtime::new().map_err(|e| Error::Config(format!("Failed to create tokio runtime: {}", e)))?; - let db = rt..block_on(PostgresDB::new(db_url.trim())).map_err(|e| Error::Config(format!("Failed to init PostgresDB: {:?}", e)))?; + // Create a Tokio runtime for sync context + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { + message: format!("Failed to create tokio runtime: {}", e), + })?; + let db = rt.block_on(PostgresDB::new(db_url)) + .map_err(|e| Error::DBError { + message: format!("Failed to init PostgresDB: {:?}", e), + })?; Ok(BeaconNodeBackend::PostgresDB(db)) } diff --git a/beacon_node/store/src/database/postgres_impl.rs b/beacon_node/store/src/database/postgres_impl.rs index 30e7a2c47a2..60bb8785375 100644 --- a/beacon_node/store/src/database/postgres_impl.rs +++ b/beacon_node/store/src/database/postgres_impl.rs @@ -1,10 +1,8 @@ -use crate::{DBColumn, Error, KeyValueStoreOp, AsyncKeyValueStore}; +use crate::{DBColumn, Error, AsyncKeyValueStore}; use sqlx::{PgPool, postgres::PgPoolOptions}; use types::EthSpec; use std::marker::PhantomData; -use super::interface::WriteOptions; use async_trait::async_trait; -use std::env; #[derive(Clone)] pub struct PostgresDB { diff --git a/beacon_node/store/src/migrations/20250626130431_init.sql b/beacon_node/store/src/migrations/20250626130431_init.sql index 39b03b56496..f6bcdecc44e 100644 --- a/beacon_node/store/src/migrations/20250626130431_init.sql +++ b/beacon_node/store/src/migrations/20250626130431_init.sql @@ -3,7 +3,4 @@ create table if not exists store ( key bytea not null, value bytea not null, primary key (col, key) -); - -INSERT INTO store (col, key, value) -VALUES ('example', decode('aabbcc', 'hex'), decode('112233', 'hex')); \ No newline at end of file +); \ No newline at end of file diff --git a/beacon_node/tests/test.rs b/beacon_node/tests/test.rs deleted file mode 100644 index ab78b65ae93..00000000000 --- a/beacon_node/tests/test.rs +++ /dev/null @@ -1,60 +0,0 @@ -#![cfg(test)] - -use beacon_chain::StateSkipConfig; -use node_test_rig::{ - environment::{Environment, EnvironmentBuilder}, - eth2::types::StateId, - testing_client_config, LocalBeaconNode, -}; -use types::{EthSpec, MinimalEthSpec, Slot}; - -fn env_builder() -> EnvironmentBuilder { - EnvironmentBuilder::minimal() -} - -fn build_node(env: &mut Environment) -> LocalBeaconNode { - let context = env.core_context(); - env.runtime() - .block_on(LocalBeaconNode::production( - context, - testing_client_config(), - )) - .expect("should block until node created") -} - -#[test] -fn http_server_genesis_state() { - let mut env = env_builder() - .multi_threaded_tokio_runtime() - .expect("should start tokio runtime") - .build() - .expect("environment should build"); - - // build a runtime guard - - let node = build_node(&mut env); - - let remote_node = node.remote_node().expect("should produce remote node"); - - let api_state = env - .runtime() - .block_on(remote_node.get_debug_beacon_states(StateId::Slot(Slot::new(0)))) - .expect("should fetch state from http api") - .unwrap() - .into_data(); - - let mut db_state = node - .client - .beacon_chain() - .expect("client should have beacon chain") - .state_at_slot(Slot::new(0), StateSkipConfig::WithStateRoots) - .expect("should find state"); - db_state.drop_all_caches().unwrap(); - - assert_eq!( - api_state, db_state, - "genesis state from api should match that from the DB" - ); - - env.fire_signal(); -} From e445094599abd8b8c0a69a10e6b5567fbe9ebfa1 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Thu, 17 Jul 2025 14:26:09 +0100 Subject: [PATCH 05/13] remove dotenvy and store test --- beacon_node/store/Cargo.toml | 1 - beacon_node/store/tests/postgres_store.rs | 27 ----------------------- 2 files changed, 28 deletions(-) delete mode 100644 beacon_node/store/tests/postgres_store.rs diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index ec735203eda..11ece73022c 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -37,7 +37,6 @@ tracing-subscriber = { workspace = true } types = { workspace = true } xdelta3 = { workspace = true } zstd = { workspace = true } -dotenvy = "0.15.7" [dev-dependencies] beacon_chain = { workspace = true } diff --git a/beacon_node/store/tests/postgres_store.rs b/beacon_node/store/tests/postgres_store.rs deleted file mode 100644 index d9e5fb1883b..00000000000 --- a/beacon_node/store/tests/postgres_store.rs +++ /dev/null @@ -1,27 +0,0 @@ -use store::database::postgres_impl::PostgresDB; -use store::{AsyncKeyValueStore, DBColumn}; -use types::MainnetEthSpec; - -#[tokio::test] -async fn test_postgres_store() { - dotenvy::dotenv().ok(); - - let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - - let db = PostgresDB::::new(&db_url) - .await - .expect("failed to connect"); - - let key = b"test_key"; - let value = b"test_value"; - - db.put_bytes(DBColumn::BeaconBlock, key, value) - .await - .expect("put failed"); - - let result = db.get_bytes(DBColumn::BeaconBlock, key) - .await - .expect("get failed"); - - assert_eq!(result, Some(value.to_vec())); -} \ No newline at end of file From 05c779b3a2fee6083adafe9d924c1dbb59441a07 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Thu, 17 Jul 2025 14:35:09 +0100 Subject: [PATCH 06/13] WIP: prep for rebase --- Cargo.lock | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index bade56709a2..00c2c47ca1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9090,7 +9090,6 @@ dependencies = [ "criterion", "db-key", "directory", - "dotenvy", "ethereum_ssz", "ethereum_ssz_derive", "itertools 0.10.5", From e3531ec2efb664770dff15a5ecff1bf7a54eda01 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Mon, 21 Jul 2025 13:51:56 +0100 Subject: [PATCH 07/13] feat(store): add working PostgresDB backend behind feature flag --- Cargo.lock | 25 ++++++++----------------- beacon_node/Cargo.toml | 1 + lighthouse/tests/beacon_node.rs | 15 +++++++++++++++ 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00c2c47ca1c..0d494ae2ec6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,7 +118,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", - "getrandom 0.3.2", + "getrandom 0.2.15", "once_cell", "version_check", "zerocopy 0.7.35", @@ -660,9 +660,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.87" +version = "0.1.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d556ec1359574147ec0c4fc5eb525f3f23263a592b1a9c07e0a75b427de55c97" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" dependencies = [ "proc-macro2", "quote", @@ -1913,15 +1913,6 @@ dependencies = [ "sha2 0.10.8", ] -[[package]] -name = "crc" -version = "3.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" -dependencies = [ - "crc-catalog", -] - [[package]] name = "crc" version = "3.3.0" @@ -8919,7 +8910,7 @@ dependencies = [ "futures-util", "hashlink 0.8.4", "hex", - "indexmap 2.9.0", + "indexmap 2.8.0", "log", "memchr", "once_cell", @@ -8927,7 +8918,7 @@ dependencies = [ "percent-encoding", "serde", "serde_json", - "sha2 0.10.9", + "sha2 0.10.8", "smallvec", "sqlformat", "thiserror 1.0.69", @@ -8965,7 +8956,7 @@ dependencies = [ "quote", "serde", "serde_json", - "sha2 0.10.9", + "sha2 0.10.8", "sqlx-core", "sqlx-postgres", "syn 1.0.109", @@ -9003,7 +8994,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.9", + "sha2 0.10.8", "smallvec", "sqlx-core", "stringprep", @@ -10673,7 +10664,7 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" dependencies = [ - "redox_syscall 0.5.12", + "redox_syscall 0.5.10", "wasite", ] diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 456376e79bd..30d9e13e2b2 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -15,6 +15,7 @@ path = "src/lib.rs" write_ssz_files = [ "beacon_chain/write_ssz_files", ] # Writes debugging .ssz files to /tmp during block processing. +postgres = ["store/postgres"] [dependencies] account_utils = { workspace = true } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 884e5eddeba..c53bc0238a9 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -2762,6 +2762,21 @@ fn beacon_node_backend_override() { }); } +#[test] +#[cfg(feature = "postgres")] +fn beacon_node_backend_postgres() { + CommandLineTest::new() + .flag("beacon-node-backend", Some("postgres")) + .flag("postgres-url", Some("postgres://user:pass@localhost/db")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.store.backend, BeaconNodeBackend::Postgres); + assert_eq!(config.store.postgres_config.as_ref().unwrap().url, + "postgres://user:pass@localhost/db" + ); + }); +} + #[test] fn block_publishing_delay_for_testing() { CommandLineTest::new() From 6e1cf1e513ed11e270af4f087ea3c8f8a1a7859e Mon Sep 17 00:00:00 2001 From: Owanikin Date: Tue, 22 Jul 2025 13:55:44 +0100 Subject: [PATCH 08/13] redesign postgres schema --- .../src/migrations/20250626130431_init.sql | 135 +++++++++++++++++- 1 file changed, 130 insertions(+), 5 deletions(-) diff --git a/beacon_node/store/src/migrations/20250626130431_init.sql b/beacon_node/store/src/migrations/20250626130431_init.sql index f6bcdecc44e..cfc46d89164 100644 --- a/beacon_node/store/src/migrations/20250626130431_init.sql +++ b/beacon_node/store/src/migrations/20250626130431_init.sql @@ -1,6 +1,131 @@ -create table if not exists store ( - col text not null, - key bytea not null, - value bytea not null, - primary key (col, key) +-- create table if not exists store ( +-- col text not null, +-- key bytea not null, +-- value bytea not null, +-- primary key (col, key) +-- ); + +CREATE TABLE beacon_meta ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_block ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_blob ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_data_column ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_hot_diff ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_hot_snapshot ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_snapshot ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_diff ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_hot_summary ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_cold_state_summary ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE executive_payload ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_chain ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE op_pool ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE fork_choice ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE pubkey_cache ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_state_roots ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_block_roots ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_randao_mixes ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE dht_enrs ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE custody_context ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE overflow_lru_cache ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE light_client_update ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE sync_committe_branch ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE sync_committe ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE dummy ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL ); \ No newline at end of file From d8fdf2d5e78d10bdf94a01ccb9cc6f1e883ed049 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Mon, 28 Jul 2025 12:15:50 +0100 Subject: [PATCH 09/13] refactor to use tokio block_ons --- beacon_node/store/src/database/interface.rs | 29 +++++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index d52d57c6358..5a74d99776d 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -10,6 +10,7 @@ use crate::{metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, use std::collections::HashSet; use std::path::Path; use types::EthSpec; +use tokio::runtime::Handle; pub enum BeaconNodeBackend { #[cfg(feature = "leveldb")] @@ -31,8 +32,19 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::Redb(txn) => redb_impl::Redb::get_bytes(txn, column, key), #[cfg(feature = "postgres")] BeaconNodeBackend::PostgresDB(ref txn) => { - let rt = tokio::runtime::Runtime::new().expect("failed to build tokio runtime"); - rt.block_on(txn.get_bytes(column, key)) + // let rt = tokio::runtime::Runtime::new().expect("failed to build tokio runtime"); + // rt.block_on(txn.get_bytes(column, key)) + + // Try to use the current Tokio runtime if available + match Handle::try_current() { + Ok(handle) => handle.block_on(txn.get_bytes(column, key)), + Err(_) => { + // Fallback to creating a new runtime if not in async context + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.get_bytes(column, key)) + } + } } } } @@ -56,9 +68,16 @@ impl KeyValueStore for BeaconNodeBackend { txn.write_options(), ), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(ref db) => { - let rt = tokio::runtime::Runtime::new().expect("Failed to block tokio runtime"); - rt.block_on(db.put_bytes(column, key, value)) + BeaconNodeBackend::PostgresDB(ref txn) => { + // let rt = tokio::runtime::Runtime::new().expect("Failed to block tokio runtime"); + // rt.block_on(db.put_bytes(column, key, value)) + match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.put_bytes(column, key, value)), + Err(_) => { + let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"); + rt.block_on(txn.put_bytes(column, key, value)) + } + } } } } From b77ca6dc8c71271b0ab1042145ba87d6868cbeb9 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Mon, 28 Jul 2025 15:16:37 +0100 Subject: [PATCH 10/13] refactor postgresdb to use seperate tables per dbcolumn --- .../store/src/database/postgres_impl.rs | 158 +++++++----------- .../src/migrations/20250626130431_init.sql | 50 ++++++ 2 files changed, 112 insertions(+), 96 deletions(-) diff --git a/beacon_node/store/src/database/postgres_impl.rs b/beacon_node/store/src/database/postgres_impl.rs index 60bb8785375..2395fd62e54 100644 --- a/beacon_node/store/src/database/postgres_impl.rs +++ b/beacon_node/store/src/database/postgres_impl.rs @@ -1,5 +1,5 @@ use crate::{DBColumn, Error, AsyncKeyValueStore}; -use sqlx::{PgPool, postgres::PgPoolOptions}; +use sqlx::{PgPool, postgres::PgPoolOptions, Row}; use types::EthSpec; use std::marker::PhantomData; use async_trait::async_trait; @@ -28,109 +28,75 @@ impl PostgresDB { #[async_trait] impl AsyncKeyValueStore for PostgresDB { async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error> { - let col = column.as_str(); + let table = get_table_name(column); let key = key.to_vec(); - sqlx::query_scalar!( - r#"SELECT value FROM store WHERE col = $1 AND key = $2"#, - col, - key - ) - .fetch_optional(&self.db) - .await - .map_err(|e| Error::DBError { message: e.to_string() }) + let query = format!("SELECT value FROM {} WHERE key = $1", table); + let row = sqlx::query(&query) + .bind(key) + .fetch_optional(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + + Ok(row.map(|r| r.get::, _>("value"))) } async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { - let col = column.as_str(); + let table = get_table_name(column); let key = key.to_vec(); let value = value.to_vec(); - - sqlx::query!( - r#" - INSERT INTO store (col, key, value) - VALUES ($1, $2, $3) - ON CONFLICT (col, key) DO UPDATE SET value = EXCLUDED.value - "#, - col, - key, - value - ) - .execute(&self.db) - .await - .map(|_| ()) - .map_err(|e| Error::DBError { message: e.to_string() }) - } - - // async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { - // self.put_bytes(column, key, value).await - // } - - // async fn sync(&self) -> Result<(), Error> { - // Ok(()) - // } - - // async fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result { - // let col = column.as_str(); - // let key = key.to_vec(); - // let exists = sqlx::query_scalar!( - // r#"SELECT EXISTS(SELECT 1 FROM store WHERE col = $1 AND key = $2)"#, - // col, - // key - // ) - // .fetch_one(&self.db) - // .await - // .map_err(|e| Error::DBError { message: e.to_string() })?; + let query = format!( + "INSERT INTO {} (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", + table + ); - // Ok(exists) - // } - - // async fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error> { - // let col = column.as_str(); - // let key = key.to_vec(); - - // sqlx::query!( - // r#"DELETE FROM store WHERE col = $1 AND key = $2"#, - // col, - // key - // ) - // .execute(&self.db) - // .await - // .map(|_| ()) - // .map_err(|e| Error::DBError { message: e.to_string() })?; - // } - - // async fn do_atomically(&self, batch: Vec) -> Result<(), Error> { - // let mut tx = self.db.begin().await.map_err(|e| Error::DBError { message: e.to_string() })?; + sqlx::query(&query) + .bind(key) + .bind(value) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(|e| Error::DBError { message: e.to_string() }) + } +} - // for op in batch { - // match op { - // KeyValueStoreOp::PutKeyValue(col, key, value) => { - // sqlx::query!( - // r#"INSERT INTO store (col, key, value) - // VALUES ($1, $2, $3) - // ON CONFLICT (col, key) DO UPDATE SET VALUE = EXCLUDED.value"#, - // col.as_str(), - // key, - // value - // ) - // .execute(&mut tx) - // .await - // .map_err(|e| Error::DBError { message: e.to_string() })?; - // } - // KeyValueStoreOp::DeleteKey(col, key) => { - // sqlx::query!( - // r#"DELETE FROM store WHERE col = $1 AND key = $2"#, - // col.as_str(), - // key - // ) - // .execute(&mut tx) - // .await - // .map_err(|e| Error::DBError { message: e.to_string() })?; - // } - // } - // } - // tx.commit().await.map_err(|e| Error::DBError { message: e.to_string() }) - // } +pub fn get_table_name(column: DBColumn) -> &'static str { + match column { + DBColumn::BeaconMeta => "beacon_meta", + DBColumn::BeaconBlock => "beacon_block", + DBColumn::BeaconBlob => "beacon_blob", + DBColumn::BeaconDataColumn => "beacon_data_column", + DBColumn::BeaconState => "beacon_state", + DBColumn::BeaconStateHotDiff => "beacon_state_hot_diff", + DBColumn::BeaconStateHotSnapshot => "beacon_state_hot_snapshot", + DBColumn::BeaconStateSnapshot => "beacon_state_snapshot", + DBColumn::BeaconStateDiff => "beacon_state_diff", + DBColumn::BeaconStateSummary => "beacon_state_summary", + DBColumn::BeaconStateHotSummary => "beacon_state_hot_summary", + DBColumn::BeaconColdStateSummary => "beacon_cold_state_summary", + DBColumn::BeaconStateTemporary => "beacon_state_temporary", + DBColumn::ExecPayload => "executive_payload", + DBColumn::BeaconChain => "beacon_chain", + DBColumn::OpPool => "op_pool", + DBColumn::Eth1Cache => "eth1_cache", + DBColumn::ForkChoice => "fork_choice", + DBColumn::PubkeyCache => "pubkey_cache", + DBColumn::BeaconRestorePoint => "beacon_restore_point", + DBColumn::BeaconStateRoots => "beacon_state_roots", + DBColumn::BeaconStateRootsChunked => "beacon_state_roots_chunked", + DBColumn::BeaconBlockRoots => "beacon_block_roots", + DBColumn::BeaconBlockRootsChunked => "beacon_block_roots_chunked", + DBColumn::BeaconHistoricalRoots => "beacon_historical_roots", + DBColumn::BeaconRandaoMixes => "beacon_randao_mixes", + DBColumn::DhtEnrs => "dht_enrs", + DBColumn::CustodyContext => "custody_context", + DBColumn::OptimisticTransitionBlock => "optimistic_transition_block", + DBColumn::BeaconHistoricalSummaries => "beacon_historical_summaries", + DBColumn::OverflowLRUCache => "overflow_lru_cache", + DBColumn::LightClientUpdate => "light_client_update", + DBColumn::SyncCommitteeBranch => "sync_committe_branch", + DBColumn::SyncCommittee => "sync_committe", + DBColumn::Dummy => "dummy", + } } \ No newline at end of file diff --git a/beacon_node/store/src/migrations/20250626130431_init.sql b/beacon_node/store/src/migrations/20250626130431_init.sql index cfc46d89164..b32c801458f 100644 --- a/beacon_node/store/src/migrations/20250626130431_init.sql +++ b/beacon_node/store/src/migrations/20250626130431_init.sql @@ -25,6 +25,11 @@ CREATE TABLE beacon_data_column ( value BYTEA NOT NULL ); +CREATE TABLE beacon_state ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + CREATE TABLE beacon_state_hot_diff ( key BYTEA PRIMARY KEY, value BYTEA NOT NULL @@ -45,6 +50,11 @@ CREATE TABLE beacon_state_diff ( value BYTEA NOT NULL ); +CREATE TABLE beacon_state_summary ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + CREATE TABLE beacon_state_hot_summary ( key BYTEA PRIMARY KEY, value BYTEA NOT NULL @@ -55,6 +65,11 @@ CREATE TABLE beacon_cold_state_summary ( value BYTEA NOT NULL ); +CREATE TABLE beacon_state_temporary ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + CREATE TABLE executive_payload ( key BYTEA PRIMARY KEY, value BYTEA NOT NULL @@ -70,6 +85,11 @@ CREATE TABLE op_pool ( value BYTEA NOT NULL ); +CREATE TABLE eth1_cache ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + CREATE TABLE fork_choice ( key BYTEA PRIMARY KEY, value BYTEA NOT NULL @@ -80,16 +100,36 @@ CREATE TABLE pubkey_cache ( value BYTEA NOT NULL ); +CREATE TABLE beacon_restore_point ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + CREATE TABLE beacon_state_roots ( key BYTEA PRIMARY KEY, value BYTEA NOT NULL ); +CREATE TABLE beacon_state_roots_chunked ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + CREATE TABLE beacon_block_roots ( key BYTEA PRIMARY KEY, value BYTEA NOT NULL ); +CREATE TABLE beacon_block_roots_chunked ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_historical_roots ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + CREATE TABLE beacon_randao_mixes ( key BYTEA PRIMARY KEY, value BYTEA NOT NULL @@ -105,6 +145,16 @@ CREATE TABLE custody_context ( value BYTEA NOT NULL ); +CREATE TABLE optimistic_transition_block ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + +CREATE TABLE beacon_historical_summaries ( + key BYTEA PRIMARY KEY, + value BYTEA NOT NULL +); + CREATE TABLE overflow_lru_cache ( key BYTEA PRIMARY KEY, value BYTEA NOT NULL From f987258916547b317b0cdf6ed43f69d04c293f92 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Fri, 1 Aug 2025 11:16:27 +0100 Subject: [PATCH 11/13] impl: put_bytes_sync --- beacon_node/store/src/database/async_interface.rs | 2 +- beacon_node/store/src/database/postgres_impl.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/beacon_node/store/src/database/async_interface.rs b/beacon_node/store/src/database/async_interface.rs index 69829b7cac1..57f0ff1a4f9 100644 --- a/beacon_node/store/src/database/async_interface.rs +++ b/beacon_node/store/src/database/async_interface.rs @@ -6,7 +6,7 @@ use types::EthSpec; pub trait AsyncKeyValueStore: Send + Sync { async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error>; async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; - // async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; + async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; // async fn sync(&self) -> Result<(), Error>; // async fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result; // async fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error>; diff --git a/beacon_node/store/src/database/postgres_impl.rs b/beacon_node/store/src/database/postgres_impl.rs index 2395fd62e54..2c299035dee 100644 --- a/beacon_node/store/src/database/postgres_impl.rs +++ b/beacon_node/store/src/database/postgres_impl.rs @@ -59,6 +59,10 @@ impl AsyncKeyValueStore for PostgresDB { .map(|_| ()) .map_err(|e| Error::DBError { message: e.to_string() }) } + + async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { + self.put_bytes(column, key, value).await + } } pub fn get_table_name(column: DBColumn) -> &'static str { From a6a78607c1f512c37f0fa4c54a9634dcf9366e21 Mon Sep 17 00:00:00 2001 From: Owanikin Date: Tue, 12 Aug 2025 11:28:24 +0100 Subject: [PATCH 12/13] refactor: remove redundant , implement the rest of the methods minimally for test to pass --- beacon_node/src/config.rs | 5 - beacon_node/store/Cargo.toml | 2 +- beacon_node/store/src/config.rs | 7 +- .../store/src/database/async_interface.rs | 26 ++-- beacon_node/store/src/database/interface.rs | 144 ++++++++++++------ .../store/src/database/postgres_impl.rs | 143 +++++++++++++++-- beacon_node/store/src/lib.rs | 9 +- 7 files changed, 245 insertions(+), 91 deletions(-) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index e14be007df0..f55b91d58c3 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -459,11 +459,6 @@ pub fn get_config( client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs; } - #[cfg(feature = "postgres")] - if let Some(url) = cli_args.get_one::("postgres-url") { - client_config.store.postgres_url = Some(url.to_string()); - } - if let Some(malicious_withhold_count) = clap_utils::parse_optional(cli_args, "malicious-withhold-count")? { diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 11ece73022c..6a8ed083d3b 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Paul Hauner "] edition = { workspace = true } [features] -default = ["leveldb"] +default = ["postgres"] leveldb = ["dep:leveldb"] redb = ["dep:redb"] postgres = ["dep:sqlx"] diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index d088005446f..3952cc53fef 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -11,6 +11,8 @@ use types::non_zero_usize::new_non_zero_usize; use types::EthSpec; use zstd::Encoder; +#[cfg(all(feature = "postgres", not(feature = "redb"), not(feature = "leveldb")))] +pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::PostgresDB; #[cfg(all(feature = "redb", not(feature = "leveldb")))] pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Redb; #[cfg(feature = "leveldb")] @@ -64,9 +66,6 @@ pub struct StoreConfig { /// The margin for blob pruning in epochs. The oldest blobs are pruned up until /// data_availability_boundary - blob_prune_margin_epochs. Default: 0. pub blob_prune_margin_epochs: u64, - /// Postgres database connection URL - #[cfg(feature = "postgres")] - pub postgres_url: Option, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -123,8 +122,6 @@ impl Default for StoreConfig { prune_blobs: true, epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE, blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS, - #[cfg(feature = "postgres")] - postgres_url: None, } } } diff --git a/beacon_node/store/src/database/async_interface.rs b/beacon_node/store/src/database/async_interface.rs index 57f0ff1a4f9..bcadbdb389d 100644 --- a/beacon_node/store/src/database/async_interface.rs +++ b/beacon_node/store/src/database/async_interface.rs @@ -1,14 +1,14 @@ -use crate::{DBColumn, Error}; -use async_trait::async_trait; -use types::EthSpec; +// use crate::{DBColumn, Error}; +// use async_trait::async_trait; +// use types::EthSpec; -#[async_trait] -pub trait AsyncKeyValueStore: Send + Sync { - async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error>; - async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; - async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; - // async fn sync(&self) -> Result<(), Error>; - // async fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result; - // async fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error>; - // async fn do_atomically(&self, batch: Vec) -> Result<(), Error>; -} \ No newline at end of file +// #[async_trait] +// pub trait AsyncKeyValueStore: Send + Sync { +// async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error>; +// async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; +// // async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error>; +// // async fn sync(&self) -> Result<(), Error>; +// async fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result; +// // async fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error>; +// // async fn do_atomically(&self, batch: Vec) -> Result<(), Error>; +// } \ No newline at end of file diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index 5a74d99776d..6a61db398e4 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -5,7 +5,6 @@ use crate::database::redb_impl; #[cfg(feature = "postgres")] use crate::database::postgres_impl; use crate::{config::DatabaseBackend, KeyValueStoreOp, StoreConfig}; -use crate::database::async_interface::AsyncKeyValueStore; use crate::{metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValueStore}; use std::collections::HashSet; use std::path::Path; @@ -31,10 +30,7 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::get_bytes(txn, column, key), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(ref txn) => { - // let rt = tokio::runtime::Runtime::new().expect("failed to build tokio runtime"); - // rt.block_on(txn.get_bytes(column, key)) - + BeaconNodeBackend::PostgresDB(txn) => { // Try to use the current Tokio runtime if available match Handle::try_current() { Ok(handle) => handle.block_on(txn.get_bytes(column, key)), @@ -69,8 +65,6 @@ impl KeyValueStore for BeaconNodeBackend { ), #[cfg(feature = "postgres")] BeaconNodeBackend::PostgresDB(ref txn) => { - // let rt = tokio::runtime::Runtime::new().expect("Failed to block tokio runtime"); - // rt.block_on(db.put_bytes(column, key, value)) match tokio::runtime::Handle::try_current() { Ok(handle) => handle.block_on(txn.put_bytes(column, key, value)), Err(_) => { @@ -102,7 +96,7 @@ impl KeyValueStore for BeaconNodeBackend { ), #[cfg(feature = "postgres")] BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); + todo!("postgres put_bytes_sync called, which should not happen!"); } } } @@ -127,8 +121,15 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_exists(txn, column, key), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); + BeaconNodeBackend::PostgresDB(txn) => { + match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.key_exists(column, key)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.key_exists(column, key)) + } + } } } } @@ -140,8 +141,15 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_delete(txn, column, key), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); + BeaconNodeBackend::PostgresDB(txn) => { + match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.key_delete(column, key)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.key_delete(column, key)) + } + } } } } @@ -166,9 +174,14 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); - } + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.compact()), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.compact()) + } + }, } } @@ -187,9 +200,14 @@ impl KeyValueStore for BeaconNodeBackend { redb_impl::Redb::iter_column_keys_from(txn, _column, from) } #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); - } + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.iter_column_keys_from(column, start)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.iter_column_keys_from(column, start)) + } + }, } } @@ -200,9 +218,14 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::iter_column_keys(txn, column), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); - } + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.iter_column_keys(column)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.iter_column_keys(column)) + } + }, } } @@ -215,9 +238,14 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::iter_column_from(txn, column, from), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); - } + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.iter_column_from(column, start)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.iter_column_from(column, start)) + } + }, } } @@ -228,9 +256,14 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); - } + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.compact_column(column)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.compact_column(column)) + } + }, } } @@ -241,9 +274,14 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_batch(txn, col, ops), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); - } + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.delete_batch(column, keys)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.delete_batch(column, keys)) + } + }, } } @@ -258,9 +296,14 @@ impl KeyValueStore for BeaconNodeBackend { #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_if(txn, column, f), #[cfg(feature = "postgres")] - BeaconNodeBackend::PostgresDB(_) => { - todo!("Implement PostgresDB logic"); - } + BeaconNodeBackend::PostgresDB(txn) => match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(txn.delete_if(column, predicate)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(txn.delete_if(column, predicate)) + } + }, } } } @@ -277,27 +320,30 @@ impl BeaconNodeBackend { DatabaseBackend::Redb => redb_impl::Redb::open(path).map(BeaconNodeBackend::Redb), #[cfg(feature = "postgres")] DatabaseBackend::PostgresDB => { - use crate::{database::postgres_impl::PostgresDB}; - - let db_url = config - .postgres_url - .as_ref() - .ok_or_else(|| Error::DBError { - message: "Missing Postgres URL".into(), - })?; + Err(Error::DBError { + message: "PostgresDB requires async initialization> Use open_async()".into(), + }) + } + } + } - // Create a Tokio runtime for sync context - let rt = tokio::runtime::Runtime::new() - .map_err(|e| Error::DBError { - message: format!("Failed to create tokio runtime: {}", e), - })?; - let db = rt.block_on(PostgresDB::new(db_url)) + #[cfg(feature = "postgres")] + pub async fn open_async(config: &StoreConfig, path: &Path) -> Result { + metrics::inc_counter_vec(&metrics::DISK_DB_TYPE, &[&config.backend.to_string()]); + match config.backend { + DatabaseBackend::PostgresDB => { + let db_url = "postgres://postgres:admin@localhost:5432/postgres"; + let db = postgres_impl::PostgresDB::open(db_url) + .await .map_err(|e| Error::DBError { - message: format!("Failed to init PostgresDB: {:?}", e), + message: format!("Failed to init PostgresDB: {:?}", e) })?; - Ok(BeaconNodeBackend::PostgresDB(db)) } + _ => { + // fallback to the sync version for other backends + Self::open(config, path) + } } } } diff --git a/beacon_node/store/src/database/postgres_impl.rs b/beacon_node/store/src/database/postgres_impl.rs index 2c299035dee..58b224180b0 100644 --- a/beacon_node/store/src/database/postgres_impl.rs +++ b/beacon_node/store/src/database/postgres_impl.rs @@ -1,8 +1,7 @@ -use crate::{DBColumn, Error, AsyncKeyValueStore}; +use crate::{DBColumn, Error}; use sqlx::{PgPool, postgres::PgPoolOptions, Row}; use types::EthSpec; -use std::marker::PhantomData; -use async_trait::async_trait; +use std::{marker::PhantomData, time::Duration}; #[derive(Clone)] pub struct PostgresDB { @@ -11,23 +10,23 @@ pub struct PostgresDB { } impl PostgresDB { - pub async fn new(database_url: &str) -> Result { + pub async fn open(database_url: &str) -> Result { let db = PgPoolOptions::new() - .max_connections(10) + .max_connections(100) + .acquire_timeout(Duration::from_secs(10)) .connect(database_url) .await .map_err(|e| Error::DBError { message: e.to_string() })?; + println!("Using Postgres backend🎯!!!"); + Ok(Self { db, _phantom: PhantomData }) } -} - -#[async_trait] -impl AsyncKeyValueStore for PostgresDB { - async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error> { + + pub async fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error> { let table = get_table_name(column); let key = key.to_vec(); @@ -41,7 +40,7 @@ impl AsyncKeyValueStore for PostgresDB { Ok(row.map(|r| r.get::, _>("value"))) } - async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { + pub async fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { let table = get_table_name(column); let key = key.to_vec(); let value = value.to_vec(); @@ -60,11 +59,129 @@ impl AsyncKeyValueStore for PostgresDB { .map_err(|e| Error::DBError { message: e.to_string() }) } - async fn put_bytes_sync(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { - self.put_bytes(column, key, value).await + pub async fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result { + let table = get_table_name(column); + let key = key.to_vec(); + + let query = format!("SELECT EXISTS (SELECT 1 FROM {} WHERE key = $1)", table); + let exists: (bool, ) = sqlx::query_as(&query) + .bind(key) + .fetch_one(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + Ok(exists.0) + } + + pub async fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error> { + let table = get_table_name(column); + let key = key.to_vec(); + let query = format!("DELETE FROM {} WHERE key = $1", table); + + sqlx::query(&query) + .bind(key) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + // pub async fn do_atomically(){} + + pub async fn compact(&self) -> Result<(), Error> { + // No-op for Postgres, but we can run VACUUM FULL + sqlx::query("VACUUM FULL") + .execute(&self.db) + .await + .map(|_| ()) + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + pub async fn compact_column(&self, column: DBColumn) -> Result<(), Error> { + let table = get_table_name(column); + let query = format!("VACUUM FULL {}", table); + sqlx::query(&query) + .execute(&self.db) + .await + .map(|_| ()) + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + pub async fn iter_column_keys_from(&self, column: DBColumn, start: &[u8]) -> Result>, Error> { + let table = get_table_name(column); + let query = format!("SELECT key FROM {} WHERE key >= $1 ORDER BY key ASC", table); + let rows = sqlx::query(&query) + .bind(start.to_vec()) + .fetch_all(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + Ok(rows.into_iter().map(|r| r.get::, _>("key")).collect()) + } + + pub async fn iter_column_keys(&self, column: DBColumn) -> Result>, Error> { + let table = get_table_name(column); + let query = format!("SELECT key FROM {} ORDER BY key ASC", table); + let rows = sqlx::query(&query) + .fetch_all(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + Ok(rows.into_iter().map(|r| r.get::, _>("key")).collect()) + } + + pub async fn iter_column_from(&self, column: DBColumn, start: &[u8]) -> Result, Vec)>, Error> { + let table = get_table_name(column); + let query = format!("SELECT key, value FROM {} WHERE key >= $1 ORDER BY key ASC", table); + let rows = sqlx::query(&query) + .bind(start.to_vec()) + .fetch_all(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + Ok(rows.into_iter().map(|r| (r.get("key"), r.get("value"))).collect()) + } + + pub async fn delete_batch(&self, column: DBColumn, keys: &[Vec]) -> Result<(), Error> { + let table = get_table_name(column); + let mut tx = self.db.begin().await + .map_err(|e| Error::DBError { message: e.to_string() })?; + let query = format!("DELETE FROM {} WHERE key = $1", table); + for key in keys { + sqlx::query(&query) + .bind(key.clone()) + .execute(&mut *tx) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + } + tx.commit().await + .map_err(|e| Error::DBError { message: e.to_string() }) + } + + pub async fn delete_if(&self, column: DBColumn, predicate: F) -> Result<(), Error> + where + F: Fn(&[u8], &[u8]) -> bool + { + let table = get_table_name(column); + let rows = sqlx::query(&format!("SELECT key, value FROM {}", table)) + .fetch_all(&self.db) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + let mut tx = self.db.begin().await + .map_err(|e| Error::DBError { message: e.to_string() })?; + for row in rows { + let key: Vec = row.get("key"); + let value: Vec = row.get("value"); + if predicate(&key, &value) { + sqlx::query(&format!("DELETE FROM {} WHERE key = $1", table)) + .bind(key) + .execute(&mut *tx) + .await + .map_err(|e| Error::DBError { message: e.to_string() })?; + } + } + tx.commit().await + .map_err(|e| Error::DBError { message: e.to_string() }) } } + pub fn get_table_name(column: DBColumn) -> &'static str { match column { DBColumn::BeaconMeta => "beacon_meta", diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 2e45e97f4e5..0bdd8248e67 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -41,7 +41,6 @@ use std::collections::HashSet; use std::sync::Arc; use strum::{EnumIter, EnumString, IntoStaticStr}; pub use types::*; -pub use database::async_interface::AsyncKeyValueStore; const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8; @@ -493,7 +492,7 @@ mod tests { } } - fn test_impl(store: impl ItemStore) { + async fn test_impl(store: impl ItemStore) { let key = Hash256::random(); let item = StorableThing { a: 1, b: 42 }; @@ -513,11 +512,11 @@ mod tests { assert_eq!(store.get::(&key).unwrap(), None); } - #[test] - fn simplediskdb() { + #[tokio::test] + async fn simplediskdb() { let dir = tempdir().unwrap(); let path = dir.path(); - let store = BeaconNodeBackend::open(&StoreConfig::default(), path).unwrap(); + let store = BeaconNodeBackend::open_async(&StoreConfig::default(), path).await.unwrap(); test_impl(store); } From 5f57e7d1856c93d8c402e99b946c5e542db7932d Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Wed, 20 Aug 2025 11:22:55 -0700 Subject: [PATCH 13/13] Update db open to handle postgres by introducing block_on --- beacon_node/store/src/database/interface.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index 6a61db398e4..12a14b0f28c 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -320,9 +320,20 @@ impl BeaconNodeBackend { DatabaseBackend::Redb => redb_impl::Redb::open(path).map(BeaconNodeBackend::Redb), #[cfg(feature = "postgres")] DatabaseBackend::PostgresDB => { - Err(Error::DBError { - message: "PostgresDB requires async initialization> Use open_async()".into(), - }) + let db_url = "postgres://postgres:admin@localhost:5432/postgres"; + let db_result = match tokio::runtime::Handle::try_current() { + Ok(handle) => handle.block_on(postgres_impl::PostgresDB::open(db_url)), + Err(_) => { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| Error::DBError { message: e.to_string() })?; + rt.block_on(postgres_impl::PostgresDB::open(db_url)) + } + }; + + let db = db_result.map_err(|e| Error::DBError { + message: format!("Failed to init PostgresDB: {:?}", e) + })?; + Ok(BeaconNodeBackend::PostgresDB(db)) } } }