Skip to content

Commit bb76eaf

Browse files
committed
feat: WIP build PRT epoch commitment
1 parent 35ec953 commit bb76eaf

File tree

21 files changed

+846
-343
lines changed

21 files changed

+846
-343
lines changed

internal/advancer/advancer.go

Lines changed: 153 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,15 @@ import (
77
"context"
88
"errors"
99
"fmt"
10-
"net/http"
1110
"os"
1211
"path"
1312
"strings"
1413

15-
"github.com/cartesi/rollups-node/internal/config"
16-
"github.com/cartesi/rollups-node/internal/inspect"
1714
"github.com/cartesi/rollups-node/internal/manager"
15+
"github.com/cartesi/rollups-node/internal/merkle"
1816
. "github.com/cartesi/rollups-node/internal/model"
1917
"github.com/cartesi/rollups-node/internal/repository"
20-
"github.com/cartesi/rollups-node/pkg/service"
18+
pkgm "github.com/cartesi/rollups-node/pkg/machine"
2119
)
2220

2321
var (
@@ -33,6 +31,7 @@ type AdvancerRepository interface {
3331
ListInputs(ctx context.Context, nameOrAddress string, f repository.InputFilter, p repository.Pagination, descending bool) ([]*Input, uint64, error)
3432
GetLastInput(ctx context.Context, appAddress string, epochIndex uint64) (*Input, error)
3533
StoreAdvanceResult(ctx context.Context, appID int64, ar *AdvanceResult) error
34+
UpdateEpochCommitment(ctx context.Context, appID int64, epochIndex uint64, commitment []byte) error
3635
UpdateEpochsInputsProcessed(ctx context.Context, nameOrAddress string) (int64, error)
3736
UpdateApplicationState(ctx context.Context, appID int64, state ApplicationState, reason *string) error
3837
GetEpoch(ctx context.Context, nameOrAddress string, index uint64) (*Epoch, error)
@@ -41,90 +40,11 @@ type AdvancerRepository interface {
4140
GetLastProcessedInput(ctx context.Context, appAddress string) (*Input, error)
4241
}
4342

44-
// Service is the main advancer service that processes inputs through Cartesi machines
45-
type Service struct {
46-
service.Service
47-
snapshotsDir string
48-
repository AdvancerRepository
49-
machineManager manager.MachineProvider
50-
inspector *inspect.Inspector
51-
HTTPServer *http.Server
52-
HTTPServerFunc func() error
53-
}
54-
55-
// CreateInfo contains the configuration for creating an advancer service
56-
type CreateInfo struct {
57-
service.CreateInfo
58-
Config config.AdvancerConfig
59-
Repository repository.Repository
60-
}
61-
62-
// Create initializes a new advancer service
63-
func Create(ctx context.Context, c *CreateInfo) (*Service, error) {
64-
var err error
65-
if err = ctx.Err(); err != nil {
66-
return nil, err // This returns context.Canceled or context.DeadlineExceeded.
67-
}
68-
69-
s := &Service{}
70-
c.Impl = s
71-
72-
err = service.Create(ctx, &c.CreateInfo, &s.Service)
73-
if err != nil {
74-
return nil, err
75-
}
76-
77-
s.repository = c.Repository
78-
if s.repository == nil {
79-
return nil, fmt.Errorf("repository on advancer service Create is nil")
80-
}
81-
82-
// Create the machine manager
83-
manager := manager.NewMachineManager(
84-
ctx,
85-
c.Repository,
86-
s.Logger,
87-
c.Config.FeatureMachineHashCheckEnabled,
88-
)
89-
s.machineManager = manager
90-
91-
// Initialize the inspect service if enabled
92-
if c.Config.FeatureInspectEnabled {
93-
s.inspector, s.HTTPServer, s.HTTPServerFunc = inspect.NewInspector(
94-
c.Repository,
95-
manager,
96-
c.Config.InspectAddress,
97-
c.LogLevel,
98-
c.LogColor,
99-
)
100-
}
101-
102-
s.snapshotsDir = c.Config.SnapshotsDir
103-
104-
return s, nil
105-
}
106-
107-
// Service interface implementation
108-
func (s *Service) Alive() bool { return true }
109-
func (s *Service) Ready() bool { return true }
110-
func (s *Service) Reload() []error { return nil }
111-
func (s *Service) Tick() []error {
112-
if err := s.Step(s.Context); err != nil {
113-
return []error{err}
114-
}
115-
return []error{}
116-
}
117-
func (s *Service) Stop(b bool) []error {
118-
return nil
119-
}
120-
func (s *Service) Serve() error {
121-
if s.inspector != nil && s.HTTPServerFunc != nil {
122-
go s.HTTPServerFunc()
123-
}
124-
return s.Service.Serve()
125-
}
126-
func (s *Service) String() string {
127-
return s.Name
43+
// temporary struct
44+
type EpochCommitment struct {
45+
EpochIndex uint64
46+
EpochInputCount uint64
47+
CommitmentBuilder merkle.Builder
12848
}
12949

13050
// getUnprocessedInputs retrieves inputs that haven't been processed yet
@@ -212,7 +132,7 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []
212132
"index", input.Index)
213133

214134
// Advance the machine with this input
215-
result, err := machine.Advance(ctx, input.RawData, input.Index)
135+
result, err := machine.Advance(ctx, input.RawData, input.EpochIndex, input.Index, app.IsDaveConsensus())
216136
if err != nil {
217137
// If there's an error, mark the application as inoperable
218138
s.Logger.Error("Error executing advance",
@@ -235,14 +155,16 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []
235155

236156
return err
237157
}
238-
158+
// log advance result hashes
239159
s.Logger.Info("Processing input finished",
240160
"application", app.Name,
241-
"epoch", input.EpochIndex,
242-
"index", input.Index,
161+
"epoch", result.EpochIndex,
162+
"index", result.InputIndex,
243163
"status", result.Status,
244164
"outputs", len(result.Outputs),
245165
"reports", len(result.Reports),
166+
"hashes", len(result.Hashes),
167+
"remaining_cycles", result.RemainingMetaCycles,
246168
)
247169

248170
// Store the result in the database
@@ -266,18 +188,135 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []
266188
// Continue processing even if snapshot creation fails
267189
}
268190
}
191+
192+
err = s.addCommimtmentLeafs(ctx, app, input, result)
193+
if err != nil {
194+
s.Logger.Error("Failed to add commitment leafs",
195+
"application", app.Name,
196+
"index", input.Index,
197+
"error", err)
198+
}
199+
200+
err = s.buildCommitment(ctx, app, input, machine)
201+
if err != nil {
202+
s.Logger.Error("Failed to build commitment",
203+
"application", app.Name,
204+
"index", input.Index,
205+
"error", err)
206+
return err
207+
}
269208
}
270209

271210
return nil
272211
}
273212

274-
// handleEpochSnapshotAfterInputProcessed handles the snapshot creation after when an epoch is closed after an input was processed
275-
func (s *Service) handleEpochSnapshotAfterInputProcessed(ctx context.Context, app *Application) error {
276-
// Check if the application has a epoch snapshot policy
277-
if app.ExecutionParameters.SnapshotPolicy != SnapshotPolicy_EveryEpoch {
278-
return nil
213+
func (s *Service) addCommimtmentLeafs(ctx context.Context, app *Application, input *Input, result *AdvanceResult) error {
214+
if app.IsDaveConsensus() {
215+
s.Logger.Debug("DaveConsensus: Adding input computation leafs",
216+
"application", app.Name,
217+
"epoch", input.EpochIndex,
218+
"index", input.Index)
219+
epochCommitment, exists := s.appCommitment[input.EpochApplicationID]
220+
if !exists {
221+
epochCommitment = &EpochCommitment{EpochIndex: input.EpochIndex}
222+
s.appCommitment[input.EpochApplicationID] = epochCommitment
223+
}
224+
for i, hash := range result.Hashes {
225+
s.Logger.Info("Advance result hash",
226+
"application", app.Name,
227+
"epoch", input.EpochIndex,
228+
"index", input.Index,
229+
"leaf_index", i,
230+
"hash", fmt.Sprintf("%x", hash),
231+
"repetitions", 1)
232+
epochCommitment.CommitmentBuilder.Append(merkle.TreeLeaf(hash))
233+
}
234+
s.Logger.Info("Advance result hash",
235+
"application", app.Name,
236+
"epoch", input.EpochIndex,
237+
"index", input.Index,
238+
"leaf_count", len(result.Hashes),
239+
"hash", result.MachineHash.String(),
240+
"repetitions", result.RemainingMetaCycles)
241+
epochCommitment.CommitmentBuilder.AppendRepeatedUint64(merkle.TreeLeaf(result.MachineHash), result.RemainingMetaCycles)
242+
epochCommitment.EpochInputCount++
243+
}
244+
return nil
245+
}
246+
247+
func (s *Service) buildCommitment(ctx context.Context, app *Application, input *Input, machine manager.MachineInstance) error {
248+
if app.IsDaveConsensus() {
249+
epochCommitment, exists := s.appCommitment[input.EpochApplicationID]
250+
if !exists {
251+
return nil
252+
}
253+
// If this is the last input and the epoch is closed, create a snapshot
254+
isLastInput, err := s.isEpochLastInput(ctx, app, input)
255+
if err != nil {
256+
return err
257+
}
258+
if isLastInput {
259+
s.Logger.Debug("DaveConsensus: Building commitment for last input of epoch",
260+
"application", app.Name,
261+
"epoch", input.EpochIndex,
262+
"index", input.Index)
263+
delete(s.appCommitment, input.EpochApplicationID)
264+
265+
remainingInputs := pkgm.InputsPerEpoch - epochCommitment.EpochInputCount
266+
remainingStrides := remainingInputs << pkgm.Log2StridesPerInput
267+
if remainingStrides > 0 {
268+
machineHash, err := machine.Hash(ctx)
269+
if err != nil {
270+
return err
271+
}
272+
273+
epochCommitment.CommitmentBuilder.AppendRepeatedUint64(merkle.TreeLeaf(machineHash), remainingStrides)
274+
}
275+
276+
epochCommitmentTree := epochCommitment.CommitmentBuilder.Build()
277+
commitment := epochCommitmentTree.GetRootHash()
278+
s.Logger.Info("DaveConsensus: Epoch commitment built",
279+
"application", app.Name,
280+
"epoch", input.EpochIndex,
281+
"commitment", fmt.Sprintf("%x", commitment))
282+
283+
err = s.repository.UpdateEpochCommitment(ctx, app.ID, input.EpochIndex, commitment.Bytes())
284+
if err != nil {
285+
return fmt.Errorf("failed to store epoch commitment: %w", err)
286+
}
287+
}
288+
}
289+
return nil
290+
}
291+
292+
func (s *Service) isEpochLastInput(ctx context.Context, app *Application, input *Input) (bool, error) {
293+
// Get the epoch for this input
294+
epoch, err := s.repository.GetEpoch(ctx, app.IApplicationAddress.String(), input.EpochIndex)
295+
if err != nil {
296+
return false, fmt.Errorf("failed to get epoch: %w", err)
297+
}
298+
299+
// Skip if the epoch is still open
300+
if epoch.Status == EpochStatus_Open {
301+
return false, nil
302+
}
303+
304+
// Check if this is the last input of the epoch
305+
lastInput, err := s.repository.GetLastInput(ctx, app.IApplicationAddress.String(), input.EpochIndex)
306+
if err != nil {
307+
return false, fmt.Errorf("failed to get last input: %w", err)
308+
}
309+
310+
// If this is the last input and the epoch is closed, return true
311+
if lastInput != nil && lastInput.Index == input.Index {
312+
return true, nil
279313
}
280314

315+
return false, nil
316+
}
317+
318+
// handleEpochSnapshotAfterInputProcessed handles the snapshot creation after when an epoch is closed after an input was processed
319+
func (s *Service) handleEpochSnapshotAfterInputProcessed(ctx context.Context, app *Application) error {
281320
// Get the machine instance for this application
282321
machine, exists := s.machineManager.GetMachine(app.ID)
283322
if !exists {
@@ -293,6 +332,19 @@ func (s *Service) handleEpochSnapshotAfterInputProcessed(ctx context.Context, ap
293332
if lastProcessedInput == nil {
294333
return nil
295334
}
335+
err = s.buildCommitment(ctx, app, lastProcessedInput, machine)
336+
if err != nil {
337+
s.Logger.Error("Failed to build commitment",
338+
"application", app.Name,
339+
"index", lastProcessedInput.Index,
340+
"error", err)
341+
return err
342+
}
343+
344+
// Check if the application has a epoch snapshot policy
345+
if app.ExecutionParameters.SnapshotPolicy != SnapshotPolicy_EveryEpoch {
346+
return nil
347+
}
296348

297349
// Handle the snapshot
298350
return s.handleSnapshot(ctx, app, machine, lastProcessedInput)
@@ -314,25 +366,12 @@ func (s *Service) handleSnapshot(ctx context.Context, app *Application, machine
314366

315367
// For EVERY_EPOCH policy, check if this is the last input of the epoch
316368
if policy == SnapshotPolicy_EveryEpoch {
317-
// Get the epoch for this input
318-
epoch, err := s.repository.GetEpoch(ctx, app.IApplicationAddress.String(), input.EpochIndex)
319-
if err != nil {
320-
return fmt.Errorf("failed to get epoch: %w", err)
321-
}
322-
323-
// Skip if the epoch is still open
324-
if epoch.Status == EpochStatus_Open {
325-
return nil
326-
}
327-
328-
// Check if this is the last input of the epoch
329-
lastInput, err := s.repository.GetLastInput(ctx, app.IApplicationAddress.String(), input.EpochIndex)
369+
// If this is the last input and the epoch is closed, create a snapshot
370+
isLastInput, err := s.isEpochLastInput(ctx, app, input)
330371
if err != nil {
331-
return fmt.Errorf("failed to get last input: %w", err)
372+
return err
332373
}
333-
334-
// If this is the last input and the epoch is closed, create a snapshot
335-
if lastInput != nil && lastInput.Index == input.Index {
374+
if isLastInput {
336375
return s.createSnapshot(ctx, app, machine, input)
337376
}
338377
}

0 commit comments

Comments
 (0)