@@ -384,6 +384,19 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
384
384
local_write_allocate_total_duration +=
385
385
facebook::WallClockUtil::NowInUsecFast () -
386
386
before_alloc_ts;
387
+ if (feature_evict_config_.has_value () &&
388
+ feature_evict_config_.value ()->trigger_mode_ !=
389
+ EvictTriggerMode::DISABLED &&
390
+ feature_evict_) {
391
+ auto * feature_score_evict = dynamic_cast <
392
+ FeatureScoreBasedEvict<weight_type>*>(
393
+ feature_evict_.get ());
394
+ if (feature_score_evict) {
395
+ feature_score_evict
396
+ ->update_feature_score_statistics (
397
+ block, 0 , shard_id, true );
398
+ }
399
+ }
387
400
}
388
401
if (feature_evict_config_.has_value () &&
389
402
feature_evict_config_.value ()->trigger_mode_ !=
@@ -705,16 +718,21 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
705
718
auto it = wlmap->find (id);
706
719
if (it != wlmap->end ()) {
707
720
block = it->second ;
721
+ feature_score_evict
722
+ ->update_feature_score_statistics (
723
+ block, engege_rate, shard_id, false );
708
724
} else {
709
725
// Key doesn't exist, allocate new block and
710
726
// insert.
711
727
block = pool->template allocate_t <weight_type>();
712
728
FixedBlockPool::set_key (block, id);
729
+ FixedBlockPool::set_feature_score_rate (
730
+ block, engege_rate);
713
731
wlmap->insert ({id, block});
732
+ feature_score_evict
733
+ ->update_feature_score_statistics (
734
+ block, 0 , shard_id, true );
714
735
}
715
-
716
- feature_score_evict->update_feature_score_statistics (
717
- block, engege_rate);
718
736
updated_id_count++;
719
737
}
720
738
}
@@ -1489,61 +1507,75 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB {
1489
1507
const auto indexes = iter->second ;
1490
1508
auto f =
1491
1509
folly::via (executor_.get ())
1492
- .thenValue (
1493
- [this , shard_id, indexes, &indices, &weights_with_metaheader](
1494
- folly::Unit) {
1495
- FBGEMM_DISPATCH_INTEGRAL_TYPES (
1496
- indices.scalar_type (),
1497
- " dram_kv_set_with_metaheader" ,
1498
- [this ,
1499
- shard_id,
1500
- indexes,
1501
- &indices,
1502
- &weights_with_metaheader] {
1503
- using index_t = scalar_t ;
1504
- CHECK (indices.is_contiguous ());
1505
- CHECK (weights_with_metaheader.is_contiguous ());
1506
- CHECK_EQ (
1507
- indices.size (0 ), weights_with_metaheader.size (0 ));
1508
- {
1509
- auto wlmap = kv_store_.by (shard_id).wlock ();
1510
- auto * pool = kv_store_.pool_by (shard_id);
1511
- int64_t stride = weights_with_metaheader.size (1 );
1512
- auto indices_data_ptr = indices.data_ptr <index_t >();
1513
- auto weights_data_ptr =
1514
- weights_with_metaheader.data_ptr <weight_type>();
1515
- for (auto index_iter = indexes.begin ();
1516
- index_iter != indexes.end ();
1517
- index_iter++) {
1518
- const auto & id_index = *index_iter;
1519
- auto id = int64_t (indices_data_ptr[id_index]);
1520
- // Defensive programming
1521
- // used is false shouldn't occur under normal
1522
- // circumstances
1523
- FixedBlockPool::set_used (
1524
- weights_data_ptr + id_index * stride, true );
1525
-
1526
- // use mempool
1527
- weight_type* block = nullptr ;
1528
- // First check if the key already exists
1529
- auto it = wlmap->find (id);
1530
- if (it != wlmap->end ()) {
1531
- block = it->second ;
1532
- } else {
1533
- // Key doesn't exist, allocate new block and
1534
- // insert.
1535
- block =
1536
- pool->template allocate_t <weight_type>();
1537
- wlmap->insert ({id, block});
1510
+ .thenValue ([this ,
1511
+ shard_id,
1512
+ indexes,
1513
+ &indices,
1514
+ &weights_with_metaheader](folly::Unit) {
1515
+ FBGEMM_DISPATCH_INTEGRAL_TYPES (
1516
+ indices.scalar_type (),
1517
+ " dram_kv_set_with_metaheader" ,
1518
+ [this ,
1519
+ shard_id,
1520
+ indexes,
1521
+ &indices,
1522
+ &weights_with_metaheader] {
1523
+ using index_t = scalar_t ;
1524
+ CHECK (indices.is_contiguous ());
1525
+ CHECK (weights_with_metaheader.is_contiguous ());
1526
+ CHECK_EQ (
1527
+ indices.size (0 ), weights_with_metaheader.size (0 ));
1528
+ {
1529
+ auto wlmap = kv_store_.by (shard_id).wlock ();
1530
+ auto * pool = kv_store_.pool_by (shard_id);
1531
+ int64_t stride = weights_with_metaheader.size (1 );
1532
+ auto indices_data_ptr = indices.data_ptr <index_t >();
1533
+ auto weights_data_ptr =
1534
+ weights_with_metaheader.data_ptr <weight_type>();
1535
+ for (auto index_iter = indexes.begin ();
1536
+ index_iter != indexes.end ();
1537
+ index_iter++) {
1538
+ const auto & id_index = *index_iter;
1539
+ auto id = int64_t (indices_data_ptr[id_index]);
1540
+ // Defensive programming
1541
+ // used is false shouldn't occur under normal
1542
+ // circumstances
1543
+ FixedBlockPool::set_used (
1544
+ weights_data_ptr + id_index * stride, true );
1545
+
1546
+ // use mempool
1547
+ weight_type* block = nullptr ;
1548
+ // First check if the key already exists
1549
+ auto it = wlmap->find (id);
1550
+ if (it != wlmap->end ()) {
1551
+ block = it->second ;
1552
+ } else {
1553
+ // Key doesn't exist, allocate new block and
1554
+ // insert.
1555
+ block = pool->template allocate_t <weight_type>();
1556
+ wlmap->insert ({id, block});
1557
+ if (feature_evict_config_.has_value () &&
1558
+ feature_evict_config_.value ()->trigger_mode_ !=
1559
+ EvictTriggerMode::DISABLED &&
1560
+ feature_evict_) {
1561
+ auto * feature_score_evict = dynamic_cast <
1562
+ FeatureScoreBasedEvict<weight_type>*>(
1563
+ feature_evict_.get ());
1564
+ if (feature_score_evict) {
1565
+ feature_score_evict
1566
+ ->update_feature_score_statistics (
1567
+ block, 0 , shard_id, true );
1538
1568
}
1539
- std::copy (
1540
- weights_data_ptr + id_index * stride,
1541
- weights_data_ptr + (id_index + 1 ) * stride,
1542
- block);
1543
1569
}
1544
1570
}
1545
- });
1546
- });
1571
+ std::copy (
1572
+ weights_data_ptr + id_index * stride,
1573
+ weights_data_ptr + (id_index + 1 ) * stride,
1574
+ block);
1575
+ }
1576
+ }
1577
+ });
1578
+ });
1547
1579
futures.push_back (std::move (f));
1548
1580
}
1549
1581
return folly::collect (futures);
0 commit comments