Skip to content
1 change: 1 addition & 0 deletions byoc/job_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (bso *BYOCOrchestratorServer) GetJobToken() http.Handler {
})
}


func (bso *BYOCOrchestratorServer) ProcessJob() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
Expand Down
1 change: 1 addition & 0 deletions core/ai_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,7 @@ func (orch *orchestrator) FreeExternalCapabilityCapacity(extCapability string) e
}
}


func (orch *orchestrator) JobPriceInfo(sender ethcommon.Address, jobCapability string) (*net.PriceInfo, error) {
if orch.node == nil || orch.node.Recipient == nil {
//return a price of zero for offhain mode
Expand Down
83 changes: 80 additions & 3 deletions server/remote_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const HTTPStatusRefreshSession = 480
const HTTPStatusPriceExceeded = 481
const HTTPStatusNoTickets = 482
const RemoteType_LiveVideoToVideo = "lv2v"
const RemoteType_BYOC = "byoc"

// SignOrchestratorInfo handles signing GetOrchestratorInfo requests for multiple orchestrators
func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -67,11 +68,56 @@ func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Re
_ = json.NewEncoder(w).Encode(results)
}

// SignBYOCJobRequest signs a BYOC job request (request + parameters) for authentication
type SignBYOCJobRequestInput struct {
Request string `json:"request"`
Parameters string `json:"parameters"`
}

type SignBYOCJobRequestResponse struct {
Sender string `json:"sender"`
Signature string `json:"signature"`
}

func (ls *LivepeerServer) SignBYOCJobRequest(w http.ResponseWriter, r *http.Request) {
ctx := clog.AddVal(r.Context(), "request_id", string(core.RandomManifestID()))
remoteAddr := getRemoteAddr(r)
clog.Info(ctx, "BYOC job signing request", "ip", remoteAddr)

gw := core.NewBroadcaster(ls.LivepeerNode)

var req SignBYOCJobRequestInput
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
clog.Errorf(ctx, "Failed to decode SignBYOCJobRequest err=%q", err)
respondJsonError(ctx, w, err, http.StatusBadRequest)
return
}

// Sign the request + parameters (same as Go gateway does for BYOC)
dataToSign := req.Request + req.Parameters
sig, err := gw.Sign([]byte(dataToSign))
if err != nil {
clog.Errorf(ctx, "Failed to sign BYOC job request err=%q", err)
respondJsonError(ctx, w, err, http.StatusInternalServerError)
return
}

response := SignBYOCJobRequestResponse{
Sender: gw.Address().Hex(),
Signature: "0x" + hex.EncodeToString(sig),
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(response)
}

// StartRemoteSignerServer starts the HTTP server for remote signer mode
func StartRemoteSignerServer(ls *LivepeerServer, bind string) error {
// Register the remote signer endpoint
ls.HTTPMux.Handle("POST /sign-orchestrator-info", http.HandlerFunc(ls.SignOrchestratorInfo))
ls.HTTPMux.Handle("POST /generate-live-payment", http.HandlerFunc(ls.GenerateLivePayment))
ls.HTTPMux.Handle("POST /sign-byoc-job", http.HandlerFunc(ls.SignBYOCJobRequest))

// Start the HTTP server
glog.Info("Starting Remote Signer server on ", bind)
Expand Down Expand Up @@ -152,16 +198,19 @@ type RemotePaymentRequest struct {
Orchestrator []byte `json:"orchestrator"`

// Set if an ID is needed to tie into orch accounting for a session. Optional
ManifestID string
ManifestID string `json:"manifestId,omitempty"`

// Number of pixels to generate a ticket for. Required if `type` is not set.
InPixels int64 `json:"inPixels"`

// Job type to automatically calculate payments. Valid values: `lv2v`. Optional.
// Job type to automatically calculate payments. Valid values: `lv2v`, `byoc`. Optional.
Type string `json:"type"`

// Capabilities to include in the ticket. Optional; may be set for the lv2v job type.
Capabilities []byte `json:"capabilities"`

// Capability name for BYOC job type. Required when Type is "byoc".
Capability string `json:"capability,omitempty"`
}

// Returned by the remote signer and includes a new payment plus updated state.
Expand Down Expand Up @@ -286,7 +335,12 @@ func (ls *LivepeerServer) GenerateLivePayment(w http.ResponseWriter, r *http.Req
respondJsonError(ctx, w, err, http.StatusBadRequest)
return
}
manifestID = string(core.RandomManifestID())
if req.Type == RemoteType_BYOC && req.Capability != "" {
// For BYOC, use capability name as manifest ID for shared balance tracking
manifestID = req.Capability
} else {
manifestID = string(core.RandomManifestID())
}
}
ctx = clog.AddVal(ctx, "manifest_id", manifestID)

Expand Down Expand Up @@ -370,6 +424,29 @@ func (ls *LivepeerServer) GenerateLivePayment(w http.ResponseWriter, r *http.Req
pixelsPerSec := float64(info.Height) * float64(info.Width) * float64(info.FPS)
secSinceLastProcessed := now.Sub(lastUpdate).Seconds()
pixels = int64(pixelsPerSec * secSinceLastProcessed)
} else if req.Type == RemoteType_BYOC {
// BYOC uses time-based pricing: price per unit of time (typically seconds)
// The pixelsPerUnit in the price info represents the time scaling factor
if req.Capability == "" {
err = errors.New("missing capability for BYOC job type")
respondJsonError(ctx, w, err, http.StatusBadRequest)
return
}
now := time.Now()
lastUpdate := state.LastUpdate
if lastUpdate.IsZero() {
// Preload with 120 seconds (2 minutes) of data by default.
// The orchestrator requires minimum 60 seconds balance, so we use 2 minutes
// to have a buffer (matching the Go gateway's approach).
lastUpdate = now.Add(-120 * time.Second)
}
secSinceLastProcessed := now.Sub(lastUpdate).Seconds()
// For BYOC, "pixels" represents time units; pixelsPerUnit is typically 1 (per second)
// We calculate units as: seconds × pixelsPerUnit (which is typically 1)
pixels = int64(secSinceLastProcessed * float64(priceInfo.PixelsPerUnit))
if pixels < priceInfo.PixelsPerUnit {
pixels = priceInfo.PixelsPerUnit // Minimum 1 unit
}
} else if req.Type != "" {
err = errors.New("invalid job type")
respondJsonError(ctx, w, err, http.StatusBadRequest)
Expand Down
Loading