Skip to content

[feat][sdk]Refactor txn sdk #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions include/dingosdk/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,14 @@ class Transaction {
friend class TestBase;
friend class SDKTxnImplTest;

Status Begin();

// own
class TxnImpl;
using TxnImplSPtr = std::shared_ptr<TxnImpl>;
class Data;
Data* data_;

explicit Transaction(Data* data);

TxnImplSPtr impl_;
Status Begin();

explicit Transaction(TxnImplSPtr impl);
};

enum EngineType : uint8_t { kLSM, kBTree, kXDPROCKS };
Expand Down
107 changes: 18 additions & 89 deletions src/example/sdk_transaction_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
#include <utility>
#include <vector>

#include "glog/logging.h"
#include "dingosdk/client.h"
#include "common/logging.h"
#include "dingosdk/client.h"
#include "dingosdk/status.h"
#include "glog/logging.h"

using dingodb::sdk::Status;

Expand Down Expand Up @@ -65,7 +65,7 @@ static void CreateRegion(std::string name, std::string start_key, std::string en
CHECK(start_key < end_key) << "start_key must < end_key";
CHECK(replicas > 0) << "replicas must > 0";

dingodb::sdk::RegionCreator *tmp_creator;
dingodb::sdk::RegionCreator* tmp_creator;
Status built = g_client->NewRegionCreator(&tmp_creator);
CHECK(built.IsOK()) << "dingo creator build fail";
std::shared_ptr<dingodb::sdk::RegionCreator> creator(tmp_creator);
Expand Down Expand Up @@ -102,7 +102,7 @@ static std::shared_ptr<dingodb::sdk::Transaction> NewOptimisticTransaction(dingo
options.kind = dingodb::sdk::kOptimistic;
options.keep_alive_ms = keep_alive_ms;

dingodb::sdk::Transaction *tmp;
dingodb::sdk::Transaction* tmp;
Status built = g_client->NewTransaction(options, &tmp);
CHECK(built.ok()) << "dingo txn build fail";
std::shared_ptr<dingodb::sdk::Transaction> txn(tmp);
Expand Down Expand Up @@ -138,7 +138,7 @@ static void OptimisticTxnPostClean(dingodb::sdk::TransactionIsolation isolation)
void OptimisticTxnBatch() {
auto txn = NewOptimisticTransaction(dingodb::sdk::kSnapshotIsolation);

for (const auto &key : keys) {
for (const auto& key : keys) {
std::string tmp;
Status got = txn->Get(key, tmp);
CHECK(got.IsNotFound());
Expand Down Expand Up @@ -167,7 +167,7 @@ void OptimisticTxnBatch() {
s = txn->BatchGet(keys, tmp);
CHECK(s.ok());
CHECK_EQ(tmp.size(), kvs.size());
for (const auto &kv : tmp) {
for (const auto& kv : tmp) {
CHECK_EQ(kv.value, key_values[kv.key]);
}
}
Expand All @@ -181,7 +181,7 @@ void OptimisticTxnBatch() {
s = txn->BatchGet(keys, tmp);
CHECK(s.ok());
CHECK_EQ(tmp.size(), kvs.size());
for (const auto &kv : tmp) {
for (const auto& kv : tmp) {
CHECK_EQ(kv.value, key_values[kv.key]);
}
}
Expand All @@ -206,7 +206,7 @@ void OptimisticTxnBatch() {
s = txn->BatchGet(keys, tmp);
CHECK(s.ok());
CHECK_EQ(tmp.size(), kvs.size());
for (const auto &kv : tmp) {
for (const auto& kv : tmp) {
CHECK_EQ(kv.value, key_values[kv.key]);
}
}
Expand All @@ -215,7 +215,7 @@ void OptimisticTxnBatch() {
// batch put override exist kvs, then batch delete
std::vector<dingodb::sdk::KVPair> new_kvs;
new_kvs.reserve(keys.size());
for (auto &key : keys) {
for (auto& key : keys) {
new_kvs.push_back({key, key});
}

Expand All @@ -226,7 +226,7 @@ void OptimisticTxnBatch() {
s = txn->BatchGet(keys, tmp);
CHECK(s.ok());
CHECK_EQ(tmp.size(), new_kvs.size());
for (const auto &kv : tmp) {
for (const auto& kv : tmp) {
CHECK_EQ(kv.value, kv.key);
}

Expand Down Expand Up @@ -272,7 +272,7 @@ void OptimisticTxnSingleOp() {
}

{
dingodb::sdk::Transaction *tmp;
dingodb::sdk::Transaction* tmp;
Status built = g_client->NewTransaction(options, &tmp);
CHECK(built.ok()) << "dingo txn build fail";
CHECK_NOTNULL(tmp);
Expand All @@ -284,7 +284,7 @@ void OptimisticTxnSingleOp() {
CHECK(got.ok());
CHECK_EQ(kvs.size(), 2);

for (const auto &kv : kvs) {
for (const auto& kv : kvs) {
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
if (kv.key == put_key) {
CHECK_EQ(kv.value, key_values[put_key]);
Expand Down Expand Up @@ -370,7 +370,7 @@ void OptimisticTxnLockConflict() {
CHECK(got.ok());
CHECK_EQ(kvs.size(), 2);

for (const auto &kv : kvs) {
for (const auto& kv : kvs) {
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
if (kv.key == put_key) {
CHECK_EQ(kv.value, key_values[put_key]);
Expand Down Expand Up @@ -445,7 +445,7 @@ void OptimisticTxnReadSnapshotAndReadCommiited() {
CHECK(got.ok());
CHECK_EQ(kvs.size(), 2);

for (const auto &kv : kvs) {
for (const auto& kv : kvs) {
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
if (kv.key == put_key) {
CHECK_EQ(kv.value, key_values[put_key]);
Expand Down Expand Up @@ -545,7 +545,7 @@ void OptimisticTxnScan() {
CHECK(got.ok());
CHECK_EQ(kvs.size(), 2);

for (const auto &kv : kvs) {
for (const auto& kv : kvs) {
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
DINGO_LOG(INFO) << "batch get, key:" << kv.key << ",value:" << kv.value;
if (kv.key == put_key) {
Expand All @@ -567,7 +567,7 @@ void OptimisticTxnScan() {
}
CHECK_EQ(kvs.size(), 2);

for (const auto &kv : kvs) {
for (const auto& kv : kvs) {
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
if (kv.key == put_key) {
CHECK_EQ(kv.value, key_values[put_key]);
Expand Down Expand Up @@ -618,77 +618,6 @@ void OptimisticTxnScanReadSelf() {
DINGO_LOG(INFO) << "txn commit:" << commit.ToString();
}

auto read_commit_txn = NewOptimisticTransaction(dingodb::sdk::kReadCommitted);
{
std::string self_put_key = "xb02";
std::string self_put_if_absent_key = "xc02";
std::string self_delete_key = "xd02";

to_check.emplace(self_put_key);
to_check.emplace(self_put_if_absent_key);
to_check.emplace(self_delete_key);
{
{
// overwrite
read_commit_txn->Put(put_key, put_key);
read_commit_txn->PutIfAbsent(put_if_absent_key, put_if_absent_key);
read_commit_txn->Delete(delete_key);
read_commit_txn->Delete(put_keyl);
}

read_commit_txn->Put(self_put_key, self_put_key);
read_commit_txn->PutIfAbsent(self_put_if_absent_key, self_put_if_absent_key);
read_commit_txn->Delete(self_delete_key);
}

{
// scan without limit
std::vector<dingodb::sdk::KVPair> kvs;
Status scan = read_commit_txn->Scan("xa00000000", "xz00000000", 0, kvs);
DINGO_LOG(INFO) << "read_commit_txn scan:" << scan.ToString();
CHECK(scan.ok());
for (const auto &kv : kvs) {
DINGO_LOG(INFO) << "read_commit_txn scan key:" << kv.key << ", value:" << kv.value;
}
if (kvs.size() != 5) {
DINGO_LOG(WARNING) << "Internal error, expected kvs size:" << 2 << ", ectual:" << kvs.size();
}
CHECK_EQ(kvs.size(), 5);

for (const auto &kv : kvs) {
if (kv.key != put_if_absent_key && kv.key != put_keyf) {
CHECK_EQ(kv.value, kv.key);
} else {
CHECK_EQ(kv.value, key_values[kv.key]);
}
to_check.erase(kv.key);
}
CHECK_EQ(to_check.size(), 3);
CHECK(to_check.find(delete_key) != to_check.cend());
CHECK(to_check.find(self_delete_key) != to_check.cend());
CHECK(to_check.find(put_keyl) != to_check.cend());
}

{
// scan without limit
int limit = 2;
std::vector<dingodb::sdk::KVPair> kvs;
Status scan = read_commit_txn->Scan("xa00000000", "xz00000000", limit, kvs);
DINGO_LOG(INFO) << "read_commit_txn scan:" << scan.ToString();
CHECK(scan.ok());
for (const auto &kv : kvs) {
DINGO_LOG(INFO) << "read_commit_txn scan key:" << kv.key << ", value:" << kv.value;
}
CHECK_EQ(kvs.size(), 2);
// TODO: check key prefix is xb
}

Status precommit = read_commit_txn->PreCommit();
DINGO_LOG(INFO) << "read_commit_txn precommit:" << precommit.ToString();
Status commit = read_commit_txn->Commit();
DINGO_LOG(INFO) << "read_commit_txn commit:" << commit.ToString();
}

{
std::vector<std::string> keys(to_check.begin(), to_check.end());
{
Expand All @@ -715,7 +644,7 @@ void OptimisticTxnScanReadSelf() {
OptimisticTxnPostClean(dingodb::sdk::kSnapshotIsolation);
}

int main(int argc, char *argv[]) {
int main(int argc, char* argv[]) {
FLAGS_minloglevel = google::GLOG_INFO;
FLAGS_logtostdout = true;
FLAGS_colorlogtostdout = true;
Expand All @@ -733,7 +662,7 @@ int main(int argc, char *argv[]) {

CHECK(!FLAGS_addrs.empty());

dingodb::sdk::Client *tmp;
dingodb::sdk::Client* tmp;
Status built = dingodb::sdk::Client::BuildFromAddrs(FLAGS_addrs, &tmp);
if (!built.ok()) {
DINGO_LOG(ERROR) << "Fail to build client, please check parameter --addrs=" << FLAGS_addrs;
Expand Down
8 changes: 8 additions & 0 deletions src/sdk/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ set(SDK_SRCS
transaction/txn_impl.cc
transaction/txn_lock_resolver.cc
transaction/txn_region_scanner_impl.cc
transaction/txn_task/txn_task.cc
transaction/txn_task/txn_get_task.cc
transaction/txn_task/txn_batch_get_task.cc
transaction/txn_task/txn_batch_rollback_task.cc
transaction/txn_task/txn_prewrite_task.cc
transaction/txn_task/txn_commit_task.cc
transaction/txn_task/txn_check_status_task.cc
transaction/txn_task/txn_resolve_lock_task.cc
vector/vector_client.cc
vector/vector_index_cache.cc
vector/vector_index_creator.cc
Expand Down
39 changes: 20 additions & 19 deletions src/sdk/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "sdk/rpc/coordinator_rpc.h"
#include "sdk/sdk_version.h"
#include "sdk/transaction/txn_impl.h"
#include "sdk/transaction/txn_internal_data.h"
#include "sdk/utils/net_util.h"
#include "sdk/vector/diskann/vector_diskann_status_by_index_task.h"
#include "sdk/vector/vector_index.h"
Expand Down Expand Up @@ -236,8 +237,8 @@ Status Client::NewRawKV(RawKV** raw_kv) {
}

Status Client::NewTransaction(const TransactionOptions& options, Transaction** txn) {
auto txn_impl = std::make_shared<Transaction::TxnImpl>(*data_->stub, options);
Transaction* tmp_txn = new Transaction(txn_impl);
auto txn_impl = std::make_shared<TxnImpl>(*data_->stub, options);
Transaction* tmp_txn = new Transaction(new Transaction::Data(*data_->stub, std::move(txn_impl)));
Status s = tmp_txn->Begin();
if (!s.ok()) {
delete tmp_txn;
Expand Down Expand Up @@ -595,46 +596,46 @@ Status RawKV::Scan(const std::string& start_key, const std::string& end_key, uin
return task.Run();
}

Transaction::Transaction(TxnImplSPtr impl) : impl_(impl) {}
Transaction::Transaction(Data* data) : data_(data) {}

Transaction::~Transaction() {} // NOLINT
Transaction::~Transaction() { delete data_; }

Status Transaction::Begin() { return impl_->Begin(); }
Status Transaction::Begin() { return data_->impl->Begin(); }

int64_t Transaction::ID() const { return impl_->ID(); }
int64_t Transaction::ID() const { return data_->impl->ID(); }

Status Transaction::Get(const std::string& key, std::string& value) { return impl_->Get(key, value); }
Status Transaction::Get(const std::string& key, std::string& value) { return data_->impl->Get(key, value); }

Status Transaction::BatchGet(const std::vector<std::string>& keys, std::vector<KVPair>& kvs) {
return impl_->BatchGet(keys, kvs);
return data_->impl->BatchGet(keys, kvs);
}

Status Transaction::Put(const std::string& key, const std::string& value) { return impl_->Put(key, value); }
Status Transaction::Put(const std::string& key, const std::string& value) { return data_->impl->Put(key, value); }

Status Transaction::BatchPut(const std::vector<KVPair>& kvs) { return impl_->BatchPut(kvs); }
Status Transaction::BatchPut(const std::vector<KVPair>& kvs) { return data_->impl->BatchPut(kvs); }

Status Transaction::PutIfAbsent(const std::string& key, const std::string& value) {
return impl_->PutIfAbsent(key, value);
return data_->impl->PutIfAbsent(key, value);
}

Status Transaction::BatchPutIfAbsent(const std::vector<KVPair>& kvs) { return impl_->BatchPutIfAbsent(kvs); }
Status Transaction::BatchPutIfAbsent(const std::vector<KVPair>& kvs) { return data_->impl->BatchPutIfAbsent(kvs); }

Status Transaction::Delete(const std::string& key) { return impl_->Delete(key); }
Status Transaction::Delete(const std::string& key) { return data_->impl->Delete(key); }

Status Transaction::BatchDelete(const std::vector<std::string>& keys) { return impl_->BatchDelete(keys); }
Status Transaction::BatchDelete(const std::vector<std::string>& keys) { return data_->impl->BatchDelete(keys); }

Status Transaction::Scan(const std::string& start_key, const std::string& end_key, uint64_t limit,
std::vector<KVPair>& kvs) {
return impl_->Scan(start_key, end_key, limit, kvs);
return data_->impl->Scan(start_key, end_key, limit, kvs);
}

Status Transaction::PreCommit() { return impl_->PreCommit(); }
Status Transaction::PreCommit() { return data_->impl->PreCommit(); }

Status Transaction::Commit() { return impl_->Commit(); }
Status Transaction::Commit() { return data_->impl->Commit(); }

Status Transaction::Rollback() { return impl_->Rollback(); }
Status Transaction::Rollback() { return data_->impl->Rollback(); }

bool Transaction::IsOnePc() const { return impl_->IsOnePc(); }
bool Transaction::IsOnePc() const { return data_->impl->IsOnePc(); }

RegionCreator::RegionCreator(Data* data) : data_(data) {}

Expand Down
Loading