Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chainlink-modsec/libmodsec/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.24.2
require (
github.com/ethereum/go-ethereum v1.16.2
github.com/stretchr/testify v1.10.0
golang.org/x/sync v0.12.0
)

require (
Expand Down Expand Up @@ -84,7 +85,6 @@ require (
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/sync v0.12.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/time v0.9.0 // indirect
Expand Down
41 changes: 36 additions & 5 deletions chainlink-modsec/libmodsec/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ package executor

import (
"fmt"
"time"

"github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes"
)

type Executor struct {
stopCh chan struct{}
messageCh chan modsectypes.Message

// timedMessageCh is an optional timed message channel for configurable tick-based message delivery
timedMessageCh modsectypes.TimedMessageChannel

// List of sources which can potentially provide messages to be executed. We use an array here in case
// custom executors want to subscribe directly to a verifier storage
// Example could be a kafka topic, CL commit verifier, etc.
Expand All @@ -23,8 +29,11 @@ type Executor struct {
// Used for encoding/decoding messages
messageCodec modsectypes.MessageCodec

// peerId used to determine whether it's this executor's turn to process a message
// Node's peerId used to determine whether it's this executor's turn to process a message
peerId [32]byte

// leaderElection is used to determine if it's this executor's turn to process a message
leaderElection LeaderElection
}

// Option is the Executor functional option type
Expand Down Expand Up @@ -95,6 +104,29 @@ func WithDestChainTransmitter(chain uint64, writer modsectypes.ContractTransmitt
}
}

// WithLeaderElection adds a leader election algorithm to the executor
func WithLeaderElection(leaderElection LeaderElection) Option {
return func(e *Executor) error {
if leaderElection == nil {
return fmt.Errorf("cannot add nil leader election to executor")
}
e.leaderElection = leaderElection
return nil
}
}

// WithTimedMessageChannel adds a timed message channel to the executor
// This allows messages to be sent with configurable tick delays
func WithTimedMessageChannel(tmc modsectypes.TimedMessageChannel) Option {
return func(e *Executor) error {
if tmc == nil {
return fmt.Errorf("cannot add nil timed message channel to executor")
}
e.timedMessageCh = tmc
return nil
}
}

func (e *Executor) Start() {
go e.run()
}
Expand All @@ -103,8 +135,7 @@ func (e *Executor) Stop() {
close(e.stopCh)
}

func (e *Executor) isMyTurn(msg modsectypes.Message) bool {
// Placeholder for logic to determine if it's this executor's turn to process the message
// This could be based on some consensus mechanism, round-robin, or other criteria
return true // For now, we assume it's always our turn
func (e *Executor) isMyTurn(msg modsectypes.Message) (bool, time.Duration) {
// Check if the message is the leader for the destination chain
return e.leaderElection.IsLeader(msg.Header.MessageID, msg.Header.DestChainSelector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package executor
import (
"context"
"fmt"
"time"

indexer "github.com/smartcontractkit/chainlink-modsec/libmodsec/internal/attestation-indexer"
"github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes"
"time"
)

type InMemoryAttestationReader struct {
Expand All @@ -17,7 +18,7 @@ func NewInMemoryAttestationReader() *InMemoryAttestationReader {
return &InMemoryAttestationReader{}
}

func (at *InMemoryAttestationReader) GetAttestations(ctx context.Context, msg modsectypes.Message) ([]Attestation, error) {
func (at *InMemoryAttestationReader) GetAttestations(ctx context.Context, msg modsectypes.Message) ([]modsectypes.Attestation, error) {
startTime := time.Now()
timeoutCh := make(chan struct{})
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"context"

"github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes"
)

Expand All @@ -22,5 +23,5 @@ type AttestationReader interface {
// we will need this at minimum for manual execution
// TODO: do returned attestations still need to be associated with its verifierID?
// If not, we can just return []byte instead of creating the Attestation struct
GetAttestations(ctx context.Context, msg modsectypes.Message) ([]Attestation, error)
GetAttestations(ctx context.Context, msg modsectypes.Message) ([]modsectypes.Attestation, error)
}
117 changes: 117 additions & 0 deletions chainlink-modsec/libmodsec/internal/executor/leader_election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package executor

import (
"hash/fnv"
"slices"
"sync"
"time"
)

// LeaderElection provides a way to determine if an executor should be currently executing a given message.
// This is used to prevent multiple executors from executing the same message concurrently.
type LeaderElection interface {
IsLeader(msgId [32]byte, destChainSelector uint64) (bool, time.Duration)
SelfParticipations() []uint64
IsParticipant(destChainSelector uint64) bool
GetParticipants(destChainSelector uint64) []string
SetParticipants(destChainSelector uint64, participants []string)
}

type ModuloLeaderElection struct {
mu *sync.RWMutex
deltaStage time.Duration
selfParticipations []uint64
chainParticipants map[uint64][]string
self string
}

func NewModuloLeaderElection(self string, chainParticipants map[uint64][]string, deltaStage time.Duration) LeaderElection {
le := &ModuloLeaderElection{
mu: &sync.RWMutex{},
deltaStage: deltaStage,
chainParticipants: chainParticipants,
self: self,
}

// Initialize self participations
le.writeSelfParticipations()

return le
}

func (s *ModuloLeaderElection) SelfParticipations() []uint64 {
s.mu.RLock()
defer s.mu.RUnlock()

return s.selfParticipations
}

func (s *ModuloLeaderElection) IsLeader(msgId [32]byte, destChainSelector uint64) (bool, time.Duration) {
// If the executor is not a participant, it cannot be the leader
if !s.IsParticipant(destChainSelector) {
return false, 0 * time.Second
}

participants := s.GetParticipants(destChainSelector)

// Concatenates each participant with the msgId and the participantId together and hashes the resulting value into a uint64
// Given multiple participants, this will have an equal chance of resulting in the offset value, and will therefore be the leader

// Currently this is using FNV-1a which is a non-cryptographic hash function, but it's fast and has a good distribution
// However there is a non-zero chance of collisions

transmissionSchedule := []uint64{}
selfHash := fnv.New64a()
selfHash.Write([]byte(s.self + string(msgId[:])))
selfHashValue := selfHash.Sum64()

for _, participant := range participants {
h := fnv.New64a()
h.Write([]byte(participant + string(msgId[:])))
transmissionSchedule = append(transmissionSchedule, h.Sum64())
}

// Sort the numerical values outputted by the hash function
slices.Sort(transmissionSchedule)

// find the index where selfHashValue is in the sorted hashes
index := slices.Index(transmissionSchedule, selfHashValue)

// If my hash matches the offset, I am the leader
return index == 0, s.deltaStage * time.Duration(index)
}

func (s *ModuloLeaderElection) IsParticipant(destChainSelector uint64) bool {
s.mu.RLock()
defer s.mu.RUnlock()

return slices.Contains(s.selfParticipations, destChainSelector)
}

func (s *ModuloLeaderElection) GetParticipants(destChainSelector uint64) []string {
s.mu.RLock()
defer s.mu.RUnlock()

return s.chainParticipants[destChainSelector]
}

func (s *ModuloLeaderElection) SetParticipants(destChainSelector uint64, participants []string) {
s.mu.Lock()
defer s.mu.Unlock()

s.chainParticipants[destChainSelector] = participants
s.writeSelfParticipations()
}

func (s *ModuloLeaderElection) writeSelfParticipations() {
participations := []uint64{}
for chainSelector, participants := range s.chainParticipants {
for _, participant := range participants {
if participant == s.self {
participations = append(participations, chainSelector)
}
}
}

s.selfParticipations = participations
}
Loading
Loading