Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 321 additions & 20 deletions based/Cargo.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions based/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,18 @@ toml = "0.8.19"
eyre = "0.6.12"
uuid = { version = "1.12.1", features = ["v4"] }
thiserror = "2.0.11"

lazy_static = "1.5.0"
once_cell = "1.19.0"
directories = "5.0.1"

# Time
chrono = "0.4.23"
quanta = "0.12.3"

# ipc
shared_memory = "^0.12"

# threading
core_affinity = "0.8.1"
crossbeam-channel = "0.5.13"
10 changes: 10 additions & 0 deletions based/crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@ edition = "2021"
[dependencies]

tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true

serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
eyre.workspace = true
once_cell.workspace = true
chrono.workspace = true
quanta.workspace = true
directories.workspace = true
shared_memory.workspace = true

crossbeam-channel.workspace = true
78 changes: 78 additions & 0 deletions based/crates/common/src/communication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::{fs::read_dir, path::Path};

use shared_memory::ShmemError;
use thiserror::Error;

pub mod queue;
pub mod seqlock;
pub use queue::{Consumer, Producer, Queue};
pub use seqlock::Seqlock;
pub mod internal_message;

pub use internal_message::InternalMessage;

pub type Sender<T> = crossbeam_channel::Sender<InternalMessage<T>>;
pub type Receiver<T> = crossbeam_channel::Sender<InternalMessage<T>>;

#[derive(Error, Debug, Copy, Clone, PartialEq)]
pub enum ReadError {
#[error("Got sped past")]
SpedPast,
#[error("Lock empty")]
Empty,
}

#[derive(Error, Debug)]
#[repr(u8)]
pub enum Error {
#[error("Queue not initialized")]
UnInitialized,
#[error("Queue length not power of two")]
LengthNotPowerOfTwo,
#[error("Element size not power of two - 4")]
ElementSizeNotPowerTwo,
#[error("Shared memory file does not exist")]
NonExistingFile,
#[error("Preexisting shared memory too small")]
TooSmall,
#[error("Shmem error")]
ShmemError(#[from] ShmemError),
}

pub fn clear_shmem<P: AsRef<Path>>(path: P) {
let path = path.as_ref();
if !path.exists() {
return;
}
let Ok(mut shmem) = shared_memory::ShmemConf::new().flink(path).open() else {
return;
};
shmem.set_owner(true);
std::fs::remove_file(path).expect("couldn't remove file");
}

pub fn queues_dir_string() -> String {
let queues_dir =
directories::BaseDirs::new().expect("Couldn't retrieve home dir").data_dir().join("builder/queues");
queues_dir.to_string_lossy().to_string()
}

pub fn verify_or_remove_queue_files() {
let queues_dir =
directories::BaseDirs::new().expect("Couldn't retrieve home dir").data_dir().join("builder/queues");
if queues_dir.is_file() {
let _ = std::fs::remove_file(&queues_dir);
let _ = std::fs::create_dir_all(queues_dir.as_path());
return;
}
let Ok(files) = read_dir(&queues_dir) else {
let _ = std::fs::create_dir_all(queues_dir.as_path());
return;
};
for f in files.filter_map(|t| t.ok()) {
if shared_memory::ShmemConf::new().flink(f.path()).open().is_err() {
tracing::warn!("couldn't open shmem at {:?} so removing it to be recreated later", f.path());
let _ = std::fs::remove_file(f.path());
}
}
}
137 changes: 137 additions & 0 deletions based/crates/common/src/communication/internal_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::ops::{Deref, DerefMut};

use serde::{Deserialize, Serialize};

use crate::time::{Duration, IngestionTime, Instant, Nanos};

#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Serialize, Deserialize, Default)]
pub struct InternalMessage<T> {
ingestion_t: IngestionTime,
data: T,
}

impl<T> InternalMessage<T> {
#[inline]
pub fn new(ingestion_t: IngestionTime, data: T) -> Self {
Self { ingestion_t, data }
}

#[inline]
pub fn with_data<D>(&self, data: D) -> InternalMessage<D> {
InternalMessage::new(self.ingestion_t, data)
}

#[inline]
pub fn data(&self) -> &T {
&self.data
}

#[inline]
pub fn into_data(self) -> T {
self.data
}

#[inline]
pub fn map<R>(self, f: impl FnOnce(T) -> R) -> InternalMessage<R> {
InternalMessage { ingestion_t: self.ingestion_t, data: f(self.data) }
}

#[inline]
pub fn map_ref<R>(&self, f: impl FnOnce(&T) -> R) -> InternalMessage<R> {
InternalMessage { ingestion_t: self.ingestion_t, data: f(&self.data) }
}

#[inline]
pub fn unpack(self) -> (IngestionTime, T) {
(self.ingestion_t, self.data)
}

/// This is only useful within the same socket as the original tsamp
#[inline]
pub fn elapsed(&self) -> Duration {
self.ingestion_t.internal().elapsed()
}

/// These are real nanos since unix epoc
#[inline]
pub fn elapsed_nanos(&self) -> Nanos {
self.ingestion_t.real().elapsed()
}

#[inline]
pub fn ingestion_time(&self) -> IngestionTime {
self.ingestion_t
}
}

impl<T> From<InternalMessage<T>> for (IngestionTime, T) {
#[inline]
fn from(value: InternalMessage<T>) -> Self {
value.unpack()
}
}

impl<T> From<T> for InternalMessage<T> {
#[inline]
fn from(value: T) -> Self {
Self::new(IngestionTime::now(), value)
}
}

impl<T> Deref for InternalMessage<T> {
type Target = T;

#[inline]
fn deref(&self) -> &Self::Target {
&self.data
}
}

impl<T> DerefMut for InternalMessage<T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}

impl<T> From<&InternalMessage<T>> for IngestionTime {
#[inline]
fn from(value: &InternalMessage<T>) -> Self {
value.ingestion_t
}
}

impl<T> AsRef<IngestionTime> for InternalMessage<T> {
#[inline]
fn as_ref(&self) -> &IngestionTime {
&self.ingestion_t
}
}

impl<T> From<&InternalMessage<T>> for Instant {
#[inline]
fn from(value: &InternalMessage<T>) -> Self {
value.ingestion_t.into()
}
}

impl<T> From<&InternalMessage<T>> for Nanos {
#[inline]
fn from(value: &InternalMessage<T>) -> Self {
value.ingestion_t.into()
}
}

impl<T> From<InternalMessage<T>> for Instant {
#[inline]
fn from(value: InternalMessage<T>) -> Self {
value.ingestion_t.into()
}
}

impl<T> From<InternalMessage<T>> for Nanos {
#[inline]
fn from(value: InternalMessage<T>) -> Self {
value.ingestion_t.into()
}
}
Loading
Loading