Skip to content

Commit 7e81900

Browse files
committed
add optional priority mempool
1 parent 61cabd6 commit 7e81900

File tree

13 files changed

+517
-96
lines changed

13 files changed

+517
-96
lines changed

chain/transaction.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ func (t *TransactionData) Marshal(p *codec.Packer) error {
112112
return t.marshal(p)
113113
}
114114

115+
func (*TransactionData) Priority() uint64 { return 0 }
116+
115117
func (t *TransactionData) marshal(p *codec.Packer) error {
116118
t.Base.Marshal(p)
117119
return t.Actions.MarshalInto(p)

internal/list/list.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
type Item interface {
1616
GetID() ids.ID // method for returning an id of the item
1717
GetExpiry() int64 // method for returning this items timestamp
18+
Priority() uint64
1819
}
1920

2021
// List implements a double-linked list. It offers

internal/list/list_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ func (mti *TestItem) GetExpiry() int64 {
2929
return mti.timestamp
3030
}
3131

32+
func (*TestItem) Priority() uint64 {
33+
return 0
34+
}
35+
3236
func GenerateTestItem(str string) *TestItem {
3337
id := ids.GenerateTestID()
3438
return &TestItem{

internal/mempool/abstract.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package mempool
5+
6+
import (
7+
"context"
8+
"time"
9+
10+
"github.com/ava-labs/avalanchego/ids"
11+
"github.com/ava-labs/avalanchego/trace"
12+
)
13+
14+
// NewGeneralMempool is creating a mempool using a FIFO queue.
15+
type AbstractMempoolFactory[T Item] func(
16+
tracer trace.Tracer,
17+
maxSize int,
18+
maxSponsorSize int,
19+
) AbstractMempool[T]
20+
21+
type AbstractMempool[T Item] interface {
22+
Has(ctx context.Context, itemID ids.ID) bool
23+
Add(ctx context.Context, items []T)
24+
PeekNext(ctx context.Context) (T, bool)
25+
PopNext(ctx context.Context) (T, bool)
26+
Remove(ctx context.Context, items []T)
27+
Len(ctx context.Context) int
28+
Size(context.Context) int
29+
SetMinTimestamp(ctx context.Context, t int64) []T
30+
Top(
31+
ctx context.Context,
32+
targetDuration time.Duration,
33+
f func(context.Context, T) (cont bool, restore bool, err error),
34+
) error
35+
StartStreaming(_ context.Context)
36+
PrepareStream(ctx context.Context, count int)
37+
Stream(ctx context.Context, count int) []T
38+
FinishStreaming(ctx context.Context, restorable []T) int
39+
}

internal/mempool/list_queue.go

Lines changed: 0 additions & 34 deletions
This file was deleted.

internal/mempool/mempool.go

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@ import (
2121

2222
const maxPrealloc = 4_096
2323

24+
type GeneralMempool[T Item] Mempool[T, *list.Element[T]]
25+
2426
type Item interface {
2527
eheap.Item
2628

2729
Sponsor() codec.Address
2830
Size() int
31+
Priority() uint64
2932
}
3033

31-
type Mempool[T Item] struct {
34+
type Mempool[T Item, E eheap.Item] struct {
3235
tracer trace.Tracer
3336

3437
mu sync.RWMutex
@@ -38,9 +41,9 @@ type Mempool[T Item] struct {
3841
maxSize int
3942
maxSponsorSize int // Maximum items allowed by a single sponsor
4043

41-
// queue *list.List[T]
42-
queue queue.Queue[T, *list.Element[T]]
43-
eh *eheap.ExpiryHeap[*list.Element[T]]
44+
// queue *list.List[T, E]
45+
queue queue.Queue[T, E]
46+
eh *eheap.ExpiryHeap[E]
4447

4548
// owned tracks the number of items in the mempool owned by a single
4649
// [Sponsor]
@@ -54,27 +57,25 @@ type Mempool[T Item] struct {
5457
nextStreamFetched bool
5558
}
5659

57-
// New creates a new [Mempool]. [maxSize] must be > 0 or else the
58-
// implementation may panic.
59-
func New[T Item](
60+
// NewGeneralMempool is creating a mempool using a FIFO queue.
61+
func NewGeneralMempool[T Item](
6062
tracer trace.Tracer,
6163
maxSize int,
6264
maxSponsorSize int,
63-
) *Mempool[T] {
64-
return &Mempool[T]{
65-
tracer: tracer,
66-
67-
maxSize: maxSize,
68-
maxSponsorSize: maxSponsorSize,
69-
70-
queue: NewList[T](),
71-
eh: eheap.New[*list.Element[T]](min(maxSize, maxPrealloc)),
65+
) AbstractMempool[T] {
66+
return newGeneralMempool[T](tracer, maxSize, maxSponsorSize)
67+
}
7268

73-
owned: map[codec.Address]int{},
74-
}
69+
// NewPriorityMempool is creating a mempool using a Priority queue.
70+
func NewPriorityMempool[T Item](
71+
tracer trace.Tracer,
72+
maxSize int,
73+
maxSponsorSize int,
74+
) AbstractMempool[T] {
75+
return newPriorityMempool[T](tracer, maxSize, maxSponsorSize)
7576
}
7677

77-
func (m *Mempool[T]) removeFromOwned(item T) {
78+
func (m *Mempool[T, E]) removeFromOwned(item T) {
7879
sender := item.Sponsor()
7980
items, ok := m.owned[sender]
8081
if !ok {
@@ -89,7 +90,7 @@ func (m *Mempool[T]) removeFromOwned(item T) {
8990
}
9091

9192
// Has returns if the eh of [m] contains [itemID]
92-
func (m *Mempool[T]) Has(ctx context.Context, itemID ids.ID) bool {
93+
func (m *Mempool[T, E]) Has(ctx context.Context, itemID ids.ID) bool {
9394
_, span := m.tracer.Start(ctx, "Mempool.Has")
9495
defer span.End()
9596

@@ -103,7 +104,7 @@ func (m *Mempool[T]) Has(ctx context.Context, itemID ids.ID) bool {
103104
// the item sponsor is not exempt and their items in the mempool exceed m.maxSponsorSize.
104105
// If the size of m exceeds m.maxSize, Add pops the lowest value item
105106
// from m.eh.
106-
func (m *Mempool[T]) Add(ctx context.Context, items []T) {
107+
func (m *Mempool[T, E]) Add(ctx context.Context, items []T) {
107108
_, span := m.tracer.Start(ctx, "Mempool.Add")
108109
defer span.End()
109110

@@ -113,7 +114,7 @@ func (m *Mempool[T]) Add(ctx context.Context, items []T) {
113114
m.add(items, false)
114115
}
115116

116-
func (m *Mempool[T]) add(items []T, front bool) {
117+
func (m *Mempool[T, E]) add(items []T, front bool) {
117118
for _, item := range items {
118119
sender := item.Sponsor()
119120

@@ -138,7 +139,7 @@ func (m *Mempool[T]) add(items []T, front bool) {
138139
}
139140

140141
// Add to mempool
141-
var elem *list.Element[T]
142+
var elem E
142143
if !front {
143144
elem = m.queue.Push(item)
144145
} else {
@@ -152,23 +153,23 @@ func (m *Mempool[T]) add(items []T, front bool) {
152153

153154
// PeekNext returns the highest valued item in m.eh.
154155
// Assumes there is non-zero items in [Mempool]
155-
func (m *Mempool[T]) PeekNext(ctx context.Context) (T, bool) {
156+
func (m *Mempool[T, E]) PeekNext(ctx context.Context) (T, bool) {
156157
_, span := m.tracer.Start(ctx, "Mempool.PeekNext")
157158
defer span.End()
158159

159160
m.mu.RLock()
160161
defer m.mu.RUnlock()
161162

162-
first := m.queue.First()
163-
if first == nil {
163+
firstValue, ok := m.queue.FirstValue()
164+
if !ok {
164165
return *new(T), false
165166
}
166-
return first.Value(), true
167+
return firstValue, true
167168
}
168169

169170
// PopNext removes and returns the highest valued item in m.eh.
170171
// Assumes there is non-zero items in [Mempool]
171-
func (m *Mempool[T]) PopNext(ctx context.Context) (T, bool) { // O(log N)
172+
func (m *Mempool[T, E]) PopNext(ctx context.Context) (T, bool) { // O(log N)
172173
_, span := m.tracer.Start(ctx, "Mempool.PopNext")
173174
defer span.End()
174175

@@ -178,9 +179,9 @@ func (m *Mempool[T]) PopNext(ctx context.Context) (T, bool) { // O(log N)
178179
return m.popNext()
179180
}
180181

181-
func (m *Mempool[T]) popNext() (T, bool) {
182-
first := m.queue.First()
183-
if first == nil {
182+
func (m *Mempool[T, E]) popNext() (T, bool) {
183+
first, ok := m.queue.First()
184+
if !ok {
184185
return *new(T), false
185186
}
186187
v := m.queue.Remove(first)
@@ -191,7 +192,7 @@ func (m *Mempool[T]) popNext() (T, bool) {
191192
}
192193

193194
// Remove removes [items] from m.
194-
func (m *Mempool[T]) Remove(ctx context.Context, items []T) {
195+
func (m *Mempool[T, E]) Remove(ctx context.Context, items []T) {
195196
_, span := m.tracer.Start(ctx, "Mempool.Remove")
196197
defer span.End()
197198

@@ -210,7 +211,7 @@ func (m *Mempool[T]) Remove(ctx context.Context, items []T) {
210211
}
211212

212213
// Len returns the number of items in m.
213-
func (m *Mempool[T]) Len(ctx context.Context) int {
214+
func (m *Mempool[T, E]) Len(ctx context.Context) int {
214215
_, span := m.tracer.Start(ctx, "Mempool.Len")
215216
defer span.End()
216217

@@ -221,15 +222,15 @@ func (m *Mempool[T]) Len(ctx context.Context) int {
221222
}
222223

223224
// Size returns the size (in bytes) of items in m.
224-
func (m *Mempool[T]) Size(context.Context) int {
225+
func (m *Mempool[T, E]) Size(context.Context) int {
225226
m.mu.RLock()
226227
defer m.mu.RUnlock()
227228

228229
return m.pendingSize
229230
}
230231

231-
// SetMinTimestamp removes and returns all items with a lower expiry than [t] from m.
232-
func (m *Mempool[T]) SetMinTimestamp(ctx context.Context, t int64) []T {
232+
// SetMinTimestamp removes and returns all items with a lower expiry than [T, E] from m.
233+
func (m *Mempool[T, E]) SetMinTimestamp(ctx context.Context, t int64) []T {
233234
_, span := m.tracer.Start(ctx, "Mempool.SetMinTimesamp")
234235
defer span.End()
235236

@@ -239,8 +240,7 @@ func (m *Mempool[T]) SetMinTimestamp(ctx context.Context, t int64) []T {
239240
removedElems := m.eh.SetMin(t)
240241
removed := make([]T, len(removedElems))
241242
for i, remove := range removedElems {
242-
m.queue.Remove(remove)
243-
v := remove.Value()
243+
v := m.queue.Remove(remove)
244244
m.removeFromOwned(v)
245245
m.pendingSize -= v.Size()
246246
removed[i] = v
@@ -249,7 +249,7 @@ func (m *Mempool[T]) SetMinTimestamp(ctx context.Context, t int64) []T {
249249
}
250250

251251
// Top iterates over the highest-valued items in the mempool.
252-
func (m *Mempool[T]) Top(
252+
func (m *Mempool[T, E]) Top(
253253
ctx context.Context,
254254
targetDuration time.Duration,
255255
f func(context.Context, T) (cont bool, restore bool, err error),
@@ -291,7 +291,7 @@ func (m *Mempool[T]) Top(
291291
// Streaming is useful for block building because we can get a feed of the
292292
// best txs to build without holding the lock during the duration of the build
293293
// process. Streaming in batches allows for various state prefetching operations.
294-
func (m *Mempool[T]) StartStreaming(_ context.Context) {
294+
func (m *Mempool[T, E]) StartStreaming(_ context.Context) {
295295
m.mu.Lock()
296296
defer m.mu.Unlock()
297297

@@ -301,7 +301,7 @@ func (m *Mempool[T]) StartStreaming(_ context.Context) {
301301

302302
// PrepareStream prefetches the next [count] items from the mempool to
303303
// reduce the latency of calls to [StreamItems].
304-
func (m *Mempool[T]) PrepareStream(ctx context.Context, count int) {
304+
func (m *Mempool[T, E]) PrepareStream(ctx context.Context, count int) {
305305
_, span := m.tracer.Start(ctx, "Mempool.PrepareStream")
306306
defer span.End()
307307

@@ -314,7 +314,7 @@ func (m *Mempool[T]) PrepareStream(ctx context.Context, count int) {
314314

315315
// Stream gets the next highest-valued [count] items from the mempool, not
316316
// including what has already been streamed.
317-
func (m *Mempool[T]) Stream(ctx context.Context, count int) []T {
317+
func (m *Mempool[T, E]) Stream(ctx context.Context, count int) []T {
318318
_, span := m.tracer.Start(ctx, "Mempool.Stream")
319319
defer span.End()
320320

@@ -330,7 +330,7 @@ func (m *Mempool[T]) Stream(ctx context.Context, count int) []T {
330330
return m.streamItems(count)
331331
}
332332

333-
func (m *Mempool[T]) streamItems(count int) []T {
333+
func (m *Mempool[T, E]) streamItems(count int) []T {
334334
txs := make([]T, 0, count)
335335
for len(txs) < count {
336336
item, ok := m.popNext()
@@ -345,7 +345,7 @@ func (m *Mempool[T]) streamItems(count int) []T {
345345

346346
// FinishStreaming restores [restorable] items to the mempool and clears
347347
// the set of all previously streamed items.
348-
func (m *Mempool[T]) FinishStreaming(ctx context.Context, restorable []T) int {
348+
func (m *Mempool[T, E]) FinishStreaming(ctx context.Context, restorable []T) int {
349349
_, span := m.tracer.Start(ctx, "Mempool.FinishStreaming")
350350
defer span.End()
351351

@@ -368,3 +368,41 @@ func (m *Mempool[T]) FinishStreaming(ctx context.Context, restorable []T) int {
368368
m.streamLock.Unlock()
369369
return restored
370370
}
371+
372+
// newGeneralMempool is creating a mempool using a FIFO queue.
373+
func newGeneralMempool[T Item](
374+
tracer trace.Tracer,
375+
maxSize int,
376+
maxSponsorSize int,
377+
) *Mempool[T, *list.Element[T]] {
378+
return &Mempool[T, *list.Element[T]]{
379+
tracer: tracer,
380+
381+
maxSize: maxSize,
382+
maxSponsorSize: maxSponsorSize,
383+
384+
queue: queue.NewList[T](),
385+
eh: eheap.New[*list.Element[T]](min(maxSize, maxPrealloc)),
386+
387+
owned: map[codec.Address]int{},
388+
}
389+
}
390+
391+
// NewPriorityMempool is creating a mempool using a Priority queue.
392+
func newPriorityMempool[T Item](
393+
tracer trace.Tracer,
394+
maxSize int,
395+
maxSponsorSize int,
396+
) *Mempool[T, T] {
397+
return &Mempool[T, T]{
398+
tracer: tracer,
399+
400+
maxSize: maxSize,
401+
maxSponsorSize: maxSponsorSize,
402+
403+
queue: queue.NewPriorityQueue[T](),
404+
eh: eheap.New[T](min(maxSize, maxPrealloc)),
405+
406+
owned: map[codec.Address]int{},
407+
}
408+
}

0 commit comments

Comments
 (0)