Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions ants_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package ants_test

import (
"math/rand"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -226,3 +227,158 @@ func BenchmarkParallelAntsMultiPoolThroughput(b *testing.B) {
}
})
}

// cpuTask simulates a CPU-intensive task.
func cpuTask() {
n := 0
for i := 0; i < 1000; i++ {
n += i * i
}
_ = n
}

// ioTask simulates an IO-intensive task with a short sleep.
func ioTask() {
time.Sleep(time.Millisecond)
}

// mixedTask simulates uneven task durations to stress load-balancing decisions.
func mixedTask() {
if time.Now().UnixNano()%5 == 0 {
time.Sleep(10 * time.Millisecond)
} else {
time.Sleep(time.Millisecond)
}
}

func benchmarkMultiPoolLBS(b *testing.B, lb ants.LoadBalancer, task func()) {
p, _ := ants.NewMultiPoolWithLB(10, PoolCap/10, lb, ants.WithExpiryDuration(DefaultExpiredTime))
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = p.Submit(task)
}
})
}

// CPU-intensive task benchmarks across LBS strategies.

func BenchmarkMultiPool_RoundRobin_CPUThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, ants.NewRoundRobinLB(), cpuTask)
}

func BenchmarkMultiPool_LeastTasks_CPUThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, ants.NewLeastTasksLB(), cpuTask)
}

func BenchmarkMultiPool_LeastWaiting_CPUThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, ants.NewLeastWaitingLB(), cpuTask)
}

// IO-intensive task benchmarks across LBS strategies.

func BenchmarkMultiPool_RoundRobin_IOThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, ants.NewRoundRobinLB(), ioTask)
}

func BenchmarkMultiPool_LeastTasks_IOThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, ants.NewLeastTasksLB(), ioTask)
}

func BenchmarkMultiPool_LeastWaiting_IOThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, ants.NewLeastWaitingLB(), ioTask)
}

// Mixed (uneven duration) task benchmarks across LBS strategies.

func BenchmarkMultiPool_RoundRobin_MixedThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, ants.NewRoundRobinLB(), mixedTask)
}

func BenchmarkMultiPool_LeastTasks_MixedThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, ants.NewLeastTasksLB(), mixedTask)
}

func BenchmarkMultiPool_LeastWaiting_MixedThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, ants.NewLeastWaitingLB(), mixedTask)
}

// randomLB is a custom LoadBalancer that picks a pool at random,
// demonstrating how users can plug in their own strategy via NewMultiPoolWithLB.
type randomLB struct{}

func newRandomLB() *randomLB {
return &randomLB{}
}

func (r *randomLB) Pick(pools []ants.PoolMetrics) int {
return rand.Intn(len(pools))
}

func (r *randomLB) FallBack(_ []ants.PoolMetrics) int {
return -1
}

// Custom random LB benchmarks across task types.

func BenchmarkMultiPool_Random_CPUThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, newRandomLB(), cpuTask)
}

func BenchmarkMultiPool_Random_IOThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, newRandomLB(), ioTask)
}

func BenchmarkMultiPool_Random_MixedThroughput(b *testing.B) {
benchmarkMultiPoolLBS(b, newRandomLB(), mixedTask)
}

func benchmarkMultiPoolWithFuncLBSThroughput(b *testing.B, lb ants.LoadBalancer) {
p, _ := ants.NewMultiPoolWithFuncAndLB(10, PoolCap/10, demoPoolFunc, lb, ants.WithExpiryDuration(DefaultExpiredTime))
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = p.Invoke(BenchParam)
}
})
}

func BenchmarkMultiPoolWithFunc_RoundRobin_Throughput(b *testing.B) {
benchmarkMultiPoolWithFuncLBSThroughput(b, ants.NewRoundRobinLB())
}

func BenchmarkMultiPoolWithFunc_LeastTasks_Throughput(b *testing.B) {
benchmarkMultiPoolWithFuncLBSThroughput(b, ants.NewLeastTasksLB())
}

func BenchmarkMultiPoolWithFunc_LeastWaiting_Throughput(b *testing.B) {
benchmarkMultiPoolWithFuncLBSThroughput(b, ants.NewLeastWaitingLB())
}

func benchmarkMultiPoolWithFuncGenericLBSThroughput(b *testing.B, lb ants.LoadBalancer) {
p, _ := ants.NewMultiPoolWithFuncGenericAndLB(10, PoolCap/10, demoPoolFuncInt, lb, ants.WithExpiryDuration(DefaultExpiredTime))
defer p.ReleaseTimeout(DefaultExpiredTime) //nolint:errcheck

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = p.Invoke(BenchParam)
}
})
}

func BenchmarkMultiPoolWithFuncGeneric_RoundRobin_Throughput(b *testing.B) {
benchmarkMultiPoolWithFuncGenericLBSThroughput(b, ants.NewRoundRobinLB())
}

func BenchmarkMultiPoolWithFuncGeneric_LeastTasks_Throughput(b *testing.B) {
benchmarkMultiPoolWithFuncGenericLBSThroughput(b, ants.NewLeastTasksLB())
}

func BenchmarkMultiPoolWithFuncGeneric_LeastWaiting_Throughput(b *testing.B) {
benchmarkMultiPoolWithFuncGenericLBSThroughput(b, ants.NewLeastWaitingLB())
}
210 changes: 210 additions & 0 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,3 +1815,213 @@ func TestRebootNewPoolWithPreAllocCalc(t *testing.T) {
wg.Wait()
require.EqualValues(t, 499500, sum, "The result should be 499500")
}

func TestMultiPoolWithLB_RoundRobin(t *testing.T) {
_, err := ants.NewMultiPoolWithLB(-1, 5, ants.NewRoundRobinLB())
require.ErrorIs(t, err, ants.ErrInvalidMultiPoolSize)
_, err = ants.NewMultiPoolWithLB(10, 5, nil)
require.ErrorIs(t, err, ants.ErrInvalidLoadBalancingStrategy)
_, err = ants.NewMultiPoolWithLB(10, 5, ants.NewRoundRobinLB(), ants.WithExpiryDuration(-1))
require.ErrorIs(t, err, ants.ErrInvalidPoolExpiry)

mp, err := ants.NewMultiPoolWithLB(10, 5, ants.NewRoundRobinLB())
require.NoError(t, err)
testFn := func() {
for i := 0; i < 50; i++ {
err = mp.Submit(longRunningFunc)
require.NoError(t, err)
}
require.EqualValues(t, 50, mp.Running())
require.EqualValues(t, 50, mp.Cap())
require.EqualValues(t, 0, mp.Free())
require.False(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningFunc, 1)
require.NoError(t, mp.ReleaseTimeout(3*time.Second))
require.ErrorIs(t, mp.Submit(nil), ants.ErrPoolClosed)
require.True(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningFunc, 0)
}
testFn()
mp.Reboot()
testFn()
}

func TestMultiPoolWithLB_LeastTasks(t *testing.T) {
mp, err := ants.NewMultiPoolWithLB(10, 5, ants.NewLeastTasksLB(), ants.WithNonblocking(true))
require.NoError(t, err)
testFn := func() {
for i := 0; i < 50; i++ {
_ = mp.Submit(longRunningFunc)
}
require.False(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningFunc, 1)
require.NoError(t, mp.ReleaseTimeout(3*time.Second))
require.ErrorIs(t, mp.Submit(nil), ants.ErrPoolClosed)
require.True(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningFunc, 0)
}
testFn()
mp.Reboot()
testFn()
}

func TestMultiPoolWithLB_LeastWaiting(t *testing.T) {
mp, err := ants.NewMultiPoolWithLB(10, 5, ants.NewLeastWaitingLB(), ants.WithNonblocking(true))
require.NoError(t, err)
testFn := func() {
for i := 0; i < 50; i++ {
_ = mp.Submit(longRunningFunc)
}
require.False(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningFunc, 1)
require.NoError(t, mp.ReleaseTimeout(3*time.Second))
require.ErrorIs(t, mp.Submit(nil), ants.ErrPoolClosed)
require.True(t, mp.IsClosed())
atomic.StoreInt32(&stopLongRunningFunc, 0)
}
testFn()
mp.Reboot()
testFn()
}

func TestMultiPoolWithFuncAndLB_RoundRobin(t *testing.T) {
_, err := ants.NewMultiPoolWithFuncAndLB(-1, 5, longRunningPoolFunc, ants.NewRoundRobinLB())
require.ErrorIs(t, err, ants.ErrInvalidMultiPoolSize)
_, err = ants.NewMultiPoolWithFuncAndLB(10, 5, longRunningPoolFunc, nil)
require.ErrorIs(t, err, ants.ErrInvalidLoadBalancingStrategy)
_, err = ants.NewMultiPoolWithFuncAndLB(10, 5, longRunningPoolFunc, ants.NewRoundRobinLB(), ants.WithExpiryDuration(-1))
require.ErrorIs(t, err, ants.ErrInvalidPoolExpiry)

ch := make(chan struct{})
mp, err := ants.NewMultiPoolWithFuncAndLB(10, 5, longRunningPoolFunc, ants.NewRoundRobinLB())
require.NoError(t, err)
testFn := func() {
for i := 0; i < 50; i++ {
err = mp.Invoke(ch)
require.NoError(t, err)
}
require.EqualValues(t, 50, mp.Running())
require.EqualValues(t, 50, mp.Cap())
require.EqualValues(t, 0, mp.Free())
require.False(t, mp.IsClosed())
close(ch)
require.NoError(t, mp.ReleaseTimeout(3*time.Second))
require.ErrorIs(t, mp.Invoke(nil), ants.ErrPoolClosed)
require.True(t, mp.IsClosed())
ch = make(chan struct{})
}
testFn()
mp.Reboot()
testFn()
}

func TestMultiPoolWithFuncAndLB_LeastTasks(t *testing.T) {
ch := make(chan struct{})
mp, err := ants.NewMultiPoolWithFuncAndLB(10, 5, longRunningPoolFunc, ants.NewLeastTasksLB(), ants.WithNonblocking(true))
require.NoError(t, err)
testFn := func() {
for i := 0; i < 50; i++ {
_ = mp.Invoke(ch)
}
require.False(t, mp.IsClosed())
close(ch)
require.NoError(t, mp.ReleaseTimeout(3*time.Second))
require.ErrorIs(t, mp.Invoke(nil), ants.ErrPoolClosed)
require.True(t, mp.IsClosed())
ch = make(chan struct{})
}
testFn()
mp.Reboot()
testFn()
}

func TestMultiPoolWithFuncAndLB_LeastWaiting(t *testing.T) {
ch := make(chan struct{})
mp, err := ants.NewMultiPoolWithFuncAndLB(10, 5, longRunningPoolFunc, ants.NewLeastWaitingLB(), ants.WithNonblocking(true))
require.NoError(t, err)
testFn := func() {
for i := 0; i < 50; i++ {
_ = mp.Invoke(ch)
}
require.False(t, mp.IsClosed())
close(ch)
require.NoError(t, mp.ReleaseTimeout(3*time.Second))
require.ErrorIs(t, mp.Invoke(nil), ants.ErrPoolClosed)
require.True(t, mp.IsClosed())
ch = make(chan struct{})
}
testFn()
mp.Reboot()
testFn()
}

func TestMultiPoolWithFuncGenericAndLB_RoundRobin(t *testing.T) {
_, err := ants.NewMultiPoolWithFuncGenericAndLB(-1, 5, longRunningPoolFuncCh, ants.NewRoundRobinLB())
require.ErrorIs(t, err, ants.ErrInvalidMultiPoolSize)
_, err = ants.NewMultiPoolWithFuncGenericAndLB(10, 5, longRunningPoolFuncCh, nil)
require.ErrorIs(t, err, ants.ErrInvalidLoadBalancingStrategy)
_, err = ants.NewMultiPoolWithFuncGenericAndLB(10, 5, longRunningPoolFuncCh, ants.NewRoundRobinLB(), ants.WithExpiryDuration(-1))
require.ErrorIs(t, err, ants.ErrInvalidPoolExpiry)

ch := make(chan struct{})
mp, err := ants.NewMultiPoolWithFuncGenericAndLB(10, 5, longRunningPoolFuncCh, ants.NewRoundRobinLB())
require.NoError(t, err)
testFn := func() {
for i := 0; i < 50; i++ {
err = mp.Invoke(ch)
require.NoError(t, err)
}
require.EqualValues(t, 50, mp.Running())
require.EqualValues(t, 50, mp.Cap())
require.EqualValues(t, 0, mp.Free())
require.False(t, mp.IsClosed())
close(ch)
require.NoError(t, mp.ReleaseTimeout(3*time.Second))
require.ErrorIs(t, mp.Invoke(nil), ants.ErrPoolClosed)
require.True(t, mp.IsClosed())
ch = make(chan struct{})
}
testFn()
mp.Reboot()
testFn()
}

func TestMultiPoolWithFuncGenericAndLB_LeastTasks(t *testing.T) {
ch := make(chan struct{})
mp, err := ants.NewMultiPoolWithFuncGenericAndLB(10, 5, longRunningPoolFuncCh, ants.NewLeastTasksLB(), ants.WithNonblocking(true))
require.NoError(t, err)
testFn := func() {
for i := 0; i < 50; i++ {
_ = mp.Invoke(ch)
}
require.False(t, mp.IsClosed())
close(ch)
require.NoError(t, mp.ReleaseTimeout(3*time.Second))
require.ErrorIs(t, mp.Invoke(nil), ants.ErrPoolClosed)
require.True(t, mp.IsClosed())
ch = make(chan struct{})
}
testFn()
mp.Reboot()
testFn()
}

func TestMultiPoolWithFuncGenericAndLB_LeastWaiting(t *testing.T) {
ch := make(chan struct{})
mp, err := ants.NewMultiPoolWithFuncGenericAndLB(10, 5, longRunningPoolFuncCh, ants.NewLeastWaitingLB(), ants.WithNonblocking(true))
require.NoError(t, err)
testFn := func() {
for i := 0; i < 50; i++ {
_ = mp.Invoke(ch)
}
require.False(t, mp.IsClosed())
close(ch)
require.NoError(t, mp.ReleaseTimeout(3*time.Second))
require.ErrorIs(t, mp.Invoke(nil), ants.ErrPoolClosed)
require.True(t, mp.IsClosed())
ch = make(chan struct{})
}
testFn()
mp.Reboot()
testFn()
}
Loading
Loading