Skip to content

Commit 2a98382

Browse files
committed
feat: build PRT epoch commitment
1 parent 0b4711b commit 2a98382

File tree

25 files changed

+910
-585
lines changed

25 files changed

+910
-585
lines changed

internal/advancer/advancer.go

Lines changed: 46 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,13 @@ import (
77
"context"
88
"errors"
99
"fmt"
10-
"net/http"
1110
"os"
1211
"path"
1312
"strings"
1413

15-
"github.com/cartesi/rollups-node/internal/config"
16-
"github.com/cartesi/rollups-node/internal/inspect"
1714
"github.com/cartesi/rollups-node/internal/manager"
1815
. "github.com/cartesi/rollups-node/internal/model"
1916
"github.com/cartesi/rollups-node/internal/repository"
20-
"github.com/cartesi/rollups-node/pkg/service"
2117
)
2218

2319
var (
@@ -34,99 +30,14 @@ type AdvancerRepository interface {
3430
GetLastInput(ctx context.Context, appAddress string, epochIndex uint64) (*Input, error)
3531
StoreAdvanceResult(ctx context.Context, appID int64, ar *AdvanceResult) error
3632
UpdateEpochsInputsProcessed(ctx context.Context, nameOrAddress string) ([]uint64, error)
33+
UpdateEpochCommitment(ctx context.Context, appID int64, epochIndex uint64, commitment []byte) error
3734
UpdateApplicationState(ctx context.Context, appID int64, state ApplicationState, reason *string) error
3835
GetEpoch(ctx context.Context, nameOrAddress string, index uint64) (*Epoch, error)
3936
UpdateInputSnapshotURI(ctx context.Context, appId int64, inputIndex uint64, snapshotURI string) error
4037
GetLastSnapshot(ctx context.Context, nameOrAddress string) (*Input, error)
4138
GetLastProcessedInput(ctx context.Context, appAddress string) (*Input, error)
4239
}
4340

44-
// Service is the main advancer service that processes inputs through Cartesi machines
45-
type Service struct {
46-
service.Service
47-
snapshotsDir string
48-
repository AdvancerRepository
49-
machineManager manager.MachineProvider
50-
inspector *inspect.Inspector
51-
HTTPServer *http.Server
52-
HTTPServerFunc func() error
53-
}
54-
55-
// CreateInfo contains the configuration for creating an advancer service
56-
type CreateInfo struct {
57-
service.CreateInfo
58-
Config config.AdvancerConfig
59-
Repository repository.Repository
60-
}
61-
62-
// Create initializes a new advancer service
63-
func Create(ctx context.Context, c *CreateInfo) (*Service, error) {
64-
var err error
65-
if err = ctx.Err(); err != nil {
66-
return nil, err // This returns context.Canceled or context.DeadlineExceeded.
67-
}
68-
69-
s := &Service{}
70-
c.Impl = s
71-
72-
err = service.Create(ctx, &c.CreateInfo, &s.Service)
73-
if err != nil {
74-
return nil, err
75-
}
76-
77-
s.repository = c.Repository
78-
if s.repository == nil {
79-
return nil, fmt.Errorf("repository on advancer service Create is nil")
80-
}
81-
82-
// Create the machine manager
83-
manager := manager.NewMachineManager(
84-
ctx,
85-
c.Repository,
86-
s.Logger,
87-
c.Config.FeatureMachineHashCheckEnabled,
88-
)
89-
s.machineManager = manager
90-
91-
// Initialize the inspect service if enabled
92-
if c.Config.FeatureInspectEnabled {
93-
s.inspector, s.HTTPServer, s.HTTPServerFunc = inspect.NewInspector(
94-
c.Repository,
95-
manager,
96-
c.Config.InspectAddress,
97-
c.LogLevel,
98-
c.LogColor,
99-
)
100-
}
101-
102-
s.snapshotsDir = c.Config.SnapshotsDir
103-
104-
return s, nil
105-
}
106-
107-
// Service interface implementation
108-
func (s *Service) Alive() bool { return true }
109-
func (s *Service) Ready() bool { return true }
110-
func (s *Service) Reload() []error { return nil }
111-
func (s *Service) Tick() []error {
112-
if err := s.Step(s.Context); err != nil {
113-
return []error{err}
114-
}
115-
return []error{}
116-
}
117-
func (s *Service) Stop(b bool) []error {
118-
return nil
119-
}
120-
func (s *Service) Serve() error {
121-
if s.inspector != nil && s.HTTPServerFunc != nil {
122-
go s.HTTPServerFunc()
123-
}
124-
return s.Service.Serve()
125-
}
126-
func (s *Service) String() string {
127-
return s.Name
128-
}
129-
13041
// getUnprocessedInputs retrieves inputs that haven't been processed yet
13142
func getUnprocessedInputs(ctx context.Context, repo AdvancerRepository, appAddress string) ([]*Input, uint64, error) {
13243
f := repository.InputFilter{Status: Pointer(InputCompletionStatus_None)}
@@ -212,7 +123,7 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []
212123
"index", input.Index)
213124

214125
// Advance the machine with this input
215-
result, err := machine.Advance(ctx, input.RawData, input.Index)
126+
result, err := machine.Advance(ctx, input.RawData, input.EpochIndex, input.Index, app.IsDaveConsensus())
216127
if err != nil {
217128
// If there's an error, mark the application as inoperable
218129
s.Logger.Error("Error executing advance",
@@ -235,14 +146,16 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []
235146

236147
return err
237148
}
238-
149+
// log advance result hashes
239150
s.Logger.Info("Processing input finished",
240151
"application", app.Name,
241-
"epoch", input.EpochIndex,
242-
"index", input.Index,
152+
"epoch", result.EpochIndex,
153+
"index", result.InputIndex,
243154
"status", result.Status,
244155
"outputs", len(result.Outputs),
245156
"reports", len(result.Reports),
157+
"hashes", len(result.Hashes),
158+
"remaining_cycles", result.RemainingMetaCycles,
246159
)
247160

248161
// Store the result in the database
@@ -271,13 +184,37 @@ func (s *Service) processInputs(ctx context.Context, app *Application, inputs []
271184
return nil
272185
}
273186

274-
// handleEpochSnapshotAfterInputProcessed handles the snapshot creation after when an epoch is closed after an input was processed
275-
func (s *Service) handleEpochSnapshotAfterInputProcessed(ctx context.Context, app *Application) error {
276-
// Check if the application has a epoch snapshot policy
277-
if app.ExecutionParameters.SnapshotPolicy != SnapshotPolicy_EveryEpoch {
278-
return nil
187+
func (s *Service) isEpochLastInput(ctx context.Context, app *Application, input *Input) (bool, error) {
188+
if app == nil || input == nil {
189+
return false, fmt.Errorf("application and input must not be nil")
190+
}
191+
// Get the epoch for this input
192+
epoch, err := s.repository.GetEpoch(ctx, app.IApplicationAddress.String(), input.EpochIndex)
193+
if err != nil {
194+
return false, fmt.Errorf("failed to get epoch: %w", err)
195+
}
196+
197+
// Skip if the epoch is still open
198+
if epoch.Status == EpochStatus_Open {
199+
return false, nil
279200
}
280201

202+
// Check if this is the last input of the epoch
203+
lastInput, err := s.repository.GetLastInput(ctx, app.IApplicationAddress.String(), input.EpochIndex)
204+
if err != nil {
205+
return false, fmt.Errorf("failed to get last input: %w", err)
206+
}
207+
208+
// If this is the last input and the epoch is closed, return true
209+
if lastInput != nil && lastInput.Index == input.Index {
210+
return true, nil
211+
}
212+
213+
return false, nil
214+
}
215+
216+
// handleEpochSnapshotAfterInputProcessed handles the snapshot creation after when an epoch is closed after an input was processed
217+
func (s *Service) handleEpochSnapshotAfterInputProcessed(ctx context.Context, app *Application) error {
281218
// Get the machine instance for this application
282219
machine, exists := s.machineManager.GetMachine(app.ID)
283220
if !exists {
@@ -290,12 +227,13 @@ func (s *Service) handleEpochSnapshotAfterInputProcessed(ctx context.Context, ap
290227
return fmt.Errorf("failed to get last input: %w", err)
291228
}
292229

293-
if lastProcessedInput == nil {
294-
return nil
230+
// Check if the application has a epoch snapshot policy
231+
if lastProcessedInput != nil && app.ExecutionParameters.SnapshotPolicy == SnapshotPolicy_EveryEpoch {
232+
// Handle the snapshot
233+
return s.handleSnapshot(ctx, app, machine, lastProcessedInput)
295234
}
296235

297-
// Handle the snapshot
298-
return s.handleSnapshot(ctx, app, machine, lastProcessedInput)
236+
return nil
299237
}
300238

301239
// handleSnapshot creates a snapshot based on the application's snapshot policy
@@ -314,25 +252,12 @@ func (s *Service) handleSnapshot(ctx context.Context, app *Application, machine
314252

315253
// For EVERY_EPOCH policy, check if this is the last input of the epoch
316254
if policy == SnapshotPolicy_EveryEpoch {
317-
// Get the epoch for this input
318-
epoch, err := s.repository.GetEpoch(ctx, app.IApplicationAddress.String(), input.EpochIndex)
319-
if err != nil {
320-
return fmt.Errorf("failed to get epoch: %w", err)
321-
}
322-
323-
// Skip if the epoch is still open
324-
if epoch.Status == EpochStatus_Open {
325-
return nil
326-
}
327-
328-
// Check if this is the last input of the epoch
329-
lastInput, err := s.repository.GetLastInput(ctx, app.IApplicationAddress.String(), input.EpochIndex)
255+
// If this is the last input and the epoch is closed, create a snapshot
256+
isLastInput, err := s.isEpochLastInput(ctx, app, input)
330257
if err != nil {
331-
return fmt.Errorf("failed to get last input: %w", err)
258+
return err
332259
}
333-
334-
// If this is the last input and the epoch is closed, create a snapshot
335-
if lastInput != nil && lastInput.Index == input.Index {
260+
if isLastInput {
336261
return s.createSnapshot(ctx, app, machine, input)
337262
}
338263
}
@@ -364,7 +289,7 @@ func (s *Service) createSnapshot(ctx context.Context, app *Application, machine
364289

365290
// Ensure the parent directory exists
366291
if _, err := os.Stat(s.snapshotsDir); os.IsNotExist(err) {
367-
if err := os.MkdirAll(s.snapshotsDir, 0755); err != nil { // nolint: mnd
292+
if err := os.MkdirAll(s.snapshotsDir, 0755); err != nil { //nolint: mnd
368293
return fmt.Errorf("failed to create snapshots directory: %w", err)
369294
}
370295
}

internal/advancer/advancer_test.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,8 @@ func (mock *MockMachineImpl) Advance(
513513
ctx context.Context,
514514
input []byte,
515515
_ uint64,
516+
_ uint64,
517+
_ bool,
516518
) (*AdvanceResult, error) {
517519
// If AdvanceBlock is true, block until context is canceled
518520
if mock.AdvanceBlock {
@@ -605,8 +607,8 @@ type MockMachineInstance struct {
605607
}
606608

607609
// Advance implements the MachineInstance interface for testing
608-
func (m *MockMachineInstance) Advance(ctx context.Context, input []byte, index uint64) (*AdvanceResult, error) {
609-
return m.machineImpl.Advance(ctx, input, index)
610+
func (m *MockMachineInstance) Advance(ctx context.Context, input []byte, epochIndex uint64, index uint64, leafs bool) (*AdvanceResult, error) {
611+
return m.machineImpl.Advance(ctx, input, epochIndex, index, leafs)
610612
}
611613

612614
// Inspect implements the MachineInstance interface for testing
@@ -632,6 +634,12 @@ func (m *MockMachineInstance) CreateSnapshot(ctx context.Context, processInputs
632634
return nil
633635
}
634636

637+
// Retrieves the hash of the current machine state
638+
func (m *MockMachineInstance) Hash(ctx context.Context) ([32]byte, error) {
639+
// Not used in advancer tests, but needed to satisfy the interface
640+
return [32]byte{}, nil
641+
}
642+
635643
// Close implements the MachineInstance interface for testing
636644
func (m *MockMachineInstance) Close() error {
637645
// Not used in advancer tests, but needed to satisfy the interface
@@ -649,6 +657,7 @@ type MockRepository struct {
649657
UpdateApplicationStateError error
650658
UpdateEpochsError error
651659
UpdatedEpochs []uint64
660+
UpdateEpochCommitmentError error
652661
GetLastSnapshotReturn *Input
653662
GetLastSnapshotError error
654663

@@ -706,6 +715,15 @@ func (mock *MockRepository) StoreAdvanceResult(
706715
return mock.StoreAdvanceError
707716
}
708717

718+
func (mock *MockRepository) UpdateEpochCommitment(ctx context.Context, appID int64, epochIndex uint64, commitmen []byte) error {
719+
// Check for context cancellation
720+
if ctx.Err() != nil {
721+
return ctx.Err()
722+
}
723+
724+
return mock.UpdateEpochCommitmentError
725+
}
726+
709727
func (mock *MockRepository) UpdateEpochsInputsProcessed(ctx context.Context, nameOrAddress string) ([]uint64, error) {
710728
// Check for context cancellation
711729
if ctx.Err() != nil {
@@ -849,14 +867,13 @@ func randomInputs(appId int64, epochIndex uint64, size int) []*Input {
849867
}
850868

851869
func randomAdvanceResult(inputIndex uint64) *AdvanceResult {
852-
hash := randomHash()
853870
res := &AdvanceResult{
854871
InputIndex: inputIndex,
855872
Status: InputCompletionStatus_Accepted,
856873
Outputs: randomSliceOfBytes(),
857874
Reports: randomSliceOfBytes(),
858875
OutputsHash: randomHash(),
859-
MachineHash: &hash,
876+
MachineHash: randomHash(),
860877
}
861878
return res
862879
}

0 commit comments

Comments
 (0)