-
Notifications
You must be signed in to change notification settings - Fork 24
Verifier main loop. #1135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Verifier main loop. #1135
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| // Package verifier implements a service that monitors a channel for incoming | ||
| // messages and sends attestations to a results channel. | ||
| package verifier |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| package verifier | ||
|
|
||
| import ( | ||
| "context" | ||
| ) | ||
|
|
||
| // Reader is an interface for reading the blockchain for new work. | ||
| type Reader interface { | ||
| // Next returns a channel that will yield the next piece of work to be processed. | ||
| Next(ctx context.Context) <-chan Work | ||
|
|
||
| // Watch returns a channel that will yield all work to be processed. | ||
| Watch(ctx context.Context) <-chan Work | ||
| } | ||
|
|
||
| // Transformer is an interface that defines how to transform a Work item into | ||
| // the generic modsec.Message format and encoded into a destination specific | ||
| // payload. This interface encapsulates most of the chain agnostic services | ||
| // required by the 1.6 implementation. | ||
| type Transformer interface { | ||
| Transform(work Work) HandlerPayload | ||
| } | ||
|
Comment on lines
+16
to
+22
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Encapsulated in the interface above - could make things more generic if we think its useful though.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think its important to split reading and re-encoding. This layer will have chain family specific re-encoding details. So may as well split it up from the blockchain reading so that each has a single responsibility. |
||
|
|
||
| // Writer is an interface that defines how to write attestations and messages to a storage layer. | ||
| // The writer is responsible for deciding how to store the messages and attestations, including | ||
| // how to avoid duplicates, and how to structure the storage for efficient scanning. | ||
| // | ||
| // TODO: standard io.Writer interface could be used to simplify testing. | ||
| // Modex extensions would look like "modsec.NewS3Writer(path)" | ||
| type Writer interface { | ||
| // WriteMessage stores the message that is being attested. | ||
| // The writer implementation is responsible for deciding how to store the message. | ||
| // It should consider things like how it will be retrieved later, and how to avoid duplicates. | ||
| // For example, it may decide to store messages in a hierarchy based on the time, block number, sequence number, etc. | ||
| WriteMessage(ctx context.Context, msg HandlerPayload) error | ||
|
|
||
| // WriteAttestation stores the attestation for a message. | ||
| // The writer implementation is responsible for deciding how to store the attestation. | ||
| // It should consider things like how it will be retrieved later, and how to avoid duplicates. | ||
| WriteAttestation(ctx context.Context, msg Attestation) error | ||
| } | ||
|
Comment on lines
+24
to
+41
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I landed on: // AttestationWriter defines the interface for storing attestations.
type AttestationWriter interface {
Store(ctx context.Context, att Attestation) error
}The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it could be useful to split up the Message, Attestation(s) and maybe even an optional AttestationData for verifiers that produce something like a CCTP attestation. OffRamp.Execute should accept these objects as the arguments. |
||
|
|
||
| // Handler is a function that processes incoming work from the work channel and sends results to the result channel. | ||
| // There may be no results for a particular piece of work, in which case the result channel should not be written to. | ||
| // For example, the verifier may have a rule to skip processing until a certain block depth is reached. In that case, | ||
| // the work may be received, but no result will be produced until later. | ||
| // | ||
| // The verifier should respect the context for cancellation. If the context is cancelled, the verifier should stop. | ||
| // | ||
| // TODO: should verifier be a full service with start/stop/cancel, a cache, maybe a db connection, etc? | ||
| type Handler func(ctx context.Context, payload HandlerPayload, result chan<- Attestation) | ||
|
Comment on lines
+50
to
+51
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have the verifier logic encapsulated in an interface: // Verifier is responsible for performing the specific verification logic for a message.
type Verifier interface {
// Verify tries to verify the given message and returns nil if the message is valid.
// For example, the commit verifier can check that the message is sufficiently deeply buried in the blockchain,
// or is finalized/safe.
Verify(ctx context.Context, message CCIPMessage) error
}Method on it like that is synchronous, though we could have an async version or make it fully async with the channel input.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this assumes that IMO this is way too complex for a component we hope for 3rd parties to implement. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| package verifier | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
| ) | ||
|
|
||
| // run is the verifier main loop. | ||
| // | ||
| // Implementation of a pipeline: | ||
| // - Read events from blockchain. | ||
| // - Transform events to the verifier payload. | ||
| // - * Write raw message to storage. | ||
| // - Dispatch payloads to verifiers. | ||
| // - Collect results from verifiers as they are available. | ||
| // - Write verifier attestations to storage. | ||
| func (s *Verifier) run() { | ||
| if s.writer == nil { | ||
| fmt.Println("No writer configured, verifier will not process any work.") | ||
| // TODO: surface an error. | ||
| return | ||
| } | ||
| if s.transformer == nil { | ||
| fmt.Println("No transformer configured, verifier will not process any work.") | ||
| // TODO: surface an error. | ||
| return | ||
| } | ||
| if s.signer == nil { | ||
| fmt.Println("No signer configured, verifier will not process any work.") | ||
| // TODO: surface an error. | ||
| return | ||
| } | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| var wg sync.WaitGroup | ||
|
|
||
| transformCh := make(chan Work) | ||
| handlerPayloadCh := make(chan HandlerPayload) | ||
| writePayloadCh := make(chan HandlerPayload) | ||
| writeAttestationCh := make(chan Attestation) | ||
|
|
||
| // Read blockchain data for new work. | ||
| go func() { | ||
| wg.Add(1) | ||
| defer wg.Done() | ||
|
|
||
| for true { | ||
| select { | ||
| case <-ctx.Done(): | ||
| fmt.Println("Verifier shutting down poller...") | ||
| return | ||
| case data := <-s.reader.Next(ctx): | ||
| fmt.Println(data) | ||
| transformCh <- data // to transformer | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| // Transform work into the handler payload. | ||
| go func() { | ||
| wg.Add(1) | ||
| defer wg.Done() | ||
|
|
||
| for true { | ||
| select { | ||
| case <-ctx.Done(): | ||
| fmt.Println("Verifier shutting down transformer...") | ||
| return | ||
| case work := <-transformCh: | ||
| payload := s.transformer.Transform(work) | ||
| fmt.Println("Payload transformed:", payload) | ||
| handlerPayloadCh <- payload // to handler dispatch. | ||
| writePayloadCh <- payload // to writer. | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| // run handlers | ||
| go func() { | ||
| wg.Add(1) | ||
| defer wg.Done() | ||
|
|
||
| for true { | ||
| select { | ||
| case <-ctx.Done(): | ||
| fmt.Println("Verifier shutting down handlers...") | ||
| return | ||
| case payload := <-handlerPayloadCh: | ||
| fmt.Printf("Running handlers for payload: %v\n", payload) | ||
| // Dispatch payload to each handler. | ||
| for _, handler := range s.handlers { | ||
| // TODO: signing needs to fit in before writing. | ||
| go handler(ctx, payload, writeAttestationCh) | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| // write results | ||
| go func() { | ||
| wg.Add(1) | ||
| defer wg.Done() | ||
|
|
||
| for true { | ||
| select { | ||
| case <-ctx.Done(): | ||
| fmt.Println("Verifier shutting down handlers...") | ||
| return | ||
| case payload := <-writePayloadCh: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems kinda overkill to make message encoding an async thing |
||
| for _, w := range s.writer { | ||
| if err := w.WriteMessage(ctx, payload); err != nil { | ||
| // TODO: surface an error. | ||
| fmt.Println("Error writing payload.") | ||
| } | ||
| } | ||
| case attestation := <-writeAttestationCh: | ||
| for _, w := range s.writer { | ||
| if err := w.WriteAttestation(ctx, attestation); err != nil { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Message encoding is required for attestation; if you'd want to make it a full pipeline you'd have to encode first then attest second |
||
| // TODO: surface an error. | ||
| fmt.Println("Error writing payload.") | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| // Wait for service to stop. | ||
| done := false | ||
| for !done { | ||
| select { | ||
| case <-s.stopCh: | ||
| cancel() | ||
| done = true | ||
| case <-ctx.Done(): | ||
| done = true | ||
| } | ||
| } | ||
|
|
||
| wg.Wait() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| package verifier | ||
|
|
||
| import ( | ||
| "github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes" | ||
| ) | ||
|
|
||
| // BlockInfo contains basic block metadata. | ||
| type BlockInfo struct { | ||
| SourceChain uint64 | ||
| BlockNum uint64 // why do they use big int... at 100k bps it would take 500k years to overflow uint64. | ||
| BlockHash []byte | ||
| Finalized bool // if the chain supports a finalized tag. | ||
| } | ||
|
|
||
| // Work is generated from the blockchain reader. It contains all raw data | ||
| // required to verify a message. Work for one verification may be split | ||
| // across multiple work items. For example, a finalized message verifier | ||
| // would receive the initial work item containing message data but wait | ||
| // to emit an attestation until the block is finalized before emitting an | ||
| // attestation. | ||
| type Work struct { | ||
| Data []byte // maybe nil if there were no messages for a particular block num | ||
| BlockInfo | ||
| } | ||
|
|
||
| // HandlerPayload is generated by the Transformer. It contains the Work data | ||
| // in a processed format. Processing includes: | ||
| // * Decoding the data into an AnyToAny object. | ||
| // * Encoding the data into a destination byte format (for verifier signing). | ||
| type HandlerPayload struct { | ||
| ID string | ||
| Message modsectypes.Message | ||
| DestData []byte | ||
| BlockInfo BlockInfo | ||
| } | ||
|
|
||
| // VerifierInfo metadata needed by the writer. | ||
| type VerifierInfo struct { | ||
| ID string | ||
| } | ||
|
|
||
| // Attestation for a message. | ||
| type Attestation struct { | ||
| VerifierInfo VerifierInfo | ||
| Message modsectypes.Message | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need a message in the attestation itself? Shouldn't the msg hash be enough here? |
||
| Sig []byte | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| package verifier | ||
|
|
||
| import ( | ||
| "github.com/smartcontractkit/chainlink-modsec/libmodsec/pkg/modsectypes" | ||
| ) | ||
|
|
||
| // Verifier is the main verifier service. It manages the lifecycle of data | ||
| // fetching, handling, and writing a resulting attestation. | ||
| type Verifier struct { | ||
| workCh chan Work | ||
| stopCh chan struct{} | ||
|
|
||
| // configurable components | ||
| handlers []Handler | ||
| signer modsectypes.Signer | ||
| reader Reader | ||
| transformer Transformer | ||
| writer []Writer | ||
| // Add more configurable fields as needed | ||
| } | ||
|
|
||
| // Option is the Verifier functional option type | ||
| type Option func(*Verifier) | ||
|
|
||
| // WithSigner sets a custom signer for the verifier | ||
| func WithSigner(signer modsectypes.Signer) Option { | ||
| return func(v *Verifier) { | ||
| v.signer = signer | ||
| } | ||
| } | ||
|
|
||
| // WithReader sets a reader for the verifier | ||
| func WithReader(reader Reader) Option { | ||
| return func(v *Verifier) { | ||
| v.reader = reader | ||
| } | ||
| } | ||
|
|
||
| // WithTransformer sets the transformer for the verifier | ||
| func WithTransformer(transformer Transformer) Option { | ||
| return func(v *Verifier) { | ||
| v.transformer = transformer | ||
| } | ||
| } | ||
|
|
||
| // WithWriter adds a writer for the verifier | ||
| func WithWriter(writer Writer) Option { | ||
| return func(v *Verifier) { | ||
| v.writer = append(v.writer, writer) | ||
| } | ||
| } | ||
|
|
||
| // WithHandler adds a handler function, there can be more than one. | ||
| func WithHandler(handler Handler) Option { | ||
| return func(v *Verifier) { | ||
| v.handlers = append(v.handlers, handler) | ||
| } | ||
| } | ||
|
|
||
| func NewVerifier(opts ...Option) *Verifier { | ||
| v := &Verifier{ | ||
| stopCh: make(chan struct{}), | ||
| } | ||
| // Apply all options | ||
| for _, opt := range opts { | ||
| opt(v) | ||
| } | ||
| return v | ||
| } | ||
|
|
||
| func (s *Verifier) Start() { | ||
| go s.run() | ||
| } | ||
|
|
||
| func (s *Verifier) Stop() { | ||
| close(s.stopCh) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needed with the
SourceReaderinterface already defined?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd leave out
GetMessagesand especiallyquery. If we need to rollback the reader for some reason it should be as simple as possible, boil it down to which round to start on and simply stream everything again from that point.