Skip to content

Commit 66c0ad4

Browse files
committed
refactor(rule): extract state machine
Signed-off-by: Jiyong Huang <[email protected]>
1 parent d3139e3 commit 66c0ad4

File tree

10 files changed

+274
-202
lines changed

10 files changed

+274
-202
lines changed

internal/server/rpc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
"github.com/lf-edge/ekuiper/v2/internal/io/sink"
3333
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
3434
"github.com/lf-edge/ekuiper/v2/internal/pkg/model"
35-
"github.com/lf-edge/ekuiper/v2/internal/topo/rule"
35+
"github.com/lf-edge/ekuiper/v2/internal/topo/rule/machine"
3636
"github.com/lf-edge/ekuiper/v2/pkg/cast"
3737
"github.com/lf-edge/ekuiper/v2/pkg/infra"
3838
)
@@ -119,7 +119,7 @@ func stopQuery() {
119119
func (t *Server) GetQueryResult(_ string, reply *string) error {
120120
if rs, ok := registry.load(QueryRuleId); ok {
121121
st := rs.GetState()
122-
if st == rule.Stopped || st == rule.StoppedByErr {
122+
if st == machine.Stopped || st == machine.StoppedByErr {
123123
return fmt.Errorf("query rule is stopped: %s", rs.GetLastWill())
124124
}
125125
}

internal/server/rule_init.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
"github.com/lf-edge/ekuiper/v2/internal/conf"
2929
"github.com/lf-edge/ekuiper/v2/internal/pkg/schedule"
30-
"github.com/lf-edge/ekuiper/v2/internal/topo/rule"
30+
"github.com/lf-edge/ekuiper/v2/internal/topo/rule/machine"
3131
"github.com/lf-edge/ekuiper/v2/metrics"
3232
"github.com/lf-edge/ekuiper/v2/pkg/ast"
3333
"github.com/lf-edge/ekuiper/v2/pkg/timex"
@@ -208,10 +208,10 @@ func handleAllRuleStatusMetrics(rs []ruleWrapper) {
208208
for _, r := range rs {
209209
id := r.rule.Id
210210
switch r.state {
211-
case rule.Running:
211+
case machine.Running:
212212
runningCount++
213213
v = RuleRunning
214-
case rule.StoppedByErr:
214+
case machine.StoppedByErr:
215215
stopCount++
216216
v = RuleStoppedByError
217217
default:
@@ -303,7 +303,7 @@ func scheduleCronRuleAction(now time.Time, rw ruleWrapper) scheduleRuleAction {
303303
}
304304

305305
} else {
306-
if rw.state == rule.Running && !rw.startTime.IsZero() && now.Sub(rw.startTime) >= d {
306+
if rw.state == machine.Running && !rw.startTime.IsZero() && now.Sub(rw.startTime) >= d {
307307
return doStop
308308
}
309309
}

internal/server/rule_manager.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/lf-edge/ekuiper/v2/internal/processor"
3131
"github.com/lf-edge/ekuiper/v2/internal/topo/planner"
3232
"github.com/lf-edge/ekuiper/v2/internal/topo/rule"
33+
"github.com/lf-edge/ekuiper/v2/internal/topo/rule/machine"
3334
"github.com/lf-edge/ekuiper/v2/internal/xsql"
3435
"github.com/lf-edge/ekuiper/v2/metrics"
3536
"github.com/lf-edge/ekuiper/v2/pkg/ast"
@@ -311,7 +312,7 @@ func (rr *RuleRegistry) GetAllRulesWithStatus() ([]map[string]any, error) {
311312
if err != nil {
312313
str = fmt.Sprintf("error: %s", err)
313314
} else {
314-
str = rule.StateName[s]
315+
str = machine.StateName[s]
315316
}
316317
trace := false
317318
if str == "running" {
@@ -474,8 +475,8 @@ func getRuleExceptionStatus(name string) (ruleExceptionStatus, error) {
474475
}
475476
if rs, ok := registry.load(name); ok {
476477
st := rs.GetState()
477-
s.Status = rule.StateName[st]
478-
if st == rule.Running {
478+
s.Status = machine.StateName[st]
479+
if st == machine.Running {
479480
keys, values := rs.GetMetrics()
480481
for i, key := range keys {
481482
if strings.Contains(key, "last_exception_time") {
@@ -518,7 +519,7 @@ type ruleExceptionStatus struct {
518519

519520
type ruleWrapper struct {
520521
rule *def.Rule
521-
state rule.RunState
522+
state machine.RunState
522523
startTime time.Time
523524
}
524525

@@ -539,11 +540,11 @@ func getAllRulesWithState() ([]ruleWrapper, error) {
539540
return rules, nil
540541
}
541542

542-
func getRuleState(name string) (rule.RunState, error) {
543+
func getRuleState(name string) (machine.RunState, error) {
543544
if rs, ok := registry.load(name); ok {
544545
return rs.GetState(), nil
545546
} else {
546-
return rule.Stopped, fmt.Errorf("Rule %s is not found in registry", name)
547+
return machine.Stopped, fmt.Errorf("Rule %s is not found in registry", name)
547548
}
548549
}
549550

internal/server/rule_state.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"github.com/gorilla/mux"
2323
"github.com/pingcap/failpoint"
2424

25-
"github.com/lf-edge/ekuiper/v2/internal/topo/rule"
25+
"github.com/lf-edge/ekuiper/v2/internal/topo/rule/machine"
2626
"github.com/lf-edge/ekuiper/v2/pkg/cast"
2727
)
2828

@@ -79,16 +79,16 @@ func updateRuleOffset(ruleID string, param map[string]interface{}) error {
7979
switch val.(int) {
8080
case 1:
8181
StateErr = nil
82-
s = rule.Running
82+
s = machine.Running
8383
case 2:
8484
StateErr = nil
85-
s = rule.Stopped
85+
s = machine.Stopped
8686
}
8787
})
8888
if StateErr != nil {
8989
return StateErr
9090
}
91-
if s != rule.Running {
91+
if s != machine.Running {
9292
return fmt.Errorf("rule %v should be running when modify state", ruleID)
9393
}
9494

internal/server/server_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
2929
"github.com/lf-edge/ekuiper/v2/internal/pkg/schedule"
30-
"github.com/lf-edge/ekuiper/v2/internal/topo/rule"
30+
"github.com/lf-edge/ekuiper/v2/internal/topo/rule/machine"
3131
"github.com/lf-edge/ekuiper/v2/pkg/cast"
3232
)
3333

@@ -43,7 +43,7 @@ func TestHandleScheduleRule(t *testing.T) {
4343
testcases := []struct {
4444
Options *def.RuleOption
4545
startTime time.Time
46-
state rule.RunState
46+
state machine.RunState
4747
action scheduleRuleAction
4848
}{
4949
{
@@ -53,7 +53,7 @@ func TestHandleScheduleRule(t *testing.T) {
5353
},
5454
action: doStop,
5555
startTime: now.Add(-time.Hour),
56-
state: rule.Running,
56+
state: machine.Running,
5757
},
5858
{
5959
Options: &def.RuleOption{

internal/topo/rule/action.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ import (
1212
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
1313
"github.com/lf-edge/ekuiper/v2/internal/topo"
1414
"github.com/lf-edge/ekuiper/v2/internal/topo/planner"
15+
"github.com/lf-edge/ekuiper/v2/internal/topo/rule/machine"
1516
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
1617
"github.com/lf-edge/ekuiper/v2/pkg/infra"
17-
"github.com/lf-edge/ekuiper/v2/pkg/timex"
1818
)
1919

2020
const EOFMessage = "done"
@@ -102,8 +102,6 @@ func (s *State) doStart() error {
102102
}
103103
ctx, cancel := context.WithCancel(context.Background())
104104
s.cancelRetry = cancel
105-
s.lastStartTimestamp = timex.GetNowInMilli()
106-
s.lastWill = ""
107105
go s.runTopo(ctx, s.topology, s.Rule.Options.RestartStrategy)
108106
return nil
109107
})
@@ -130,19 +128,19 @@ func (s *State) doStop() error {
130128
}
131129

132130
func (s *State) stopOld() {
133-
s.logger.Debug("stop RunState")
134-
done := s.triggerAction(ActionSignalStop)
131+
done := s.sm.TriggerAction(machine.ActionSignalStop)
135132
if done {
136133
return
137134
}
138135
// do stop, stopping action and starting action are mutual exclusive. No concurrent problem here
139136
s.logger.Infof("stopping rule %s", s.Rule.Id)
137+
lastWill := "stopped by update"
140138
err := s.doStop()
141-
if err == nil {
142-
err = errors.New("stopped by update")
139+
if err != nil {
140+
lastWill = fmt.Sprintf("stopped by update with error: %v", err)
143141
}
144142
// currentState may be accessed concurrently
145-
s.transit(Stopped, err)
143+
s.transitState(machine.Stopped, lastWill)
146144
return
147145
}
148146

@@ -162,20 +160,22 @@ func (s *State) runTopo(ctx context.Context, tp *topo.Topo, rs *def.RestartStrat
162160
tp.GetContext().SetError(er)
163161
s.logger.Errorf("closing Rule for error: %v", er)
164162
tp.Cancel()
165-
} else { // exit normally
163+
s.transitState(machine.Stopped, "retrying after error: "+er.Error())
164+
} else {
165+
// exit normally
166+
lastWill := "cancelled manually"
166167
if errorx.IsEOF(er) {
167-
s.lastWill = EOFMessage
168+
lastWill = EOFMessage
168169
msg := er.Error()
169170
if len(msg) > 0 {
170-
s.lastWill = fmt.Sprintf("%s: %s", s.lastWill, msg)
171+
lastWill = fmt.Sprintf("%s: %s", lastWill, msg)
171172
}
172173
s.updateTrigger(s.Rule.Id, false)
173174
}
174175
tp.Cancel()
176+
s.transitState(machine.Stopped, lastWill)
175177
return nil
176178
}
177-
// Although it is stopped, it is still retrying, so the status is still RUNNING
178-
s.lastWill = "retrying after error: " + er.Error()
179179
}
180180
if count < rs.Attempts {
181181
if d > time.Duration(rs.MaxDelay) {
@@ -208,20 +208,26 @@ func (s *State) runTopo(ctx context.Context, tp *topo.Topo, rs *def.RestartStrat
208208
}
209209
}
210210
})
211+
s.cleanRule(err)
212+
}
213+
214+
func (s *State) cleanRule(err error) {
215+
s.ruleLock.Lock()
216+
defer s.ruleLock.Unlock()
211217
if s.topology != nil {
212218
s.topoGraph = s.topology.GetTopo()
213219
keys, values := s.topology.GetMetrics()
214220
s.stoppedMetrics = []any{keys, values}
215221
}
216222
if err != nil { // Exit after retries
217223
s.logger.Error(err)
218-
s.transit(StoppedByErr, err)
224+
s.transitState(machine.StoppedByErr, err.Error())
219225
s.topology = nil
220226
s.logger.Infof("%s exit by error set tp to nil", s.Rule.Id)
221-
} else if strings.HasPrefix(s.lastWill, EOFMessage) {
227+
} else if strings.HasPrefix(s.sm.LastWill(), EOFMessage) {
222228
// Two case when err is nil; 1. Manually stop 2.EOF
223229
// Only transit status when EOF. Don't do this for manual stop because the state already changed!
224-
s.transit(Stopped, nil)
230+
s.transitState(machine.Stopped, "")
225231
s.topology = nil
226232
s.logger.Infof("%s exit eof set tp to nil", s.Rule.Id)
227233
}

0 commit comments

Comments
 (0)