diff --git a/orchestrator/.gitignore b/orchestrator/.gitignore index 9525d76..a927865 100644 --- a/orchestrator/.gitignore +++ b/orchestrator/.gitignore @@ -3,9 +3,10 @@ bazel-* user.bazelrc # make stuff -result/ -generator -orchestrator +/result/ +/generator +/orchestrator +/orchestrator_service # Binaries for programs and plugins *.exe diff --git a/orchestrator/WEBHOOK_USAGE.md b/orchestrator/WEBHOOK_USAGE.md new file mode 100644 index 0000000..13fc289 --- /dev/null +++ b/orchestrator/WEBHOOK_USAGE.md @@ -0,0 +1,69 @@ +# Webhook Integration for Orchestrator Service + +The orchestrator service now supports webhook notifications to notify external systems when experiments complete or fail. + +## How to Use + +### 1. Include Webhook URL in Experiment Request + +When creating an experiment via the `/api/v0/experiment/run` endpoint, include an optional `webhook_url` parameter: + +```json +{ + "experiment_name": "my-test-experiment", + "webhook_url": "https://your-server.com/webhook/endpoint", + "base_tps": 10.0, + "rounds": 3, + // ... other experiment parameters +} +``` + +### 2. Webhook Payload Format + +When the experiment completes (either successfully or with an error), the orchestrator will send a POST request to your webhook URL with the following JSON payload: + +#### Success Notification +```json +{ + "experiment_name": "my-test-experiment", + "success": true, + "warnings": ["Optional warning messages"] +} +``` + +#### Error Notification +```json +{ + "experiment_name": "my-test-experiment", + "success": false, + "error": "Detailed error message", + "warnings": ["Optional warning messages"] +} +``` + +### 3. Webhook Request Headers + +The webhook request will include the following headers: +- `Content-Type: application/json` +- `User-Agent: mina-orchestrator/1.0` + +### 4. Webhook Endpoint Requirements + +Your webhook endpoint should: +- Accept POST requests +- Return HTTP status code 200-299 for successful processing +- Process the request within 30 seconds (request timeout) + +## Error Handling + +- If the webhook URL is unreachable or returns an error status code, the notification will fail but the experiment will continue normally +- Webhook notifications are sent asynchronously and do not affect experiment execution +- Failed webhook notifications are logged but do not cause the experiment to fail +- The webhook request has a 30-second timeout + +## Troubleshooting + +- Check orchestrator service logs for webhook-related errors +- Verify your webhook endpoint is accessible from the orchestrator service +- Ensure your webhook endpoint returns appropriate HTTP status codes +- Test with a simple webhook testing service first before implementing custom logic diff --git a/orchestrator/shell.nix b/orchestrator/shell.nix new file mode 100644 index 0000000..370e1a0 --- /dev/null +++ b/orchestrator/shell.nix @@ -0,0 +1,7 @@ +with import {}; +{ + devEnv = stdenv.mkDerivation { + name = "dev"; + buildInputs = [ stdenv go_1_23 glibc ]; + }; +} diff --git a/orchestrator/src/encode.go b/orchestrator/src/encode.go index 003ccdf..b710560 100644 --- a/orchestrator/src/encode.go +++ b/orchestrator/src/encode.go @@ -1,18 +1,170 @@ package itn_orchestrator import ( + "encoding/json" "fmt" + "io" "os" "strings" + + logging "github.com/ipfs/go-log/v2" ) +// RoundInfo holds information about a single round +type RoundInfo struct { + PaymentCount int `json:"payment_count"` + ZkappCount int `json:"zkapp_count"` + PaymentTps float64 `json:"payment_tps"` + ZkappTps float64 `json:"zkapp_tps"` + DurationMinutes int `json:"duration_minutes"` + MaxCost bool `json:"max_cost"` +} + +// ExperimentInfo holds information about the entire experiment +type ExperimentInfo []RoundInfo + func fund(p FundParams) GeneratedCommand { return GeneratedCommand{Action: FundAction{}.Name(), Params: p} } -func Encode(p *GenParams, writeCommand func(GeneratedCommand), writeComment func(string)) { +// EncodeToWriter encodes experiment parameters to a writer using JSON encoding +// Returns experiment info and error +// isService: if true, uses secure logging that doesn't expose passwords +func EncodeToWriter(p *GenParams, writer io.Writer, setup interface{}) (ExperimentInfo, error) { + encoder := json.NewEncoder(writer) + var errors []string + + writeComment := func(comment string) { + if err := encoder.Encode(comment); err != nil { + errors = append(errors, fmt.Sprintf("Error writing comment: %v", err)) + } + } + writeCommand := func(cmd GeneratedCommand) { + if comment := cmd.Comment(); comment != "" { + writeComment(comment) + } + if err := encoder.Encode(cmd); err != nil { + errors = append(errors, fmt.Sprintf("Error writing command: %v", err)) + } + } + + experimentInfo := EncodeWithContext(p, writeCommand, writeComment, setup) + + if len(errors) > 0 { + return ExperimentInfo{}, fmt.Errorf("encoding errors: %s", strings.Join(errors, "; ")) + } + return experimentInfo, nil +} + +// printComment prints a comment to both log and stderr +func printComment(comment string, log logging.StandardLogger) { + log.Info(comment) + fmt.Fprintln(os.Stderr, comment) +} - writeComment("Generated with: " + strings.Join(os.Args, " ")) +// RunExperiment executes an experiment from a JSON decoder with the given configuration +// Returns error if execution fails, nil on success +func RunExperiment(inDecoder *json.Decoder, config Config, log logging.StandardLogger) error { + outCache := EmptyOutputCache() + rconfig := ResolutionConfig{ + OutputCache: outCache, + } + step := 0 + var prevAction BatchAction + var actionAccum []ActionIO + var batchStartStep int // Track the starting step of current batch + var preBatchComments []string // Comments to print before current batch executes + var postBatchComments []string // Comments accumulated after batch started + + handlePrevAction := func() error { + // Print pre-batch comments before executing the action + for _, comment := range preBatchComments { + printComment(comment, log) + } + preBatchComments = nil + + // Calculate correct step range for the batch + batchEndStep := batchStartStep + len(actionAccum) - 1 + if len(actionAccum) == 1 { + log.Infof("Performing step %s (%d)", prevAction.Name(), batchStartStep) + } else { + log.Infof("Performing steps %s (%d-%d)", prevAction.Name(), batchStartStep, batchEndStep) + } + err := prevAction.RunMany(config, actionAccum) + if err != nil { + batchEndStep := batchStartStep + len(actionAccum) - 1 + if len(actionAccum) == 1 { + return &OrchestratorError{ + Message: fmt.Sprintf("Error running step %d: %v", batchStartStep, err), + Code: 9, + } + } else { + return &OrchestratorError{ + Message: fmt.Sprintf("Error running steps %d-%d: %v", batchStartStep, batchEndStep, err), + Code: 9, + } + } + } + prevAction = nil + actionAccum = nil + + // Move post-batch comments to pre-batch for next action + preBatchComments = postBatchComments + postBatchComments = nil + return nil + } + + err := RunActions(inDecoder, config, outCache, log, step, + handlePrevAction, &actionAccum, rconfig, &prevAction, &preBatchComments, &postBatchComments, &batchStartStep) + if err != nil { + if orchErr, ok := err.(*OrchestratorError); ok { + log.Errorf("Experiment finished with error: %v", orchErr) + return orchErr + } + return err + } + + if prevAction != nil { + if err := handlePrevAction(); err != nil { + log.Errorf("Error running action: %s due to: %v", prevAction.Name(), err) + // Check if context is canceled + if config.Ctx.Err() != nil { + return config.Ctx.Err() + } + return &OrchestratorError{ + Message: fmt.Sprintf("Error running previous action: %v", err), + Code: 9, + } + } + } + + // Print any remaining accumulated comments at the end + for _, comment := range preBatchComments { + printComment(comment, log) + } + for _, comment := range postBatchComments { + printComment(comment, log) + } + return nil +} + +func Encode(p *GenParams, writeCommand func(GeneratedCommand), writeComment func(string)) ExperimentInfo { + return EncodeWithContext(p, writeCommand, writeComment, false) +} + +func EncodeWithContext(p *GenParams, writeCommand func(GeneratedCommand), writeComment func(string), setup interface{}) ExperimentInfo { + // For orchestrator service, show experiment setup instead of command args (which contain passwords) + // For standalone generator, show command args as before + if setup != nil { + setupJSON, err := json.Marshal(setup) + if err != nil { + writeComment("Generated by orchestrator service (error serializing setup)") + } else { + writeComment("Generated by orchestrator service with setup: " + string(setupJSON)) + } + } else { + writeComment("Generated with: " + strings.Join(os.Args, " ")) + } if p.ZkappSoftLimit > -2 { writeCommand(Discovery(DiscoveryParams{})) writeComment(fmt.Sprintf("Setting zkapp soft limit to %d", p.ZkappSoftLimit)) @@ -20,6 +172,8 @@ func Encode(p *GenParams, writeCommand func(GeneratedCommand), writeComment func } cmds := []GeneratedCommand{} fundCmds := []FundParams{} + var rounds []RoundInfo + writeComment("Funding keys for the experiment") for r := 0; r < p.Rounds; r++ { round := p.Generate(r) @@ -30,6 +184,8 @@ func Encode(p *GenParams, writeCommand func(GeneratedCommand), writeComment func if round.ZkappFundCommand != nil { fundCmds = append(fundCmds, *round.ZkappFundCommand) } + // Collect round info from the generated round + rounds = append(rounds, round.RoundInfo) } privkeys := p.Privkeys if p.GenerateFundKeys > 0 { @@ -74,4 +230,5 @@ func Encode(p *GenParams, writeCommand func(GeneratedCommand), writeComment func for _, cmd := range cmds { writeCommand(cmd) } + return rounds } diff --git a/orchestrator/src/fund.go b/orchestrator/src/fund.go index abc1e1f..b2730ae 100644 --- a/orchestrator/src/fund.go +++ b/orchestrator/src/fund.go @@ -74,7 +74,16 @@ func fundImpl(config Config, ctx context.Context, daemonPort string, params Fund } func fundRunImpl(config Config, ctx context.Context, daemonPortIx int, params FundParams, output OutputF) error { + // The amount per key should be the total amount divided by number of accounts + // But we need to ensure each account gets enough to cover the fee amountPerKey := params.Amount / uint64(params.Num) + + // Check if the amount per key is sufficient to cover the fee + if amountPerKey < params.Fee { + return fmt.Errorf("insufficient funds per account: each account would get %d nanomina but fee is %d nanomina (need at least %d nanomina total for %d accounts)", + amountPerKey, params.Fee, params.Fee*uint64(params.Num), params.Num) + } + password := "" if params.PasswordEnv != "" { password, _ = os.LookupEnv(params.PasswordEnv) diff --git a/orchestrator/src/generate.go b/orchestrator/src/generate.go index bc1e135..bb2ac01 100644 --- a/orchestrator/src/generate.go +++ b/orchestrator/src/generate.go @@ -9,8 +9,6 @@ import ( "sort" "strconv" "strings" - - "gorm.io/datatypes" ) type GenParams struct { @@ -26,7 +24,7 @@ type GenParams struct { RotationKeys, RotationServers []string RotationPermutation bool RotationRatio float64 - MixMaxCostTpsRatio float64 + MaxCostMixedTpsRatio float64 LargePauseEveryNRounds, LargePauseMin int MinBalanceChange, MaxBalanceChange, DeploymentFee uint64 PaymentAmount, MinZkappFee, MaxZkappFee, FundFee uint64 @@ -34,20 +32,12 @@ type GenParams struct { ZkappSoftLimit int } -func (p *GenParams) ToJSON() (datatypes.JSON, error) { - data, err := json.Marshal(p) - if err != nil { - return nil, fmt.Errorf("Failed to marshal setup to JSON: %v", err) - } - return data, nil -} - func DefaultGenParams() GenParams { return GenParams{ MinTps: 0.01, BaseTps: 0.3, StressTps: 1, - SenderRatio: 0.5, + SenderRatio: 1, ZkappRatio: 0.5, NewAccountRatio: 0, StopCleanRatio: 0.1, @@ -56,7 +46,7 @@ func DefaultGenParams() GenParams { RoundDurationMin: 30, PauseMin: 15, Rounds: 4, - StopsPerRound: 2, + StopsPerRound: 0, Gap: 180, SendFromNonBpsOnly: false, StopOnlyBps: false, @@ -66,14 +56,14 @@ func DefaultGenParams() GenParams { PasswordEnv: "", FundKeyPrefix: "./fund_keys", Privkeys: []string{}, - PaymentReceiver: "B62qn7v4x5g3Z1h8k2j6f9c5z5v5v5v5v5v5v5v5v5v5v5v5v5", - PrivkeysPerFundCmd: 1, + PaymentReceiver: "B62qiy32p8kAKnny8ZFwoMhYpBppM1DWVCqAPBYNcXnsAHhnfAAuXgg", + PrivkeysPerFundCmd: 2, GenerateFundKeys: 20, RotationKeys: []string{}, RotationServers: []string{}, RotationPermutation: false, RotationRatio: 0.3, - MixMaxCostTpsRatio: 0.0, + MaxCostMixedTpsRatio: 0.0, LargePauseEveryNRounds: 8, LargePauseMin: 0, MinBalanceChange: 0, @@ -103,6 +93,7 @@ type GeneratedRound struct { Commands []GeneratedCommand PaymentFundCommand *FundParams ZkappFundCommand *FundParams + RoundInfo RoundInfo } func withComment(comment string, cmd GeneratedCommand) GeneratedCommand { @@ -312,16 +303,43 @@ func except(groupRef int, groupName string, exceptRef int, exceptName string) Ge Except: except, }} } + +func roundInfo(paymentParams PaymentSubParams, zkappParams ZkappSubParams, onlyPayments bool, onlyZkapps bool, roundDurationMin int) RoundInfo { + // Calculate round information + var paymentCount, zkappCount int + var paymentTps, zkappTps_ float64 + var maxCost_ bool + + if !onlyZkapps { + paymentCount = int(paymentParams.Tps * float64(paymentParams.DurationMin) * 60) + paymentTps = paymentParams.Tps + } + if !onlyPayments { + zkappCount = int(zkappParams.Tps * float64(zkappParams.DurationMin) * 60) + zkappTps_ = zkappParams.Tps + maxCost_ = zkappParams.MaxCost + } + + return RoundInfo{ + PaymentCount: paymentCount, + ZkappCount: zkappCount, + PaymentTps: paymentTps, + ZkappTps: zkappTps_, + DurationMinutes: roundDurationMin, + MaxCost: maxCost_, + } +} + func (p *GenParams) Generate(round int) GeneratedRound { zkappsKeysDir := fmt.Sprintf("%s/%s/round-%d/zkapps", p.FundKeyPrefix, p.ExperimentName, round) paymentsKeysDir := fmt.Sprintf("%s/%s/round-%d/payments", p.FundKeyPrefix, p.ExperimentName, round) tps := SampleTps(p.BaseTps, p.StressTps) maxCost := p.MaxCost zkappRatio := p.ZkappRatio - if p.MixMaxCostTpsRatio > 1e-3 && (round&1) == 1 { + if p.MaxCostMixedTpsRatio > 1e-3 && (round&1) == 1 { maxCost = true zkappRatio = 1 - tps *= p.MixMaxCostTpsRatio + tps *= p.MaxCostMixedTpsRatio } experimentName := fmt.Sprintf("%s-%d", p.ExperimentName, round) onlyZkapps := math.Abs(1-zkappRatio) < 1e-3 @@ -470,7 +488,8 @@ func (p *GenParams) Generate(round int) GeneratedRound { cmds = append(cmds, withComment(comment3, waitMin(p.LargePauseMin))) } } - res := GeneratedRound{Commands: cmds} + + res := GeneratedRound{Commands: cmds, RoundInfo: roundInfo(paymentParams, zkappParams, onlyPayments, onlyZkapps, p.RoundDurationMin)} if !onlyPayments { _, _, _, initBalance := ZkappBalanceRequirements(zkappTps, zkappParams) zkappKeysNum, zkappAmount := ZkappKeygenRequirements(initBalance, zkappParams) diff --git a/orchestrator/src/generate_test.go b/orchestrator/src/generate_test.go index ff82080..0c16046 100644 --- a/orchestrator/src/generate_test.go +++ b/orchestrator/src/generate_test.go @@ -26,7 +26,7 @@ func someParams() GenParams { PaymentReceiver: "B62qpPita1s7Dbnr7MVb3UK8fdssZixL1a4536aeMYxbTJEtRGGyS8U", PrivkeysPerFundCmd: 2, GenerateFundKeys: 20, - MixMaxCostTpsRatio: 0.7, + MaxCostMixedTpsRatio: 0.7, LargePauseEveryNRounds: 8, LargePauseMin: 240, MinBalanceChange: 1e3, diff --git a/orchestrator/src/generate_validation.go b/orchestrator/src/generate_validation.go index 3fbda19..9f31eb4 100644 --- a/orchestrator/src/generate_validation.go +++ b/orchestrator/src/generate_validation.go @@ -41,12 +41,12 @@ func ValidationSteps(p *GenParams) []ValidationStep { simpleRangeCheck(p.MinStopRatio, "min stop ratio"), simpleRangeCheck(p.MaxStopRatio, "max stop ratio"), simpleRangeCheck(p.StopCleanRatio, "stop-clean ratio"), - simpleRangeCheck(p.MixMaxCostTpsRatio, "max-cost-mixed ratio"), + simpleRangeCheck(p.MaxCostMixedTpsRatio, "max-cost-mixed ratio"), simpleRangeCheck(p.RotationRatio, "rotation ratio"), { ErrorMsg: "both max-cost-mixed and max-cost specified", Check: func(p *GenParams) bool { - return p.MaxCost && p.MixMaxCostTpsRatio > 1e-3 + return p.MaxCost && p.MaxCostMixedTpsRatio > 1e-3 }, ExitCode: 2, }, diff --git a/orchestrator/src/generator/main.go b/orchestrator/src/generator/main.go index 8a5107b..3ea96c9 100644 --- a/orchestrator/src/generator/main.go +++ b/orchestrator/src/generator/main.go @@ -1,7 +1,6 @@ package main import ( - "encoding/json" "flag" "fmt" "os" @@ -10,7 +9,7 @@ import ( lib "itn_orchestrator" ) -const mixMaxCostTpsRatioHelp = "when provided, specifies ratio of tps (proportional to total tps) for max cost transactions to be used every other round, zkapps ratio for these rounds is set to 100%" +const maxCostMixedTpsRatioHelp = "when provided, specifies ratio of tps (proportional to total tps) for max cost transactions to be used every other round, zkapps ratio for these rounds is set to 100%" func main() { var rotateKeys, rotateServers string @@ -50,7 +49,7 @@ func main() { flag.BoolVar(&p.RotationPermutation, "rotate-permutation", defaults.RotationPermutation, "Whether to generate only permutation mappings for rotation") flag.IntVar(&p.LargePauseMin, "large-pause", defaults.LargePauseMin, "duration of the large pause, minutes") flag.IntVar(&p.LargePauseEveryNRounds, "large-pause-every", defaults.LargePauseEveryNRounds, "number of rounds in between large pauses") - flag.Float64Var(&p.MixMaxCostTpsRatio, "max-cost-mixed", defaults.MixMaxCostTpsRatio, mixMaxCostTpsRatioHelp) + flag.Float64Var(&p.MaxCostMixedTpsRatio, "max-cost-mixed", defaults.MaxCostMixedTpsRatio, maxCostMixedTpsRatioHelp) flag.Uint64Var(&p.MaxBalanceChange, "max-balance-change", defaults.MaxBalanceChange, "Max balance change for zkapp account update") flag.Uint64Var(&p.MinBalanceChange, "min-balance-change", defaults.MinBalanceChange, "Min balance change for zkapp account update") flag.Uint64Var(&p.DeploymentFee, "deployment-fee", defaults.DeploymentFee, "Zkapp deployment fee") @@ -91,23 +90,8 @@ func main() { os.Exit(1) } - encoder := json.NewEncoder(os.Stdout) - writeComment := func(comment string) { - if err := encoder.Encode(comment); err != nil { - fmt.Fprintf(os.Stderr, "Error writing comment: %v\n", err) - os.Exit(3) - } - } - - writeCommand := func(cmd lib.GeneratedCommand) { - comment := cmd.Comment() - if comment != "" { - writeComment(comment) - } - if err := encoder.Encode(cmd); err != nil { - fmt.Fprintf(os.Stderr, "Error writing command: %v\n", err) - os.Exit(3) - } + if _, err := lib.EncodeToWriter(&p, os.Stdout, nil); err != nil { + fmt.Fprintf(os.Stderr, "Error encoding: %v\n", err) + os.Exit(3) } - lib.Encode(&p, writeCommand, writeComment) } diff --git a/orchestrator/src/itn_orchestrator/main.go b/orchestrator/src/itn_orchestrator/main.go index 13a0838..7964164 100644 --- a/orchestrator/src/itn_orchestrator/main.go +++ b/orchestrator/src/itn_orchestrator/main.go @@ -23,39 +23,9 @@ func run(configFilename string) error { log := logging.Logger("itn orchestrator") log.Infof("Launching logging: %v", logging.GetSubsystems()) config := lib.SetupConfig(context.Background(), orchestratorConfig, log) - outCache := lib.EmptyOutputCache() - rconfig := lib.ResolutionConfig{ - OutputCache: outCache, - } inDecoder := json.NewDecoder(os.Stdin) - step := 0 - var prevAction lib.BatchAction - var actionAccum []lib.ActionIO - handlePrevAction := func() error { - log.Infof("Performing steps %s (%d-%d)", prevAction.Name(), step, len(actionAccum)-step) - err := prevAction.RunMany(config, actionAccum) - if err != nil { - return &lib.OrchestratorError{ - Message: fmt.Sprintf("Error running steps %d-%d: %v", step, len(actionAccum)-step, err), - Code: 9, - } - } - prevAction = nil - actionAccum = nil - return nil - } - lib.RunActions(inDecoder, config, outCache, log, step, - handlePrevAction, &actionAccum, rconfig, &prevAction) - if prevAction != nil { - if err := handlePrevAction(); err != nil { - return &lib.OrchestratorError{ - Message: fmt.Sprintf("Error running previous action: %v", err), - Code: 9, - } - } - } - return nil + return lib.RunExperiment(inDecoder, config, log) } func main() { diff --git a/orchestrator/src/misc.go b/orchestrator/src/misc.go index c8ca4fd..042342c 100644 --- a/orchestrator/src/misc.go +++ b/orchestrator/src/misc.go @@ -204,8 +204,19 @@ func (SampleAction) Name() string { return "sample" } var _ Action = SampleAction{} func selectNodes(tps, minTps float64, nodes []NodeAddress) (float64, []NodeAddress) { + if len(nodes) == 0 { + return tps, nodes // Return empty slice if no nodes available + } + nodesF := math.Floor(tps / minTps) nodesMax := int(nodesF) + + // Ensure we always select at least one node if nodes are available + if nodesMax == 0 && len(nodes) > 0 { + nodesMax = 1 + nodesF = 1.0 + } + if nodesMax >= len(nodes) { return tps / float64(len(nodes)), nodes } diff --git a/orchestrator/src/orchestrator.go b/orchestrator/src/orchestrator.go index 31b8f79..1b627af 100644 --- a/orchestrator/src/orchestrator.go +++ b/orchestrator/src/orchestrator.go @@ -246,7 +246,7 @@ func EmptyOutputCache() outCacheT { } func RunActions(inDecoder *json.Decoder, config Config, outCache outCacheT, log logging.StandardLogger, step int, - handlePrevAction func() error, actionAccum *[]ActionIO, rconfig ResolutionConfig, prevAction *BatchAction) error { + handlePrevAction func() error, actionAccum *[]ActionIO, rconfig ResolutionConfig, prevAction *BatchAction, preBatchComments *[]string, postBatchComments *[]string, batchStartStep *int) error { for { select { @@ -267,8 +267,14 @@ func RunActions(inDecoder *json.Decoder, config Config, outCache outCacheT, log break } if commandOrComment.command == nil { - log.Info(commandOrComment.comment) - fmt.Fprintln(os.Stderr, commandOrComment.comment) + // Accumulate comments based on current state + if *prevAction == nil { + // No current batch - these comments go to pre-batch + *preBatchComments = append(*preBatchComments, commandOrComment.comment) + } else { + // Current batch exists - these comments go to post-batch + *postBatchComments = append(*postBatchComments, commandOrComment.comment) + } continue } cmd := *commandOrComment.command @@ -298,12 +304,22 @@ func RunActions(inDecoder *json.Decoder, config Config, outCache outCacheT, log Code: 1, } } + // If this is the start of a new batch, record the starting step + if len(*actionAccum) == 0 { + *batchStartStep = step + } *prevAction = batchAction *actionAccum = append(*actionAccum, ActionIO{ Params: params, Output: outputF(outCache, log, step), }) } else { + // Print accumulated comments before executing immediate action + for _, comment := range *preBatchComments { + printComment(comment, log) + } + *preBatchComments = nil + log.Infof("Performing step %s (%d)", cmd.Action, step) err = action.Run(config, params, outputF(outCache, log, step)) if err != nil { diff --git a/orchestrator/src/orchestrator_service/cancel_handler.go b/orchestrator/src/orchestrator_service/cancel_handler.go new file mode 100644 index 0000000..fca29b8 --- /dev/null +++ b/orchestrator/src/orchestrator_service/cancel_handler.go @@ -0,0 +1,36 @@ +package main + +import ( + "net/http" + + service "itn_orchestrator/service" +) + +// CancelHandler handles experiment cancellation requests +type CancelHandler struct { + Store *service.Store +} + +// Handle processes the cancel request with well-typed input/output +// This function cancels the currently running experiment by stopping its execution +// and updating its status to cancelled. +func (h *CancelHandler) Handle() error { + if err := h.Store.Cancel(); err != nil { + return err + } + return nil +} + +// ServeHTTP implements the http.Handler interface +func (h *CancelHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if err := h.Handle(); err != nil { + writeResponse(w, http.StatusConflict, APIResponse{ + Errors: []string{err.Error()}, + Result: "error", + }) + return + } + writeResponse(w, http.StatusOK, APIResponse{ + Result: "canceled", + }) +} diff --git a/orchestrator/src/orchestrator_service/common.go b/orchestrator/src/orchestrator_service/common.go new file mode 100644 index 0000000..c603f4a --- /dev/null +++ b/orchestrator/src/orchestrator_service/common.go @@ -0,0 +1,82 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + service_inputs "itn_orchestrator/service/inputs" +) + +// APIResponse represents the standard API response format +type APIResponse struct { + Errors []string `json:"errors,omitempty"` + Result string `json:"result,omitempty"` +} + +// writeResponse writes a unified response with the given status code and APIResponse +func writeResponse(w http.ResponseWriter, statusCode int, resp APIResponse) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +// writeJSONResponse writes a JSON response with the given data (for non-APIResponse data) +func writeJSONResponse(w http.ResponseWriter, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(struct { + Result interface{} `json:"result,omitempty"` + }{Result: data}); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +// validateContentLength validates the request content length +func validateContentLength(r *http.Request, maxSize int64) error { + if r.ContentLength > maxSize { + return fmt.Errorf("request body too large: %d bytes (max %d)", r.ContentLength, maxSize) + } + return nil +} + +// parseExperimentSetup parses the experiment setup from request body +func parseExperimentSetup(r *http.Request) (*service_inputs.GeneratorInputData, error) { + // Limit request body size to prevent abuse + const maxRequestSize = 1024 * 1024 // 1MB + if err := validateContentLength(r, maxRequestSize); err != nil { + return nil, err + } + + var experimentSetup service_inputs.GeneratorInputData + limitedReader := io.LimitReader(r.Body, maxRequestSize) + if err := json.NewDecoder(limitedReader).Decode(&experimentSetup); err != nil { + return nil, fmt.Errorf("failed to decode request body: %v", err) + } + + return &experimentSetup, nil +} + +// Legacy API response functions for backward compatibility +func ValidationError(validationErrors []string, w http.ResponseWriter) { + writeResponse(w, http.StatusBadRequest, APIResponse{ + Errors: validationErrors, + Result: "invalid", + }) +} + +func Error(errors []string, w http.ResponseWriter) { + writeResponse(w, http.StatusBadRequest, APIResponse{ + Errors: errors, + Result: "error", + }) +} + +func Success(w http.ResponseWriter) { + writeResponse(w, http.StatusOK, APIResponse{ + Result: "success", + }) +} diff --git a/orchestrator/src/orchestrator_service/create_experiment_handler.go b/orchestrator/src/orchestrator_service/create_experiment_handler.go new file mode 100644 index 0000000..a614d66 --- /dev/null +++ b/orchestrator/src/orchestrator_service/create_experiment_handler.go @@ -0,0 +1,128 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + logging "github.com/ipfs/go-log/v2" + + lib "itn_orchestrator" + service "itn_orchestrator/service" + service_inputs "itn_orchestrator/service/inputs" +) + +// CreateExperimentHandler handles experiment creation requests +type CreateExperimentHandler struct { + Store *service.Store + Config *lib.OrchestratorConfig + App *App +} + +// Handle processes the create experiment request with well-typed input/output +// This function creates a new experiment based on the provided setup parameters, +// validates the input, generates the experiment configuration, and starts the experiment execution. +// Returns (statusCode, errors) where statusCode indicates the type of response. +func (h *CreateExperimentHandler) Handle(setup *service_inputs.GeneratorInputData) (int, []string, *InfoExperimentResponse) { + var p lib.GenParams + setup.ApplyWithDefaults(&p) + + validationErrors := lib.ValidateAndCollectErrors(&p) + if len(validationErrors) > 0 { + return http.StatusBadRequest, validationErrors, nil + } + + var experimentScript string + var experimentInfo lib.ExperimentInfo + { + var result strings.Builder + var err error + if experimentInfo, err = lib.EncodeToWriter(&p, &result, setup); err != nil { + return http.StatusInternalServerError, []string{err.Error()}, nil + } + experimentScript = result.String() + } + + var webhookURL string + if setup.WebhookURL != nil { + webhookURL = *setup.WebhookURL + } + + job := &service.ExperimentState{ + Name: *setup.ExperimentName, + Status: "running", + CreatedAt: time.Now(), + Setup: p, + WebhookURL: webhookURL, + } + + ctx, cancel := context.WithCancel(context.Background()) + + orchestratorConfig := *h.Config + log := service.StoreLogging{Store: h.Store, Log: logging.Logger("orchestrator")} + config := lib.SetupConfig(ctx, orchestratorConfig, log) + + if err := h.Store.Add(job, cancel); err != nil { + return http.StatusConflict, []string{fmt.Sprintf("failed to add experiment: %v", err)}, nil + } + + if err := h.Store.WriteExperimentToDB(*job); err != nil { + return http.StatusInternalServerError, []string{fmt.Sprintf("failed to write experiment to database: %v", err)}, nil + } + + decoder := json.NewDecoder(strings.NewReader(experimentScript)) + go h.App.loadRun(decoder, config, log) + + return http.StatusOK, []string{}, &InfoExperimentResponse{ + Setup: p, + Rounds: experimentInfo, + Script: experimentScript, + } +} + +// ServeHTTP implements the http.Handler interface +func (h *CreateExperimentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + experimentSetup, err := parseExperimentSetup(r) + if err != nil { + writeResponse(w, http.StatusBadRequest, APIResponse{ + Errors: []string{err.Error()}, + Result: "error", + }) + return + } + if !h.Store.NameIsUnique(*experimentSetup.ExperimentName) { + writeResponse(w, http.StatusBadRequest, APIResponse{ + Errors: []string{"experiment with the same name already exists"}, + Result: "error", + }) + return + } + + statusCode, errors, info := h.Handle(experimentSetup) + + // Determine result based on status code + var result string + switch statusCode { + case http.StatusOK: + { + writeJSONResponse(w, info) + return + } + case http.StatusBadRequest: + // Check if it's validation errors (from Handle method) + if len(errors) > 0 { + result = "invalid" + } else { + result = "error" + } + default: + result = "error" + } + writeResponse(w, statusCode, APIResponse{ + Errors: errors, + Result: result, + }) +} diff --git a/orchestrator/src/orchestrator_service/info_experiment_handler.go b/orchestrator/src/orchestrator_service/info_experiment_handler.go new file mode 100644 index 0000000..5d4c06b --- /dev/null +++ b/orchestrator/src/orchestrator_service/info_experiment_handler.go @@ -0,0 +1,95 @@ +package main + +import ( + "fmt" + "net/http" + "strings" + + lib "itn_orchestrator" + service "itn_orchestrator/service" + service_inputs "itn_orchestrator/service/inputs" +) + +// InfoExperimentHandler handles experiment info requests +type InfoExperimentHandler struct { + Store *service.Store +} + +// InfoExperimentResponse represents the response for experiment info endpoint +type InfoExperimentResponse struct { + Setup lib.GenParams `json:"setup"` + Rounds []lib.RoundInfo `json:"rounds"` + Script string `json:"script"` +} + +// Handle processes the experiment info request with well-typed input/output +// This function validates the experiment setup parameters and returns detailed information +// about the experiment configuration including setup JSON and round information. +func (h *InfoExperimentHandler) Handle(setup *service_inputs.GeneratorInputData) (*InfoExperimentResponse, error) { + var p lib.GenParams + setup.ApplyWithDefaults(&p) + + validationErrors := lib.ValidateAndCollectErrors(&p) + if len(validationErrors) > 0 { + return nil, fmt.Errorf("validation failed: %v", validationErrors) + } + + // Get experiment info directly from EncodeToWriter + var result strings.Builder + experimentInfo, err := lib.EncodeToWriter(&p, &result, setup) + if err != nil { + return nil, fmt.Errorf("encoding errors: %v", err) + } + + return &InfoExperimentResponse{ + Setup: p, + Rounds: experimentInfo, + Script: result.String(), + }, nil +} + +// ServeHTTP implements the http.Handler interface +func (h *InfoExperimentHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + experimentSetup, err := parseExperimentSetup(r) + if err != nil { + writeResponse(w, http.StatusBadRequest, APIResponse{ + Errors: []string{err.Error()}, + Result: "error", + }) + return + } + + if expName := *experimentSetup.ExperimentName; !h.Store.NameIsUnique(expName) { + writeResponse(w, http.StatusBadRequest, APIResponse{ + Errors: []string{ + fmt.Sprintf("experiment with name %s already exists", expName), + }, + Result: "error", + }) + return + } + + response, err := h.Handle(experimentSetup) + if err != nil { + // Check if it's a validation error + if strings.Contains(err.Error(), "validation failed") { + // Extract validation errors from the error message + errorMsg := err.Error() + if strings.Contains(errorMsg, "validation failed: ") { + validationErrorsStr := strings.TrimPrefix(errorMsg, "validation failed: ") + writeResponse(w, http.StatusBadRequest, APIResponse{ + Errors: []string{validationErrorsStr}, + Result: "invalid", + }) + return + } + } + writeResponse(w, http.StatusBadRequest, APIResponse{ + Errors: []string{err.Error()}, + Result: "error", + }) + return + } + + writeJSONResponse(w, response) +} diff --git a/orchestrator/src/orchestrator_service/main.go b/orchestrator/src/orchestrator_service/main.go index 1cc014c..dd082e2 100644 --- a/orchestrator/src/orchestrator_service/main.go +++ b/orchestrator/src/orchestrator_service/main.go @@ -8,10 +8,6 @@ import ( "log" "net/http" "os" - "regexp" - "strconv" - "strings" - "time" "github.com/gorilla/mux" "gorm.io/driver/postgres" @@ -21,26 +17,41 @@ import ( service "itn_orchestrator/service" - service_inputs "itn_orchestrator/service/inputs" - lib "itn_orchestrator" ) // App holds application-wide dependencies type App struct { - Router *mux.Router - Store *service.Store - Config *lib.OrchestratorConfig + Router *mux.Router + Store *service.Store + Config *lib.OrchestratorConfig + WebhookNotifier *WebhookNotifier } func (a *App) initializeRoutes() { log.Println("Registering routes...") - a.Router.HandleFunc("/api/v0/experiment/run", a.createExperimentHandler()).Methods(http.MethodPost) - a.Router.HandleFunc("/api/v0/experiment/test", a.infoExperimentHandler).Methods(http.MethodPost) - a.Router.HandleFunc("/api/v0/experiment/status", a.statusHandler).Methods(http.MethodGet) - a.Router.HandleFunc("/api/v0/experiment/cancel", a.cancelHandler()).Methods(http.MethodPost) + // Initialize handlers + createHandler := &CreateExperimentHandler{ + Store: a.Store, + Config: a.Config, + App: a, + } + infoHandler := &InfoExperimentHandler{ + Store: a.Store, + } + statusHandler := &StatusHandler{ + Store: a.Store, + } + cancelHandler := &CancelHandler{ + Store: a.Store, + } + // Register routes with new handlers + a.Router.Handle("/api/v0/experiment/run", createHandler).Methods(http.MethodPost) + a.Router.Handle("/api/v0/experiment/test", infoHandler).Methods(http.MethodPost) + a.Router.Handle("/api/v0/experiment/status", statusHandler).Methods(http.MethodGet) + a.Router.Handle("/api/v0/experiment/cancel", cancelHandler).Methods(http.MethodPost) } // Initialize opens the DB and sets up routes @@ -58,8 +69,9 @@ func (a *App) Initialize(connStr string, config lib.OrchestratorConfig) { log.Fatalf("Cannot connect to DB: %v", err) } a.Router = mux.NewRouter() - a.Store = &service.Store{DB: db} + a.Store = service.NewStore(db) a.Config = &config + a.WebhookNotifier = NewWebhookNotifier(logging.Logger("webhook")) a.initializeRoutes() } @@ -73,357 +85,37 @@ func (a *App) Run(address string) { } } -func (a *App) infoExperimentHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - - experimentSetup, err := a.GetExperimentSetup(*r) - - if err != nil { - Error([]string{err.Error()}, w) - return - } - - var p lib.GenParams - experimentSetup.ApplyWithDefaults(&p) - - validationErrors := lib.ValidateAndCollectErrors(&p) - - if len(validationErrors) > 0 { - ValidationError(validationErrors, w) - return - } - - var errors []string - - type Round struct { - No int - PaymentsRate float64 - ZkappRate float64 - } - - var rounds []Round - var result strings.Builder - - encoder := json.NewEncoder(&result) - writeComment := func(comment string) { - if err := encoder.Encode(comment); err != nil { - errors = append(errors, fmt.Sprintf("Error writing comment: %v", err)) - } - } - writeCommand := func(cmd lib.GeneratedCommand) { - comment := cmd.Comment() - if comment != "" { - writeComment(comment) - } - if err := encoder.Encode(cmd); err != nil { - errors = append(errors, fmt.Sprintf("Error writing command: %v", err)) - } - } - - if len(errors) > 0 { - Error(errors, w) - return - } - - lib.Encode(&p, writeCommand, writeComment) - - setup_json, err := p.ToJSON() - if err != nil { - Error([]string{fmt.Sprintf("Error converting to JSON: %v", err)}, w) - return - } - - for _, line := range strings.Split(result.String(), "\n") { - - re := regexp.MustCompile(`Starting round (\d), .*`) - - if re.MatchString(line) { - m := re.FindStringSubmatch(line) - if len(m) == 2 { - rounds = append(rounds, Round{ - No: func() int { - no, err := strconv.Atoi(m[1]) - if err != nil { - errors = append(errors, fmt.Sprintf("Error parsing round number: %v", err)) - return 0 - } - return no - }(), - }) - } - } - - re = regexp.MustCompile(`\b\d+\s+(zkapp|payments)\b.*?\(([\d.]+)\s*txs\/min\)`) - - if re.MatchString(line) { - m := re.FindStringSubmatch(line) - if len(m) == 3 { - round := &rounds[len(rounds)-1] - rate, err := strconv.ParseFloat(m[2], 64) - if err != nil { - errors = append(errors, fmt.Sprintf("Error parsing rate: %v", err)) - return - } - switch m[1] { - case "zkapp": - round.ZkappRate = rate - case "payments": - round.PaymentsRate = rate - } - } - } - } - - if len(errors) > 0 { - Error(errors, w) - return - } - - w.WriteHeader(http.StatusOK) - if err := json.NewEncoder(w).Encode( - map[string]interface{}{ - "setup": setup_json, - "rounds": rounds, - }, - ); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - func (a *App) loadRun(inDecoder *json.Decoder, config lib.Config, log logging.StandardLogger) { - - outCache := lib.EmptyOutputCache() - rconfig := lib.ResolutionConfig{ - OutputCache: outCache, - } - step := 0 - var prevAction lib.BatchAction - var actionAccum []lib.ActionIO - handlePrevAction := func() error { - var start int - if step-len(actionAccum) > 0 { - start = step - len(actionAccum) - } else { - start = 0 - } - var end int - if step-1 > 0 { - end = step - 1 - } else { - end = 0 - } - log.Infof("Performing steps %s (%d-%d)", prevAction.Name(), start, end) - err := prevAction.RunMany(config, actionAccum) - if err != nil { - return &lib.OrchestratorError{ - Message: fmt.Sprintf("Error running steps %d-%d: %v", start, end, err), + if err := lib.RunExperiment(inDecoder, config, log); err != nil { + var orchErr *lib.OrchestratorError + var ok bool + if orchErr, ok = err.(*lib.OrchestratorError); !ok { + errMsg := fmt.Sprintf("Experiment failed: %v", err) + orchErr = &lib.OrchestratorError{ + Message: errMsg, Code: 9, } } - prevAction = nil - actionAccum = nil - return nil - } - err := lib.RunActions(inDecoder, config, outCache, log, step, - handlePrevAction, &actionAccum, rconfig, &prevAction) - if err != nil { - if err, ok := err.(*lib.OrchestratorError); ok { - log.Errorf("Experiment finished with error: %v", err) - a.Store.FinishWithError(err) - return + if experiment := a.Store.FinishWithError(orchErr); experiment.WebhookURL != "" { + // Send error webhook notification + go a.WebhookNotifier.SendErrorNotification( + context.Background(), + experiment.WebhookURL, + experiment.Name, + orchErr.Message, + experiment.Warnings, + ) } - } - - if prevAction != nil { - if err := handlePrevAction(); err != nil { - log.Errorf("Error running action: %s due to: %v", prevAction.Name(), err) - // If context is canceled, we don't want to finish with error - // because it means the user canceled the experiment - if config.Ctx.Err() == nil { - a.Store.FinishWithError(&lib.OrchestratorError{ - Message: fmt.Sprintf("Error running previous action: %v", err), - Code: 9, - }) - } - return - - } - } - a.Store.FinishWithSuccess() - return -} - -// statusHandler returns the current job's status -func (a *App) statusHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - job := a.Store.AtomicGet() - if job == nil { - http.Error(w, "No experiment running", http.StatusNotFound) return } - json.NewEncoder(w).Encode(job) -} - -// cancelHandler stops a running job -func (a *App) cancelHandler() http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if err := a.Store.Cancel(); err != nil { - http.Error(w, err.Error(), http.StatusConflict) - return - } - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]string{"result": "canceled"}) - } -} - -type APIResponse struct { - Errors []string `json:"errors,omitempty"` - ValidationErrors []string `json:"validation_errors,omitempty"` - Result string `json:"result,omitempty"` -} - -func ValidationError(validationErrors []string, w http.ResponseWriter) { - w.WriteHeader(http.StatusBadRequest) - if err := json.NewEncoder(w).Encode( - APIResponse{ - Errors: []string{}, - ValidationErrors: validationErrors, - Result: "validation_failed", - }, - ); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -func Error(errors []string, w http.ResponseWriter) { - w.WriteHeader(http.StatusBadRequest) - if err := json.NewEncoder(w).Encode( - APIResponse{ - Errors: errors, - ValidationErrors: []string{}, - Result: "error", - }, - ); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -func Success(w http.ResponseWriter) { - w.WriteHeader(http.StatusOK) - if err := json.NewEncoder(w).Encode( - APIResponse{ - Errors: []string{}, - ValidationErrors: []string{}, - Result: "success", - }, - ); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -func (a *App) GetExperimentSetup(r http.Request) (*service_inputs.GeneratorInputData, error) { - - var input service_inputs.Input - if err := json.NewDecoder(r.Body).Decode(&input); err != nil { - return nil, fmt.Errorf("Failed to decode request body: %v", err) - } - - experimentSetup := input.ExperimentSetup - - if input.ExperimentSetup == nil { - return nil, fmt.Errorf("Experiment setup is required") - } - - if !a.Store.CheckExperimentIsUnique(*experimentSetup.ExperimentName) { - return nil, fmt.Errorf("Experiment with the same name already exists") - } - return experimentSetup, nil -} - -func (a *App) createExperimentHandler() func(w http.ResponseWriter, r *http.Request) { - - return func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - - var input service_inputs.Input - - experimentSetup, err := a.GetExperimentSetup(*r) - - if err != nil { - Error([]string{err.Error()}, w) - return - } - - var p lib.GenParams - experimentSetup.ApplyWithDefaults(&p) - - validationErrors := lib.ValidateAndCollectErrors(&p) - - if len(validationErrors) > 0 { - ValidationError(validationErrors, w) - return - } - - var errors []string - var result strings.Builder - - encoder := json.NewEncoder(&result) - writeComment := func(comment string) { - if err := encoder.Encode(comment); err != nil { - errors = append(errors, fmt.Sprintf("Error writing comment: %v", err)) - } - } - writeCommand := func(cmd lib.GeneratedCommand) { - comment := cmd.Comment() - if comment != "" { - writeComment(comment) - } - if err := encoder.Encode(cmd); err != nil { - errors = append(errors, fmt.Sprintf("Error writing command: %v", err)) - } - } - - if len(errors) > 0 { - Error(errors, w) - return - } - - lib.Encode(&p, writeCommand, writeComment) - - setup_json, err := p.ToJSON() - if err != nil { - Error([]string{fmt.Sprintf("Error converting to JSON: %v", err)}, w) - return - } - - job := &service.ExperimentState{Name: *experimentSetup.ExperimentName, Status: "running", CreatedAt: time.Now(), - SetupJSON: setup_json, - } - - ctx, cancel := context.WithCancel(context.Background()) - - orchestratorConfig := input.GetOrchestratorConfig(a.Config) - - log := service.StoreLogging{Store: a.Store, Log: logging.Logger("orchestrator")} - config := lib.SetupConfig(ctx, orchestratorConfig, log) - - if err := a.Store.Add(job, cancel); err != nil { - http.Error(w, err.Error(), http.StatusConflict) - return - } - - if err := a.Store.WriteExperimentToDB(*job); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - decoder := json.NewDecoder(strings.NewReader(result.String())) - - println("Starting experiment with setup: ", result.String()) - - go a.loadRun(decoder, config, log) - - Success(w) + if experiment := a.Store.FinishWithSuccess(); experiment.WebhookURL != "" { + // Send success webhook notification + go a.WebhookNotifier.SendSuccessNotification( + context.Background(), + experiment.WebhookURL, + experiment.Name, + experiment.Warnings, + ) } } diff --git a/orchestrator/src/orchestrator_service/orchestrator_service b/orchestrator/src/orchestrator_service/orchestrator_service new file mode 100755 index 0000000..7be09ae Binary files /dev/null and b/orchestrator/src/orchestrator_service/orchestrator_service differ diff --git a/orchestrator/src/orchestrator_service/status_handler.go b/orchestrator/src/orchestrator_service/status_handler.go new file mode 100644 index 0000000..7941dc9 --- /dev/null +++ b/orchestrator/src/orchestrator_service/status_handler.go @@ -0,0 +1,37 @@ +package main + +import ( + "fmt" + "net/http" + + service "itn_orchestrator/service" +) + +// StatusHandler handles experiment status requests +type StatusHandler struct { + Store *service.Store +} + +// Handle processes the status request with well-typed input/output +// This function returns the current status of the running experiment, +// including all experiment details, current step, logs, errors, and warnings. +func (h *StatusHandler) Handle() (*service.ExperimentState, error) { + job := h.Store.AtomicGet() + if job == nil { + return nil, fmt.Errorf("no experiment running") + } + return job, nil +} + +// ServeHTTP implements the http.Handler interface +func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + job, err := h.Handle() + if err != nil { + writeResponse(w, http.StatusNotFound, APIResponse{ + Errors: []string{err.Error()}, + Result: "error", + }) + return + } + writeJSONResponse(w, job) +} diff --git a/orchestrator/src/orchestrator_service/webhook.go b/orchestrator/src/orchestrator_service/webhook.go new file mode 100644 index 0000000..13ee0a0 --- /dev/null +++ b/orchestrator/src/orchestrator_service/webhook.go @@ -0,0 +1,98 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + logging "github.com/ipfs/go-log/v2" +) + +// WebhookPayload represents the data sent to the webhook endpoint +type WebhookPayload struct { + ExperimentName string `json:"experiment_name"` + Success bool `json:"success"` + Error string `json:"error,omitempty"` + Warnings []string `json:"warnings,omitempty"` +} + +// WebhookNotifier handles webhook notifications +type WebhookNotifier struct { + client *http.Client + log logging.StandardLogger +} + +// NewWebhookNotifier creates a new webhook notifier +func NewWebhookNotifier(log logging.StandardLogger) *WebhookNotifier { + return &WebhookNotifier{ + client: &http.Client{ + Timeout: 30 * time.Second, + }, + log: log, + } +} + +// SendNotification sends a webhook notification to the specified URL +func (w *WebhookNotifier) SendNotification(ctx context.Context, webhookURL string, payload WebhookPayload) error { + if webhookURL == "" { + return nil // No webhook URL provided, skip notification + } + + w.log.Infof("Sending webhook notification to %s for experiment %s", webhookURL, payload.ExperimentName) + + jsonPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal webhook payload: %v", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", webhookURL, bytes.NewBuffer(jsonPayload)) + if err != nil { + return fmt.Errorf("failed to create webhook request: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "mina-orchestrator/1.0") + + resp, err := w.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send webhook request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("webhook returned non-success status code: %d", resp.StatusCode) + } + + w.log.Infof("Successfully sent webhook notification to %s for experiment %s", webhookURL, payload.ExperimentName) + return nil +} + +// SendSuccessNotification sends a success notification +func (w *WebhookNotifier) SendSuccessNotification(ctx context.Context, webhookURL, experimentName string, warnings []string) { + payload := WebhookPayload{ + ExperimentName: experimentName, + Success: true, + Warnings: warnings, + } + + if err := w.SendNotification(ctx, webhookURL, payload); err != nil { + w.log.Errorf("Failed to send success webhook notification: %v", err) + } +} + +// SendErrorNotification sends an error notification +func (w *WebhookNotifier) SendErrorNotification(ctx context.Context, webhookURL, experimentName, errorMessage string, warnings []string) { + payload := WebhookPayload{ + ExperimentName: experimentName, + Success: false, + Error: errorMessage, + Warnings: warnings, + } + + if err := w.SendNotification(ctx, webhookURL, payload); err != nil { + w.log.Errorf("Failed to send error webhook notification: %v", err) + } +} diff --git a/orchestrator/src/payments.go b/orchestrator/src/payments.go index 9267d50..333bdb7 100644 --- a/orchestrator/src/payments.go +++ b/orchestrator/src/payments.go @@ -36,6 +36,12 @@ func PaymentKeygenRequirements(gap int, params PaymentSubParams) (int, uint64) { totalTxs := uint64(math.Ceil(float64(params.DurationMin) * 60 * params.Tps)) balance := 3 * txCost * totalTxs keys := maxParticipants + int(tpsGap)*2 + + // Add funding fees for account creation (1 MINA per account by default) + // This ensures we have enough funds to cover both the account balances AND the creation fees + fundingFees := uint64(keys) * 1e9 // 1 MINA per account creation + balance += fundingFees + return keys, balance } @@ -63,6 +69,10 @@ func schedulePaymentsDo(config Config, params PaymentSubParams, nodeAddress Node func SchedulePayments(config Config, params PaymentParams, output func(ScheduledPaymentsReceipt)) error { tps, nodes := selectNodes(params.Tps, params.MinTps, params.Nodes) + if len(nodes) == 0 { + return fmt.Errorf("no nodes selected for payment execution (tps=%.6f, minTps=%.6f, available nodes=%d)", + params.Tps, params.MinTps, len(params.Nodes)) + } feePayersPerNode := len(params.FeePayers) / len(nodes) successfulNodes := make([]NodeAddress, 0, len(nodes)) remTps := params.Tps diff --git a/orchestrator/src/service/inputs/generator.go b/orchestrator/src/service/inputs/generator.go index 4cb3b53..e41fb1e 100644 --- a/orchestrator/src/service/inputs/generator.go +++ b/orchestrator/src/service/inputs/generator.go @@ -11,7 +11,7 @@ type GeneratorInputData struct { BaseTps *float64 `json:"base_tps,omitempty"` StressTps *float64 `json:"stress_tps,omitempty"` MinTps *float64 `json:"min_tps,omitempty"` - MixMaxCostTpsRatio *float64 `json:"mix_max_cost_tps_ratio,omitempty"` + MaxCostMixedTpsRatio *float64 `json:"max_cost_mixed_tps_ratio,omitempty"` MinStopRatio *float64 `json:"min_stop_ratio,omitempty"` MaxStopRatio *float64 `json:"max_stop_ratio,omitempty"` SenderRatio *float64 `json:"sender_ratio,omitempty"` @@ -28,7 +28,6 @@ type GeneratorInputData struct { StopsPerRound *int `json:"stops_per_round,omitempty"` Gap *int `json:"gap,omitempty"` ZkappSoftLimit *int `json:"zkapp_soft_limit,omitempty"` - Mode *string `json:"mode,omitempty"` FundKeyPrefix *string `json:"fund_key_prefix,omitempty"` PasswordEnv *string `json:"password_env,omitempty"` PaymentReceiver *itn_json_types.MinaPublicKey `json:"payment_receiver,omitempty"` @@ -44,6 +43,7 @@ type GeneratorInputData struct { MaxBalanceChange *uint64 `json:"max_balance_change,omitempty"` MinBalanceChange *uint64 `json:"min_balance_change,omitempty"` PaymentAmount *uint64 `json:"payment_amount,omitempty"` + WebhookURL *string `json:"webhook_url,omitempty"` Privkeys []string `json:"priv_keys,omitempty"` Fees struct { Deployment *uint64 `json:"deployment,omitempty"` @@ -55,8 +55,6 @@ type GeneratorInputData struct { } `json:"fees,omitempty"` } -const mixMaxCostTpsRatioHelp = "when provided, specifies ratio of tps (proportional to total tps) for max cost transactions to be used every other round, zkapps ratio for these rounds is set to 100%" - func (inputData *GeneratorInputData) ApplyWithDefaults(p *lib.GenParams) { var defaults = lib.DefaultGenParams() @@ -67,7 +65,7 @@ func (inputData *GeneratorInputData) ApplyWithDefaults(p *lib.GenParams) { lib.SetOrDefault(inputData.BaseTps, &p.BaseTps, defaults.BaseTps) lib.SetOrDefault(inputData.StressTps, &p.StressTps, defaults.StressTps) lib.SetOrDefault(inputData.MinTps, &p.MinTps, defaults.MinTps) - lib.SetOrDefault(inputData.MixMaxCostTpsRatio, &p.MixMaxCostTpsRatio, defaults.MixMaxCostTpsRatio) + lib.SetOrDefault(inputData.MaxCostMixedTpsRatio, &p.MaxCostMixedTpsRatio, defaults.MaxCostMixedTpsRatio) lib.SetOrDefault(inputData.MinStopRatio, &p.MinStopRatio, defaults.MinStopRatio) lib.SetOrDefault(inputData.MaxStopRatio, &p.MaxStopRatio, defaults.MaxStopRatio) lib.SetOrDefault(inputData.SenderRatio, &p.SenderRatio, defaults.SenderRatio) diff --git a/orchestrator/src/service/inputs/input.go b/orchestrator/src/service/inputs/input.go deleted file mode 100644 index 893010f..0000000 --- a/orchestrator/src/service/inputs/input.go +++ /dev/null @@ -1,28 +0,0 @@ -package inputs - -import lib "itn_orchestrator" - -type Input struct { - ExperimentSetup *GeneratorInputData `json:"experiment_setup"` - OrchestratorConfig *OrchestratorInputConfig `json:"orchestrator_config"` -} - -func (input *Input) GetOrchestratorConfig(defaults *lib.OrchestratorConfig) lib.OrchestratorConfig { - if input.OrchestratorConfig == nil { - return *defaults - } else { - config := lib.OrchestratorConfig{} - - lib.SetOrDefault(&config.LogFile, &input.OrchestratorConfig.LogFile, defaults.LogFile) - lib.SetOrDefault(&config.Key, &input.OrchestratorConfig.Key, defaults.Key) - lib.SetOrDefault(&config.OnlineURL, &input.OrchestratorConfig.OnlineURL, defaults.OnlineURL) - lib.SetOrDefault(&config.FundDaemonPorts, &input.OrchestratorConfig.FundDaemonPorts, defaults.FundDaemonPorts) - lib.SetOrDefault(&config.MinaExec, &input.OrchestratorConfig.MinaExec, defaults.MinaExec) - lib.SetOrDefault(&config.SlotDurationMs, &input.OrchestratorConfig.SlotDurationMs, defaults.SlotDurationMs) - lib.SetOrDefault(&config.GenesisTimestamp, &input.OrchestratorConfig.GenesisTimestamp, defaults.GenesisTimestamp) - lib.SetOrDefault(&config.UrlOverrides, &input.OrchestratorConfig.URLOverrides, defaults.UrlOverrides) - - return config - } - -} diff --git a/orchestrator/src/service/state.go b/orchestrator/src/service/state.go index a39aca4..e612be3 100644 --- a/orchestrator/src/service/state.go +++ b/orchestrator/src/service/state.go @@ -2,6 +2,7 @@ package service import ( "context" + "encoding/json" "fmt" lib "itn_orchestrator" "log" @@ -13,7 +14,6 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/lib/pq" - "gorm.io/datatypes" ) type ExperimentStatus string @@ -36,10 +36,11 @@ type ExperimentState struct { Comment *string `json:"comment,omitempty"` CurrentStepNo int `json:"step"` CurrentStepName string `json:"step_name"` - SetupJSON datatypes.JSON `json:"setup_json"` + Setup lib.GenParams `gorm:"column:setup_json;type:jsonb" json:"setup_json"` Warnings pq.StringArray `gorm:"type:text[]" json:"warnings,omitempty"` Errors pq.StringArray `gorm:"type:text[]" json:"errors,omitempty"` Logs pq.StringArray `gorm:"type:text[]" json:"logs,omitempty"` + WebhookURL string `json:"webhook_url,omitempty"` } func (ExperimentState) TableName() string { @@ -54,16 +55,24 @@ type Store struct { } func NewStore(db *gorm.DB) *Store { + // Auto-migrate the schema + log.Printf("Starting auto-migration for ExperimentState table...") + err := db.AutoMigrate(&ExperimentState{}) + if err != nil { + log.Printf("Error auto-migrating ExperimentState table: %v", err) + } else { + log.Printf("Auto-migration completed successfully") + } return &Store{ DB: db, } } -func (a *Store) CheckExperimentIsUnique(name string) bool { +func (a *Store) NameIsUnique(name string) bool { var count int64 err := a.DB.Where("name = ?", name).Model(&ExperimentState{}).Count(&count).Error if err != nil { - log.Printf("Error checking experiment uniqueness: %v", err) + log.Printf("Error checking experiment uniqueness for name '%s': %v", name, err) return false } return count == 0 @@ -79,17 +88,24 @@ func (a *Store) WriteExperimentToDB(state ExperimentState) error { } func (a *Store) updateExperimentInDB(state *ExperimentState) error { + // Convert Setup to JSON bytes to avoid GORM serialization issues + setupJSON, err := json.Marshal(state.Setup) + if err != nil { + log.Printf("Error marshaling setup JSON: %v", err) + return err + } - err := a.DB.Model(&ExperimentState{}).Where("name = ?", state.Name).Updates(map[string]interface{}{ + err = a.DB.Model(&ExperimentState{}).Where("name = ?", state.Name).Updates(map[string]interface{}{ "updated_at": state.UpdatedAt, "ended_at": state.EndedAt, "status": state.Status, - "setup_json": state.SetupJSON, + "setup_json": string(setupJSON), "current_step_no": state.CurrentStepNo, "current_step_name": state.CurrentStepName, "warnings": state.Warnings, "errors": state.Errors, "logs": state.Logs, + "webhook_url": state.WebhookURL, }).Error if err != nil { log.Printf("Error updating experiment in DB: %v", err) @@ -99,7 +115,7 @@ func (a *Store) updateExperimentInDB(state *ExperimentState) error { } -func (s *Store) AtomicSet(f func(experiment *ExperimentState)) { +func (s *Store) AtomicSet(f func(experiment *ExperimentState)) *ExperimentState { s.mu.Lock() defer s.mu.Unlock() if s.experiment != nil { @@ -115,6 +131,7 @@ func (s *Store) AtomicSet(f func(experiment *ExperimentState)) { log.Printf("Error updating experiment in DB: %v", err) } } + return s.experiment } func (s *Store) AtomicGet() *ExperimentState { @@ -127,8 +144,8 @@ func (s *Store) AtomicGet() *ExperimentState { } // FinishWithError sets the experiment status to "error" and appends the error message -func (s *Store) FinishWithError(err *lib.OrchestratorError) { - s.AtomicSet(func(experiment *ExperimentState) { +func (s *Store) FinishWithError(err *lib.OrchestratorError) *ExperimentState { + return s.AtomicSet(func(experiment *ExperimentState) { experiment.Status = "error" experiment.Errors = append(experiment.Errors, err.Message) experiment.EndedAt = &time.Time{} @@ -136,8 +153,8 @@ func (s *Store) FinishWithError(err *lib.OrchestratorError) { } // FinishWithSuccess sets the experiment status to "success" and marks it as completed -func (s *Store) FinishWithSuccess() { - s.AtomicSet(func(experiment *ExperimentState) { +func (s *Store) FinishWithSuccess() *ExperimentState { + return s.AtomicSet(func(experiment *ExperimentState) { experiment.Status = "success" experiment.EndedAt = &time.Time{} }) diff --git a/orchestrator/src/zkapp.go b/orchestrator/src/zkapp.go index c6d4525..c1399f0 100644 --- a/orchestrator/src/zkapp.go +++ b/orchestrator/src/zkapp.go @@ -70,6 +70,12 @@ func ZkappKeygenRequirements(initZkappBalance uint64, params ZkappSubParams) (in txCost := params.MaxBalanceChange*8 + params.MaxFee totalTxs := uint64(math.Ceil(float64(params.DurationMin) * 60 * params.Tps)) balance := uint64(keys)*zkappsToDeployPerKey*(initZkappBalance+params.DeploymentFee)*2 + 3*txCost*totalTxs + + // Add funding fees for account creation (1 MINA per account by default) + // This ensures we have enough funds to cover both the account balances AND the creation fees + fundingFees := uint64(keys) * 1e9 // 1 MINA per account creation + balance += fundingFees + return keys, balance } @@ -124,6 +130,10 @@ func SendZkappCommands(config Config, params ZkappCommandParams, output func(Sch return errors.New("no nodes specified") } tps, nodes := selectNodes(params.Tps, params.MinTps, params.Nodes) + if len(nodes) == 0 { + return fmt.Errorf("no nodes selected for zkapp execution (tps=%.6f, minTps=%.6f, available nodes=%d)", + params.Tps, params.MinTps, len(params.Nodes)) + } feePayersPerNode := len(params.FeePayers) / len(nodes) successfulNodes := make([]NodeAddress, 0, len(nodes)) remTps := params.Tps