Skip to content

Commit 9878597

Browse files
authored
refactor: expose internal error for hybrid cache (#1060)
1 parent 8d58728 commit 9878597

File tree

10 files changed

+151
-42
lines changed

10 files changed

+151
-42
lines changed

examples/hybrid_full.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,16 @@
1414

1515
use std::{hash::BuildHasherDefault, num::NonZeroUsize, sync::Arc};
1616

17-
use anyhow::Result;
1817
use chrono::Datelike;
1918
use foyer::{
2019
AdmitAllPicker, DirectFsDeviceOptions, Engine, FifoPicker, HybridCache, HybridCacheBuilder, HybridCachePolicy,
21-
IopsCounter, LargeEngineOptions, LruConfig, RecoverMode, RejectAllPicker, RuntimeOptions, SmallEngineOptions,
22-
Throttle, TokioRuntimeOptions, TombstoneLogConfigBuilder,
20+
IopsCounter, LargeEngineOptions, LruConfig, RecoverMode, RejectAllPicker, Result, RuntimeOptions,
21+
SmallEngineOptions, Throttle, TokioRuntimeOptions, TombstoneLogConfigBuilder,
2322
};
2423
use tempfile::tempdir;
2524

2625
#[tokio::main]
27-
async fn main() -> Result<()> {
26+
async fn main() -> anyhow::Result<()> {
2827
let dir = tempdir()?;
2928

3029
let hybrid: HybridCache<u64, String> = HybridCacheBuilder::new()
@@ -110,7 +109,8 @@ async fn main() -> Result<()> {
110109
async fn mock() -> Result<String> {
111110
let now = chrono::Utc::now();
112111
if format!("{}{}{}", now.year(), now.month(), now.day()) == "20230512" {
113-
return Err(anyhow::anyhow!("Hi, time traveler!"));
112+
let e: Box<dyn std::error::Error + Send + Sync + 'static> = "Hi, time traveler!".into();
113+
return Err(e.into());
114114
}
115115
Ok("Hello, foyer.".to_string())
116116
}

foyer-memory/src/cache.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ use foyer_common::{
2626
use mixtrics::{metrics::BoxedRegistry, registry::noop::NoopMetricsRegistry};
2727
use pin_project::pin_project;
2828
use serde::{Deserialize, Serialize};
29-
use tokio::sync::oneshot;
3029

3130
use crate::{
31+
error::Error,
3232
eviction::{
3333
fifo::{Fifo, FifoConfig},
3434
lfu::{Lfu, LfuConfig},
@@ -945,7 +945,7 @@ impl<K, V, ER, S, P> Future for Fetch<K, V, ER, S, P>
945945
where
946946
K: Key,
947947
V: Value,
948-
ER: From<oneshot::error::RecvError>,
948+
ER: From<Error>,
949949
S: HashBuilder,
950950
P: Properties,
951951
{
@@ -1165,7 +1165,7 @@ mod tests {
11651165
let entry = cache
11661166
.fetch(i, || async move {
11671167
tokio::time::sleep(Duration::from_micros(10)).await;
1168-
Ok::<_, tokio::sync::oneshot::error::RecvError>(i)
1168+
Ok::<_, Error>(i)
11691169
})
11701170
.await
11711171
.unwrap();

foyer-memory/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,21 @@ pub enum Error {
2323
/// Config error.
2424
#[error("config error: {0}")]
2525
ConfigError(String),
26+
/// Wait error.
27+
#[error("wait for concurrent fetch result error: {0}")]
28+
Wait(Box<dyn std::error::Error + Send + Sync + 'static>),
2629
}
2730

2831
impl Error {
2932
/// Combine multiple errors into one error.
3033
pub fn multiple(errs: Vec<Error>) -> Self {
3134
Self::Multiple(MultipleError(errs))
3235
}
36+
37+
/// Error on waiting for concurrrent fetch result.
38+
pub fn wait(err: impl std::error::Error + Send + Sync + 'static) -> Self {
39+
Self::Wait(Box::new(err))
40+
}
3341
}
3442

3543
#[derive(thiserror::Error, Debug)]

foyer-memory/src/raw.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ where
988988
impl<E, ER, S, I> Future for RawFetchInner<E, ER, S, I>
989989
where
990990
E: Eviction,
991-
ER: From<oneshot::error::RecvError>,
991+
ER: From<Error>,
992992
S: HashBuilder,
993993
I: Indexer<Eviction = E>,
994994
{
@@ -997,7 +997,7 @@ where
997997
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
998998
match self.project() {
999999
RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()),
1000-
RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|err| err.into()).map(Diversion::from),
1000+
RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|e| Error::wait(e).into()).map(Diversion::from),
10011001
RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()),
10021002
}
10031003
}

foyer/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ strict_assertions = [
3434
]
3535

3636
[dependencies]
37-
anyhow = { workspace = true }
3837
equivalent = { workspace = true }
3938
fastrace = { workspace = true, optional = true }
4039
foyer-common = { workspace = true }
@@ -43,6 +42,7 @@ foyer-storage = { workspace = true }
4342
mixtrics = { workspace = true }
4443
pin-project = { workspace = true }
4544
serde = { workspace = true }
45+
thiserror = { workspace = true }
4646
tracing = { workspace = true }
4747

4848
[target.'cfg(madsim)'.dependencies]

foyer/src/hybrid/builder.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ use foyer_memory::{Cache, CacheBuilder, EvictionConfig, Weighter};
2525
use foyer_storage::{AdmissionPicker, Compression, DeviceOptions, Engine, RecoverMode, RuntimeOptions, StoreBuilder};
2626
use mixtrics::{metrics::BoxedRegistry, registry::noop::NoopMetricsRegistry};
2727

28-
use super::cache::{HybridCacheOptions, HybridCachePipe};
29-
use crate::{HybridCache, HybridCachePolicy, HybridCacheProperties};
28+
use crate::hybrid::{
29+
cache::{HybridCache, HybridCacheOptions, HybridCachePipe, HybridCachePolicy, HybridCacheProperties},
30+
error::Result,
31+
};
3032

3133
/// Hybrid cache builder.
3234
pub struct HybridCacheBuilder<K, V> {
@@ -304,7 +306,7 @@ where
304306
}
305307

306308
/// Build and open the hybrid cache with the given configurations.
307-
pub async fn build(self) -> anyhow::Result<HybridCache<K, V, S>> {
309+
pub async fn build(self) -> Result<HybridCache<K, V, S>> {
308310
let builder = self.builder;
309311

310312
let piped = !builder.is_noop() && self.options.policy == HybridCachePolicy::WriteOnEviction;

foyer/src/hybrid/cache.rs

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@ use foyer_memory::{Cache, CacheEntry, Fetch, FetchContext, FetchState, Piece, Pi
4242
use foyer_storage::{IoThrottler, Load, Statistics, Store};
4343
use pin_project::pin_project;
4444
use serde::{Deserialize, Serialize};
45-
use tokio::sync::oneshot;
4645

47-
use super::writer::HybridCacheStorageWriter;
48-
use crate::{HybridCacheBuilder, HybridCacheWriter};
46+
use crate::hybrid::{
47+
builder::HybridCacheBuilder,
48+
error::{Error, Result},
49+
writer::{HybridCacheStorageWriter, HybridCacheWriter},
50+
};
4951

5052
#[cfg(feature = "tracing")]
5153
macro_rules! root_span {
@@ -316,7 +318,7 @@ where
316318
memory: Cache<K, V, S, HybridCacheProperties>,
317319
storage: Store<K, V, S, HybridCacheProperties>,
318320
flush_on_close: bool,
319-
) -> anyhow::Result<()> {
321+
) -> Result<()> {
320322
if closed.fetch_or(true, Ordering::Relaxed) {
321323
return Ok(());
322324
}
@@ -333,7 +335,7 @@ where
333335
Ok(())
334336
}
335337

336-
async fn close(&self) -> anyhow::Result<()> {
338+
async fn close(&self) -> Result<()> {
337339
Self::close_inner(
338340
self.closed.clone(),
339341
self.memory.clone(),
@@ -571,7 +573,7 @@ where
571573
}
572574

573575
/// Get cached entry with the given key from the hybrid cache.
574-
pub async fn get<Q>(&self, key: &Q) -> anyhow::Result<Option<HybridCacheEntry<K, V, S>>>
576+
pub async fn get<Q>(&self, key: &Q) -> Result<Option<HybridCacheEntry<K, V, S>>>
575577
where
576578
Q: Hash + Equivalent<K> + Send + Sync + 'static + Clone,
577579
{
@@ -650,7 +652,7 @@ where
650652
///
651653
/// `obtain` is always supposed to be used instead of `get` if the overhead of getting the ownership of the given
652654
/// key is acceptable.
653-
pub async fn obtain(&self, key: K) -> anyhow::Result<Option<HybridCacheEntry<K, V, S>>>
655+
pub async fn obtain(&self, key: K) -> Result<Option<HybridCacheEntry<K, V, S>>>
654656
where
655657
K: Clone,
656658
{
@@ -667,7 +669,7 @@ where
667669
|| {
668670
let store = self.inner.storage.clone();
669671
async move {
670-
match store.load(&key).await.map_err(anyhow::Error::from) {
672+
match store.load(&key).await.map_err(Error::from) {
671673
Ok(Load::Entry {
672674
key: _,
673675
value,
@@ -681,7 +683,7 @@ where
681683
},
682684
Ok(Load::Throttled) => Err(ObtainFetchError::Throttled).into(),
683685
Ok(Load::Miss) => Err(ObtainFetchError::NotExist).into(),
684-
Err(e) => Err(ObtainFetchError::Err(e)).into(),
686+
Err(e) => Err(ObtainFetchError::Other(e)).into(),
685687
}
686688
}
687689
},
@@ -720,11 +722,7 @@ where
720722
try_cancel!(self, span, record_hybrid_obtain_threshold);
721723
Ok(None)
722724
}
723-
Err(ObtainFetchError::RecvError(_)) => {
724-
try_cancel!(self, span, record_hybrid_obtain_threshold);
725-
Ok(None)
726-
}
727-
Err(ObtainFetchError::Err(e)) => {
725+
Err(ObtainFetchError::Other(e)) => {
728726
try_cancel!(self, span, record_hybrid_obtain_threshold);
729727
Err(e)
730728
}
@@ -766,7 +764,7 @@ where
766764
}
767765

768766
/// Clear the hybrid cache.
769-
pub async fn clear(&self) -> anyhow::Result<()> {
767+
pub async fn clear(&self) -> Result<()> {
770768
self.inner.memory.clear();
771769
self.inner.storage.destroy().await?;
772770
Ok(())
@@ -781,7 +779,7 @@ where
781779
/// For more details, please refer to [`super::builder::HybridCacheBuilder::with_flush_on_close()`].
782780
///
783781
/// If `close` is not called explicitly, the hybrid cache will be closed when its last copy is dropped.
784-
pub async fn close(&self) -> anyhow::Result<()> {
782+
pub async fn close(&self) -> Result<()> {
785783
self.inner.close().await
786784
}
787785

@@ -819,13 +817,12 @@ where
819817
enum ObtainFetchError {
820818
Throttled,
821819
NotExist,
822-
RecvError(oneshot::error::RecvError),
823-
Err(anyhow::Error),
820+
Other(Error),
824821
}
825822

826-
impl From<oneshot::error::RecvError> for ObtainFetchError {
827-
fn from(e: oneshot::error::RecvError) -> Self {
828-
Self::RecvError(e)
823+
impl From<foyer_memory::Error> for ObtainFetchError {
824+
fn from(e: foyer_memory::Error) -> Self {
825+
Self::Other(e.into())
829826
}
830827
}
831828

@@ -846,7 +843,7 @@ where
846843
S: HashBuilder + Debug,
847844
{
848845
#[pin]
849-
inner: Fetch<K, V, anyhow::Error, S, HybridCacheProperties>,
846+
inner: Fetch<K, V, Error, S, HybridCacheProperties>,
850847
policy: HybridCachePolicy,
851848
storage: Store<K, V, S, HybridCacheProperties>,
852849
}
@@ -857,7 +854,7 @@ where
857854
V: StorageValue,
858855
S: HashBuilder + Debug,
859856
{
860-
type Output = anyhow::Result<CacheEntry<K, V, S, HybridCacheProperties>>;
857+
type Output = Result<CacheEntry<K, V, S, HybridCacheProperties>>;
861858

862859
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
863860
let mut this = self.project();
@@ -885,7 +882,7 @@ where
885882
V: StorageValue,
886883
S: HashBuilder + Debug,
887884
{
888-
type Target = Fetch<K, V, anyhow::Error, S, HybridCacheProperties>;
885+
type Target = Fetch<K, V, Error, S, HybridCacheProperties>;
889886

890887
fn deref(&self) -> &Self::Target {
891888
&self.inner
@@ -905,7 +902,7 @@ where
905902
pub fn fetch<F, FU>(&self, key: K, fetch: F) -> HybridFetch<K, V, S>
906903
where
907904
F: FnOnce() -> FU,
908-
FU: Future<Output = anyhow::Result<V>> + Send + 'static,
905+
FU: Future<Output = Result<V>> + Send + 'static,
909906
{
910907
self.fetch_inner(key, HybridCacheProperties::default(), fetch)
911908
}
@@ -922,15 +919,15 @@ where
922919
) -> HybridFetch<K, V, S>
923920
where
924921
F: FnOnce() -> FU,
925-
FU: Future<Output = anyhow::Result<V>> + Send + 'static,
922+
FU: Future<Output = Result<V>> + Send + 'static,
926923
{
927924
self.fetch_inner(key, properties, fetch)
928925
}
929926

930927
fn fetch_inner<F, FU>(&self, key: K, properties: HybridCacheProperties, fetch: F) -> HybridFetch<K, V, S>
931928
where
932929
F: FnOnce() -> FU,
933-
FU: Future<Output = anyhow::Result<V>> + Send + 'static,
930+
FU: Future<Output = Result<V>> + Send + 'static,
934931
{
935932
root_span!(self, span, "foyer::hybrid::cache::fetch");
936933

@@ -950,7 +947,7 @@ where
950947
let runtime = self.storage().runtime().clone();
951948

952949
async move {
953-
let throttled = match store.load(&key).await.map_err(anyhow::Error::from) {
950+
let throttled = match store.load(&key).await.map_err(Error::from) {
954951
Ok(Load::Entry {
955952
key: _,
956953
value,

0 commit comments

Comments
 (0)