Skip to content

Commit f1fd5e5

Browse files
committed
chore(rule): remove rule auto restart
Will simplify it and implement in another way. Do it in the rule patrol. Signed-off-by: Jiyong Huang <[email protected]>
1 parent 5834f8e commit f1fd5e5

File tree

4 files changed

+62
-92
lines changed

4 files changed

+62
-92
lines changed

internal/topo/rule/action.go

Lines changed: 28 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
package rule
22

33
import (
4-
"context"
54
"errors"
65
"fmt"
7-
"math"
8-
"math/rand/v2"
9-
"strings"
10-
"time"
116

127
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
138
"github.com/lf-edge/ekuiper/v2/internal/topo"
@@ -100,18 +95,13 @@ func (s *State) doStart() error {
10095
s.topoGraph = s.topology.GetTopo()
10196
}
10297
}
103-
ctx, cancel := context.WithCancel(context.Background())
104-
s.cancelRetry = cancel
105-
go s.runTopo(ctx, s.topology, s.Rule.Options.RestartStrategy)
98+
go s.runTopo(s.topology)
10699
return nil
107100
})
108101
return err
109102
}
110103

111104
func (s *State) doStop() error {
112-
if s.cancelRetry != nil {
113-
s.cancelRetry()
114-
}
115105
if s.topology != nil {
116106
e := s.topology.GetContext().Err()
117107
s.topoGraph = s.topology.GetTopo()
@@ -144,91 +134,45 @@ func (s *State) stopOld() {
144134
return
145135
}
146136

147-
// This is called async
148-
func (s *State) runTopo(ctx context.Context, tp *topo.Topo, rs *def.RestartStrategy) {
149-
err := infra.SafeRun(func() error {
150-
count := 0
151-
d := time.Duration(rs.Delay)
152-
var er error
153-
ticker := time.NewTicker(d)
154-
defer ticker.Stop()
155-
for {
156-
select {
157-
case e := <-tp.Open():
158-
er = e
159-
if errorx.IsUnexpectedErr(er) { // Only restart Rule for errors
160-
tp.GetContext().SetError(er)
161-
s.logger.Errorf("closing Rule for error: %v", er)
162-
tp.Cancel()
163-
s.transitState(machine.Stopped, "retrying after error: "+er.Error())
164-
} else {
165-
// exit normally
166-
lastWill := "cancelled manually"
167-
if errorx.IsEOF(er) {
168-
lastWill = EOFMessage
169-
msg := er.Error()
170-
if len(msg) > 0 {
171-
lastWill = fmt.Sprintf("%s: %s", lastWill, msg)
172-
}
173-
s.updateTrigger(s.Rule.Id, false)
174-
}
175-
tp.Cancel()
176-
s.transitState(machine.Stopped, lastWill)
177-
return nil
178-
}
179-
}
180-
if count < rs.Attempts {
181-
if d > time.Duration(rs.MaxDelay) {
182-
d = time.Duration(rs.MaxDelay)
183-
}
184-
if rs.JitterFactor > 0 {
185-
d = time.Duration(math.Round(float64(d.Milliseconds())*((rand.Float64()*2-1)*rs.JitterFactor+1))) * time.Millisecond
186-
// make sure d is always in range
187-
for d <= 0 || d > time.Duration(rs.MaxDelay) {
188-
d = time.Duration(math.Round(float64(d.Milliseconds())*((rand.Float64()*2-1)*rs.JitterFactor+1))) * time.Millisecond
189-
}
190-
s.logger.Infof("Rule will restart with jitterred delay %d", d)
191-
} else {
192-
s.logger.Infof("Rule will restart with delay %d", d)
193-
}
194-
// retry after delay
195-
select {
196-
case <-ticker.C:
197-
break
198-
case <-ctx.Done():
199-
s.logger.Errorf("stop Rule retry as cancelled")
200-
return nil
201-
}
202-
count++
203-
if rs.Multiplier > 0 {
204-
d = time.Duration(rs.Delay) * time.Duration(math.Pow(rs.Multiplier, float64(count)))
205-
}
206-
} else {
207-
return er
137+
// This is called async, so do not touch state properties
138+
func (s *State) runTopo(tp *topo.Topo) {
139+
e := <-tp.Open()
140+
tp.Cancel()
141+
var lastWill string
142+
hasError := false
143+
if errorx.IsUnexpectedErr(e) { // Only restart Rule for errors
144+
tp.GetContext().SetError(e)
145+
lastWill = e.Error()
146+
hasError = true
147+
} else {
148+
// exit normally
149+
lastWill = "canceled manually"
150+
if errorx.IsEOF(e) {
151+
lastWill = EOFMessage
152+
msg := e.Error()
153+
if len(msg) > 0 {
154+
lastWill = fmt.Sprintf("%s: %s", lastWill, msg)
208155
}
156+
s.updateTrigger(s.Rule.Id, false)
209157
}
210-
})
211-
s.cleanRule(err)
158+
}
159+
s.cleanRule(hasError, lastWill)
212160
}
213161

214-
func (s *State) cleanRule(err error) {
162+
func (s *State) cleanRule(hasError bool, lastWill string) {
215163
s.ruleLock.Lock()
216164
defer s.ruleLock.Unlock()
217165
if s.topology != nil {
218166
s.topoGraph = s.topology.GetTopo()
219167
keys, values := s.topology.GetMetrics()
220168
s.stoppedMetrics = []any{keys, values}
221169
}
222-
if err != nil { // Exit after retries
223-
s.logger.Error(err)
224-
s.transitState(machine.StoppedByErr, err.Error())
225-
s.topology = nil
170+
s.topology = nil
171+
if hasError {
172+
s.transitState(machine.StoppedByErr, lastWill)
226173
s.logger.Infof("%s exit by error set tp to nil", s.Rule.Id)
227-
} else if strings.HasPrefix(s.sm.LastWill(), EOFMessage) {
228-
// Two case when err is nil; 1. Manually stop 2.EOF
229-
// Only transit status when EOF. Don't do this for manual stop because the state already changed!
230-
s.transitState(machine.Stopped, "")
231-
s.topology = nil
174+
} else {
175+
s.transitState(machine.Stopped, lastWill)
232176
s.logger.Infof("%s exit eof set tp to nil", s.Rule.Id)
233177
}
234178
}

internal/topo/rule/state.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ package rule
1616

1717
import (
1818
"bytes"
19-
"context"
2019
"encoding/json"
2120
"fmt"
2221
"strconv"
2322
"strings"
2423
"sync"
24+
"sync/atomic"
2525
"time"
2626

2727
"github.com/lf-edge/ekuiper/contract/v2/api"
@@ -50,27 +50,31 @@ type State struct {
5050
logger api.Logger
5151
updateTrigger func(string, bool)
5252
// The physical rule instance for each **run**. control the lifecycle in State.
53-
topology *topo.Topo
54-
cancelRetry context.CancelFunc
53+
topology *topo.Topo
5554
// temporary storage for topo graph to make sure even Rule close, the graph is still available
5655
topoGraph *def.PrintableTopo
5756
// Metric RunState
5857
stoppedMetrics []any
5958
// State machine
60-
sm machine.StateMachine
59+
sm machine.StateMachine
60+
actionq chan struct{}
61+
deleted atomic.Bool
6162
}
6263

6364
// NewState provision a state instance only.
6465
// Do not plan or run as before. If the Rule is not triggered, do not plan or run.
6566
// When called by recover Rule, expect
6667
func NewState(rule *def.Rule, updateTriggerFunc func(string, bool)) *State {
6768
contextLogger := conf.Log.WithField("Rule", rule.Id)
68-
return &State{
69+
s := &State{
6970
Rule: rule,
7071
sm: machine.NewStateMachine(contextLogger),
7172
logger: contextLogger,
7273
updateTrigger: updateTriggerFunc,
74+
actionq: make(chan struct{}, 100),
7375
}
76+
go s.processActions()
77+
return s
7478
}
7579

7680
// ValidateAndRun tries to set up the rule in an atomic way
@@ -99,7 +103,6 @@ func (s *State) Bootstrap() error {
99103
// By check state, it assures only one Start function is running at any time. (thread safe)
100104
// regSchedule: whether need to handle scheduler. If call externally, set it to true
101105
func (s *State) Start() error {
102-
s.logger.Debug("start RunState")
103106
done := s.sm.TriggerAction(machine.ActionSignalStart)
104107
if done {
105108
return nil
@@ -209,6 +212,7 @@ func (s *State) Delete() (err error) {
209212
s.topology.Cancel()
210213
s.topology.WaitClose()
211214
}
215+
s.triggerAction(true)
212216
return nil
213217
}
214218

internal/topo/rule/state_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,12 @@ func TestAPIs(t *testing.T) {
103103
assert.NoError(t, e)
104104
e = st.Start()
105105
assert.NoError(t, e)
106-
time.Sleep(100 * time.Millisecond)
106+
for i := 0; i < 10; i++ {
107+
time.Sleep(100 * time.Millisecond)
108+
if st.sm.CurrentState() == machine.Running {
109+
break
110+
}
111+
}
107112
assert.Equal(t, machine.Running, st.sm.CurrentState())
108113
err = st.ResetStreamOffset("test", nil)
109114
assert.EqualError(t, err, "stream test not found in topo")

internal/topo/rule/transit.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,28 @@ package rule
22

33
import "github.com/lf-edge/ekuiper/v2/internal/topo/rule/machine"
44

5+
func (s *State) processActions() {
6+
for range s.actionq {
7+
s.nextAction()
8+
}
9+
}
10+
11+
func (s *State) triggerAction(stop bool) {
12+
if stop {
13+
s.deleted.Store(true)
14+
close(s.actionq)
15+
return
16+
}
17+
if !s.deleted.Load() {
18+
s.actionq <- struct{}{}
19+
}
20+
}
21+
522
func (s *State) transitState(newState machine.RunState, lastWill string) {
623
chainAction := false
724
defer func() {
825
if chainAction {
9-
s.nextAction()
26+
s.triggerAction(false)
1027
}
1128
}()
1229
chainAction = s.sm.Transit(newState, lastWill)

0 commit comments

Comments
 (0)