Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions orchestrator/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ bazel-*
user.bazelrc

# make stuff
result/
generator
orchestrator
/result/
/generator
/orchestrator
/orchestrator_service

# Binaries for programs and plugins
*.exe
Expand Down
69 changes: 69 additions & 0 deletions orchestrator/WEBHOOK_USAGE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Webhook Integration for Orchestrator Service

The orchestrator service now supports webhook notifications to notify external systems when experiments complete or fail.

## How to Use

### 1. Include Webhook URL in Experiment Request

When creating an experiment via the `/api/v0/experiment/run` endpoint, include an optional `webhook_url` parameter:

```json
{
"experiment_name": "my-test-experiment",
"webhook_url": "https://your-server.com/webhook/endpoint",
"base_tps": 10.0,
"rounds": 3,
// ... other experiment parameters
}
```

### 2. Webhook Payload Format

When the experiment completes (either successfully or with an error), the orchestrator will send a POST request to your webhook URL with the following JSON payload:

#### Success Notification
```json
{
"experiment_name": "my-test-experiment",
"success": true,
"warnings": ["Optional warning messages"]
}
```

#### Error Notification
```json
{
"experiment_name": "my-test-experiment",
"success": false,
"error": "Detailed error message",
"warnings": ["Optional warning messages"]
}
```

### 3. Webhook Request Headers

The webhook request will include the following headers:
- `Content-Type: application/json`
- `User-Agent: mina-orchestrator/1.0`

### 4. Webhook Endpoint Requirements

Your webhook endpoint should:
- Accept POST requests
- Return HTTP status code 200-299 for successful processing
- Process the request within 30 seconds (request timeout)

## Error Handling

- If the webhook URL is unreachable or returns an error status code, the notification will fail but the experiment will continue normally
- Webhook notifications are sent asynchronously and do not affect experiment execution
- Failed webhook notifications are logged but do not cause the experiment to fail
- The webhook request has a 30-second timeout

## Troubleshooting

- Check orchestrator service logs for webhook-related errors
- Verify your webhook endpoint is accessible from the orchestrator service
- Ensure your webhook endpoint returns appropriate HTTP status codes
- Test with a simple webhook testing service first before implementing custom logic
7 changes: 7 additions & 0 deletions orchestrator/shell.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
with import <nixpkgs> {};
{
devEnv = stdenv.mkDerivation {
name = "dev";
buildInputs = [ stdenv go_1_23 glibc ];
};
}
161 changes: 159 additions & 2 deletions orchestrator/src/encode.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,179 @@
package itn_orchestrator

import (
"encoding/json"
"fmt"
"io"
"os"
"strings"

logging "github.com/ipfs/go-log/v2"
)

// RoundInfo holds information about a single round
type RoundInfo struct {
PaymentCount int `json:"payment_count"`
ZkappCount int `json:"zkapp_count"`
PaymentTps float64 `json:"payment_tps"`
ZkappTps float64 `json:"zkapp_tps"`
DurationMinutes int `json:"duration_minutes"`
MaxCost bool `json:"max_cost"`
}

// ExperimentInfo holds information about the entire experiment
type ExperimentInfo []RoundInfo

func fund(p FundParams) GeneratedCommand {
return GeneratedCommand{Action: FundAction{}.Name(), Params: p}
}

func Encode(p *GenParams, writeCommand func(GeneratedCommand), writeComment func(string)) {
// EncodeToWriter encodes experiment parameters to a writer using JSON encoding
// Returns experiment info and error
// isService: if true, uses secure logging that doesn't expose passwords
func EncodeToWriter(p *GenParams, writer io.Writer, setup interface{}) (ExperimentInfo, error) {
encoder := json.NewEncoder(writer)
var errors []string

writeComment := func(comment string) {
if err := encoder.Encode(comment); err != nil {
errors = append(errors, fmt.Sprintf("Error writing comment: %v", err))
}
}
writeCommand := func(cmd GeneratedCommand) {
if comment := cmd.Comment(); comment != "" {
writeComment(comment)
}
if err := encoder.Encode(cmd); err != nil {
errors = append(errors, fmt.Sprintf("Error writing command: %v", err))
}
}

experimentInfo := EncodeWithContext(p, writeCommand, writeComment, setup)

if len(errors) > 0 {
return ExperimentInfo{}, fmt.Errorf("encoding errors: %s", strings.Join(errors, "; "))
}
return experimentInfo, nil
}

// printComment prints a comment to both log and stderr
func printComment(comment string, log logging.StandardLogger) {
log.Info(comment)
fmt.Fprintln(os.Stderr, comment)
}

writeComment("Generated with: " + strings.Join(os.Args, " "))
// RunExperiment executes an experiment from a JSON decoder with the given configuration
// Returns error if execution fails, nil on success
func RunExperiment(inDecoder *json.Decoder, config Config, log logging.StandardLogger) error {
outCache := EmptyOutputCache()
rconfig := ResolutionConfig{
OutputCache: outCache,
}
step := 0
var prevAction BatchAction
var actionAccum []ActionIO
var batchStartStep int // Track the starting step of current batch
var preBatchComments []string // Comments to print before current batch executes
var postBatchComments []string // Comments accumulated after batch started

handlePrevAction := func() error {
// Print pre-batch comments before executing the action
for _, comment := range preBatchComments {
printComment(comment, log)
}
preBatchComments = nil

// Calculate correct step range for the batch
batchEndStep := batchStartStep + len(actionAccum) - 1
if len(actionAccum) == 1 {
log.Infof("Performing step %s (%d)", prevAction.Name(), batchStartStep)
} else {
log.Infof("Performing steps %s (%d-%d)", prevAction.Name(), batchStartStep, batchEndStep)
}
err := prevAction.RunMany(config, actionAccum)
if err != nil {
batchEndStep := batchStartStep + len(actionAccum) - 1
if len(actionAccum) == 1 {
return &OrchestratorError{
Message: fmt.Sprintf("Error running step %d: %v", batchStartStep, err),
Code: 9,
}
} else {
return &OrchestratorError{
Message: fmt.Sprintf("Error running steps %d-%d: %v", batchStartStep, batchEndStep, err),
Code: 9,
}
}
}
prevAction = nil
actionAccum = nil

// Move post-batch comments to pre-batch for next action
preBatchComments = postBatchComments
postBatchComments = nil
return nil
}

err := RunActions(inDecoder, config, outCache, log, step,
handlePrevAction, &actionAccum, rconfig, &prevAction, &preBatchComments, &postBatchComments, &batchStartStep)
if err != nil {
if orchErr, ok := err.(*OrchestratorError); ok {
log.Errorf("Experiment finished with error: %v", orchErr)
return orchErr
}
return err
}

if prevAction != nil {
if err := handlePrevAction(); err != nil {
log.Errorf("Error running action: %s due to: %v", prevAction.Name(), err)
// Check if context is canceled
if config.Ctx.Err() != nil {
return config.Ctx.Err()
}
return &OrchestratorError{
Message: fmt.Sprintf("Error running previous action: %v", err),
Code: 9,
}
}
}

// Print any remaining accumulated comments at the end
for _, comment := range preBatchComments {
printComment(comment, log)
}
for _, comment := range postBatchComments {
printComment(comment, log)
}
return nil
}

func Encode(p *GenParams, writeCommand func(GeneratedCommand), writeComment func(string)) ExperimentInfo {
return EncodeWithContext(p, writeCommand, writeComment, false)
}

func EncodeWithContext(p *GenParams, writeCommand func(GeneratedCommand), writeComment func(string), setup interface{}) ExperimentInfo {
// For orchestrator service, show experiment setup instead of command args (which contain passwords)
// For standalone generator, show command args as before
if setup != nil {
setupJSON, err := json.Marshal(setup)
if err != nil {
writeComment("Generated by orchestrator service (error serializing setup)")
} else {
writeComment("Generated by orchestrator service with setup: " + string(setupJSON))
}
} else {
writeComment("Generated with: " + strings.Join(os.Args, " "))
}
if p.ZkappSoftLimit > -2 {
writeCommand(Discovery(DiscoveryParams{}))
writeComment(fmt.Sprintf("Setting zkapp soft limit to %d", p.ZkappSoftLimit))
writeCommand(ZkappSoftLimit(-1, "participant", p.ZkappSoftLimit))
}
cmds := []GeneratedCommand{}
fundCmds := []FundParams{}
var rounds []RoundInfo

writeComment("Funding keys for the experiment")
for r := 0; r < p.Rounds; r++ {
round := p.Generate(r)
Expand All @@ -30,6 +184,8 @@ func Encode(p *GenParams, writeCommand func(GeneratedCommand), writeComment func
if round.ZkappFundCommand != nil {
fundCmds = append(fundCmds, *round.ZkappFundCommand)
}
// Collect round info from the generated round
rounds = append(rounds, round.RoundInfo)
}
privkeys := p.Privkeys
if p.GenerateFundKeys > 0 {
Expand Down Expand Up @@ -74,4 +230,5 @@ func Encode(p *GenParams, writeCommand func(GeneratedCommand), writeComment func
for _, cmd := range cmds {
writeCommand(cmd)
}
return rounds
}
9 changes: 9 additions & 0 deletions orchestrator/src/fund.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ func fundImpl(config Config, ctx context.Context, daemonPort string, params Fund
}

func fundRunImpl(config Config, ctx context.Context, daemonPortIx int, params FundParams, output OutputF) error {
// The amount per key should be the total amount divided by number of accounts
// But we need to ensure each account gets enough to cover the fee
amountPerKey := params.Amount / uint64(params.Num)

// Check if the amount per key is sufficient to cover the fee
if amountPerKey < params.Fee {
return fmt.Errorf("insufficient funds per account: each account would get %d nanomina but fee is %d nanomina (need at least %d nanomina total for %d accounts)",
amountPerKey, params.Fee, params.Fee*uint64(params.Num), params.Num)
}

password := ""
if params.PasswordEnv != "" {
password, _ = os.LookupEnv(params.PasswordEnv)
Expand Down
Loading
Loading