@@ -23,14 +23,15 @@ use databend_common_ast::ast::ScheduleOptions;
2323use  databend_common_meta_api:: kv_pb_api:: KVPbApi ; 
2424use  databend_common_meta_api:: kv_pb_api:: UpsertPB ; 
2525use  databend_common_meta_api:: txn_cond_eq_seq; 
26+ use  databend_common_meta_api:: txn_op_del; 
2627use  databend_common_meta_api:: util:: txn_put_pb; 
2728use  databend_common_meta_api:: SchemaApi ; 
2829use  databend_common_meta_app:: principal:: task; 
2930use  databend_common_meta_app:: principal:: task:: TaskMessage ; 
30- use  databend_common_meta_app:: principal:: task:: TaskStateValue ; 
31+ use  databend_common_meta_app:: principal:: task:: TaskSucceededStateValue ; 
3132use  databend_common_meta_app:: principal:: task_dependent_ident:: TaskDependentIdent ; 
3233use  databend_common_meta_app:: principal:: task_message_ident:: TaskMessageIdent ; 
33- use  databend_common_meta_app:: principal:: task_state_ident:: TaskStateIdent ; 
34+ use  databend_common_meta_app:: principal:: task_state_ident:: TaskSucceededStateIdent ; 
3435use  databend_common_meta_app:: principal:: ScheduleType ; 
3536use  databend_common_meta_app:: principal:: Status ; 
3637use  databend_common_meta_app:: principal:: Task ; 
@@ -40,16 +41,16 @@ use databend_common_meta_app::tenant::Tenant;
4041use  databend_common_meta_kvapi:: kvapi; 
4142use  databend_common_meta_kvapi:: kvapi:: DirName ; 
4243use  databend_common_meta_kvapi:: kvapi:: Key ; 
44+ use  databend_common_meta_types:: txn_condition:: Target ; 
45+ use  databend_common_meta_types:: ConditionResult ; 
4346use  databend_common_meta_types:: MatchSeq ; 
4447use  databend_common_meta_types:: MetaError ; 
4548use  databend_common_meta_types:: TxnCondition ; 
4649use  databend_common_meta_types:: TxnOp ; 
4750use  databend_common_meta_types:: TxnRequest ; 
4851use  databend_common_meta_types:: With ; 
49- use  databend_common_proto_conv:: FromToProto ; 
5052use  futures:: StreamExt ; 
5153use  futures:: TryStreamExt ; 
52- use  prost:: Message ; 
5354use  seq_marked:: SeqValue ; 
5455
5556use  crate :: task:: errors:: TaskApiError ; 
@@ -407,7 +408,7 @@ impl TaskMgr {
407408        ) ) ; 
408409        let  mut  stream = self 
409410            . kv_api 
410-             . list_pb_keys ( & DirName :: new ( TaskStateIdent :: new ( 
411+             . list_pb_keys ( & DirName :: new ( TaskSucceededStateIdent :: new ( 
411412                & self . tenant , 
412413                task_name, 
413414                "" , 
@@ -456,10 +457,6 @@ impl TaskMgr {
456457        task_name :  & str , 
457458    )  -> Result < Result < Vec < String > ,  TaskError > ,  TaskApiError >  { 
458459        let  task_before_ident = TaskDependentIdent :: new_before ( & self . tenant ,  task_name) ; 
459-         let  succeeded_value = TaskStateValue  {  is_succeeded :  true  } ; 
460-         let  not_succeeded_value = TaskStateValue  { 
461-             is_succeeded :  false , 
462-         } ; 
463460
464461        let  Some ( task_before_dependent)  = self . kv_api . get_pb ( & task_before_ident) . await ? else  { 
465462            return  Ok ( Ok ( Vec :: new ( ) ) ) ; 
@@ -478,25 +475,24 @@ impl TaskMgr {
478475            } ; 
479476            let  target_after = & target_after_ident. name ( ) . source ; 
480477            let  this_task_to_target_state =
481-                 TaskStateIdent :: new ( & self . tenant ,  task_name,  target_after) ; 
478+                 TaskSucceededStateIdent :: new ( & self . tenant ,  task_name,  target_after) ; 
482479            let  mut  request = TxnRequest :: new ( vec ! [ ] ,  vec ! [ ] ) . with_else ( vec ! [ txn_put_pb( 
483480                & this_task_to_target_state, 
484-                 & succeeded_value , 
481+                 & TaskSucceededStateValue , 
485482            ) ?] ) ; 
486483
487484            for  before_target_after in  target_after_dependent. 0 . iter ( )  { 
488485                let  task_ident =
489-                     TaskStateIdent :: new ( & self . tenant ,  before_target_after,  target_after) ; 
486+                     TaskSucceededStateIdent :: new ( & self . tenant ,  before_target_after,  target_after) ; 
490487                // Only care about the predecessors of this task's successor tasks, excluding this task itself. 
491488                if  before_target_after != task_name { 
492-                     request. condition . push ( TxnCondition :: eq_value ( 
493-                         task_ident. to_string_key ( ) , 
494-                         succeeded_value. to_pb ( ) ?. encode_to_vec ( ) , 
495-                     ) ) ; 
489+                     request. condition . push ( TxnCondition  { 
490+                         key :  task_ident. to_string_key ( ) , 
491+                         expected :  ConditionResult :: Gt  as  i32 , 
492+                         target :  Some ( Target :: Seq ( 0 ) ) , 
493+                     } ) ; 
496494                } 
497-                 request
498-                     . if_then 
499-                     . push ( txn_put_pb ( & task_ident,  & not_succeeded_value) ?) ; 
495+                 request. if_then . push ( txn_op_del ( & task_ident) ) ; 
500496            } 
501497            let  reply = self . kv_api . transaction ( request) . await ?; 
502498
0 commit comments