Skip to content

Commit 7a6ae0f

Browse files
committed
[feat][sdk]Refactor txn sdk
1 parent bdaba4d commit 7a6ae0f

25 files changed

+1851
-949
lines changed

include/dingosdk/client.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,15 +271,14 @@ class Transaction {
271271
friend class TestBase;
272272
friend class SDKTxnImplTest;
273273

274-
Status Begin();
275-
276274
// own
277-
class TxnImpl;
278-
using TxnImplSPtr = std::shared_ptr<TxnImpl>;
275+
class Data;
276+
Data* data_;
277+
278+
explicit Transaction(Data* data);
279279

280-
TxnImplSPtr impl_;
280+
Status Begin();
281281

282-
explicit Transaction(TxnImplSPtr impl);
283282
};
284283

285284
enum EngineType : uint8_t { kLSM, kBTree, kXDPROCKS };

src/example/sdk_transaction_example.cc

Lines changed: 18 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
#include <utility>
2323
#include <vector>
2424

25-
#include "glog/logging.h"
26-
#include "dingosdk/client.h"
2725
#include "common/logging.h"
26+
#include "dingosdk/client.h"
2827
#include "dingosdk/status.h"
28+
#include "glog/logging.h"
2929

3030
using dingodb::sdk::Status;
3131

@@ -65,7 +65,7 @@ static void CreateRegion(std::string name, std::string start_key, std::string en
6565
CHECK(start_key < end_key) << "start_key must < end_key";
6666
CHECK(replicas > 0) << "replicas must > 0";
6767

68-
dingodb::sdk::RegionCreator *tmp_creator;
68+
dingodb::sdk::RegionCreator* tmp_creator;
6969
Status built = g_client->NewRegionCreator(&tmp_creator);
7070
CHECK(built.IsOK()) << "dingo creator build fail";
7171
std::shared_ptr<dingodb::sdk::RegionCreator> creator(tmp_creator);
@@ -102,7 +102,7 @@ static std::shared_ptr<dingodb::sdk::Transaction> NewOptimisticTransaction(dingo
102102
options.kind = dingodb::sdk::kOptimistic;
103103
options.keep_alive_ms = keep_alive_ms;
104104

105-
dingodb::sdk::Transaction *tmp;
105+
dingodb::sdk::Transaction* tmp;
106106
Status built = g_client->NewTransaction(options, &tmp);
107107
CHECK(built.ok()) << "dingo txn build fail";
108108
std::shared_ptr<dingodb::sdk::Transaction> txn(tmp);
@@ -138,7 +138,7 @@ static void OptimisticTxnPostClean(dingodb::sdk::TransactionIsolation isolation)
138138
void OptimisticTxnBatch() {
139139
auto txn = NewOptimisticTransaction(dingodb::sdk::kSnapshotIsolation);
140140

141-
for (const auto &key : keys) {
141+
for (const auto& key : keys) {
142142
std::string tmp;
143143
Status got = txn->Get(key, tmp);
144144
CHECK(got.IsNotFound());
@@ -167,7 +167,7 @@ void OptimisticTxnBatch() {
167167
s = txn->BatchGet(keys, tmp);
168168
CHECK(s.ok());
169169
CHECK_EQ(tmp.size(), kvs.size());
170-
for (const auto &kv : tmp) {
170+
for (const auto& kv : tmp) {
171171
CHECK_EQ(kv.value, key_values[kv.key]);
172172
}
173173
}
@@ -181,7 +181,7 @@ void OptimisticTxnBatch() {
181181
s = txn->BatchGet(keys, tmp);
182182
CHECK(s.ok());
183183
CHECK_EQ(tmp.size(), kvs.size());
184-
for (const auto &kv : tmp) {
184+
for (const auto& kv : tmp) {
185185
CHECK_EQ(kv.value, key_values[kv.key]);
186186
}
187187
}
@@ -206,7 +206,7 @@ void OptimisticTxnBatch() {
206206
s = txn->BatchGet(keys, tmp);
207207
CHECK(s.ok());
208208
CHECK_EQ(tmp.size(), kvs.size());
209-
for (const auto &kv : tmp) {
209+
for (const auto& kv : tmp) {
210210
CHECK_EQ(kv.value, key_values[kv.key]);
211211
}
212212
}
@@ -215,7 +215,7 @@ void OptimisticTxnBatch() {
215215
// batch put override exist kvs, then batch delete
216216
std::vector<dingodb::sdk::KVPair> new_kvs;
217217
new_kvs.reserve(keys.size());
218-
for (auto &key : keys) {
218+
for (auto& key : keys) {
219219
new_kvs.push_back({key, key});
220220
}
221221

@@ -226,7 +226,7 @@ void OptimisticTxnBatch() {
226226
s = txn->BatchGet(keys, tmp);
227227
CHECK(s.ok());
228228
CHECK_EQ(tmp.size(), new_kvs.size());
229-
for (const auto &kv : tmp) {
229+
for (const auto& kv : tmp) {
230230
CHECK_EQ(kv.value, kv.key);
231231
}
232232

@@ -272,7 +272,7 @@ void OptimisticTxnSingleOp() {
272272
}
273273

274274
{
275-
dingodb::sdk::Transaction *tmp;
275+
dingodb::sdk::Transaction* tmp;
276276
Status built = g_client->NewTransaction(options, &tmp);
277277
CHECK(built.ok()) << "dingo txn build fail";
278278
CHECK_NOTNULL(tmp);
@@ -284,7 +284,7 @@ void OptimisticTxnSingleOp() {
284284
CHECK(got.ok());
285285
CHECK_EQ(kvs.size(), 2);
286286

287-
for (const auto &kv : kvs) {
287+
for (const auto& kv : kvs) {
288288
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
289289
if (kv.key == put_key) {
290290
CHECK_EQ(kv.value, key_values[put_key]);
@@ -370,7 +370,7 @@ void OptimisticTxnLockConflict() {
370370
CHECK(got.ok());
371371
CHECK_EQ(kvs.size(), 2);
372372

373-
for (const auto &kv : kvs) {
373+
for (const auto& kv : kvs) {
374374
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
375375
if (kv.key == put_key) {
376376
CHECK_EQ(kv.value, key_values[put_key]);
@@ -445,7 +445,7 @@ void OptimisticTxnReadSnapshotAndReadCommiited() {
445445
CHECK(got.ok());
446446
CHECK_EQ(kvs.size(), 2);
447447

448-
for (const auto &kv : kvs) {
448+
for (const auto& kv : kvs) {
449449
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
450450
if (kv.key == put_key) {
451451
CHECK_EQ(kv.value, key_values[put_key]);
@@ -545,7 +545,7 @@ void OptimisticTxnScan() {
545545
CHECK(got.ok());
546546
CHECK_EQ(kvs.size(), 2);
547547

548-
for (const auto &kv : kvs) {
548+
for (const auto& kv : kvs) {
549549
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
550550
DINGO_LOG(INFO) << "batch get, key:" << kv.key << ",value:" << kv.value;
551551
if (kv.key == put_key) {
@@ -567,7 +567,7 @@ void OptimisticTxnScan() {
567567
}
568568
CHECK_EQ(kvs.size(), 2);
569569

570-
for (const auto &kv : kvs) {
570+
for (const auto& kv : kvs) {
571571
CHECK(kv.key == put_key || kv.key == put_if_absent_key);
572572
if (kv.key == put_key) {
573573
CHECK_EQ(kv.value, key_values[put_key]);
@@ -618,77 +618,6 @@ void OptimisticTxnScanReadSelf() {
618618
DINGO_LOG(INFO) << "txn commit:" << commit.ToString();
619619
}
620620

621-
auto read_commit_txn = NewOptimisticTransaction(dingodb::sdk::kReadCommitted);
622-
{
623-
std::string self_put_key = "xb02";
624-
std::string self_put_if_absent_key = "xc02";
625-
std::string self_delete_key = "xd02";
626-
627-
to_check.emplace(self_put_key);
628-
to_check.emplace(self_put_if_absent_key);
629-
to_check.emplace(self_delete_key);
630-
{
631-
{
632-
// overwrite
633-
read_commit_txn->Put(put_key, put_key);
634-
read_commit_txn->PutIfAbsent(put_if_absent_key, put_if_absent_key);
635-
read_commit_txn->Delete(delete_key);
636-
read_commit_txn->Delete(put_keyl);
637-
}
638-
639-
read_commit_txn->Put(self_put_key, self_put_key);
640-
read_commit_txn->PutIfAbsent(self_put_if_absent_key, self_put_if_absent_key);
641-
read_commit_txn->Delete(self_delete_key);
642-
}
643-
644-
{
645-
// scan without limit
646-
std::vector<dingodb::sdk::KVPair> kvs;
647-
Status scan = read_commit_txn->Scan("xa00000000", "xz00000000", 0, kvs);
648-
DINGO_LOG(INFO) << "read_commit_txn scan:" << scan.ToString();
649-
CHECK(scan.ok());
650-
for (const auto &kv : kvs) {
651-
DINGO_LOG(INFO) << "read_commit_txn scan key:" << kv.key << ", value:" << kv.value;
652-
}
653-
if (kvs.size() != 5) {
654-
DINGO_LOG(WARNING) << "Internal error, expected kvs size:" << 2 << ", ectual:" << kvs.size();
655-
}
656-
CHECK_EQ(kvs.size(), 5);
657-
658-
for (const auto &kv : kvs) {
659-
if (kv.key != put_if_absent_key && kv.key != put_keyf) {
660-
CHECK_EQ(kv.value, kv.key);
661-
} else {
662-
CHECK_EQ(kv.value, key_values[kv.key]);
663-
}
664-
to_check.erase(kv.key);
665-
}
666-
CHECK_EQ(to_check.size(), 3);
667-
CHECK(to_check.find(delete_key) != to_check.cend());
668-
CHECK(to_check.find(self_delete_key) != to_check.cend());
669-
CHECK(to_check.find(put_keyl) != to_check.cend());
670-
}
671-
672-
{
673-
// scan without limit
674-
int limit = 2;
675-
std::vector<dingodb::sdk::KVPair> kvs;
676-
Status scan = read_commit_txn->Scan("xa00000000", "xz00000000", limit, kvs);
677-
DINGO_LOG(INFO) << "read_commit_txn scan:" << scan.ToString();
678-
CHECK(scan.ok());
679-
for (const auto &kv : kvs) {
680-
DINGO_LOG(INFO) << "read_commit_txn scan key:" << kv.key << ", value:" << kv.value;
681-
}
682-
CHECK_EQ(kvs.size(), 2);
683-
// TODO: check key prefix is xb
684-
}
685-
686-
Status precommit = read_commit_txn->PreCommit();
687-
DINGO_LOG(INFO) << "read_commit_txn precommit:" << precommit.ToString();
688-
Status commit = read_commit_txn->Commit();
689-
DINGO_LOG(INFO) << "read_commit_txn commit:" << commit.ToString();
690-
}
691-
692621
{
693622
std::vector<std::string> keys(to_check.begin(), to_check.end());
694623
{
@@ -715,7 +644,7 @@ void OptimisticTxnScanReadSelf() {
715644
OptimisticTxnPostClean(dingodb::sdk::kSnapshotIsolation);
716645
}
717646

718-
int main(int argc, char *argv[]) {
647+
int main(int argc, char* argv[]) {
719648
FLAGS_minloglevel = google::GLOG_INFO;
720649
FLAGS_logtostdout = true;
721650
FLAGS_colorlogtostdout = true;
@@ -733,7 +662,7 @@ int main(int argc, char *argv[]) {
733662

734663
CHECK(!FLAGS_addrs.empty());
735664

736-
dingodb::sdk::Client *tmp;
665+
dingodb::sdk::Client* tmp;
737666
Status built = dingodb::sdk::Client::BuildFromAddrs(FLAGS_addrs, &tmp);
738667
if (!built.ok()) {
739668
DINGO_LOG(ERROR) << "Fail to build client, please check parameter --addrs=" << FLAGS_addrs;

src/sdk/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ set(SDK_SRCS
4444
transaction/txn_impl.cc
4545
transaction/txn_lock_resolver.cc
4646
transaction/txn_region_scanner_impl.cc
47+
transaction/txn_task/txn_task.cc
48+
transaction/txn_task/txn_get_task.cc
49+
transaction/txn_task/txn_batch_get_task.cc
50+
transaction/txn_task/txn_batch_rollback_task.cc
51+
transaction/txn_task/txn_prewrite_task.cc
52+
transaction/txn_task/txn_commit_task.cc
53+
transaction/txn_task/txn_check_status_task.cc
54+
transaction/txn_task/txn_resolve_lock_task.cc
4755
vector/vector_client.cc
4856
vector/vector_index_cache.cc
4957
vector/vector_index_creator.cc

src/sdk/client.cc

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
#include "sdk/rpc/coordinator_rpc.h"
5858
#include "sdk/sdk_version.h"
5959
#include "sdk/transaction/txn_impl.h"
60+
#include "sdk/transaction/txn_internal_data.h"
6061
#include "sdk/utils/net_util.h"
6162
#include "sdk/vector/diskann/vector_diskann_status_by_index_task.h"
6263
#include "sdk/vector/vector_index.h"
@@ -236,8 +237,8 @@ Status Client::NewRawKV(RawKV** raw_kv) {
236237
}
237238

238239
Status Client::NewTransaction(const TransactionOptions& options, Transaction** txn) {
239-
auto txn_impl = std::make_shared<Transaction::TxnImpl>(*data_->stub, options);
240-
Transaction* tmp_txn = new Transaction(txn_impl);
240+
auto txn_impl = std::make_shared<TxnImpl>(*data_->stub, options);
241+
Transaction* tmp_txn = new Transaction(new Transaction::Data(*data_->stub, std::move(txn_impl)));
241242
Status s = tmp_txn->Begin();
242243
if (!s.ok()) {
243244
delete tmp_txn;
@@ -595,46 +596,46 @@ Status RawKV::Scan(const std::string& start_key, const std::string& end_key, uin
595596
return task.Run();
596597
}
597598

598-
Transaction::Transaction(TxnImplSPtr impl) : impl_(impl) {}
599+
Transaction::Transaction(Data* data) : data_(data) {}
599600

600-
Transaction::~Transaction() {} // NOLINT
601+
Transaction::~Transaction() { delete data_; }
601602

602-
Status Transaction::Begin() { return impl_->Begin(); }
603+
Status Transaction::Begin() { return data_->impl->Begin(); }
603604

604-
int64_t Transaction::ID() const { return impl_->ID(); }
605+
int64_t Transaction::ID() const { return data_->impl->ID(); }
605606

606-
Status Transaction::Get(const std::string& key, std::string& value) { return impl_->Get(key, value); }
607+
Status Transaction::Get(const std::string& key, std::string& value) { return data_->impl->Get(key, value); }
607608

608609
Status Transaction::BatchGet(const std::vector<std::string>& keys, std::vector<KVPair>& kvs) {
609-
return impl_->BatchGet(keys, kvs);
610+
return data_->impl->BatchGet(keys, kvs);
610611
}
611612

612-
Status Transaction::Put(const std::string& key, const std::string& value) { return impl_->Put(key, value); }
613+
Status Transaction::Put(const std::string& key, const std::string& value) { return data_->impl->Put(key, value); }
613614

614-
Status Transaction::BatchPut(const std::vector<KVPair>& kvs) { return impl_->BatchPut(kvs); }
615+
Status Transaction::BatchPut(const std::vector<KVPair>& kvs) { return data_->impl->BatchPut(kvs); }
615616

616617
Status Transaction::PutIfAbsent(const std::string& key, const std::string& value) {
617-
return impl_->PutIfAbsent(key, value);
618+
return data_->impl->PutIfAbsent(key, value);
618619
}
619620

620-
Status Transaction::BatchPutIfAbsent(const std::vector<KVPair>& kvs) { return impl_->BatchPutIfAbsent(kvs); }
621+
Status Transaction::BatchPutIfAbsent(const std::vector<KVPair>& kvs) { return data_->impl->BatchPutIfAbsent(kvs); }
621622

622-
Status Transaction::Delete(const std::string& key) { return impl_->Delete(key); }
623+
Status Transaction::Delete(const std::string& key) { return data_->impl->Delete(key); }
623624

624-
Status Transaction::BatchDelete(const std::vector<std::string>& keys) { return impl_->BatchDelete(keys); }
625+
Status Transaction::BatchDelete(const std::vector<std::string>& keys) { return data_->impl->BatchDelete(keys); }
625626

626627
Status Transaction::Scan(const std::string& start_key, const std::string& end_key, uint64_t limit,
627628
std::vector<KVPair>& kvs) {
628-
return impl_->Scan(start_key, end_key, limit, kvs);
629+
return data_->impl->Scan(start_key, end_key, limit, kvs);
629630
}
630631

631-
Status Transaction::PreCommit() { return impl_->PreCommit(); }
632+
Status Transaction::PreCommit() { return data_->impl->PreCommit(); }
632633

633-
Status Transaction::Commit() { return impl_->Commit(); }
634+
Status Transaction::Commit() { return data_->impl->Commit(); }
634635

635-
Status Transaction::Rollback() { return impl_->Rollback(); }
636+
Status Transaction::Rollback() { return data_->impl->Rollback(); }
636637

637-
bool Transaction::IsOnePc() const { return impl_->IsOnePc(); }
638+
bool Transaction::IsOnePc() const { return data_->impl->IsOnePc(); }
638639

639640
RegionCreator::RegionCreator(Data* data) : data_(data) {}
640641

0 commit comments

Comments
 (0)