Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions chain/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func (t *TransactionData) Marshal(p *codec.Packer) error {
return t.marshal(p)
}

func (*TransactionData) Priority() uint64 { return 0 }

func (t *TransactionData) marshal(p *codec.Packer) error {
t.Base.Marshal(p)
return t.Actions.MarshalInto(p)
Expand Down
9 changes: 8 additions & 1 deletion internal/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package list

import "github.com/ava-labs/avalanchego/ids"
import (
"github.com/ava-labs/avalanchego/ids"
)

// Item defines an interface accepted by [List].
//
Expand All @@ -13,6 +15,7 @@ import "github.com/ava-labs/avalanchego/ids"
type Item interface {
GetID() ids.ID // method for returning an id of the item
GetExpiry() int64 // method for returning this items timestamp
Priority() uint64
}

// List implements a double-linked list. It offers
Expand Down Expand Up @@ -64,6 +67,10 @@ func (e *Element[T]) GetExpiry() int64 {
return e.value.GetExpiry()
}

func (e *Element[T]) Priority() uint64 {
return e.value.Priority()
}

func (l *List[T]) First() *Element[T] {
if l.size == 0 {
return nil
Expand Down
4 changes: 4 additions & 0 deletions internal/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (mti *TestItem) GetExpiry() int64 {
return mti.timestamp
}

func (*TestItem) Priority() uint64 {
return 0
}

func GenerateTestItem(str string) *TestItem {
id := ids.GenerateTestID()
return &TestItem{
Expand Down
39 changes: 39 additions & 0 deletions internal/mempool/abstract.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package mempool

import (
"context"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/trace"
)

// NewGeneralMempool is creating a mempool using a FIFO queue.
type AbstractMempoolFactory[T Item] func(
tracer trace.Tracer,
maxSize int,
maxSponsorSize int,
) AbstractMempool[T]

type AbstractMempool[T Item] interface {
Has(ctx context.Context, itemID ids.ID) bool
Add(ctx context.Context, items []T)
PeekNext(ctx context.Context) (T, bool)
PopNext(ctx context.Context) (T, bool)
Remove(ctx context.Context, items []T)
Len(ctx context.Context) int
Size(context.Context) int
SetMinTimestamp(ctx context.Context, t int64) []T
Top(
ctx context.Context,
targetDuration time.Duration,
f func(context.Context, T) (cont bool, restore bool, err error),
) error
StartStreaming(_ context.Context)
PrepareStream(ctx context.Context, count int)
Stream(ctx context.Context, count int) []T
FinishStreaming(ctx context.Context, restorable []T) int
}
130 changes: 85 additions & 45 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ import (
"github.com/ava-labs/hypersdk/codec"
"github.com/ava-labs/hypersdk/internal/eheap"
"github.com/ava-labs/hypersdk/internal/list"
"github.com/ava-labs/hypersdk/internal/mempool/queue"
)

const maxPrealloc = 4_096

type GeneralMempool[T Item] Mempool[T, *list.Element[T]]

type Item interface {
eheap.Item

Sponsor() codec.Address
Size() int
Priority() uint64
}

type Mempool[T Item] struct {
type Mempool[T Item, E queue.Item] struct {
tracer trace.Tracer

mu sync.RWMutex
Expand All @@ -37,8 +41,9 @@ type Mempool[T Item] struct {
maxSize int
maxSponsorSize int // Maximum items allowed by a single sponsor

queue *list.List[T]
eh *eheap.ExpiryHeap[*list.Element[T]]
// queue *list.List[T, E]
queue queue.Queue[T, E]
eh *eheap.ExpiryHeap[E]

// owned tracks the number of items in the mempool owned by a single
// [Sponsor]
Expand All @@ -52,27 +57,25 @@ type Mempool[T Item] struct {
nextStreamFetched bool
}

// New creates a new [Mempool]. [maxSize] must be > 0 or else the
// implementation may panic.
func New[T Item](
// NewGeneralMempool is creating a mempool using a FIFO queue.
func NewGeneralMempool[T Item](
tracer trace.Tracer,
maxSize int,
maxSponsorSize int,
) *Mempool[T] {
return &Mempool[T]{
tracer: tracer,

maxSize: maxSize,
maxSponsorSize: maxSponsorSize,

queue: &list.List[T]{},
eh: eheap.New[*list.Element[T]](min(maxSize, maxPrealloc)),
) AbstractMempool[T] {
return newGeneralMempool[T](tracer, maxSize, maxSponsorSize)
}

owned: map[codec.Address]int{},
}
// NewPriorityMempool is creating a mempool using a Priority queue.
func NewPriorityMempool[T Item](
tracer trace.Tracer,
maxSize int,
maxSponsorSize int,
) AbstractMempool[T] {
return newPriorityMempool[T](tracer, maxSize, maxSponsorSize)
}

func (m *Mempool[T]) removeFromOwned(item T) {
func (m *Mempool[T, E]) removeFromOwned(item T) {
sender := item.Sponsor()
items, ok := m.owned[sender]
if !ok {
Expand All @@ -87,7 +90,7 @@ func (m *Mempool[T]) removeFromOwned(item T) {
}

// Has returns if the eh of [m] contains [itemID]
func (m *Mempool[T]) Has(ctx context.Context, itemID ids.ID) bool {
func (m *Mempool[T, E]) Has(ctx context.Context, itemID ids.ID) bool {
_, span := m.tracer.Start(ctx, "Mempool.Has")
defer span.End()

Expand All @@ -101,7 +104,7 @@ func (m *Mempool[T]) Has(ctx context.Context, itemID ids.ID) bool {
// the item sponsor is not exempt and their items in the mempool exceed m.maxSponsorSize.
// If the size of m exceeds m.maxSize, Add pops the lowest value item
// from m.eh.
func (m *Mempool[T]) Add(ctx context.Context, items []T) {
func (m *Mempool[T, E]) Add(ctx context.Context, items []T) {
_, span := m.tracer.Start(ctx, "Mempool.Add")
defer span.End()

Expand All @@ -111,7 +114,7 @@ func (m *Mempool[T]) Add(ctx context.Context, items []T) {
m.add(items, false)
}

func (m *Mempool[T]) add(items []T, front bool) {
func (m *Mempool[T, E]) add(items []T, front bool) {
for _, item := range items {
sender := item.Sponsor()

Expand All @@ -136,11 +139,11 @@ func (m *Mempool[T]) add(items []T, front bool) {
}

// Add to mempool
var elem *list.Element[T]
var elem E
if !front {
elem = m.queue.PushBack(item)
elem = m.queue.Push(item)
} else {
elem = m.queue.PushFront(item)
elem = m.queue.Restore(item)
}
m.eh.Add(elem)
m.owned[sender]++
Expand All @@ -150,23 +153,23 @@ func (m *Mempool[T]) add(items []T, front bool) {

// PeekNext returns the highest valued item in m.eh.
// Assumes there is non-zero items in [Mempool]
func (m *Mempool[T]) PeekNext(ctx context.Context) (T, bool) {
func (m *Mempool[T, E]) PeekNext(ctx context.Context) (T, bool) {
_, span := m.tracer.Start(ctx, "Mempool.PeekNext")
defer span.End()

m.mu.RLock()
defer m.mu.RUnlock()

first := m.queue.First()
if first == nil {
firstValue, ok := m.queue.FirstValue()
if !ok {
return *new(T), false
}
return first.Value(), true
return firstValue, true
}

// PopNext removes and returns the highest valued item in m.eh.
// Assumes there is non-zero items in [Mempool]
func (m *Mempool[T]) PopNext(ctx context.Context) (T, bool) { // O(log N)
func (m *Mempool[T, E]) PopNext(ctx context.Context) (T, bool) { // O(log N)
_, span := m.tracer.Start(ctx, "Mempool.PopNext")
defer span.End()

Expand All @@ -176,9 +179,9 @@ func (m *Mempool[T]) PopNext(ctx context.Context) (T, bool) { // O(log N)
return m.popNext()
}

func (m *Mempool[T]) popNext() (T, bool) {
first := m.queue.First()
if first == nil {
func (m *Mempool[T, E]) popNext() (T, bool) {
first, ok := m.queue.First()
if !ok {
return *new(T), false
}
v := m.queue.Remove(first)
Expand All @@ -189,7 +192,7 @@ func (m *Mempool[T]) popNext() (T, bool) {
}

// Remove removes [items] from m.
func (m *Mempool[T]) Remove(ctx context.Context, items []T) {
func (m *Mempool[T, E]) Remove(ctx context.Context, items []T) {
_, span := m.tracer.Start(ctx, "Mempool.Remove")
defer span.End()

Expand All @@ -208,7 +211,7 @@ func (m *Mempool[T]) Remove(ctx context.Context, items []T) {
}

// Len returns the number of items in m.
func (m *Mempool[T]) Len(ctx context.Context) int {
func (m *Mempool[T, E]) Len(ctx context.Context) int {
_, span := m.tracer.Start(ctx, "Mempool.Len")
defer span.End()

Expand All @@ -219,15 +222,15 @@ func (m *Mempool[T]) Len(ctx context.Context) int {
}

// Size returns the size (in bytes) of items in m.
func (m *Mempool[T]) Size(context.Context) int {
func (m *Mempool[T, E]) Size(context.Context) int {
m.mu.RLock()
defer m.mu.RUnlock()

return m.pendingSize
}

// SetMinTimestamp removes and returns all items with a lower expiry than [t] from m.
func (m *Mempool[T]) SetMinTimestamp(ctx context.Context, t int64) []T {
// SetMinTimestamp removes and returns all items with a lower expiry than [T, E] from m.
func (m *Mempool[T, E]) SetMinTimestamp(ctx context.Context, t int64) []T {
_, span := m.tracer.Start(ctx, "Mempool.SetMinTimesamp")
defer span.End()

Expand All @@ -237,8 +240,7 @@ func (m *Mempool[T]) SetMinTimestamp(ctx context.Context, t int64) []T {
removedElems := m.eh.SetMin(t)
removed := make([]T, len(removedElems))
for i, remove := range removedElems {
m.queue.Remove(remove)
v := remove.Value()
v := m.queue.Remove(remove)
m.removeFromOwned(v)
m.pendingSize -= v.Size()
removed[i] = v
Expand All @@ -247,7 +249,7 @@ func (m *Mempool[T]) SetMinTimestamp(ctx context.Context, t int64) []T {
}

// Top iterates over the highest-valued items in the mempool.
func (m *Mempool[T]) Top(
func (m *Mempool[T, E]) Top(
ctx context.Context,
targetDuration time.Duration,
f func(context.Context, T) (cont bool, restore bool, err error),
Expand Down Expand Up @@ -289,7 +291,7 @@ func (m *Mempool[T]) Top(
// Streaming is useful for block building because we can get a feed of the
// best txs to build without holding the lock during the duration of the build
// process. Streaming in batches allows for various state prefetching operations.
func (m *Mempool[T]) StartStreaming(_ context.Context) {
func (m *Mempool[T, E]) StartStreaming(_ context.Context) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -299,7 +301,7 @@ func (m *Mempool[T]) StartStreaming(_ context.Context) {

// PrepareStream prefetches the next [count] items from the mempool to
// reduce the latency of calls to [StreamItems].
func (m *Mempool[T]) PrepareStream(ctx context.Context, count int) {
func (m *Mempool[T, E]) PrepareStream(ctx context.Context, count int) {
_, span := m.tracer.Start(ctx, "Mempool.PrepareStream")
defer span.End()

Expand All @@ -312,7 +314,7 @@ func (m *Mempool[T]) PrepareStream(ctx context.Context, count int) {

// Stream gets the next highest-valued [count] items from the mempool, not
// including what has already been streamed.
func (m *Mempool[T]) Stream(ctx context.Context, count int) []T {
func (m *Mempool[T, E]) Stream(ctx context.Context, count int) []T {
_, span := m.tracer.Start(ctx, "Mempool.Stream")
defer span.End()

Expand All @@ -328,7 +330,7 @@ func (m *Mempool[T]) Stream(ctx context.Context, count int) []T {
return m.streamItems(count)
}

func (m *Mempool[T]) streamItems(count int) []T {
func (m *Mempool[T, E]) streamItems(count int) []T {
txs := make([]T, 0, count)
for len(txs) < count {
item, ok := m.popNext()
Expand All @@ -343,7 +345,7 @@ func (m *Mempool[T]) streamItems(count int) []T {

// FinishStreaming restores [restorable] items to the mempool and clears
// the set of all previously streamed items.
func (m *Mempool[T]) FinishStreaming(ctx context.Context, restorable []T) int {
func (m *Mempool[T, E]) FinishStreaming(ctx context.Context, restorable []T) int {
_, span := m.tracer.Start(ctx, "Mempool.FinishStreaming")
defer span.End()

Expand All @@ -366,3 +368,41 @@ func (m *Mempool[T]) FinishStreaming(ctx context.Context, restorable []T) int {
m.streamLock.Unlock()
return restored
}

// newGeneralMempool is creating a mempool using a FIFO queue.
func newGeneralMempool[T Item](
tracer trace.Tracer,
maxSize int,
maxSponsorSize int,
) *Mempool[T, *list.Element[T]] {
return &Mempool[T, *list.Element[T]]{
tracer: tracer,

maxSize: maxSize,
maxSponsorSize: maxSponsorSize,

queue: queue.NewList[T](),
eh: eheap.New[*list.Element[T]](min(maxSize, maxPrealloc)),

owned: map[codec.Address]int{},
}
}

// NewPriorityMempool is creating a mempool using a Priority queue.
func newPriorityMempool[T Item](
tracer trace.Tracer,
maxSize int,
maxSponsorSize int,
) *Mempool[T, T] {
return &Mempool[T, T]{
tracer: tracer,

maxSize: maxSize,
maxSponsorSize: maxSponsorSize,

queue: queue.NewPriorityQueue[T](),
eh: eheap.New[T](min(maxSize, maxPrealloc)),

owned: map[codec.Address]int{},
}
}
Loading