From 0929707204c3d42a99882f1854e9ff54ed709bd9 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Sat, 9 Aug 2025 10:17:22 -0400 Subject: [PATCH 1/2] Verifier main loop. --- .../libmodsec/internal/verifier/handler.go | 40 ++++++ .../libmodsec/internal/verifier/main.go | 100 ++++++++++++++ .../libmodsec/internal/verifier/options.go | 0 .../libmodsec/internal/verifier/verifier.go | 129 ++++++++++++++++++ 4 files changed, 269 insertions(+) create mode 100644 chainlink-modsec/libmodsec/internal/verifier/handler.go create mode 100644 chainlink-modsec/libmodsec/internal/verifier/main.go create mode 100644 chainlink-modsec/libmodsec/internal/verifier/options.go create mode 100644 chainlink-modsec/libmodsec/internal/verifier/verifier.go diff --git a/chainlink-modsec/libmodsec/internal/verifier/handler.go b/chainlink-modsec/libmodsec/internal/verifier/handler.go new file mode 100644 index 0000000000..3d8990933d --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/handler.go @@ -0,0 +1,40 @@ +// Package verifier implements a service that monitors a channel for incoming +// messages and sends attestations to a results channel. +package verifier + +import ( + "context" + + "github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes" +) + +type Work struct { + // Define your work payload here + Data []byte // maybe nil if there were no messages for a particular block num + + // Block information. The same block data may be sent multiple times if there are multiple messages. + SourceChain uint64 + BlockNum uint64 // why do they use big int... at 100k bps it would take 500k years to overflow uint64. + BlockHash []byte +} + +type HandlerPayload struct { + Work Work + Message modsectypes.Message + //TODO encoded message as well. +} + +type Result struct { + // Define your result payload here + Data []byte +} + +// Handler is a function that processes incoming work from the work channel and sends results to the result channel. +// There may be no results for a particular piece of work, in which case the result channel should not be written to. +// For example, the verifier may have a rule to skip processing until a certain block depth is reached. In that case, +// the work may be received, but no result will be produced until later. +// +// The verifier should respect the context for cancellation. If the context is cancelled, the verifier should stop. +// +// TODO: should verifier be a full service with start/stop/cancel, a cache, maybe a db connection, etc? +type Handler func(ctx context.Context, payload HandlerPayload, result chan<- Result) diff --git a/chainlink-modsec/libmodsec/internal/verifier/main.go b/chainlink-modsec/libmodsec/internal/verifier/main.go new file mode 100644 index 0000000000..6138ab9972 --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/main.go @@ -0,0 +1,100 @@ +package verifier + +import ( + "context" + "fmt" + "log" + "sync" +) + +// run is the verifier main loop. +// +// Implementation of a pipeline: +// - Read events from blockchain. +// - Transform events to the verifier payload. +// - * Write raw message to storage. +// - Dispatch payloads to verifiers. +// - Collect results from verifiers as they are available. +// - Write verifier attestations to storage. +func (s *Verifier) run() { + // Start reader + s.started = true + if s.poller == nil { + log.Println("No poller configured, verifier will not process any work.") + // TODO: surface an error. + return + } + if s.transformer == nil { + log.Println("No transformer configured, verifier will not process any work.") + // TODO: surface an error. + return + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + + workCh := make(chan Work) + messageCh := make(chan StoredMessage) + handlerPayloadCh := make(chan HandlerPayload) + resultCh := make(chan Result) + + // produce data + go func() { + wg.Add(1) + defer wg.Done() + + for true { + select { + case <-ctx.Done(): + log.Println("Verifier shutting down poller...") + return + case data := <-s.poller.Watch(ctx): + fmt.Println(data) + workCh <- data + } + } + }() + + // transform data + go func() { + wg.Add(1) + defer wg.Done() + + for true { + select { + case <-ctx.Done(): + log.Println("Verifier shutting down transformer...") + return + case work := <-workCh: + payload := s.transformer.Transform(work) + fmt.Println("Payload transformed:", payload) + handlerPayloadCh <- payload + // TODO:: send StoredMessage to 'messageCh', it can be written ahead of verification. + } + } + }() + + // run handlers + go func() { + wg.Add(1) + defer wg.Done() + + for true { + select { + case <-ctx.Done(): + log.Println("Verifier shutting down handlers...") + return + case payload := <-handlerPayloadCh: + log.Printf("Running handlers for payload: %v\n", payload) + // Dispatch payload to each handler. + for _, handler := range s.handlers { + go handler(ctx, payload, resultCh) + } + } + } + }() + + // write results +} diff --git a/chainlink-modsec/libmodsec/internal/verifier/options.go b/chainlink-modsec/libmodsec/internal/verifier/options.go new file mode 100644 index 0000000000..e69de29bb2 diff --git a/chainlink-modsec/libmodsec/internal/verifier/verifier.go b/chainlink-modsec/libmodsec/internal/verifier/verifier.go new file mode 100644 index 0000000000..c414531181 --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/verifier.go @@ -0,0 +1,129 @@ +package verifier + +import ( + "context" + + "github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes" +) + +// Poller is an interface that defines how to poll for new work to be processed. +type Poller interface { + // Next returns a channel that will yield the next piece of work to be processed. + Next(ctx context.Context) <-chan Work + + // Watch returns a channel that will yield all work to be processed. + Watch(ctx context.Context) <-chan Work +} + +// Transformer is an interface that defines how to transform a Work item into a modsec message. +// TODO: tranform from source to verifier and again from verifier to destination? +type Transformer interface { + Transform(work Work) HandlerPayload +} + +// StoredMessage represents a message that will be attested. It is the object written to the storage layer. +type StoredMessage struct { + ID string + Message modsectypes.Message + Encoded []byte + // block info? +} + +// MessageAttestation represents the verifier's attestation of a message. +type MessageAttestation struct { + Attestor string + Sig []byte +} + +// AttestationWriter is an interface that defines how to write attestations and messages to a storage layer. +// The writer is responsible for deciding how to store the messages and attestations, including +// how to avoid duplicates, and how to structure the storage for efficient scanning. +type AttestationWriter interface { + // StoreMessage stores the message that is being attested. + // The writer implementation is responsible for deciding how to store the message. + // It should consider things like how it will be retrieved later, and how to avoid duplicates. + // For example, it may decide to store messages in a hierarchy based on the time, block number, sequence number, etc. + StoreMessage(ctx context.Context, msg StoredMessage) error + + // StoreAttestation stores the attestation for a message. + // The writer implementation is responsible for deciding how to store the attestation. + // It should consider things like how it will be retrieved later, and how to avoid duplicates. + StoreAttestation(ctx context.Context, msg MessageAttestation) error +} + +// Verifier is the main verifier service. It manages the lifecycle of data +// fetching, handling, and writing a resulting attestation. +type Verifier struct { + workCh chan Work + stopCh chan struct{} + + // handlers are registered by name. + handlers []Handler + + // state + started bool + + // configurable services + signer modsectypes.Signer + poller Poller + transformer Transformer + writer []AttestationWriter + // Add more configurable fields as needed +} + +// Option is the Verifier functional option type +type Option func(*Verifier) + +// WithSigner sets a custom signer for the verifier +func WithSigner(signer modsectypes.Signer) Option { + return func(v *Verifier) { + v.signer = signer + } +} + +// WithPoller sets a custom poller for the verifier +func WithPoller(poller Poller) Option { + return func(v *Verifier) { + v.poller = poller + } +} + +// WithTransformer sets the transformer for the verifier +func WithTransformer(transformer Transformer) Option { + return func(v *Verifier) { + v.transformer = transformer + } +} + +// WithWriter adds a writer for the verifier +func WithWriter(writer AttestationWriter) Option { + return func(v *Verifier) { + v.writer = append(v.writer, writer) + } +} + +// WithHandler adds a handler function for a given name. Only one handler can be registered per name. +func WithHandler(handler Handler) Option { + return func(v *Verifier) { + v.handlers = append(v.handlers, handler) + } +} + +func NewVerifier(opts ...Option) *Verifier { + v := &Verifier{ + stopCh: make(chan struct{}), + } + // Apply all options + for _, opt := range opts { + opt(v) + } + return v +} + +func (s *Verifier) Start() { + go s.run() +} + +func (s *Verifier) Stop() { + close(s.stopCh) +} From 5ecde9e872722299f7b5d22a2e50e06608646d25 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Sun, 10 Aug 2025 10:04:17 -0400 Subject: [PATCH 2/2] Finish verifier main loop. --- .../libmodsec/internal/verifier/doc.go | 3 + .../libmodsec/internal/verifier/handler.go | 40 --------- .../libmodsec/internal/verifier/interfaces.go | 51 +++++++++++ .../libmodsec/internal/verifier/main.go | 85 ++++++++++++++----- .../libmodsec/internal/verifier/options.go | 0 .../libmodsec/internal/verifier/types.go | 47 ++++++++++ .../libmodsec/internal/verifier/verifier.go | 70 ++------------- 7 files changed, 174 insertions(+), 122 deletions(-) create mode 100644 chainlink-modsec/libmodsec/internal/verifier/doc.go delete mode 100644 chainlink-modsec/libmodsec/internal/verifier/handler.go create mode 100644 chainlink-modsec/libmodsec/internal/verifier/interfaces.go delete mode 100644 chainlink-modsec/libmodsec/internal/verifier/options.go create mode 100644 chainlink-modsec/libmodsec/internal/verifier/types.go diff --git a/chainlink-modsec/libmodsec/internal/verifier/doc.go b/chainlink-modsec/libmodsec/internal/verifier/doc.go new file mode 100644 index 0000000000..9af914b740 --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/doc.go @@ -0,0 +1,3 @@ +// Package verifier implements a service that monitors a channel for incoming +// messages and sends attestations to a results channel. +package verifier diff --git a/chainlink-modsec/libmodsec/internal/verifier/handler.go b/chainlink-modsec/libmodsec/internal/verifier/handler.go deleted file mode 100644 index 3d8990933d..0000000000 --- a/chainlink-modsec/libmodsec/internal/verifier/handler.go +++ /dev/null @@ -1,40 +0,0 @@ -// Package verifier implements a service that monitors a channel for incoming -// messages and sends attestations to a results channel. -package verifier - -import ( - "context" - - "github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes" -) - -type Work struct { - // Define your work payload here - Data []byte // maybe nil if there were no messages for a particular block num - - // Block information. The same block data may be sent multiple times if there are multiple messages. - SourceChain uint64 - BlockNum uint64 // why do they use big int... at 100k bps it would take 500k years to overflow uint64. - BlockHash []byte -} - -type HandlerPayload struct { - Work Work - Message modsectypes.Message - //TODO encoded message as well. -} - -type Result struct { - // Define your result payload here - Data []byte -} - -// Handler is a function that processes incoming work from the work channel and sends results to the result channel. -// There may be no results for a particular piece of work, in which case the result channel should not be written to. -// For example, the verifier may have a rule to skip processing until a certain block depth is reached. In that case, -// the work may be received, but no result will be produced until later. -// -// The verifier should respect the context for cancellation. If the context is cancelled, the verifier should stop. -// -// TODO: should verifier be a full service with start/stop/cancel, a cache, maybe a db connection, etc? -type Handler func(ctx context.Context, payload HandlerPayload, result chan<- Result) diff --git a/chainlink-modsec/libmodsec/internal/verifier/interfaces.go b/chainlink-modsec/libmodsec/internal/verifier/interfaces.go new file mode 100644 index 0000000000..9e20bd35b5 --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/interfaces.go @@ -0,0 +1,51 @@ +package verifier + +import ( + "context" +) + +// Reader is an interface for reading the blockchain for new work. +type Reader interface { + // Next returns a channel that will yield the next piece of work to be processed. + Next(ctx context.Context) <-chan Work + + // Watch returns a channel that will yield all work to be processed. + Watch(ctx context.Context) <-chan Work +} + +// Transformer is an interface that defines how to transform a Work item into +// the generic modsec.Message format and encoded into a destination specific +// payload. This interface encapsulates most of the chain agnostic services +// required by the 1.6 implementation. +type Transformer interface { + Transform(work Work) HandlerPayload +} + +// Writer is an interface that defines how to write attestations and messages to a storage layer. +// The writer is responsible for deciding how to store the messages and attestations, including +// how to avoid duplicates, and how to structure the storage for efficient scanning. +// +// TODO: standard io.Writer interface could be used to simplify testing. +// Modex extensions would look like "modsec.NewS3Writer(path)" +type Writer interface { + // WriteMessage stores the message that is being attested. + // The writer implementation is responsible for deciding how to store the message. + // It should consider things like how it will be retrieved later, and how to avoid duplicates. + // For example, it may decide to store messages in a hierarchy based on the time, block number, sequence number, etc. + WriteMessage(ctx context.Context, msg HandlerPayload) error + + // WriteAttestation stores the attestation for a message. + // The writer implementation is responsible for deciding how to store the attestation. + // It should consider things like how it will be retrieved later, and how to avoid duplicates. + WriteAttestation(ctx context.Context, msg Attestation) error +} + +// Handler is a function that processes incoming work from the work channel and sends results to the result channel. +// There may be no results for a particular piece of work, in which case the result channel should not be written to. +// For example, the verifier may have a rule to skip processing until a certain block depth is reached. In that case, +// the work may be received, but no result will be produced until later. +// +// The verifier should respect the context for cancellation. If the context is cancelled, the verifier should stop. +// +// TODO: should verifier be a full service with start/stop/cancel, a cache, maybe a db connection, etc? +type Handler func(ctx context.Context, payload HandlerPayload, result chan<- Attestation) diff --git a/chainlink-modsec/libmodsec/internal/verifier/main.go b/chainlink-modsec/libmodsec/internal/verifier/main.go index 6138ab9972..0acc647277 100644 --- a/chainlink-modsec/libmodsec/internal/verifier/main.go +++ b/chainlink-modsec/libmodsec/internal/verifier/main.go @@ -3,7 +3,6 @@ package verifier import ( "context" "fmt" - "log" "sync" ) @@ -17,15 +16,18 @@ import ( // - Collect results from verifiers as they are available. // - Write verifier attestations to storage. func (s *Verifier) run() { - // Start reader - s.started = true - if s.poller == nil { - log.Println("No poller configured, verifier will not process any work.") + if s.writer == nil { + fmt.Println("No writer configured, verifier will not process any work.") // TODO: surface an error. return } if s.transformer == nil { - log.Println("No transformer configured, verifier will not process any work.") + fmt.Println("No transformer configured, verifier will not process any work.") + // TODO: surface an error. + return + } + if s.signer == nil { + fmt.Println("No signer configured, verifier will not process any work.") // TODO: surface an error. return } @@ -35,12 +37,12 @@ func (s *Verifier) run() { var wg sync.WaitGroup - workCh := make(chan Work) - messageCh := make(chan StoredMessage) + transformCh := make(chan Work) handlerPayloadCh := make(chan HandlerPayload) - resultCh := make(chan Result) + writePayloadCh := make(chan HandlerPayload) + writeAttestationCh := make(chan Attestation) - // produce data + // Read blockchain data for new work. go func() { wg.Add(1) defer wg.Done() @@ -48,16 +50,16 @@ func (s *Verifier) run() { for true { select { case <-ctx.Done(): - log.Println("Verifier shutting down poller...") + fmt.Println("Verifier shutting down poller...") return - case data := <-s.poller.Watch(ctx): + case data := <-s.reader.Next(ctx): fmt.Println(data) - workCh <- data + transformCh <- data // to transformer } } }() - // transform data + // Transform work into the handler payload. go func() { wg.Add(1) defer wg.Done() @@ -65,13 +67,13 @@ func (s *Verifier) run() { for true { select { case <-ctx.Done(): - log.Println("Verifier shutting down transformer...") + fmt.Println("Verifier shutting down transformer...") return - case work := <-workCh: + case work := <-transformCh: payload := s.transformer.Transform(work) fmt.Println("Payload transformed:", payload) - handlerPayloadCh <- payload - // TODO:: send StoredMessage to 'messageCh', it can be written ahead of verification. + handlerPayloadCh <- payload // to handler dispatch. + writePayloadCh <- payload // to writer. } } }() @@ -84,17 +86,58 @@ func (s *Verifier) run() { for true { select { case <-ctx.Done(): - log.Println("Verifier shutting down handlers...") + fmt.Println("Verifier shutting down handlers...") return case payload := <-handlerPayloadCh: - log.Printf("Running handlers for payload: %v\n", payload) + fmt.Printf("Running handlers for payload: %v\n", payload) // Dispatch payload to each handler. for _, handler := range s.handlers { - go handler(ctx, payload, resultCh) + // TODO: signing needs to fit in before writing. + go handler(ctx, payload, writeAttestationCh) } } } }() // write results + go func() { + wg.Add(1) + defer wg.Done() + + for true { + select { + case <-ctx.Done(): + fmt.Println("Verifier shutting down handlers...") + return + case payload := <-writePayloadCh: + for _, w := range s.writer { + if err := w.WriteMessage(ctx, payload); err != nil { + // TODO: surface an error. + fmt.Println("Error writing payload.") + } + } + case attestation := <-writeAttestationCh: + for _, w := range s.writer { + if err := w.WriteAttestation(ctx, attestation); err != nil { + // TODO: surface an error. + fmt.Println("Error writing payload.") + } + } + } + } + }() + + // Wait for service to stop. + done := false + for !done { + select { + case <-s.stopCh: + cancel() + done = true + case <-ctx.Done(): + done = true + } + } + + wg.Wait() } diff --git a/chainlink-modsec/libmodsec/internal/verifier/options.go b/chainlink-modsec/libmodsec/internal/verifier/options.go deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/chainlink-modsec/libmodsec/internal/verifier/types.go b/chainlink-modsec/libmodsec/internal/verifier/types.go new file mode 100644 index 0000000000..7530a19a0b --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/types.go @@ -0,0 +1,47 @@ +package verifier + +import ( + "github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes" +) + +// BlockInfo contains basic block metadata. +type BlockInfo struct { + SourceChain uint64 + BlockNum uint64 // why do they use big int... at 100k bps it would take 500k years to overflow uint64. + BlockHash []byte + Finalized bool // if the chain supports a finalized tag. +} + +// Work is generated from the blockchain reader. It contains all raw data +// required to verify a message. Work for one verification may be split +// across multiple work items. For example, a finalized message verifier +// would receive the initial work item containing message data but wait +// to emit an attestation until the block is finalized before emitting an +// attestation. +type Work struct { + Data []byte // maybe nil if there were no messages for a particular block num + BlockInfo +} + +// HandlerPayload is generated by the Transformer. It contains the Work data +// in a processed format. Processing includes: +// * Decoding the data into an AnyToAny object. +// * Encoding the data into a destination byte format (for verifier signing). +type HandlerPayload struct { + ID string + Message modsectypes.Message + DestData []byte + BlockInfo BlockInfo +} + +// VerifierInfo metadata needed by the writer. +type VerifierInfo struct { + ID string +} + +// Attestation for a message. +type Attestation struct { + VerifierInfo VerifierInfo + Message modsectypes.Message + Sig []byte +} diff --git a/chainlink-modsec/libmodsec/internal/verifier/verifier.go b/chainlink-modsec/libmodsec/internal/verifier/verifier.go index c414531181..7f51c57fb7 100644 --- a/chainlink-modsec/libmodsec/internal/verifier/verifier.go +++ b/chainlink-modsec/libmodsec/internal/verifier/verifier.go @@ -1,73 +1,21 @@ package verifier import ( - "context" - "github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes" ) -// Poller is an interface that defines how to poll for new work to be processed. -type Poller interface { - // Next returns a channel that will yield the next piece of work to be processed. - Next(ctx context.Context) <-chan Work - - // Watch returns a channel that will yield all work to be processed. - Watch(ctx context.Context) <-chan Work -} - -// Transformer is an interface that defines how to transform a Work item into a modsec message. -// TODO: tranform from source to verifier and again from verifier to destination? -type Transformer interface { - Transform(work Work) HandlerPayload -} - -// StoredMessage represents a message that will be attested. It is the object written to the storage layer. -type StoredMessage struct { - ID string - Message modsectypes.Message - Encoded []byte - // block info? -} - -// MessageAttestation represents the verifier's attestation of a message. -type MessageAttestation struct { - Attestor string - Sig []byte -} - -// AttestationWriter is an interface that defines how to write attestations and messages to a storage layer. -// The writer is responsible for deciding how to store the messages and attestations, including -// how to avoid duplicates, and how to structure the storage for efficient scanning. -type AttestationWriter interface { - // StoreMessage stores the message that is being attested. - // The writer implementation is responsible for deciding how to store the message. - // It should consider things like how it will be retrieved later, and how to avoid duplicates. - // For example, it may decide to store messages in a hierarchy based on the time, block number, sequence number, etc. - StoreMessage(ctx context.Context, msg StoredMessage) error - - // StoreAttestation stores the attestation for a message. - // The writer implementation is responsible for deciding how to store the attestation. - // It should consider things like how it will be retrieved later, and how to avoid duplicates. - StoreAttestation(ctx context.Context, msg MessageAttestation) error -} - // Verifier is the main verifier service. It manages the lifecycle of data // fetching, handling, and writing a resulting attestation. type Verifier struct { workCh chan Work stopCh chan struct{} - // handlers are registered by name. - handlers []Handler - - // state - started bool - - // configurable services + // configurable components + handlers []Handler signer modsectypes.Signer - poller Poller + reader Reader transformer Transformer - writer []AttestationWriter + writer []Writer // Add more configurable fields as needed } @@ -81,10 +29,10 @@ func WithSigner(signer modsectypes.Signer) Option { } } -// WithPoller sets a custom poller for the verifier -func WithPoller(poller Poller) Option { +// WithReader sets a reader for the verifier +func WithReader(reader Reader) Option { return func(v *Verifier) { - v.poller = poller + v.reader = reader } } @@ -96,13 +44,13 @@ func WithTransformer(transformer Transformer) Option { } // WithWriter adds a writer for the verifier -func WithWriter(writer AttestationWriter) Option { +func WithWriter(writer Writer) Option { return func(v *Verifier) { v.writer = append(v.writer, writer) } } -// WithHandler adds a handler function for a given name. Only one handler can be registered per name. +// WithHandler adds a handler function, there can be more than one. func WithHandler(handler Handler) Option { return func(v *Verifier) { v.handlers = append(v.handlers, handler)