Skip to content

Commit d1b1d96

Browse files
committed
refactor: store Task's dependents in the single key
1 parent 9808a4b commit d1b1d96

File tree

8 files changed

+314
-128
lines changed

8 files changed

+314
-128
lines changed

src/meta/app/src/principal/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ pub use task::ScheduleType;
9797
pub use task::State;
9898
pub use task::Status;
9999
pub use task::Task;
100-
pub use task::TaskDependent;
100+
pub use task::TaskDependentKey;
101+
pub use task::TaskDependentValue;
101102
pub use task::TaskMessage;
102103
pub use task::TaskRun;
103104
pub use task::TaskState;

src/meta/app/src/principal/task.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::BTreeMap;
16+
use std::collections::HashSet;
1617

1718
use chrono::DateTime;
1819
use chrono::Utc;
@@ -185,33 +186,34 @@ pub enum DependentType {
185186
}
186187

187188
#[derive(Debug, Clone, PartialEq)]
188-
pub struct TaskDependent {
189+
pub struct TaskDependentKey {
189190
pub ty: DependentType,
190191
pub source: String,
191-
pub target: String,
192192
}
193193

194-
impl TaskDependent {
195-
pub fn new(ty: DependentType, source: String, target: String) -> Self {
196-
Self { ty, source, target }
194+
impl TaskDependentKey {
195+
pub fn new(ty: DependentType, source: String) -> Self {
196+
Self { ty, source }
197197
}
198198
}
199199

200+
#[derive(Debug, Clone, PartialEq)]
201+
pub struct TaskDependentValue(pub HashSet<String>);
202+
200203
mod kvapi_key_impl {
201204
use databend_common_meta_kvapi::kvapi;
202205
use databend_common_meta_kvapi::kvapi::KeyError;
203206

204207
use crate::principal::DependentType;
205-
use crate::principal::TaskDependent;
208+
use crate::principal::TaskDependentKey;
206209

207-
impl kvapi::KeyCodec for TaskDependent {
210+
impl kvapi::KeyCodec for TaskDependentKey {
208211
fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder {
209212
match self.ty {
210213
DependentType::After => b.push_str("After"),
211214
DependentType::Before => b.push_str("Before"),
212215
}
213216
.push_str(self.source.as_str())
214-
.push_str(self.target.as_str())
215217
}
216218

217219
fn decode_key(parser: &mut kvapi::KeyParser) -> Result<Self, kvapi::KeyError> {
@@ -226,9 +228,8 @@ mod kvapi_key_impl {
226228
}
227229
};
228230
let source = parser.next_str()?;
229-
let target = parser.next_str()?;
230231

231-
Ok(Self { ty, source, target })
232+
Ok(Self { ty, source })
232233
}
233234
}
234235
}

src/meta/app/src/principal/task_dependent_ident.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414

1515
use crate::tenant_key::ident::TIdent;
1616

17-
pub type TaskDependentIdent = TIdent<TaskDependentResource, TaskDependent>;
17+
pub type TaskDependentIdent = TIdent<TaskDependentResource, TaskDependentKey>;
1818

1919
pub use kvapi_impl::TaskDependentResource;
2020

21-
use crate::principal::TaskDependent;
21+
use crate::principal::TaskDependentKey;
2222

2323
mod kvapi_impl {
2424
use databend_common_meta_kvapi::kvapi;
2525

26-
use crate::principal::task::TaskDependent;
26+
use crate::principal::task::TaskDependentValue;
2727
use crate::principal::task_dependent_ident::TaskDependentIdent;
2828
use crate::tenant_key::resource::TenantResource;
2929

@@ -32,10 +32,10 @@ mod kvapi_impl {
3232
const PREFIX: &'static str = "__fd_task_dependents";
3333
const TYPE: &'static str = "TaskDependentIdent";
3434
const HAS_TENANT: bool = true;
35-
type ValueType = TaskDependent;
35+
type ValueType = TaskDependentValue;
3636
}
3737

38-
impl kvapi::Value for TaskDependent {
38+
impl kvapi::Value for TaskDependentValue {
3939
type KeyType = TaskDependentIdent;
4040

4141
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {

src/meta/proto-conv/src/task_from_to_protobuf_impl.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashSet;
16+
1517
use chrono::DateTime;
1618
use chrono::Utc;
1719
use databend_common_meta_app::principal as mt;
@@ -197,8 +199,8 @@ impl FromToProto for mt::TaskMessage {
197199
}
198200
}
199201

200-
impl FromToProto for mt::TaskDependent {
201-
type PB = pb::TaskDependent;
202+
impl FromToProto for mt::TaskDependentKey {
203+
type PB = pb::TaskDependentKey;
202204

203205
fn get_pb_ver(p: &Self::PB) -> u64 {
204206
p.ver
@@ -213,22 +215,41 @@ impl FromToProto for mt::TaskDependent {
213215
_ => return Err(Incompatible::new(format!("invalid task type {}", p.ty))),
214216
},
215217
source: p.source,
216-
target: p.target,
217218
})
218219
}
219220

220221
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
221-
Ok(pb::TaskDependent {
222+
Ok(pb::TaskDependentKey {
222223
ver: VER,
223224
min_reader_ver: MIN_READER_VER,
224225

225226
source: self.source.clone(),
226-
target: self.target.clone(),
227227
ty: self.ty as i32,
228228
})
229229
}
230230
}
231231

232+
impl FromToProto for mt::TaskDependentValue {
233+
type PB = pb::TaskDependentValue;
234+
235+
fn get_pb_ver(p: &Self::PB) -> u64 {
236+
p.ver
237+
}
238+
239+
fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
240+
where Self: Sized {
241+
Ok(Self(HashSet::from_iter(p.names)))
242+
}
243+
244+
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
245+
Ok(pb::TaskDependentValue {
246+
ver: VER,
247+
min_reader_ver: MIN_READER_VER,
248+
names: Vec::from_iter(self.0.iter().cloned()),
249+
})
250+
}
251+
}
252+
232253
impl FromToProto for mt::TaskState {
233254
type PB = pb::TaskState;
234255

src/meta/proto-conv/tests/it/v141_task_state.rs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashSet;
16+
1517
use databend_common_meta_app::principal as mt;
1618
use fastrace::func_name;
1719

@@ -30,15 +32,31 @@ fn test_decode_v141_task_state() -> anyhow::Result<()> {
3032

3133
#[test]
3234
fn test_decode_v141_task_dependent() -> anyhow::Result<()> {
33-
let task_dependent_v141 = vec![10, 1, 97, 18, 1, 99, 160, 6, 141, 1, 168, 6, 24];
34-
35-
let want = || mt::TaskDependent {
36-
ty: mt::DependentType::After,
37-
source: s("a"),
38-
target: s("c"),
39-
};
40-
common::test_pb_from_to(func_name!(), want())?;
41-
common::test_load_old(func_name!(), task_dependent_v141.as_slice(), 141, want())?;
35+
{
36+
let task_dependent_key_v141 = vec![10, 1, 97, 160, 6, 141, 1, 168, 6, 24];
37+
let want = || mt::TaskDependentKey {
38+
ty: mt::DependentType::After,
39+
source: s("a"),
40+
};
41+
common::test_pb_from_to(func_name!(), want())?;
42+
common::test_load_old(
43+
func_name!(),
44+
task_dependent_key_v141.as_slice(),
45+
141,
46+
want(),
47+
)?;
48+
}
49+
{
50+
let task_dependent_value_v141 = vec![10, 1, 97, 10, 1, 98, 160, 6, 141, 1, 168, 6, 24];
51+
let want = || mt::TaskDependentValue(HashSet::from([s("a"), s("b")]));
52+
common::test_pb_from_to(func_name!(), want())?;
53+
common::test_load_old(
54+
func_name!(),
55+
task_dependent_value_v141.as_slice(),
56+
141,
57+
want(),
58+
)?;
59+
}
4260

4361
Ok(())
4462
}

src/meta/protos/proto/task.proto

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ message TaskState {
8686
bool is_succeeded = 1;
8787
}
8888

89-
message TaskDependent {
89+
message TaskDependentKey {
9090
uint64 ver = 100;
9191
uint64 min_reader_ver = 101;
9292

@@ -95,6 +95,12 @@ message TaskDependent {
9595
Before = 1;
9696
}
9797
string source = 1;
98-
string target = 2;
9998
DependentType ty = 3;
10099
}
100+
101+
message TaskDependentValue {
102+
uint64 ver = 100;
103+
uint64 min_reader_ver = 101;
104+
105+
repeated string names = 1;
106+
}

0 commit comments

Comments
 (0)