diff --git a/chainlink-modsec/libmodsec/internal/verifier/doc.go b/chainlink-modsec/libmodsec/internal/verifier/doc.go new file mode 100644 index 0000000000..9af914b740 --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/doc.go @@ -0,0 +1,3 @@ +// Package verifier implements a service that monitors a channel for incoming +// messages and sends attestations to a results channel. +package verifier diff --git a/chainlink-modsec/libmodsec/internal/verifier/interfaces.go b/chainlink-modsec/libmodsec/internal/verifier/interfaces.go new file mode 100644 index 0000000000..9e20bd35b5 --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/interfaces.go @@ -0,0 +1,51 @@ +package verifier + +import ( + "context" +) + +// Reader is an interface for reading the blockchain for new work. +type Reader interface { + // Next returns a channel that will yield the next piece of work to be processed. + Next(ctx context.Context) <-chan Work + + // Watch returns a channel that will yield all work to be processed. + Watch(ctx context.Context) <-chan Work +} + +// Transformer is an interface that defines how to transform a Work item into +// the generic modsec.Message format and encoded into a destination specific +// payload. This interface encapsulates most of the chain agnostic services +// required by the 1.6 implementation. +type Transformer interface { + Transform(work Work) HandlerPayload +} + +// Writer is an interface that defines how to write attestations and messages to a storage layer. +// The writer is responsible for deciding how to store the messages and attestations, including +// how to avoid duplicates, and how to structure the storage for efficient scanning. +// +// TODO: standard io.Writer interface could be used to simplify testing. +// Modex extensions would look like "modsec.NewS3Writer(path)" +type Writer interface { + // WriteMessage stores the message that is being attested. + // The writer implementation is responsible for deciding how to store the message. + // It should consider things like how it will be retrieved later, and how to avoid duplicates. + // For example, it may decide to store messages in a hierarchy based on the time, block number, sequence number, etc. + WriteMessage(ctx context.Context, msg HandlerPayload) error + + // WriteAttestation stores the attestation for a message. + // The writer implementation is responsible for deciding how to store the attestation. + // It should consider things like how it will be retrieved later, and how to avoid duplicates. + WriteAttestation(ctx context.Context, msg Attestation) error +} + +// Handler is a function that processes incoming work from the work channel and sends results to the result channel. +// There may be no results for a particular piece of work, in which case the result channel should not be written to. +// For example, the verifier may have a rule to skip processing until a certain block depth is reached. In that case, +// the work may be received, but no result will be produced until later. +// +// The verifier should respect the context for cancellation. If the context is cancelled, the verifier should stop. +// +// TODO: should verifier be a full service with start/stop/cancel, a cache, maybe a db connection, etc? +type Handler func(ctx context.Context, payload HandlerPayload, result chan<- Attestation) diff --git a/chainlink-modsec/libmodsec/internal/verifier/main.go b/chainlink-modsec/libmodsec/internal/verifier/main.go new file mode 100644 index 0000000000..0acc647277 --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/main.go @@ -0,0 +1,143 @@ +package verifier + +import ( + "context" + "fmt" + "sync" +) + +// run is the verifier main loop. +// +// Implementation of a pipeline: +// - Read events from blockchain. +// - Transform events to the verifier payload. +// - * Write raw message to storage. +// - Dispatch payloads to verifiers. +// - Collect results from verifiers as they are available. +// - Write verifier attestations to storage. +func (s *Verifier) run() { + if s.writer == nil { + fmt.Println("No writer configured, verifier will not process any work.") + // TODO: surface an error. + return + } + if s.transformer == nil { + fmt.Println("No transformer configured, verifier will not process any work.") + // TODO: surface an error. + return + } + if s.signer == nil { + fmt.Println("No signer configured, verifier will not process any work.") + // TODO: surface an error. + return + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + + transformCh := make(chan Work) + handlerPayloadCh := make(chan HandlerPayload) + writePayloadCh := make(chan HandlerPayload) + writeAttestationCh := make(chan Attestation) + + // Read blockchain data for new work. + go func() { + wg.Add(1) + defer wg.Done() + + for true { + select { + case <-ctx.Done(): + fmt.Println("Verifier shutting down poller...") + return + case data := <-s.reader.Next(ctx): + fmt.Println(data) + transformCh <- data // to transformer + } + } + }() + + // Transform work into the handler payload. + go func() { + wg.Add(1) + defer wg.Done() + + for true { + select { + case <-ctx.Done(): + fmt.Println("Verifier shutting down transformer...") + return + case work := <-transformCh: + payload := s.transformer.Transform(work) + fmt.Println("Payload transformed:", payload) + handlerPayloadCh <- payload // to handler dispatch. + writePayloadCh <- payload // to writer. + } + } + }() + + // run handlers + go func() { + wg.Add(1) + defer wg.Done() + + for true { + select { + case <-ctx.Done(): + fmt.Println("Verifier shutting down handlers...") + return + case payload := <-handlerPayloadCh: + fmt.Printf("Running handlers for payload: %v\n", payload) + // Dispatch payload to each handler. + for _, handler := range s.handlers { + // TODO: signing needs to fit in before writing. + go handler(ctx, payload, writeAttestationCh) + } + } + } + }() + + // write results + go func() { + wg.Add(1) + defer wg.Done() + + for true { + select { + case <-ctx.Done(): + fmt.Println("Verifier shutting down handlers...") + return + case payload := <-writePayloadCh: + for _, w := range s.writer { + if err := w.WriteMessage(ctx, payload); err != nil { + // TODO: surface an error. + fmt.Println("Error writing payload.") + } + } + case attestation := <-writeAttestationCh: + for _, w := range s.writer { + if err := w.WriteAttestation(ctx, attestation); err != nil { + // TODO: surface an error. + fmt.Println("Error writing payload.") + } + } + } + } + }() + + // Wait for service to stop. + done := false + for !done { + select { + case <-s.stopCh: + cancel() + done = true + case <-ctx.Done(): + done = true + } + } + + wg.Wait() +} diff --git a/chainlink-modsec/libmodsec/internal/verifier/types.go b/chainlink-modsec/libmodsec/internal/verifier/types.go new file mode 100644 index 0000000000..7530a19a0b --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/types.go @@ -0,0 +1,47 @@ +package verifier + +import ( + "github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes" +) + +// BlockInfo contains basic block metadata. +type BlockInfo struct { + SourceChain uint64 + BlockNum uint64 // why do they use big int... at 100k bps it would take 500k years to overflow uint64. + BlockHash []byte + Finalized bool // if the chain supports a finalized tag. +} + +// Work is generated from the blockchain reader. It contains all raw data +// required to verify a message. Work for one verification may be split +// across multiple work items. For example, a finalized message verifier +// would receive the initial work item containing message data but wait +// to emit an attestation until the block is finalized before emitting an +// attestation. +type Work struct { + Data []byte // maybe nil if there were no messages for a particular block num + BlockInfo +} + +// HandlerPayload is generated by the Transformer. It contains the Work data +// in a processed format. Processing includes: +// * Decoding the data into an AnyToAny object. +// * Encoding the data into a destination byte format (for verifier signing). +type HandlerPayload struct { + ID string + Message modsectypes.Message + DestData []byte + BlockInfo BlockInfo +} + +// VerifierInfo metadata needed by the writer. +type VerifierInfo struct { + ID string +} + +// Attestation for a message. +type Attestation struct { + VerifierInfo VerifierInfo + Message modsectypes.Message + Sig []byte +} diff --git a/chainlink-modsec/libmodsec/internal/verifier/verifier.go b/chainlink-modsec/libmodsec/internal/verifier/verifier.go new file mode 100644 index 0000000000..7f51c57fb7 --- /dev/null +++ b/chainlink-modsec/libmodsec/internal/verifier/verifier.go @@ -0,0 +1,77 @@ +package verifier + +import ( + "github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes" +) + +// Verifier is the main verifier service. It manages the lifecycle of data +// fetching, handling, and writing a resulting attestation. +type Verifier struct { + workCh chan Work + stopCh chan struct{} + + // configurable components + handlers []Handler + signer modsectypes.Signer + reader Reader + transformer Transformer + writer []Writer + // Add more configurable fields as needed +} + +// Option is the Verifier functional option type +type Option func(*Verifier) + +// WithSigner sets a custom signer for the verifier +func WithSigner(signer modsectypes.Signer) Option { + return func(v *Verifier) { + v.signer = signer + } +} + +// WithReader sets a reader for the verifier +func WithReader(reader Reader) Option { + return func(v *Verifier) { + v.reader = reader + } +} + +// WithTransformer sets the transformer for the verifier +func WithTransformer(transformer Transformer) Option { + return func(v *Verifier) { + v.transformer = transformer + } +} + +// WithWriter adds a writer for the verifier +func WithWriter(writer Writer) Option { + return func(v *Verifier) { + v.writer = append(v.writer, writer) + } +} + +// WithHandler adds a handler function, there can be more than one. +func WithHandler(handler Handler) Option { + return func(v *Verifier) { + v.handlers = append(v.handlers, handler) + } +} + +func NewVerifier(opts ...Option) *Verifier { + v := &Verifier{ + stopCh: make(chan struct{}), + } + // Apply all options + for _, opt := range opts { + opt(v) + } + return v +} + +func (s *Verifier) Start() { + go s.run() +} + +func (s *Verifier) Stop() { + close(s.stopCh) +}