diff --git a/based/Cargo.lock b/based/Cargo.lock index 76947a00f..e7be3f7cc 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -298,6 +298,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "ark-ff" version = "0.3.0" @@ -532,7 +547,7 @@ dependencies = [ "miniz_oxide", "object", "rustc-demangle", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -574,6 +589,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.8.0" @@ -617,8 +638,17 @@ dependencies = [ name = "bop-common" version = "0.1.0" dependencies = [ + "chrono", + "crossbeam-channel", + "directories", "eyre", + "once_cell", + "quanta", + "serde", + "shared_memory", + "thiserror 2.0.11", "tokio", + "tracing", "tracing-appender", "tracing-subscriber", ] @@ -707,6 +737,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.52.6", +] + [[package]] name = "const-hex" version = "1.14.0" @@ -901,6 +945,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "directories" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a49173b84e034382284f27f1af4dcbbd231ffa358c0fe316541a7337f376a35" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -1388,6 +1453,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "1.5.0" @@ -1687,6 +1775,16 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.8.0", + "libc", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -1736,6 +1834,15 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1779,6 +1886,19 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f3790c00a0150112de0f4cd161e3d7fc4b2d8a5542ffc35f099a2562aecb35c" +dependencies = [ + "bitflags 1.3.2", + "cc", + "cfg-if", + "libc", + "memoffset", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1897,7 +2017,7 @@ version = "0.10.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" dependencies = [ - "bitflags", + "bitflags 2.8.0", "cfg-if", "foreign-types", "libc", @@ -1935,6 +2055,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "overload" version = "0.1.1" @@ -1987,7 +2113,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -2125,7 +2251,7 @@ checksum = "b4c2511913b88df1637da85cc8d96ec8e43a3f8bb8ccb71ee1ac240d6f3df58d" dependencies = [ "bit-set", "bit-vec", - "bitflags", + "bitflags 2.8.0", "lazy_static", "num-traits", "rand", @@ -2137,6 +2263,21 @@ dependencies = [ "unarray", ] +[[package]] +name = "quanta" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -2198,13 +2339,33 @@ dependencies = [ "rand_core", ] +[[package]] +name = "raw-cpuid" +version = "11.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "redox_syscall" version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ - "bitflags", + "bitflags 2.8.0", +] + +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom", + "libredox", + "thiserror 1.0.69", ] [[package]] @@ -2404,7 +2565,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags", + "bitflags 2.8.0", "errno", "libc", "linux-raw-sys", @@ -2509,7 +2670,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.8.0", "core-foundation", "core-foundation-sys", "libc", @@ -2644,6 +2805,19 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared_memory" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba8593196da75d9dc4f69349682bd4c2099f8cde114257d1ef7ef1b33d1aba54" +dependencies = [ + "cfg-if", + "libc", + "nix", + "rand", + "win-sys", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2837,7 +3011,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags", + "bitflags 2.8.0", "core-foundation", "system-configuration-sys", ] @@ -3371,6 +3545,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "win-sys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b7b128a98c1cfa201b09eb49ba285887deb3cbe7466a98850eb1adabb452be5" +dependencies = [ + "windows", +] + [[package]] name = "winapi" version = "0.3.9" @@ -3393,6 +3576,28 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45296b64204227616fdbf2614cefa4c236b98ee64dfaaaa435207ed99fe7829f" +dependencies = [ + "windows_aarch64_msvc 0.34.0", + "windows_i686_gnu 0.34.0", + "windows_i686_msvc 0.34.0", + "windows_x86_64_gnu 0.34.0", + "windows_x86_64_msvc 0.34.0", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-registry" version = "0.2.0" @@ -3401,7 +3606,7 @@ checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" dependencies = [ "windows-result", "windows-strings", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -3410,7 +3615,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -3420,7 +3625,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" dependencies = [ "windows-result", - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", ] [[package]] @@ -3429,7 +3643,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -3438,7 +3652,22 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", ] [[package]] @@ -3447,28 +3676,58 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -3481,24 +3740,66 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/based/Cargo.toml b/based/Cargo.toml index 2afa09132..1e2c22c15 100644 --- a/based/Cargo.toml +++ b/based/Cargo.toml @@ -33,3 +33,18 @@ toml = "0.8.19" eyre = "0.6.12" uuid = { version = "1.12.1", features = ["v4"] } thiserror = "2.0.11" + +lazy_static = "1.5.0" +once_cell = "1.19.0" +directories = "5.0.1" + +# Time +chrono = "0.4.23" +quanta = "0.12.3" + +# ipc +shared_memory = "^0.12" + +# threading +core_affinity = "0.8.1" +crossbeam-channel = "0.5.13" diff --git a/based/crates/common/Cargo.toml b/based/crates/common/Cargo.toml index 5054956bc..3bce4ed66 100644 --- a/based/crates/common/Cargo.toml +++ b/based/crates/common/Cargo.toml @@ -6,7 +6,17 @@ edition = "2021" [dependencies] tokio.workspace = true +tracing.workspace = true tracing-subscriber.workspace = true tracing-appender.workspace = true +serde = { workspace = true, features = ["derive"] } +thiserror.workspace = true eyre.workspace = true +once_cell.workspace = true +chrono.workspace = true +quanta.workspace = true +directories.workspace = true +shared_memory.workspace = true + +crossbeam-channel.workspace = true diff --git a/based/crates/common/src/communication.rs b/based/crates/common/src/communication.rs new file mode 100644 index 000000000..f387bb65e --- /dev/null +++ b/based/crates/common/src/communication.rs @@ -0,0 +1,78 @@ +use std::{fs::read_dir, path::Path}; + +use shared_memory::ShmemError; +use thiserror::Error; + +pub mod queue; +pub mod seqlock; +pub use queue::{Consumer, Producer, Queue}; +pub use seqlock::Seqlock; +pub mod internal_message; + +pub use internal_message::InternalMessage; + +pub type Sender = crossbeam_channel::Sender>; +pub type Receiver = crossbeam_channel::Sender>; + +#[derive(Error, Debug, Copy, Clone, PartialEq)] +pub enum ReadError { + #[error("Got sped past")] + SpedPast, + #[error("Lock empty")] + Empty, +} + +#[derive(Error, Debug)] +#[repr(u8)] +pub enum Error { + #[error("Queue not initialized")] + UnInitialized, + #[error("Queue length not power of two")] + LengthNotPowerOfTwo, + #[error("Element size not power of two - 4")] + ElementSizeNotPowerTwo, + #[error("Shared memory file does not exist")] + NonExistingFile, + #[error("Preexisting shared memory too small")] + TooSmall, + #[error("Shmem error")] + ShmemError(#[from] ShmemError), +} + +pub fn clear_shmem>(path: P) { + let path = path.as_ref(); + if !path.exists() { + return; + } + let Ok(mut shmem) = shared_memory::ShmemConf::new().flink(path).open() else { + return; + }; + shmem.set_owner(true); + std::fs::remove_file(path).expect("couldn't remove file"); +} + +pub fn queues_dir_string() -> String { + let queues_dir = + directories::BaseDirs::new().expect("Couldn't retrieve home dir").data_dir().join("builder/queues"); + queues_dir.to_string_lossy().to_string() +} + +pub fn verify_or_remove_queue_files() { + let queues_dir = + directories::BaseDirs::new().expect("Couldn't retrieve home dir").data_dir().join("builder/queues"); + if queues_dir.is_file() { + let _ = std::fs::remove_file(&queues_dir); + let _ = std::fs::create_dir_all(queues_dir.as_path()); + return; + } + let Ok(files) = read_dir(&queues_dir) else { + let _ = std::fs::create_dir_all(queues_dir.as_path()); + return; + }; + for f in files.filter_map(|t| t.ok()) { + if shared_memory::ShmemConf::new().flink(f.path()).open().is_err() { + tracing::warn!("couldn't open shmem at {:?} so removing it to be recreated later", f.path()); + let _ = std::fs::remove_file(f.path()); + } + } +} diff --git a/based/crates/common/src/communication/internal_message.rs b/based/crates/common/src/communication/internal_message.rs new file mode 100644 index 000000000..dcb7af804 --- /dev/null +++ b/based/crates/common/src/communication/internal_message.rs @@ -0,0 +1,137 @@ +use std::ops::{Deref, DerefMut}; + +use serde::{Deserialize, Serialize}; + +use crate::time::{Duration, IngestionTime, Instant, Nanos}; + +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Serialize, Deserialize, Default)] +pub struct InternalMessage { + ingestion_t: IngestionTime, + data: T, +} + +impl InternalMessage { + #[inline] + pub fn new(ingestion_t: IngestionTime, data: T) -> Self { + Self { ingestion_t, data } + } + + #[inline] + pub fn with_data(&self, data: D) -> InternalMessage { + InternalMessage::new(self.ingestion_t, data) + } + + #[inline] + pub fn data(&self) -> &T { + &self.data + } + + #[inline] + pub fn into_data(self) -> T { + self.data + } + + #[inline] + pub fn map(self, f: impl FnOnce(T) -> R) -> InternalMessage { + InternalMessage { ingestion_t: self.ingestion_t, data: f(self.data) } + } + + #[inline] + pub fn map_ref(&self, f: impl FnOnce(&T) -> R) -> InternalMessage { + InternalMessage { ingestion_t: self.ingestion_t, data: f(&self.data) } + } + + #[inline] + pub fn unpack(self) -> (IngestionTime, T) { + (self.ingestion_t, self.data) + } + + /// This is only useful within the same socket as the original tsamp + #[inline] + pub fn elapsed(&self) -> Duration { + self.ingestion_t.internal().elapsed() + } + + /// These are real nanos since unix epoc + #[inline] + pub fn elapsed_nanos(&self) -> Nanos { + self.ingestion_t.real().elapsed() + } + + #[inline] + pub fn ingestion_time(&self) -> IngestionTime { + self.ingestion_t + } +} + +impl From> for (IngestionTime, T) { + #[inline] + fn from(value: InternalMessage) -> Self { + value.unpack() + } +} + +impl From for InternalMessage { + #[inline] + fn from(value: T) -> Self { + Self::new(IngestionTime::now(), value) + } +} + +impl Deref for InternalMessage { + type Target = T; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.data + } +} + +impl DerefMut for InternalMessage { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} + +impl From<&InternalMessage> for IngestionTime { + #[inline] + fn from(value: &InternalMessage) -> Self { + value.ingestion_t + } +} + +impl AsRef for InternalMessage { + #[inline] + fn as_ref(&self) -> &IngestionTime { + &self.ingestion_t + } +} + +impl From<&InternalMessage> for Instant { + #[inline] + fn from(value: &InternalMessage) -> Self { + value.ingestion_t.into() + } +} + +impl From<&InternalMessage> for Nanos { + #[inline] + fn from(value: &InternalMessage) -> Self { + value.ingestion_t.into() + } +} + +impl From> for Instant { + #[inline] + fn from(value: InternalMessage) -> Self { + value.ingestion_t.into() + } +} + +impl From> for Nanos { + #[inline] + fn from(value: InternalMessage) -> Self { + value.ingestion_t.into() + } +} diff --git a/based/crates/common/src/communication/queue.rs b/based/crates/common/src/communication/queue.rs new file mode 100644 index 000000000..b24b9eabd --- /dev/null +++ b/based/crates/common/src/communication/queue.rs @@ -0,0 +1,629 @@ +use std::{ + alloc::Layout, + borrow::Borrow, + mem::{size_of, MaybeUninit}, + ops::Deref, + path::Path, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use shared_memory::ShmemConf; +use tracing::error; + +use super::{seqlock::Seqlock, Error, ReadError}; + +#[derive(Debug, Clone, Copy)] +#[repr(u8)] +pub enum QueueType { + Unknown, + MPMC, + SPMC, +} + +#[derive(Debug)] +#[repr(C, align(64))] +struct QueueHeader { + queue_type: QueueType, // 1 + is_initialized: u8, // 2 + _pad1: [u8; 6], // 8 + elsize: usize, // 16 + mask: usize, // 24 + count: AtomicUsize, // 32 +} +#[allow(dead_code)] +impl QueueHeader { + /// in bytes + pub fn size_of(&self) -> usize { + (self.mask + 1) * self.elsize + } + + pub fn len(&self) -> usize { + self.mask + 1 + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn from_ptr(ptr: *mut u8) -> &'static mut Self { + unsafe { &mut *(ptr as *mut Self) } + } + + pub fn is_initialized(&self) -> bool { + self.is_initialized == 1 + } + + pub fn elsize(&self) -> usize { + self.elsize + } + + pub fn open_shared>(path: S) -> &'static mut Self { + let shmem = ShmemConf::new().flink(path.as_ref()).open().unwrap(); + let ptr = shmem.as_ptr(); + std::mem::forget(shmem); + Self::from_ptr(ptr) + } +} + +//TODO: this should in reality really also implement drop and most likely return an Arc instead of +// &'static or whatever. +#[repr(C, align(64))] +pub struct InnerQueue { + header: QueueHeader, + buffer: [Seqlock], +} + +impl InnerQueue { + /// Allocs (unshared) memory and initializes a new queue from it. + /// QueueType::MPMC = multi producer multi consumer + /// QueueType::SPMC = single producer multi consumer + fn new(len: usize, queue_type: QueueType) -> Result<*const Self, Error> { + let real_len = len.next_power_of_two(); + let size = size_of::() + real_len * size_of::>(); + + unsafe { + let ptr = std::alloc::alloc_zeroed(Layout::array::(size).unwrap().align_to(64).unwrap().pad_to_align()); + // Why real len you may ask. The size of the fat pointer ONLY includes the length of the + // unsized part of the struct i.e. the buffer. + Self::from_uninitialized_ptr(ptr, real_len, queue_type) + } + } + + const fn size_of(len: usize) -> usize { + size_of::() + len.next_power_of_two() * size_of::>() + } + + fn from_uninitialized_ptr(ptr: *mut u8, len: usize, queue_type: QueueType) -> Result<*const Self, Error> { + if !len.is_power_of_two() { + return Err(Error::LengthNotPowerOfTwo); + } + unsafe { + let q = std::ptr::slice_from_raw_parts_mut(ptr, len) as *mut Self; + let elsize = size_of::>(); + if !len.is_power_of_two() { + return Err(Error::LengthNotPowerOfTwo); + } + + let mask = len - 1; + + (*q).header.queue_type = queue_type; + (*q).header.mask = mask; + (*q).header.elsize = elsize; + (*q).header.is_initialized = true as u8; + (*q).header.count = AtomicUsize::new(0); + Ok(q) + } + } + + #[allow(dead_code)] + fn from_initialized_ptr(ptr: *mut QueueHeader) -> Result<*const Self, Error> { + unsafe { + let len = (*ptr).mask + 1; + if !len.is_power_of_two() { + return Err(Error::LengthNotPowerOfTwo); + } + if (*ptr).is_initialized != true as u8 { + return Err(Error::UnInitialized); + } + // TODO: I think this is slightly wrong + Ok(std::ptr::slice_from_raw_parts_mut(ptr as *mut Seqlock, len) as *const Self) + } + } + + // Note: Calling this from anywhere that's not a producer -> false sharing + fn count(&self) -> usize { + self.header.count.load(Ordering::Relaxed) + } + + fn next_count(&self) -> usize { + match self.header.queue_type { + QueueType::Unknown => panic!("Unknown queue"), + QueueType::MPMC => self.header.count.fetch_add(1, Ordering::AcqRel), + QueueType::SPMC => { + let c = self.header.count.load(Ordering::Relaxed); + self.header.count.store(c.wrapping_add(1), Ordering::Relaxed); + c + } + } + } + + fn load(&self, pos: usize) -> &Seqlock { + unsafe { self.buffer.get_unchecked(pos) } + } + + fn cur_pos(&self) -> usize { + self.count() & self.header.mask + } + + fn version(&self) -> u32 { + (((self.count() / (self.header.mask + 1)) << 1) + 2) as u32 + } + + #[allow(dead_code)] + fn version_of(&self, pos: usize) -> u32 { + self.load(pos).version() + } + + // returns the current count + fn produce(&self, item: &T) -> usize { + let p = self.next_count(); + let lock = self.load(p & self.header.mask); + lock.write(item); + p + } + + fn consume(&self, el: &mut T, ri: usize, ri_ver: u32) -> Result<(), ReadError> { + self.load(ri).read_with_version(el, ri_ver) + } + + #[allow(dead_code)] + fn read(&self, el: &mut T, ri: usize) { + self.load(ri).read(el) + } + + fn len(&self) -> usize { + self.header.mask + 1 + } + + // This exists just to check the state of the queue for debugging purposes + #[allow(dead_code)] + fn verify(&self) { + let mut prev_v = self.load(0).version(); + let mut n_changes = 0; + for i in 1..=self.header.mask { + let lck = self.load(i); + let v = lck.version(); + if v & 1 == 1 { + panic!("odd version at {i}: {prev_v} -> {v}"); + } + if v != prev_v && v & 1 == 0 { + n_changes += 1; + println!("version change at {i}: {prev_v} -> {v}"); + prev_v = v; + } + } + if n_changes > 1 { + panic!("what") + } + } + + fn produce_first(&self, item: &T) -> usize { + match self.header.queue_type { + QueueType::Unknown => panic!("Unknown queue"), + QueueType::MPMC => self.produce(item), + QueueType::SPMC => { + let m = self.header.mask; + let c = self.count(); + let p = c & m; + let lock = self.load(p); + if lock.version() & 1 == 1 { + lock.write_unpoison(item); + p + } else { + self.produce(item) + } + } + } + } + + fn create_or_open_shared>(shmem_file: P, len: usize, typ: QueueType) -> Result<*const Self, Error> { + use shared_memory::{ShmemConf, ShmemError}; + let _ = std::fs::create_dir_all(shmem_file.as_ref().parent().unwrap()); + match ShmemConf::new().size(Self::size_of(len)).flink(&shmem_file).create() { + Ok(shmem) => { + let ptr = shmem.as_ptr(); + std::mem::forget(shmem); + Ok(Self::from_uninitialized_ptr(ptr, len, typ)?) + } + Err(ShmemError::LinkExists) => { + let v = Self::open_shared(&shmem_file)?; + if unsafe { (*v).header.len() } < len { + Err(Error::TooSmall) + } else { + Ok(v) + } + } + Err(e) => Err(e.into()), + } + } + + fn open_shared>(shmem_file: S) -> Result<*const Self, Error> { + if !shmem_file.as_ref().exists() { + return Err(Error::NonExistingFile); + } + let mut tries = 0; + let mut header = QueueHeader::open_shared(shmem_file.as_ref()); + while !header.is_initialized() { + std::thread::sleep(std::time::Duration::from_millis(1)); + header = QueueHeader::open_shared(shmem_file.as_ref()); + if tries == 10 { + return Err(Error::UnInitialized); + } + tries += 1; + } + Self::from_initialized_ptr(header) + } +} + +unsafe impl Send for InnerQueue {} +unsafe impl Sync for InnerQueue {} + +impl std::fmt::Debug for InnerQueue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Queue:\nHeader:\n{:?}", self.header) + } +} + +#[derive(Clone, Copy, Debug)] +pub struct Queue { + inner: *const InnerQueue, +} + +impl Queue { + pub fn new(len: usize, queue_type: QueueType) -> Result { + InnerQueue::new(len, queue_type).map(|inner| Self { inner }) + } + + pub fn create_or_open_shared>( + shmem_file: P, + len: usize, + queue_type: QueueType, + ) -> Result { + InnerQueue::create_or_open_shared(shmem_file, len, queue_type).map(|inner| Self { inner }) + } + + pub fn open_shared>(shmem_file: P) -> Result { + InnerQueue::open_shared(shmem_file).map(|inner| Self { inner }) + } +} + +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} + +impl Borrow> for Queue { + fn borrow(&self) -> &InnerQueue { + unsafe { &*self.inner } + } +} +impl Deref for Queue { + type Target = InnerQueue; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.inner } + } +} +impl AsRef> for Queue { + fn as_ref(&self) -> &InnerQueue { + unsafe { &*self.inner } + } +} + +/// Simply exists for the automatic produce_first +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct Producer { + pub produced_first: u8, // 1 + pub queue: Queue, +} + +impl From> for Producer { + fn from(queue: Queue) -> Self { + Self { produced_first: 0, queue } + } +} + +impl Producer { + pub fn produce(&mut self, msg: &T) -> usize { + if self.produced_first == 0 { + self.produced_first = 1; + self.queue.produce_first(msg) + } else { + self.queue.produce(msg) + } + } + + pub fn produce_without_first(&self, msg: &T) -> usize { + self.queue.produce(msg) + } +} + +impl AsMut> for Producer { + fn as_mut(&mut self) -> &mut Producer { + self + } +} + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct ConsumerBare { + pos: usize, // 8 + mask: usize, // 16 + expected_version: u32, // 20 + is_running: u8, // 21 + _pad: [u8; 11], // 32 + queue: Queue, // 48 fat ptr: (usize, pointer) +} + +impl ConsumerBare { + #[inline] + pub fn recover_after_error(&mut self) { + self.expected_version += 2; + } + + #[inline] + fn update_pos(&mut self) { + self.pos = (self.pos + 1) & self.mask; + self.expected_version = self.expected_version.wrapping_add(2 * (self.pos == 0) as u32); + } + + /// Nonblocking consume returning either Ok(()) or a ReadError + #[inline] + pub fn try_consume(&mut self, el: &mut T) -> Result<(), ReadError> { + self.queue.consume(el, self.pos, self.expected_version)?; + self.update_pos(); + Ok(()) + } + + /// Blocking consume + #[inline] + pub fn blocking_consume(&mut self, el: &mut T) { + loop { + match self.try_consume(el) { + Ok(_) => { + return; + } + Err(ReadError::Empty) => { + #[cfg(target_arch = "x86_64")] + unsafe { + std::arch::x86_64::_mm_pause() + }; + continue; + } + Err(ReadError::SpedPast) => { + self.recover_after_error(); + } + } + } + } + + #[allow(clippy::not_unsafe_ptr_arg_deref)] + pub fn init_header(consumer_ptr: *mut ConsumerBare, queue: Queue) { + unsafe { + (*consumer_ptr).pos = queue.cur_pos(); + (*consumer_ptr).expected_version = queue.version(); + (*consumer_ptr).mask = queue.header.mask; + (*consumer_ptr).queue = queue + } + } + + #[inline] + pub fn tot_published(&self) -> usize { + (*self.queue).count() + } +} + +impl AsMut> for ConsumerBare { + fn as_mut(&mut self) -> &mut ConsumerBare { + self + } +} + +impl From> for ConsumerBare { + fn from(queue: Queue) -> Self { + let pos = queue.cur_pos(); + let expected_version = queue.version(); + Self { pos, mask: queue.header.mask, _pad: [0; 11], expected_version, is_running: 1, queue } + } +} + +#[derive(Clone, Copy, Debug)] +pub struct Consumer { + consumer: ConsumerBare, + message: T, + should_log: bool, +} + +impl Consumer { + /// Maybe consume one message in a queue with error recovery and logging, + /// and return whether one was read + #[inline] + pub fn consume(&mut self, mut f: F) -> bool + where + F: FnMut(&mut T), + { + match self.consumer.try_consume(&mut self.message) { + Ok(()) => { + f(&mut self.message); + true + } + Err(ReadError::SpedPast) => { + self.log_and_recover(); + self.consume(f) + } + Err(ReadError::Empty) => false, + } + } + + #[inline(never)] + fn log_and_recover(&mut self) { + if self.should_log { + error!( + "Consumer<{}> got sped past. Lost {} messages", + std::any::type_name::(), + self.consumer.queue.len() + ); + } + self.consumer.recover_after_error(); + } + + #[inline] + pub fn without_log(self) -> Self { + Self { should_log: false, ..self } + } + + #[inline] + pub fn tot_published(&self) -> usize { + self.consumer.tot_published() + } +} + +impl>> From for Consumer { + #[allow(clippy::uninit_assumed_init)] + fn from(queue: Q) -> Self { + Self { consumer: queue.into(), message: unsafe { MaybeUninit::uninit().assume_init() }, should_log: true } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn headersize() { + assert_eq!(64, std::mem::size_of::()); + assert_eq!(48, std::mem::size_of::>()) + } + + #[test] + fn basic() { + for typ in [QueueType::SPMC, QueueType::MPMC] { + let q = Queue::new(16, typ).unwrap(); + let mut p = Producer::from(q); + let mut c = ConsumerBare::from(q); + p.produce(&1); + let mut m = 0; + + assert_eq!(c.try_consume(&mut m), Ok(())); + assert_eq!(m, 1); + assert!(matches!(c.try_consume(&mut m), Err(ReadError::Empty))); + for i in 0..16 { + p.produce(&i); + } + for i in 0..16 { + c.try_consume(&mut m).unwrap(); + assert_eq!(m, i); + } + + assert!(matches!(c.try_consume(&mut m), Err(ReadError::Empty))); + + for _ in 0..20 { + p.produce(&1); + } + + assert!(matches!(c.try_consume(&mut m), Err(ReadError::SpedPast))); + } + } + + fn multithread(n_writers: usize, n_readers: usize, tot_messages: usize) { + let q = Queue::new(16, QueueType::MPMC).unwrap(); + + let mut readhandles = Vec::new(); + for _ in 0..n_readers { + let mut c1 = ConsumerBare::from(q); + let cons = std::thread::spawn(move || { + let mut c = 0; + let mut m = 0; + while c < tot_messages { + c1.blocking_consume(&mut m); + c += m; + } + assert_eq!(c, (0..tot_messages).sum::()); + }); + readhandles.push(cons) + } + let mut writehandles = Vec::new(); + for n in 0..n_writers { + let mut p1 = Producer::from(q); + let prod1 = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(20)); + let mut c = n; + while c < tot_messages { + p1.produce(&c); + c += n_writers; + std::thread::yield_now(); + } + }); + writehandles.push(prod1); + } + + for h in readhandles { + let _ = h.join(); + } + for h in writehandles { + let _ = h.join(); + } + } + #[test] + fn multithread_1_2() { + multithread(1, 2, 100000); + } + #[test] + fn multithread_1_4() { + multithread(1, 4, 100000); + } + #[test] + fn multithread_2_4() { + multithread(2, 4, 100000); + } + #[test] + fn multithread_4_4() { + multithread(4, 4, 100000); + } + #[test] + fn multithread_8_8() { + multithread(8, 8, 100000); + } + #[test] + fn basic_shared() { + for typ in [QueueType::SPMC, QueueType::MPMC] { + let path = std::path::Path::new("/dev/shm/blabla_test"); + let _ = std::fs::remove_file(path); + let q = Queue::create_or_open_shared(path, 16, typ).unwrap(); + let mut p = Producer::from(q); + let mut c = ConsumerBare::from(q); + + p.produce(&1); + let mut m = 0; + + assert_eq!(c.try_consume(&mut m), Ok(())); + assert_eq!(m, 1); + assert!(matches!(c.try_consume(&mut m), Err(ReadError::Empty))); + for i in 0..16 { + p.produce(&i); + } + for i in 0..16 { + c.try_consume(&mut m).unwrap(); + assert_eq!(m, i); + } + + assert!(matches!(c.try_consume(&mut m), Err(ReadError::Empty))); + + for _ in 0..20 { + p.produce(&1); + } + + assert!(matches!(c.try_consume(&mut m), Err(ReadError::SpedPast))); + let _ = std::fs::remove_file(path); + } + } +} diff --git a/based/crates/common/src/communication/seqlock.rs b/based/crates/common/src/communication/seqlock.rs new file mode 100644 index 000000000..3f46ce938 --- /dev/null +++ b/based/crates/common/src/communication/seqlock.rs @@ -0,0 +1,355 @@ +use std::{ + cell::UnsafeCell, + fmt, + sync::atomic::{compiler_fence, AtomicU32, Ordering}, +}; + +use super::ReadError; +//TODO: Make the types more rust like. I.e. on copy types -> copy on write/read, clone types -> +// copy std::mem::forget till read etc +/// A sequential lock +#[repr(C, align(64))] +pub struct Seqlock { + pub data: UnsafeCell, // don't change this order or rust does padding till 8bytes + pub version: AtomicU32, +} +unsafe impl Send for Seqlock {} +unsafe impl Sync for Seqlock {} + +// TODO: Try 32 bit version +impl Seqlock { + /// Creates a new SeqLock with the given initial value. + #[inline] + pub const fn new(val: T) -> Seqlock { + Seqlock { version: AtomicU32::new(0), data: UnsafeCell::new(val) } + } + + pub fn version(&self) -> u32 { + self.version.load(Ordering::Relaxed) + } + + #[allow(dead_code)] + fn set_version(&self, v: u32) { + self.version.store(v, Ordering::Relaxed) + } + + #[inline(never)] + pub fn read_with_version(&self, result: &mut T, expected_version: u32) -> Result<(), ReadError> { + let v1 = self.version.load(Ordering::Acquire); + if v1 < expected_version { + return Err(ReadError::Empty); + } + + compiler_fence(Ordering::AcqRel); + *result = unsafe { (*self.data.get()).clone() }; + compiler_fence(Ordering::AcqRel); + let v2 = self.version.load(Ordering::Acquire); + if v2 == expected_version { + Ok(()) + } else { + Err(ReadError::SpedPast) + } + } + + #[inline(never)] + pub fn read(&self, result: &mut T) { + loop { + let v1 = self.version.load(Ordering::Acquire); + compiler_fence(Ordering::AcqRel); + unsafe { + *result = (*self.data.get()).clone(); + } + compiler_fence(Ordering::AcqRel); + let v2 = self.version.load(Ordering::Acquire); + if v1 == v2 && v1 & 1 == 0 { + return; + } + #[cfg(target_arch = "x86_64")] + unsafe { + std::arch::x86_64::_mm_pause() + }; + } + } + + #[inline] + #[allow(clippy::mut_from_ref)] + pub fn view_unsafe(&self) -> &mut T { + unsafe { &mut *self.data.get() } + } + + #[inline(never)] + pub fn write_lock(&self, f: F) -> R + where + F: FnOnce(&mut T) -> R, + { + // Increment the sequence number. At this point, the number will be odd, + // which will force readers to spin until we finish writing. + let v = self.version.fetch_add(1, Ordering::Release); + compiler_fence(Ordering::AcqRel); + // Make sure any writes to the data happen after incrementing the + // sequence number. What we ideally want is a store(Acquire), but the + // Acquire ordering is not available on stores. + let o = f(unsafe { &mut *self.data.get() }); + compiler_fence(Ordering::AcqRel); + // unsafe {asm!("sti");} + self.version.store(v.wrapping_add(2), Ordering::Release); + o + } + + #[inline] + pub fn write(&self, val: &T) { + self.write_lock(|data| { + *data = val.clone(); + }); + } + + #[inline(never)] + pub fn _write_unpoison(&self, f: F) + where + F: FnOnce(), + { + let v = self.version.load(Ordering::Relaxed); + self.version.store(v.wrapping_add(v.wrapping_sub(1) & 1), Ordering::Release); + // Make sure any writes to the data happen after incrementing the + // sequence number. What we ideally want is a store(Acquire), but the + // Acquire ordering is not available on stores. + compiler_fence(Ordering::AcqRel); + f(); + compiler_fence(Ordering::AcqRel); + self.version.store(v.wrapping_add(1), Ordering::Relaxed); + } + + #[inline] + pub fn write_unpoison(&self, val: &T) { + self._write_unpoison(|| { + let t = self.data.get() as *mut u8; + unsafe { t.copy_from(val as *const _ as *const u8, std::mem::size_of::()) }; + }); + } + + #[inline(always)] + #[allow(named_asm_labels)] + fn _write_multi(&self, f: F) + where + F: FnOnce(), + { + // Increment the sequence number. At this point, the number will be odd, + // which will force readers to spin until we finish writing. + let mut v = self.version.fetch_or(1, Ordering::AcqRel); + while v & 1 == 1 { + v = self.version.fetch_or(1, Ordering::AcqRel); + } + // Make sure any writes to the data happen after incrementing the + // sequence number. What we ideally want is a store(Acquire), but the + // Acquire ordering is not available on stores. + f(); + compiler_fence(Ordering::AcqRel); + self.version.store(v.wrapping_add(2), Ordering::Release); + } + + #[inline(never)] + pub fn write_multi(&self, val: &T) { + self._write_multi(|| { + unsafe { self.data.get().copy_from(val as *const T, 1) }; + }); + } +} + +impl Default for Seqlock { + #[inline] + fn default() -> Seqlock { + Seqlock::new(Default::default()) + } +} + +impl fmt::Debug for Seqlock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SeqLock {{ data: {:?} }}", unsafe { (*self.data.get()).clone() }) + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::atomic::AtomicBool, + time::{Duration, Instant}, + }; + + use super::*; + + #[test] + fn lock_size() { + assert_eq!(std::mem::size_of::>(), 64); + assert_eq!(std::mem::size_of::>(), 128) + } + + fn consumer_loop(lock: &Seqlock<[usize; N]>, done: &AtomicBool) { + let mut msg = [0usize; N]; + while !done.load(Ordering::Relaxed) { + lock.read(&mut msg); + let first = msg[0]; + for i in msg { + assert_eq!(first, i); + } + } + } + + fn producer_loop(lock: &Seqlock<[usize; N]>, done: &AtomicBool, multi: bool) { + let curt = Instant::now(); + let mut count = 0; + let mut msg = [0usize; N]; + while curt.elapsed() < Duration::from_secs(1) { + msg.fill(count); + if multi { + lock.write_multi(&msg); + } else { + lock.write(&msg); + } + count = count.wrapping_add(1); + } + done.store(true, Ordering::Relaxed); + } + + fn read_test() { + let lock = Seqlock::new([0usize; N]); + let done = AtomicBool::new(false); + std::thread::scope(|s| { + s.spawn(|| { + consumer_loop(&lock, &done); + }); + s.spawn(|| { + producer_loop(&lock, &done, false); + }); + }); + } + + fn read_test_multi() { + let lock = Seqlock::new([0usize; N]); + let done = AtomicBool::new(false); + std::thread::scope(|s| { + s.spawn(|| { + consumer_loop(&lock, &done); + }); + s.spawn(|| { + producer_loop(&lock, &done, true); + }); + s.spawn(|| { + producer_loop(&lock, &done, true); + }); + }); + } + + #[test] + fn read_16() { + read_test::<16>() + } + #[test] + fn read_32() { + read_test::<32>() + } + #[test] + fn read_64() { + read_test::<64>() + } + #[test] + fn read_128() { + read_test::<128>() + } + #[test] + fn read_large() { + read_test::<65536>() + } + + #[test] + fn read_16_multi() { + read_test_multi::<16>() + } + #[test] + fn read_32_multi() { + read_test_multi::<32>() + } + #[test] + fn read_64_multi() { + read_test_multi::<64>() + } + #[test] + fn read_128_multi() { + read_test_multi::<128>() + } + #[test] + fn read_large_multi() { + read_test_multi::<65536>() + } + + #[test] + fn write_unpoison() { + let lock = Seqlock::default(); + lock.set_version(1); + lock.write_unpoison(&1); + assert_eq!(lock.version(), 2); + } + + fn consumer_vec_loop_data_consistency(lock: &Seqlock>, done: &AtomicBool) { + let mut msg = vec![0; 10]; + let mut got_larger = false; + let mut got_empty = false; + let mut got_110 = false; + while !done.load(Ordering::Relaxed) { + lock.read(&mut msg); + if msg.len() == 0 { + got_empty = true; + continue; + } + if msg.len() == 110 { + got_110 = true; + assert_eq!(msg[0], 110); + } + let first = msg[0]; + if first != 0 { + got_larger = true; + } + for &i in &msg { + assert_eq!(first, i); + } + } + assert!(got_larger); + assert!(got_empty); + assert!(got_110); + } + fn producer_vec_loop(lock: &Seqlock>, done: &AtomicBool, multi: bool) { + let curt = Instant::now(); + let mut count = 0; + let mut msg = vec![0; 100]; + while curt.elapsed() < Duration::from_secs(1) { + msg.fill(count); + if multi { + lock.write_multi(&msg); + } else { + lock.write(&msg); + } + count = count.wrapping_add(1); + } + lock.write_lock(|v| v.fill(110)); + for _ in 0..10 { + lock.write_lock(|v| v.push(110)); + } + std::thread::sleep(std::time::Duration::from_secs(1)); + + lock.write_lock(|v| v.clear()); + done.store(true, Ordering::Relaxed); + } + + #[test] + fn read_vec_data_consistency_test() { + let lock = Seqlock::new(vec![0; 10]); + let done = AtomicBool::new(false); + std::thread::scope(|s| { + s.spawn(|| { + consumer_vec_loop_data_consistency(&lock, &done); + }); + s.spawn(|| { + producer_vec_loop(&lock, &done, false); + }); + }); + } +} diff --git a/based/crates/common/src/lib.rs b/based/crates/common/src/lib.rs index b5614dd82..c39a10ce9 100644 --- a/based/crates/common/src/lib.rs +++ b/based/crates/common/src/lib.rs @@ -1 +1,3 @@ +pub mod communication; +pub mod time; pub mod utils; diff --git a/based/crates/common/src/time.rs b/based/crates/common/src/time.rs new file mode 100644 index 000000000..49368a8d8 --- /dev/null +++ b/based/crates/common/src/time.rs @@ -0,0 +1,127 @@ +pub mod repeater; +pub mod timer; +pub mod types; + +use once_cell::sync::OnceCell; +pub use repeater::*; +use serde::{Deserialize, Serialize}; +pub use timer::*; +pub use types::*; +pub type Clock = quanta::Clock; + +static GLOBAL_NANOS_FOR_100: OnceCell = OnceCell::new(); +static GLOBAL_CLOCK: OnceCell = OnceCell::new(); + +#[inline] +fn global_clock() -> &'static Clock { + GLOBAL_CLOCK.get_or_init(Clock::new) +} + +#[inline] +fn nanos_for_100() -> u64 { + *GLOBAL_NANOS_FOR_100.get_or_init(|| global_clock().delta_as_nanos(0, 100)) +} + +/// Returns a high-precision timestamp counter: +/// - On x86: uses rdtscp (synchronized cycle counter) +/// - On ARM64: uses CNTVCT_EL0 (virtual timer counter) +/// - On other platforms: falls back to quanta's fastest timestamper +/// +/// Performance: ~6-9ns on x86/ARM vs ~20ns for SystemTime::now +fn rdtscp() -> u64 { + #[cfg(all(target_arch = "x86_64", not(target_arch = "wasm32")))] + #[allow(clippy::uninit_assumed_init, invalid_value)] + { + let mut c = unsafe { std::mem::MaybeUninit::uninit().assume_init() }; + let o = unsafe { ::core::arch::x86_64::__rdtscp(&mut c) }; + o | ((c & 0x0000F000) as u64) << 50 + } + #[cfg(all(target_arch = "aarch64", not(target_arch = "wasm32")))] + { + // CNTVCT_EL0 is a virtual counter that runs at a fixed frequency + // and is accessible from userspace + let value: u64; + unsafe { core::arch::asm!("mrs {}, cntvct_el0", out(reg) value) }; + value + } + #[cfg(any(not(any(target_arch = "x86_64", target_arch = "aarch64")), target_arch = "wasm32"))] + { + global_clock().raw() + } +} + +// TODO: move all of this to a more suited place +pub mod utils; + +pub use types::{duration::Duration, instant::Instant, nanos::Nanos}; +pub use utils::{vsync, vsync_with_cancel}; + +use crate::communication::internal_message::InternalMessage; + +/// A Timestamp that should be used for internal latency/performance tracking. +/// +/// Should be instantiated at the very first time that a message arrives from the +/// network on the box. It takes a "real" timestamp, i.e. nanos since unix epoch, +/// and links it to the current value of the rdtscp counter. +/// When reporting internal latency/performance, the rdtscp counter alone should be used. +/// When creating an "exiting" message, i.e. a message leaving this box over the network, +/// the "real" time + linked rdtscp counter can be used to generate an approximate outgoing +/// "real" timestamp. This is approximate, but 2x faster than Nanos::now() which has +/// the same performance as SystemTime::now. +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Default, Serialize, Deserialize)] +pub struct IngestionTime { + real: Nanos, + internal: Instant, +} + +impl IngestionTime { + #[inline] + pub fn now() -> Self { + let real = Nanos::now(); + let internal = Instant::now(); + Self { real, internal } + } + + #[inline] + pub fn internal(&self) -> &Instant { + &self.internal + } + + #[inline] + pub fn real(&self) -> Nanos { + self.real + } + + #[inline] + pub fn to_msg(&self, data: T) -> InternalMessage { + InternalMessage::new(*self, data) + } +} + +impl From<&IngestionTime> for Instant { + #[inline] + fn from(value: &IngestionTime) -> Self { + value.internal + } +} + +impl From<&IngestionTime> for Nanos { + #[inline] + fn from(value: &IngestionTime) -> Self { + value.real + } +} + +impl From for Instant { + #[inline] + fn from(value: IngestionTime) -> Self { + value.internal + } +} + +impl From for Nanos { + #[inline] + fn from(value: IngestionTime) -> Self { + value.real + } +} diff --git a/based/crates/common/src/time/repeater.rs b/based/crates/common/src/time/repeater.rs new file mode 100644 index 000000000..b1904c6d2 --- /dev/null +++ b/based/crates/common/src/time/repeater.rs @@ -0,0 +1,77 @@ +use std::ops::{Add, AddAssign, Sub, SubAssign}; + +use super::{Duration, Instant}; + +#[derive(Clone, Copy, Debug, PartialEq, Default)] +pub struct Repeater { + interval: Duration, + last_acted: Instant, +} + +impl Repeater { + #[inline] + pub fn every(interval: Duration) -> Self { + Self { interval, last_acted: Instant::now() } + } + + #[inline] + pub fn maybe(&mut self, mut f: F) + where + F: FnMut(Duration), + { + let el = self.last_acted.elapsed(); + if el >= self.interval { + f(el); + self.last_acted = Instant::now(); + } + } + + #[inline] + pub fn fired(&mut self) -> bool { + let el = self.last_acted.elapsed(); + if el >= self.interval { + self.last_acted = Instant::now(); + true + } else { + false + } + } + + #[inline] + pub fn interval(&self) -> Duration { + self.interval + } + + #[inline] + pub fn set_interval(&mut self, interval: Duration) { + self.interval = interval + } +} + +impl Add for Repeater { + type Output = Repeater; + + fn add(self, rhs: Duration) -> Self::Output { + Repeater { interval: self.interval + rhs, ..self } + } +} + +impl Sub for Repeater { + type Output = Repeater; + + fn sub(self, rhs: Duration) -> Self::Output { + Repeater { interval: self.interval.saturating_sub(rhs), ..self } + } +} + +impl AddAssign for Repeater { + fn add_assign(&mut self, rhs: Duration) { + self.interval += rhs; + } +} + +impl SubAssign for Repeater { + fn sub_assign(&mut self, rhs: Duration) { + self.interval -= rhs; + } +} diff --git a/based/crates/common/src/time/timer.rs b/based/crates/common/src/time/timer.rs new file mode 100644 index 000000000..5e80c8884 --- /dev/null +++ b/based/crates/common/src/time/timer.rs @@ -0,0 +1,207 @@ +use std::{collections::HashMap, fmt::Display, sync::Mutex}; + +use once_cell::sync::Lazy; + +use crate::{ + communication::{ + queue::{Producer, Queue, QueueType}, + queues_dir_string, + }, + time::{Duration, Instant, InternalMessage}, +}; + +#[derive(Clone, Copy, Default, Debug)] +#[repr(C)] +pub struct TimingMessage { + pub start_t: Instant, + pub stop_t: Instant, +} + +impl TimingMessage { + #[inline(always)] + pub fn new() -> Self { + Self { start_t: Instant::now(), stop_t: Default::default() } + } + + pub fn elapsed(&self) -> Duration { + Duration(self.stop_t.0.saturating_sub(self.start_t.0)) + } + + pub fn is_valid(&self) -> bool { + self.start_t.same_socket(&self.stop_t) + } +} + +impl From for Duration { + fn from(value: TimingMessage) -> Self { + value.elapsed() + } +} + +const QUEUE_SIZE: usize = 2usize.pow(17); + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct Timer { + pub curmsg: TimingMessage, + timing_producer: Producer, + latency_producer: Producer, +} + +impl Timer { + pub fn new(name: S) -> Self { + let dirstr = queues_dir_string(); + let _ = std::fs::create_dir_all(&dirstr); + + let file = format!("{dirstr}/timing-{name}"); + + let timing_queue = + Queue::create_or_open_shared(file, QUEUE_SIZE, QueueType::MPMC).expect("couldn't open timing queue"); + + let file = format!("{dirstr}/latency-{name}"); + let latency_queue = + Queue::create_or_open_shared(file, QUEUE_SIZE, QueueType::MPMC).expect("couldn't open latency queue"); + + Timer { + curmsg: Default::default(), + timing_producer: Producer::from(timing_queue), + latency_producer: Producer::from(latency_queue), + } + } +} + +unsafe impl Send for Timer {} +unsafe impl Sync for Timer {} + +impl Timer { + #[inline] + pub fn start(&mut self) { + self.set_start(Instant::now()); + } + + #[inline] + pub fn stop(&mut self) { + self.set_stop(Instant::now()); + self.send_business(); + } + + #[inline] + pub fn stop_and_latency(&mut self, ingestion_t: Instant) { + self.stop(); + self.set_stop(self.curmsg.start_t); + self.set_start(ingestion_t); + self.send_latency(); + } + + #[inline] + pub fn stop_and_latency_till_now(&mut self, ingestion_t: Instant) { + self.stop(); + self.set_start(ingestion_t); + self.send_latency(); + } + + #[inline] + pub fn latency_till_now(&mut self, ingestion_t: Instant) { + let m = TimingMessage { start_t: ingestion_t, stop_t: Instant::now() }; + if m.is_valid() { + self.latency_producer.produce(&m); + } + } + + #[inline] + fn set_stop(&mut self, stop: Instant) { + self.curmsg.stop_t = stop; + } + + #[inline] + pub fn set_start(&mut self, start: Instant) { + self.curmsg.start_t = start; + } + + #[inline] + fn send_latency(&mut self) { + if self.curmsg.is_valid() { + self.latency_producer.produce(&self.curmsg); + } + } + + #[inline] + fn send_business(&mut self) { + if self.curmsg.is_valid() { + self.timing_producer.produce(&self.curmsg); + } + } + + #[inline] + pub fn get_stop_t(&self) -> Instant { + self.curmsg.stop_t + } + + #[inline] + pub fn start_accumulate(&mut self) { + self.curmsg.start_t = Instant::ZERO; + self.curmsg.stop_t = Instant::ZERO + } + + #[inline] + pub fn accumulate(&mut self, duration: Duration) { + self.curmsg.stop_t += duration + } + + #[inline] + pub fn finish_accumulate(&mut self) { + self.send_business() + } + + #[inline] + pub fn process(&mut self, msg: InternalMessage, mut f: impl FnMut(InternalMessage) -> R) -> R { + self.start(); + let in_t = (&msg).into(); + let o = f(msg); + self.stop_and_latency(in_t); + o + } + + #[inline] + pub fn time(&mut self, f: impl FnOnce() -> R) -> R { + self.start(); + let o = f(); + self.stop(); + o + } + + #[inline] + pub fn elapsed(&self) -> Duration { + self.curmsg.elapsed() + } +} + +// Global map of timers +#[allow(dead_code)] +pub static TIMERS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); + +/// Macro to be used when quickly benchmarking some piece of code, should not remain in prod as it +/// is not particularly performant +#[macro_export] +macro_rules! timeit { + ($name:expr, $block:block) => {{ + use bop_common::time::timer::TIMERS; + // Initialize or retrieve the timer + let mut timer = { + let mut timers = TIMERS.lock().unwrap(); + timers.entry($name).or_insert_with(|| bop_common::time::Timer::new($name)).clone() + }; + + // Start timing + timer.start(); + + // Execute the block of code and capture the result + let result = { $block }; + + // Stop timing + timer.stop(); + + // Return the block result + result + }}; +} diff --git a/based/crates/common/src/time/types.rs b/based/crates/common/src/time/types.rs new file mode 100644 index 000000000..80a8634dc --- /dev/null +++ b/based/crates/common/src/time/types.rs @@ -0,0 +1,7 @@ +pub mod duration; +pub mod instant; +pub mod nanos; + +pub use duration::Duration; +pub use instant::Instant; +pub use nanos::Nanos; diff --git a/based/crates/common/src/time/types/duration.rs b/based/crates/common/src/time/types/duration.rs new file mode 100644 index 000000000..6defda915 --- /dev/null +++ b/based/crates/common/src/time/types/duration.rs @@ -0,0 +1,349 @@ +use std::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign}; + +use serde::{Deserialize, Serialize}; + +use super::Nanos; +use crate::time::nanos_for_100; + +#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] +#[repr(C)] +pub struct Duration(pub u64); + +impl Duration { + pub const MAX: Self = Self(u64::MAX); + pub const MIN: Self = Self(0); + pub const ZERO: Self = Self(0); + + #[inline] + pub fn saturating_sub(self, rhs: Duration) -> Self { + Self(self.0.saturating_sub(rhs.0)) + } + + #[inline] + pub fn from_secs(s: u64) -> Self { + Self(s * 100_000_000_000 / nanos_for_100()) + } + + #[inline] + pub fn from_mins(s: u64) -> Self { + Self::from_secs(s * 60) + } + + #[inline] + pub fn from_secs_f64(s: f64) -> Self { + Self::from_secs((s * 1_000_000_000.0).round() as u64) + } + + #[inline] + pub fn from_millis(s: u64) -> Self { + Self(s * 100_000_000 / nanos_for_100()) + } + + #[inline] + pub fn from_micros(s: u64) -> Self { + Self(s * 100_000 / nanos_for_100()) + } + + #[inline] + pub fn from_nanos(s: u64) -> Self { + Self(s * 100 / nanos_for_100()) + } + + #[inline] + pub fn as_secs(&self) -> f64 { + (self.0 * nanos_for_100()) as f64 / 100_000_000_000.0 + } + + #[inline] + pub fn as_millis(&self) -> f64 { + (self.0 * nanos_for_100()) as f64 / 100_000_000.0 + } + + #[inline] + pub fn as_micros(&self) -> f64 { + (self.0 * nanos_for_100()) as f64 / 100_000.0 + } + + pub fn as_micros_u128(&self) -> u128 { + (self.0 * nanos_for_100()) as u128 / 100_000 + } +} + +impl std::fmt::Display for Duration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Nanos::from(*self).fmt(f) + } +} + +impl From for Duration { + fn from(value: u64) -> Self { + Self(value) + } +} + +impl From for u64 { + fn from(value: Duration) -> Self { + value.0 + } +} + +impl Add for Duration { + type Output = Duration; + + #[inline] + fn add(self, rhs: Duration) -> Duration { + Duration(self.0.wrapping_add(rhs.0)) + } +} + +impl AddAssign for Duration { + #[inline] + fn add_assign(&mut self, rhs: Duration) { + *self = *self + rhs; + } +} + +impl Sub for Duration { + type Output = Duration; + + #[inline] + fn sub(self, rhs: Duration) -> Duration { + Duration(self.0.wrapping_sub(rhs.0)) + } +} + +impl SubAssign for Duration { + #[inline] + fn sub_assign(&mut self, rhs: Duration) { + *self = *self - rhs; + } +} + +impl Sub for Duration { + type Output = Duration; + + #[inline] + fn sub(self, rhs: u64) -> Duration { + Duration(self.0.wrapping_sub(rhs)) + } +} + +impl SubAssign for Duration { + #[inline] + fn sub_assign(&mut self, rhs: u64) { + *self = *self - rhs; + } +} + +impl Mul for Duration { + type Output = Duration; + + #[inline] + fn mul(self, rhs: u32) -> Duration { + Duration(self.0 * rhs as u64) + } +} + +impl Mul for Duration { + type Output = Duration; + + #[inline] + fn mul(self, rhs: usize) -> Duration { + Duration(self.0 * rhs as u64) + } +} + +impl Mul for u32 { + type Output = Duration; + + #[inline] + fn mul(self, rhs: Duration) -> Duration { + rhs * self + } +} + +impl MulAssign for Duration { + #[inline] + fn mul_assign(&mut self, rhs: u32) { + *self = *self * rhs; + } +} + +impl Div for Duration { + type Output = Duration; + + #[inline] + fn div(self, rhs: u32) -> Duration { + Duration(self.0 / rhs as u64) + } +} +impl Div for Duration { + type Output = Duration; + + #[inline] + fn div(self, rhs: usize) -> Duration { + Duration(self.0 / rhs as u64) + } +} + +impl DivAssign for Duration { + #[inline] + fn div_assign(&mut self, rhs: u32) { + *self = *self / rhs; + } +} +impl Mul for Duration { + type Output = Duration; + + #[inline] + fn mul(self, rhs: u64) -> Duration { + Duration(self.0 * rhs) + } +} + +impl Mul for u64 { + type Output = Duration; + + #[inline] + fn mul(self, rhs: Duration) -> Duration { + rhs * self + } +} + +impl MulAssign for Duration { + #[inline] + fn mul_assign(&mut self, rhs: u64) { + *self = *self * rhs; + } +} + +impl Div for Duration { + type Output = Duration; + + #[inline] + fn div(self, rhs: u64) -> Duration { + Duration(self.0 / rhs) + } +} + +impl DivAssign for Duration { + #[inline] + fn div_assign(&mut self, rhs: u64) { + *self = *self / rhs; + } +} + +impl PartialEq for Duration { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} +impl Eq for Duration {} + +impl PartialOrd for Duration { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Duration { + #[inline] + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.0.cmp(&other.0) + } +} + +impl From for f64 { + #[inline] + fn from(value: Duration) -> f64 { + value.0 as f64 + } +} + +impl std::iter::Sum for Duration { + #[inline] + fn sum(iter: I) -> Self + where + I: Iterator, + { + Duration(iter.map(|v| v.0).sum()) + } +} +impl<'a> std::iter::Sum<&'a Self> for Duration { + #[inline] + fn sum(iter: I) -> Self + where + I: Iterator, + { + Duration(iter.map(|v| v.0).sum()) + } +} + +impl From for Duration { + #[inline] + fn from(value: u128) -> Self { + Duration(value as u64) + } +} +impl From for Duration { + #[inline] + fn from(value: u32) -> Self { + Duration(value as u64) + } +} +impl From for Duration { + #[inline] + fn from(value: i64) -> Self { + Duration(value as u64) + } +} +impl From for Duration { + #[inline] + fn from(value: i32) -> Self { + Duration(value as u64) + } +} + +impl From for i64 { + #[inline] + fn from(val: Duration) -> Self { + val.0 as i64 + } +} + +impl From for std::time::Duration { + #[inline] + fn from(value: Duration) -> Self { + std::time::Duration::from_nanos(Nanos::from(value).0) + } +} + +impl From for Duration { + #[inline] + fn from(value: std::time::Duration) -> Self { + Self((value.as_nanos() * 100 / nanos_for_100() as u128) as u64) + } +} + +impl From for Duration { + #[inline] + fn from(value: Nanos) -> Self { + Self(value.0 * 100 / nanos_for_100()) + } +} + +impl DivAssign for Duration { + #[inline] + fn div_assign(&mut self, rhs: usize) { + self.0 /= rhs as u64 + } +} + +impl DivAssign for Duration { + #[inline] + fn div_assign(&mut self, rhs: i32) { + self.0 /= rhs as u64 + } +} diff --git a/based/crates/common/src/time/types/instant.rs b/based/crates/common/src/time/types/instant.rs new file mode 100644 index 000000000..9365ace95 --- /dev/null +++ b/based/crates/common/src/time/types/instant.rs @@ -0,0 +1,99 @@ +use std::ops::{Add, AddAssign, Sub}; + +use serde::{Deserialize, Serialize}; + +use super::{Duration, Nanos}; +use crate::time::{global_clock, nanos_for_100, rdtscp}; +// Socket is in the top 2 bits, rdtscp counter in lower 62 +#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] +#[repr(C)] +pub struct Instant(pub u64); +impl Instant { + pub const MAX: Self = Self(u64::MAX); + pub const ZERO: Self = Self(0); + + #[inline] + pub fn now() -> Self { + Instant(rdtscp()) + } + + #[inline] + fn remove_socket(self) -> Self { + Instant(self.0 & 0x3fffffffffffffff) + } + + #[inline] + pub fn same_socket(&self, other: &Self) -> bool { + (self.0 & 0xc000000000000000) == (other.0 & 0xc000000000000000) + } + + #[inline] + pub fn elapsed(&self) -> Duration { + let curt = Instant::now(); + curt.wrapping_sub(self) + } + + #[inline] + pub fn as_delta_nanos(&self) -> Nanos { + Nanos(global_clock().delta_as_nanos(0, self.remove_socket().0)) + } + + #[inline] + pub fn wrapping_sub(&self, other: &Instant) -> Duration { + Duration(self.0.wrapping_sub(other.0)) + } + + #[inline] + pub fn saturating_sub(&self, other: &Instant) -> Duration { + Duration(self.0.saturating_sub(other.0)) + } +} + +impl PartialEq for Instant { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} +impl Eq for Instant {} + +impl PartialOrd for Instant { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Instant { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.0.cmp(&other.0) + } +} + +impl Sub for Instant { + type Output = Duration; + + fn sub(self, rhs: Instant) -> Duration { + Duration(self.0.saturating_sub(rhs.0)) + } +} + +impl Sub for Instant { + type Output = Instant; + + fn sub(self, rhs: Nanos) -> Instant { + Instant(self.0.saturating_sub(rhs.0) / nanos_for_100() * 100) + } +} + +impl Add for Instant { + type Output = Instant; + + fn add(self, rhs: Nanos) -> Self::Output { + Instant(self.0 + rhs.0 / nanos_for_100() * 100) + } +} + +impl AddAssign for Instant { + fn add_assign(&mut self, rhs: Duration) { + self.0 += rhs.0 + } +} diff --git a/based/crates/common/src/time/types/nanos.rs b/based/crates/common/src/time/types/nanos.rs new file mode 100644 index 000000000..fe781bc46 --- /dev/null +++ b/based/crates/common/src/time/types/nanos.rs @@ -0,0 +1,410 @@ +use std::{ + num::ParseIntError, + ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign}, + str::FromStr, + sync::atomic::{AtomicU64, Ordering}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use tracing::warn; + +use super::Duration; +use crate::time::global_clock; + +/// Nanos since unix epoch, good till 2554 I think +#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] +#[repr(C)] +pub struct Nanos(pub u64); + +impl Nanos { + pub const MAX: Nanos = Nanos(u64::MAX); + pub const ZERO: Nanos = Nanos(0); + + pub const fn from_secs(s: u64) -> Self { + Nanos(s * 1_000_000_000) + } + + pub fn from_secs_f64(s: f64) -> Self { + Nanos((s * 1_000_000_000.0).round() as u64) + } + + pub fn from_millis_f64(s: f64) -> Self { + Nanos((s * 1_000_000.0).round() as u64) + } + + pub const fn from_millis(s: u64) -> Self { + Nanos(s * 1_000_000) + } + + pub const fn from_micros(s: u64) -> Self { + Nanos(s * 1_000) + } + + pub const fn from_mins(s: u64) -> Self { + Nanos(s * 60 * 1_000_000_000) + } + + pub const fn from_hours(s: u64) -> Self { + Nanos::from_mins(s * 60) + } + + pub fn from_rfc3339(datetime_str: &str) -> Option { + match chrono::DateTime::parse_from_rfc3339(datetime_str).map(|d| d.timestamp_nanos_opt().map(Self::from)) { + Ok(Some(n)) => Some(n), + Ok(None) => { + warn!("timestamp out of nanoseconds reach, using ingestion time"); + None + } + Err(e) => { + warn!("Couldn't parse timestamp {}: {e}", datetime_str); + None + } + } + } + + pub fn as_secs(&self) -> f64 { + self.0 as f64 / 1_000_000_000.0 + } + + pub fn as_millis(&self) -> f64 { + self.0 as f64 / 1_000_000.0 + } + + pub fn as_millis_u64(&self) -> u64 { + self.0 / 1_000_000 + } + + pub fn as_micros(&self) -> f64 { + self.0 as f64 / 1_000.0 + } + + pub fn now() -> Self { + SystemTime::now().into() + } + + pub fn saturating_sub(self, rhs: Nanos) -> Self { + Self(self.0.saturating_sub(rhs.0)) + } + + pub fn elapsed(&self) -> Self { + let curt = Self::now(); + Nanos(curt.0 - self.0) + } +} + +impl From for chrono::DateTime { + fn from(value: Nanos) -> Self { + chrono::DateTime::from_timestamp_nanos(value.0 as i64) + } +} + +impl std::fmt::Display for Nanos { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if *self < Nanos::from_micros(1) { + write!(f, "{}ns", self.0) + } else if *self < Nanos::from_millis(1) { + write!(f, "{}μs", self.0 as f64 / 1000.0) + } else if *self < Nanos::from_secs(1) { + write!(f, "{}ms", (self.0 / 1000) as f64 / 1000.0) + } else if *self < Nanos::from_mins(1) { + write!(f, "{}s", (self.0 / 1_000_000) as f64 / 1000.0) + } else if *self < Nanos::from_hours(1) { + let min = self.0 / Nanos::from_mins(1).0; + let s = *self - Nanos::from_mins(min); + write!(f, "{}m:{}", min, s) + } else if *self < Nanos::from_hours(24) { + let hours = self.0 / Nanos::from_hours(1).0; + let min = *self - Nanos::from_hours(hours); + write!(f, "{}h:{}", hours, min) + } else { + write!(f, "{}", chrono::DateTime::::from(*self)) + } + } +} + +impl From for u64 { + fn from(value: Nanos) -> Self { + value.0 + } +} + +impl Add for Nanos { + type Output = Nanos; + + fn add(self, rhs: Nanos) -> Nanos { + Nanos(self.0.wrapping_add(rhs.0)) + } +} + +impl AddAssign for Nanos { + fn add_assign(&mut self, rhs: Nanos) { + *self = *self + rhs; + } +} + +impl Sub for Nanos { + type Output = Nanos; + + fn sub(self, rhs: Nanos) -> Nanos { + Nanos(self.0.wrapping_sub(rhs.0)) + } +} + +impl SubAssign for Nanos { + fn sub_assign(&mut self, rhs: Nanos) { + *self = *self - rhs; + } +} + +impl Sub for Nanos { + type Output = Nanos; + + fn sub(self, rhs: u64) -> Nanos { + Nanos(self.0.wrapping_sub(rhs)) + } +} + +impl SubAssign for Nanos { + fn sub_assign(&mut self, rhs: u64) { + *self = *self - rhs; + } +} + +impl Mul for Nanos { + type Output = Nanos; + + fn mul(self, rhs: u32) -> Nanos { + Nanos(self.0 * rhs as u64) + } +} + +impl Mul for Nanos { + type Output = Nanos; + + fn mul(self, rhs: i32) -> Nanos { + Nanos(self.0 * rhs as u64) + } +} + +impl Mul for u32 { + type Output = Nanos; + + fn mul(self, rhs: Nanos) -> Nanos { + rhs * self + } +} +impl Mul for i32 { + type Output = Nanos; + + fn mul(self, rhs: Nanos) -> Nanos { + rhs * self + } +} + +impl MulAssign for Nanos { + fn mul_assign(&mut self, rhs: u32) { + *self = *self * rhs; + } +} + +impl Div for Nanos { + type Output = Nanos; + + fn div(self, rhs: u32) -> Nanos { + Nanos(self.0 / rhs as u64) + } +} +impl Div for Nanos { + type Output = Nanos; + + fn div(self, rhs: usize) -> Nanos { + Nanos(self.0 / rhs as u64) + } +} + +impl DivAssign for Nanos { + fn div_assign(&mut self, rhs: u32) { + *self = *self / rhs; + } +} +impl Mul for Nanos { + type Output = Nanos; + + fn mul(self, rhs: u64) -> Nanos { + Nanos(self.0 * rhs) + } +} + +impl Mul for u64 { + type Output = Nanos; + + fn mul(self, rhs: Nanos) -> Nanos { + rhs * self + } +} + +impl Mul for Nanos { + type Output = Nanos; + + fn mul(self, rhs: Nanos) -> Nanos { + Nanos(rhs.0 * self.0) + } +} + +impl MulAssign for Nanos { + fn mul_assign(&mut self, rhs: u64) { + *self = *self * rhs; + } +} + +impl Div for Nanos { + type Output = Nanos; + + fn div(self, rhs: u64) -> Nanos { + Nanos(self.0 / rhs) + } +} + +impl DivAssign for Nanos { + fn div_assign(&mut self, rhs: u64) { + *self = *self / rhs; + } +} + +impl Div for Nanos { + type Output = Nanos; + + fn div(self, rhs: Nanos) -> Nanos { + Nanos(self.0 / rhs.0) + } +} + +impl DivAssign for Nanos { + fn div_assign(&mut self, rhs: Nanos) { + self.0 /= rhs.0 + } +} + +impl PartialEq for Nanos { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} +impl Eq for Nanos {} + +impl PartialOrd for Nanos { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Nanos { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.0.cmp(&other.0) + } +} + +impl std::iter::Sum for Nanos { + fn sum(iter: I) -> Self + where + I: Iterator, + { + Nanos(iter.map(|v| v.0).sum()) + } +} +impl<'a> std::iter::Sum<&'a Self> for Nanos { + fn sum(iter: I) -> Self + where + I: Iterator, + { + Nanos(iter.map(|v| v.0).sum()) + } +} + +impl From for Nanos { + fn from(value: u64) -> Self { + Nanos(value) + } +} +impl From for Nanos { + fn from(value: u128) -> Self { + Nanos(value as u64) + } +} +impl From for Nanos { + fn from(value: u32) -> Self { + Nanos(value as u64) + } +} +impl From for Nanos { + fn from(value: i64) -> Self { + Nanos(value as u64) + } +} +impl From for Nanos { + fn from(value: i32) -> Self { + Nanos(value as u64) + } +} + +impl From for i64 { + fn from(val: Nanos) -> Self { + val.0 as i64 + } +} + +impl From for Nanos { + fn from(value: SystemTime) -> Self { + Nanos(unsafe { + value.duration_since(UNIX_EPOCH).unwrap_unchecked().as_nanos() as u64 - + TIME_OFFSET_NANOS.load(Ordering::Relaxed) + }) + } +} + +impl FromStr for Nanos { + type Err = ParseIntError; + + fn from_str(s: &str) -> Result { + if !s.contains("ns") { + s.parse::().map(Nanos) + } else { + let len = s.len(); + s[..len - 2].parse::().map(Nanos) + } + } +} + +impl From for Nanos { + fn from(value: Duration) -> Self { + Nanos(global_clock().delta_as_nanos(0, value.0)) + } +} + +impl From for std::time::Duration { + fn from(value: Nanos) -> Self { + std::time::Duration::from_nanos(value.0) + } +} + +/// Sets a spoof 'now' time - this adds an offset to the reported system time in order to allow +/// time travel. +static TIME_OFFSET_NANOS: AtomicU64 = AtomicU64::new(0); +pub fn set_time_delta(duration_since_epoch: std::time::Duration) { + let since_epoch_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + assert!(since_epoch_now > duration_since_epoch); + let delta_nanos = (since_epoch_now.as_nanos() - duration_since_epoch.as_nanos()) as u64; + TIME_OFFSET_NANOS.store(delta_nanos, Ordering::Relaxed); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serde() { + assert_eq!("1", serde_json::to_string(&Nanos(1)).unwrap()); + } +} diff --git a/based/crates/common/src/time/utils.rs b/based/crates/common/src/time/utils.rs new file mode 100644 index 000000000..63a62d246 --- /dev/null +++ b/based/crates/common/src/time/utils.rs @@ -0,0 +1,91 @@ +use std::sync::atomic::{AtomicBool, Ordering}; + +use crate::time::{Duration, Instant}; + +#[inline(always)] +pub fn vsync_busy(duration: Option, f: F) -> R +where + F: FnOnce() -> R, +{ + match duration { + Some(duration) if duration != Duration(0) => { + let start_t = Instant::now(); + let out = f(); + while start_t.elapsed() < duration {} + out + } + _ => f(), + } +} + +#[inline] +pub fn vsync(duration: Option, f: F) -> R +where + F: FnOnce() -> R, +{ + match duration { + Some(duration) if duration != Duration(0) => { + let start_t = Instant::now(); + let out = f(); + let el = start_t.elapsed(); + if el < duration { + std::thread::sleep((duration - el).into()) + } + out + } + _ => f(), + } +} + +#[inline] +pub fn renderloop_60_fps(mut f: impl FnMut() -> bool) { + while vsync(Some(Duration::from_millis(16)), &mut f) {} +} + +#[inline] +pub fn vsync_with_cancel(duration: Option, cancel: &std::sync::Arc, f: F) -> R +where + F: FnOnce() -> R, +{ + let sleep_before_cancelcheck = std::time::Duration::from_millis(1); + match duration { + Some(duration) if duration != Duration(0) => { + let start_t = Instant::now(); + let out = f(); + let el = start_t.elapsed(); + while el < duration && !cancel.load(Ordering::Relaxed) { + std::thread::sleep(sleep_before_cancelcheck); + } + out + } + _ => f(), + } +} + +// TODO: does match statement impact performance? Thesis: no because of branch predicting +#[inline] +pub fn busy_sleep(duration: Option) { + match duration { + None => (), + Some(duration) if duration == Duration::ZERO => (), + Some(duration) => { + let curt = Instant::now(); + while curt.elapsed() < duration {} + } + } +} + +#[inline] +pub fn sleep(duration: Duration) { + if duration == Duration::ZERO { + std::thread::sleep(duration.into()) + } +} + +#[inline] +pub fn timeit(msg: &str, f: impl FnOnce() -> O) -> O { + let curt = Instant::now(); + let o = f(); + println!("Timing result: {msg} took {}", curt.elapsed()); + o +}