diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 8b027d1..649b5eb 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -23,7 +23,7 @@ jobs: ref: ${{ github.event.pull_request.head.sha }} persist-credentials: false # otherwise, the token used is the GITHUB_TOKEN, instead of your personal access token. fetch-depth: 0 # otherwise, there would be errors pushing refs to the destination repository. - + - name: Setup go uses: actions/setup-go@v4 with: @@ -31,8 +31,7 @@ jobs: - name: Run Test run: | - go test -count=100 -timeout=1800s -v ./... -covermode=count -coverprofile=coverage.out - + go test -count=150 -timeout=3600s -v ./... -covermode=count -coverprofile=coverage.out - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go index 8e27bad..86e6a2a 100644 --- a/benchmark/benchmark_test.go +++ b/benchmark/benchmark_test.go @@ -7,7 +7,7 @@ import ( gotaskflow "github.com/noneback/go-taskflow" ) -var executor = gotaskflow.NewExecutor(6400) +var executor = gotaskflow.NewExecutor(1) func BenchmarkC32(b *testing.B) { tf := gotaskflow.NewTaskFlow("G") diff --git a/taskflow_test.go b/taskflow_test.go index 653e7c7..f7f6651 100644 --- a/taskflow_test.go +++ b/taskflow_test.go @@ -721,3 +721,45 @@ func TestSequencialTaskingPanic(t *testing.T) { t.Fail() } } +func TestDeadlock(t *testing.T) { + // BUG: https://github.com/noneback/go-taskflow/issues/99 + tf := gotaskflow.NewTaskFlow("G1") + exe := gotaskflow.NewExecutor(1) + N := 100 + prev := tf.NewTask("N0", func() {}) + for i := 1; i < 32; i++ { + next := tf.NewTask(fmt.Sprintf("N%d", i), func() {}) + prev.Precede(next) + prev = next + } + + for i := 0; i < N; i++ { + exe.Run(tf).Wait() + } + + tf = gotaskflow.NewTaskFlow("G2") + + layersCount := 8 + layerNodesCount := 8 + + var curLayer, upperLayer []*gotaskflow.Task + + for i := 0; i < layersCount; i++ { + for j := 0; j < layerNodesCount; j++ { + task := tf.NewTask(fmt.Sprintf("N%d", i*layersCount+j), func() {}) + + for i := range upperLayer { + upperLayer[i].Precede(task) + } + + curLayer = append(curLayer, task) + } + + upperLayer = curLayer + curLayer = []*gotaskflow.Task{} + } + + for i := 0; i < N; i++ { + exe.Run(tf).Wait() + } +} diff --git a/utils/copool.go b/utils/copool.go index 2579ce6..effffde 100644 --- a/utils/copool.go +++ b/utils/copool.go @@ -24,7 +24,7 @@ type Copool struct { cap uint taskQ *Queue[*cotask] corun atomic.Int32 - coworker atomic.Int32 + coworker uint mu *sync.Mutex taskObjPool *ObjectPool[*cotask] } @@ -36,7 +36,7 @@ func NewCopool(cap uint) *Copool { taskQ: NewQueue[*cotask](false), cap: cap, corun: atomic.Int32{}, - coworker: atomic.Int32{}, + coworker: 0, mu: &sync.Mutex{}, taskObjPool: NewObjectPool(func() *cotask { return &cotask{} @@ -74,16 +74,15 @@ func (cp *Copool) CtxGo(ctx *context.Context, f func()) { cp.mu.Lock() cp.taskQ.Put(task) - if cp.coworker.Load() == 0 || cp.taskQ.Len() != 0 && uint(cp.coworker.Load()) < uint(cp.cap) { + if cp.coworker == 0 || cp.taskQ.Len() != 0 && cp.coworker < cp.cap { + cp.coworker++ cp.mu.Unlock() - cp.coworker.Add(1) go func() { - defer cp.coworker.Add(-1) - for { cp.mu.Lock() if cp.taskQ.Len() == 0 { + cp.coworker-- cp.mu.Unlock() return } diff --git a/visualizer_dot.go b/visualizer_dot.go index 078113e..086bdbc 100644 --- a/visualizer_dot.go +++ b/visualizer_dot.go @@ -32,7 +32,7 @@ type DotEdge struct { attributes map[string]string } -func NewDotGraph(name string) *DotGraph { +func newDotGraph(name string) *DotGraph { return &DotGraph{ name: name, isSubgraph: false, @@ -230,7 +230,7 @@ func (v *dotVizer) visualizeG(g *eGraph, parentGraph *DotGraph) error { // Visualize generates raw dag text in dot format and writes to writer func (v *dotVizer) Visualize(tf *TaskFlow, writer io.Writer) error { - graph := NewDotGraph(tf.graph.name) + graph := newDotGraph(tf.graph.name) err := v.visualizeG(tf.graph, graph) if err != nil { return fmt.Errorf("visualize %v -> %w", tf.graph.name, err) diff --git a/visualizer_dot_test.go b/visualizer_dot_test.go index fc93fca..4dbd517 100644 --- a/visualizer_dot_test.go +++ b/visualizer_dot_test.go @@ -7,7 +7,7 @@ import ( ) func TestDotGraph_String(t *testing.T) { - graph := NewDotGraph("test_graph") + graph := newDotGraph("test_graph") graph.attributes["rankdir"] = "LR" nodeA := graph.CreateNode("A") @@ -38,7 +38,7 @@ func TestDotGraph_String(t *testing.T) { } func TestDotGraph_SubGraph(t *testing.T) { - graph := NewDotGraph("main_graph") + graph := newDotGraph("main_graph") nodeA := graph.CreateNode("A") nodeB := graph.CreateNode("B")