Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
92e13c0
*: warn linger import sst files
overvenus Feb 21, 2024
60c1f11
resolved_ts: add ingest mediator and observer
overvenus Feb 21, 2024
9316cc1
sst_importer,resolved_ts: move ingest mediator and observer
overvenus Feb 23, 2024
d2474d0
--wip-- [skip ci]
overvenus Feb 26, 2024
3b1c67e
*: clean up SstImporter::new_
overvenus Feb 27, 2024
dfd0569
resolved_ts, import: fix test_resolved_ts_basic
overvenus Feb 27, 2024
fdd19b3
import: refactor ingest mod
overvenus Feb 28, 2024
030a004
sst_importer: add test_ingest_lease
overvenus Feb 28, 2024
25d831b
tests: fix import tests
overvenus Feb 28, 2024
bc8ac6f
tests: add ingest test_lease_expire
overvenus Feb 28, 2024
7442e72
tests: add two tests test_lease_renew and test_lease_invalid_uuid
overvenus Feb 28, 2024
4672965
tests: add three tests
overvenus Feb 29, 2024
78492d4
*: hold sst lease throughout RPC lifetime
overvenus Feb 29, 2024
6f6085f
*: check sst lease before advancing resolved ts
overvenus Mar 1, 2024
3bf56e1
resolved_ts,test_raftstore: simplify test code
overvenus Mar 1, 2024
ba827a5
resolved_ts: add sst lease tests
overvenus Mar 1, 2024
3c6b63a
resolved_ts,test_sst_importer: simpify test code
overvenus Mar 2, 2024
a26c7dc
test_sst_importer: remove duplicated ingest sst
overvenus Mar 4, 2024
87e3f54
update kvproto
overvenus Mar 4, 2024
50f9496
sst_importer: gc mediator periodically
overvenus Mar 4, 2024
c6e6f0f
sst_importer: add comments and adjust names
overvenus Mar 5, 2024
edcabf9
make format
overvenus Mar 5, 2024
e249e85
Cargo: update mio dependency. (#16598)
afeinberg Mar 4, 2024
a948e89
fix cargo-deny check
overvenus Mar 5, 2024
80a9bc3
fix test compile
overvenus Mar 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 36 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ paste = "1.0"
pd_client = { workspace = true }
pin-project = "1.0"
pnet_datalink = "0.23"
pprof = { version = "0.11", default-features = false, features = [
pprof = { version = "0.13", default-features = false, features = [
"flamegraph",
"protobuf-codec",
] }
Expand Down Expand Up @@ -249,6 +249,8 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "7693954bd1dd86e
# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`.
# [patch.'https://github.com/pingcap/kvproto']
# kvproto = { git = "https://github.com/your_github_id/kvproto", branch = "your_branch" }
[patch.'https://github.com/pingcap/kvproto']
kvproto = { git = "https://github.com/overvenus/kvproto", branch = "resolved-ts-compat" }
#
# After the PR to rust-rocksdb is merged, remember to comment this out and run `cargo update -p rocksdb`.
# [patch.'https://github.com/tikv/rust-rocksdb']
Expand Down
1 change: 1 addition & 0 deletions components/backup-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ resolved_ts = { workspace = true }
security = { path = "../security" }
slog = { workspace = true }
slog-global = { workspace = true }
sst_importer = { workspace = true }
thiserror = "1"
tidb_query_datatype = { workspace = true }
tikv = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions components/cdc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ security = { workspace = true }
semver = "1.0"
slog = { workspace = true }
slog-global = { workspace = true }
sst_importer = { workspace = true }
thiserror = "1.0"
tikv = { workspace = true }
tikv_kv = { workspace = true }
Expand Down
6 changes: 4 additions & 2 deletions components/error_code/src/sst_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ define_error_codes!(
TTL_LEN_NOT_EQUALS_TO_PAIRS => ("TtlLenNotEqualsToPairs", "", ""),
INCOMPATIBLE_API_VERSION => ("IncompatibleApiVersion", "", ""),
INVALID_KEY_MODE => ("InvalidKeyMode", "", ""),
RESOURCE_NOT_ENOUTH => ("ResourceNotEnough", "", ""),
RESOURCE_NOT_ENOUGH => ("ResourceNotEnough", "", ""),
SUSPENDED => ("Suspended",
"this request has been suspended.",
"Probably there are some export tools don't support exporting data inserted by `ingest`(say, snapshot backup). Check the user manual and stop them."),
REQUEST_TOO_NEW => ("RequestTooNew", "", ""),
REQUEST_TOO_OLD => ("RequestTooOld", "", "")
REQUEST_TOO_OLD => ("RequestTooOld", "", ""),
LEASE_EXPIRED => ("LeaseExpired", "", ""),
INVALID_LEASE => ("InvalidLease", "", "")
);
16 changes: 14 additions & 2 deletions components/raftstore-v2/src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,28 @@ pub mod test_util {
use kvproto::{kvrpcpb::ApiVersion, metapb::RegionEpoch, raft_cmdpb::RaftRequestHeader};
use raft::prelude::{Entry, EntryType};
use raftstore::store::simple_write::SimpleWriteEncoder;
use sst_importer::SstImporter;
use sst_importer::{IngestMediator, IngestObserver, Mediator, SstImporter};
use tempfile::TempDir;

use super::{CatchUpLogs, SimpleWriteReqEncoder};
use crate::{fsm::ApplyResReporter, router::ApplyRes};

pub fn create_tmp_importer<E: KvEngine>() -> (TempDir, Arc<SstImporter<E>>) {
let mut ingest_mediator = IngestMediator::default();
let ingest_observer = Arc::new(IngestObserver::default());
ingest_mediator.register(ingest_observer.clone());
let dir = TempDir::new().unwrap();
let importer = Arc::new(
SstImporter::new(&Default::default(), dir.path(), None, ApiVersion::V1, true).unwrap(),
SstImporter::new(
&Default::default(),
dir.path(),
None,
ApiVersion::V1,
true,
Arc::new(ingest_mediator),
ingest_observer,
)
.unwrap(),
);
(dir, importer)
}
Expand Down
7 changes: 6 additions & 1 deletion components/raftstore-v2/tests/integrations/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use resource_control::{ResourceController, ResourceGroupManager};
use resource_metering::CollectorRegHandle;
use service::service_manager::GrpcServiceManager;
use slog::{debug, o, Logger};
use sst_importer::SstImporter;
use sst_importer::{IngestMediator, IngestObserver, Mediator, SstImporter};
use tempfile::TempDir;
use test_pd::mocker::Service;
use tikv_util::{
Expand Down Expand Up @@ -326,13 +326,18 @@ impl RunningState {
.unwrap();
let coprocessor_host =
CoprocessorHost::new(router.store_router().clone(), cop_cfg.value().clone());
let mut ingest_mediator = IngestMediator::default();
let ingest_observer = Arc::new(IngestObserver::default());
ingest_mediator.register(ingest_observer.clone());
let importer = Arc::new(
SstImporter::new(
&Default::default(),
path.join("importer"),
key_manager.clone(),
ApiVersion::V1,
true,
Arc::new(ingest_mediator),
ingest_observer,
)
.unwrap(),
);
Expand Down
7 changes: 6 additions & 1 deletion components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5037,7 +5037,7 @@ mod tests {
};
use protobuf::Message;
use raft::eraftpb::{ConfChange, ConfChangeV2};
use sst_importer::Config as ImportConfig;
use sst_importer::{Config as ImportConfig, IngestMediator, IngestObserver, Mediator};
use tempfile::{Builder, TempDir};
use test_sst_importer::*;
use tikv_util::{
Expand Down Expand Up @@ -5074,6 +5074,9 @@ mod tests {
}

pub fn create_tmp_importer(path: &str) -> (TempDir, Arc<SstImporter<KvTestEngine>>) {
let mut ingest_mediator = IngestMediator::default();
let ingest_observer = Arc::new(IngestObserver::default());
ingest_mediator.register(ingest_observer.clone());
let dir = Builder::new().prefix(path).tempdir().unwrap();
let importer = Arc::new(
SstImporter::new(
Expand All @@ -5082,6 +5085,8 @@ mod tests {
None,
ApiVersion::V1,
false,
Arc::new(ingest_mediator),
ingest_observer,
)
.unwrap(),
);
Expand Down
Loading