Skip to content

Commit 8d58728

Browse files
authored
fix: move some storage metrics to store instead of each engine (#1057)
1 parent b55dba1 commit 8d58728

File tree

4 files changed

+66
-38
lines changed

4 files changed

+66
-38
lines changed

foyer-common/src/metrics.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ pub struct Metrics {
3636
pub storage_enqueue: BoxedCounter,
3737
pub storage_hit: BoxedCounter,
3838
pub storage_miss: BoxedCounter,
39+
pub storage_throttled: BoxedCounter,
3940
pub storage_delete: BoxedCounter,
4041
pub storage_error: BoxedCounter,
4142

4243
pub storage_enqueue_duration: BoxedHistogram,
4344
pub storage_hit_duration: BoxedHistogram,
4445
pub storage_miss_duration: BoxedHistogram,
46+
pub storage_throttled_duration: BoxedHistogram,
4547
pub storage_delete_duration: BoxedHistogram,
4648

4749
pub storage_queue_rotate: BoxedCounter,
@@ -217,11 +219,13 @@ impl Metrics {
217219
let storage_hit = foyer_storage_op_total.counter(&[name.clone(), "hit".into()]);
218220
let storage_miss = foyer_storage_op_total.counter(&[name.clone(), "miss".into()]);
219221
let storage_delete = foyer_storage_op_total.counter(&[name.clone(), "delete".into()]);
222+
let storage_throttled = foyer_storage_op_total.counter(&[name.clone(), "throttled".into()]);
220223
let storage_error = foyer_storage_op_total.counter(&[name.clone(), "error".into()]);
221224

222225
let storage_enqueue_duration = foyer_storage_op_duration.histogram(&[name.clone(), "enqueue".into()]);
223226
let storage_hit_duration = foyer_storage_op_duration.histogram(&[name.clone(), "hit".into()]);
224227
let storage_miss_duration = foyer_storage_op_duration.histogram(&[name.clone(), "miss".into()]);
228+
let storage_throttled_duration = foyer_storage_op_duration.histogram(&[name.clone(), "throttled".into()]);
225229
let storage_delete_duration = foyer_storage_op_duration.histogram(&[name.clone(), "delete".into()]);
226230

227231
let storage_queue_rotate = foyer_storage_inner_op_total.counter(&[name.clone(), "queue_rotate".into()]);
@@ -303,11 +307,13 @@ impl Metrics {
303307
storage_enqueue,
304308
storage_hit,
305309
storage_miss,
310+
storage_throttled,
306311
storage_delete,
307312
storage_error,
308313
storage_enqueue_duration,
309314
storage_hit_duration,
310315
storage_miss_duration,
316+
storage_throttled_duration,
311317
storage_delete_duration,
312318
storage_queue_rotate,
313319
storage_queue_rotate_duration,

foyer-storage/src/large/generic.rs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,6 @@ where
355355
fn load(&self, hash: u64) -> impl Future<Output = Result<Load<K, V>>> + Send + 'static {
356356
tracing::trace!(hash, "[lodc]: load");
357357

358-
let now = Instant::now();
359-
360358
let indexer = self.inner.indexer.clone();
361359
let metrics = self.inner.metrics.clone();
362360
let region_manager = self.inner.region_manager.clone();
@@ -365,8 +363,6 @@ where
365363
let addr = match indexer.get(hash) {
366364
Some(addr) => addr,
367365
None => {
368-
metrics.storage_miss.increase(1);
369-
metrics.storage_miss_duration.record(now.elapsed().as_secs_f64());
370366
return Ok(Load::Miss);
371367
}
372368
};
@@ -381,13 +377,10 @@ where
381377
Err(e @ Error::InvalidIoRange { .. }) => {
382378
tracing::warn!(?e, "[lodc load]: invalid io range, remove this entry and skip");
383379
indexer.remove(hash);
384-
metrics.storage_miss.increase(1);
385-
metrics.storage_miss_duration.record(now.elapsed().as_secs_f64());
386380
return Ok(Load::Miss);
387381
}
388382
Err(e) => {
389383
tracing::error!(hash, ?addr, ?e, "[lodc load]: load error");
390-
metrics.storage_error.increase(1);
391384
return Err(e);
392385
}
393386
}
@@ -406,13 +399,10 @@ where
406399
"[lodc load]: deserialize read buffer raise error, remove this entry and skip"
407400
);
408401
indexer.remove(hash);
409-
metrics.storage_miss.increase(1);
410-
metrics.storage_miss_duration.record(now.elapsed().as_secs_f64());
411402
return Ok(Load::Miss);
412403
}
413404
Err(e) => {
414405
tracing::error!(hash, ?addr, ?e, "[lodc load]: load error");
415-
metrics.storage_error.increase(1);
416406
return Err(e);
417407
}
418408
};
@@ -439,14 +429,10 @@ where
439429
"[lodc load]: deserialize read buffer raise error, remove this entry and skip"
440430
);
441431
indexer.remove(hash);
442-
metrics.storage_miss.increase(1);
443-
metrics.storage_miss_duration.record(now.elapsed().as_secs_f64());
444-
metrics.storage_error.increase(1);
445432
return Ok(Load::Miss);
446433
}
447434
Err(e) => {
448435
tracing::error!(hash, ?addr, ?header, ?e, "[lodc load]: load error");
449-
metrics.storage_error.increase(1);
450436
return Err(e);
451437
}
452438
};
@@ -456,9 +442,6 @@ where
456442
res
457443
};
458444

459-
metrics.storage_hit.increase(1);
460-
metrics.storage_hit_duration.record(now.elapsed().as_secs_f64());
461-
462445
let age = match region.statistics().probation.load(Ordering::Relaxed) {
463446
true => Age::Old,
464447
false => Age::Young,
@@ -476,8 +459,6 @@ where
476459
}
477460

478461
fn delete(&self, hash: u64) {
479-
let now = Instant::now();
480-
481462
if !self.inner.active.load(Ordering::Relaxed) {
482463
tracing::warn!("cannot delete entry after closed");
483464
return;
@@ -500,12 +481,6 @@ where
500481
stats,
501482
});
502483
});
503-
504-
self.inner.metrics.storage_delete.increase(1);
505-
self.inner
506-
.metrics
507-
.storage_miss_duration
508-
.record(now.elapsed().as_secs_f64());
509484
}
510485

511486
fn may_contains(&self, hash: u64) -> bool {

foyer-storage/src/small/generic.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ where
9292

9393
active: AtomicBool,
9494

95-
metrics: Arc<Metrics>,
95+
_metrics: Arc<Metrics>,
9696
_runtime: Runtime,
9797
}
9898

@@ -155,7 +155,7 @@ where
155155
device: config.device,
156156
set_manager,
157157
active: AtomicBool::new(true),
158-
metrics,
158+
_metrics: metrics,
159159
_runtime: config.runtime,
160160
};
161161
let inner = Arc::new(inner);
@@ -189,15 +189,13 @@ where
189189

190190
fn load(&self, hash: u64) -> impl Future<Output = Result<Load<K, V>>> + Send + 'static {
191191
let set_manager = self.inner.set_manager.clone();
192-
let metrics = self.inner.metrics.clone();
193192

194193
async move {
195194
set_manager
196195
.load(hash)
197196
.await
198197
.inspect_err(|e| {
199198
tracing::error!(hash, ?e, "[sodc load]: fail to load");
200-
metrics.storage_error.increase(1);
201199
})
202200
.map(|o| match o {
203201
Some((key, value)) => Load::Entry {

foyer-storage/src/store.rs

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,17 @@ where
215215
where
216216
Q: Hash + Equivalent<K> + ?Sized + Send + Sync + 'static,
217217
{
218+
let now = Instant::now();
219+
218220
let hash = self.inner.hasher.hash_one(key);
219221

220222
#[cfg(feature = "test_utils")]
221223
if self.inner.load_throttle_switch.is_throttled() {
224+
self.inner.metrics.storage_throttled.increase(1);
225+
self.inner
226+
.metrics
227+
.storage_throttled_duration
228+
.record(now.elapsed().as_secs_f64());
222229
return Ok(Load::Throttled);
223230
}
224231

@@ -228,8 +235,18 @@ where
228235
Pick::Reject => unreachable!(),
229236
Pick::Throttled(_) => {
230237
if self.inner.engine.may_contains(hash) {
238+
self.inner.metrics.storage_throttled.increase(1);
239+
self.inner
240+
.metrics
241+
.storage_throttled_duration
242+
.record(now.elapsed().as_secs_f64());
231243
return Ok(Load::Throttled);
232244
} else {
245+
self.inner.metrics.storage_miss.increase(1);
246+
self.inner
247+
.metrics
248+
.storage_miss_duration
249+
.record(now.elapsed().as_secs_f64());
233250
return Ok(Load::Miss);
234251
}
235252
}
@@ -242,14 +259,38 @@ where
242259
key: k,
243260
value: v,
244261
populated: p,
245-
}) if key.equivalent(&k) => Ok(Load::Entry {
246-
key: k,
247-
value: v,
248-
populated: p,
249-
}),
250-
Ok(Load::Entry { .. }) | Ok(Load::Miss) => Ok(Load::Miss),
251-
Ok(Load::Throttled) => Ok(Load::Throttled),
252-
Err(e) => Err(e),
262+
}) if key.equivalent(&k) => {
263+
self.inner.metrics.storage_hit.increase(1);
264+
self.inner
265+
.metrics
266+
.storage_hit_duration
267+
.record(now.elapsed().as_secs_f64());
268+
Ok(Load::Entry {
269+
key: k,
270+
value: v,
271+
populated: p,
272+
})
273+
}
274+
Ok(Load::Entry { .. }) | Ok(Load::Miss) => {
275+
self.inner.metrics.storage_miss.increase(1);
276+
self.inner
277+
.metrics
278+
.storage_miss_duration
279+
.record(now.elapsed().as_secs_f64());
280+
Ok(Load::Miss)
281+
}
282+
Ok(Load::Throttled) => {
283+
self.inner.metrics.storage_throttled.increase(1);
284+
self.inner
285+
.metrics
286+
.storage_throttled_duration
287+
.record(now.elapsed().as_secs_f64());
288+
Ok(Load::Throttled)
289+
}
290+
Err(e) => {
291+
self.inner.metrics.storage_error.increase(1);
292+
Err(e)
293+
}
253294
}
254295
}
255296

@@ -258,8 +299,16 @@ where
258299
where
259300
Q: Hash + Equivalent<K> + ?Sized,
260301
{
302+
let now = Instant::now();
303+
261304
let hash = self.inner.hasher.hash_one(key);
262-
self.inner.engine.delete(hash)
305+
self.inner.engine.delete(hash);
306+
307+
self.inner.metrics.storage_delete.increase(1);
308+
self.inner
309+
.metrics
310+
.storage_delete_duration
311+
.record(now.elapsed().as_secs_f64());
263312
}
264313

265314
/// Check if the disk cache contains a cached entry with the given key.

0 commit comments

Comments
 (0)