Skip to content

Commit ac9d26d

Browse files
committed
feat: add i/o limit
1 parent 59e6e88 commit ac9d26d

File tree

2 files changed

+41
-5
lines changed

2 files changed

+41
-5
lines changed

pbuf/pooler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (bufpooler[USRDAT]) Parse(obj any, pooled UserBuffer[USRDAT]) UserBuffer[US
6767
func (bufpooler[USRDAT]) Reset(item *UserBuffer[USRDAT]) {
6868
// See https://golang.org/issue/23199
6969
const maxSize = 1 << 16
70-
if item.Cap() > maxSize { // drop large buffer
70+
if item.Cap() >= maxSize { // drop large buffer
7171
*item = UserBuffer[USRDAT]{}
7272
return
7373
}

pool.go

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,15 @@ import (
1111
type Pool[T any] struct {
1212
countin int32
1313
countout int32
14-
pooler Pooler[T]
15-
pool sync.Pool
14+
// 64 bit align
15+
16+
outlim int32
17+
inlim int32
18+
// 64 bit align
19+
20+
pool sync.Pool
21+
pooler Pooler[T]
22+
1623
noputbak bool
1724
}
1825

@@ -23,6 +30,9 @@ func NewPool[T any](pooler Pooler[T]) *Pool[T] {
2330
p.pool.New = func() any {
2431
return &Item[T]{pool: p}
2532
}
33+
// default limit
34+
p.outlim = 4096
35+
p.inlim = 4096
2636
return p
2737
}
2838

@@ -33,6 +43,24 @@ func (pool *Pool[T]) SetNoPutBack(on bool) {
3343
pool.noputbak = on
3444
}
3545

46+
// LimitOutput will automatically set new item no-autodestroy
47+
// if countout > outlim.
48+
func (pool *Pool[T]) LimitOutput(n int32) {
49+
if n <= 0 {
50+
panic("n must > 0")
51+
}
52+
pool.outlim = n
53+
}
54+
55+
// LimitInputwill automatically set new item no-autodestroy
56+
// if countout > inlim.
57+
func (pool *Pool[T]) LimitInput(n int32) {
58+
if n <= 0 {
59+
panic("n must > 0")
60+
}
61+
pool.inlim = n
62+
}
63+
3664
func (pool *Pool[T]) incin() {
3765
atomic.AddInt32(&pool.countin, 1)
3866
}
@@ -51,10 +79,17 @@ func (pool *Pool[T]) decout() {
5179

5280
func (pool *Pool[T]) newempty() *Item[T] {
5381
item := pool.pool.Get().(*Item[T])
54-
if item.stat.hasdestroyed() { // is recycled
82+
isrecycled := item.stat.hasdestroyed()
83+
if isrecycled {
5584
pool.decin()
5685
}
5786
item.stat = status(0)
87+
isfull := atomic.LoadInt32(&pool.countin) > pool.inlim ||
88+
atomic.LoadInt32(&pool.countout) > pool.outlim
89+
if isfull {
90+
// no out log, no reuse
91+
return item
92+
}
5893
pool.incout()
5994
return item.setautodestroy()
6095
}
@@ -66,7 +101,8 @@ func (pool *Pool[T]) put(item *Item[T]) {
66101

67102
item.stat.setdestroyed(true)
68103

69-
if pool.noputbak {
104+
if pool.noputbak ||
105+
atomic.LoadInt32(&pool.countin) > pool.inlim {
70106
return
71107
}
72108
pool.pool.Put(item)

0 commit comments

Comments
 (0)