diff --git a/byoc/job_orchestrator.go b/byoc/job_orchestrator.go index 159f9d83d3..cdd6e6eff9 100644 --- a/byoc/job_orchestrator.go +++ b/byoc/job_orchestrator.go @@ -204,6 +204,7 @@ func (bso *BYOCOrchestratorServer) GetJobToken() http.Handler { }) } + func (bso *BYOCOrchestratorServer) ProcessJob() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() diff --git a/core/ai_orchestrator.go b/core/ai_orchestrator.go index 0155fe85da..a03cdb2fe5 100644 --- a/core/ai_orchestrator.go +++ b/core/ai_orchestrator.go @@ -1186,6 +1186,7 @@ func (orch *orchestrator) FreeExternalCapabilityCapacity(extCapability string) e } } + func (orch *orchestrator) JobPriceInfo(sender ethcommon.Address, jobCapability string) (*net.PriceInfo, error) { if orch.node == nil || orch.node.Recipient == nil { //return a price of zero for offhain mode diff --git a/server/remote_signer.go b/server/remote_signer.go index a45928a97e..ea2854e247 100644 --- a/server/remote_signer.go +++ b/server/remote_signer.go @@ -28,6 +28,7 @@ const HTTPStatusRefreshSession = 480 const HTTPStatusPriceExceeded = 481 const HTTPStatusNoTickets = 482 const RemoteType_LiveVideoToVideo = "lv2v" +const RemoteType_BYOC = "byoc" // SignOrchestratorInfo handles signing GetOrchestratorInfo requests for multiple orchestrators func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Request) { @@ -67,11 +68,56 @@ func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Re _ = json.NewEncoder(w).Encode(results) } +// SignBYOCJobRequest signs a BYOC job request (request + parameters) for authentication +type SignBYOCJobRequestInput struct { + Request string `json:"request"` + Parameters string `json:"parameters"` +} + +type SignBYOCJobRequestResponse struct { + Sender string `json:"sender"` + Signature string `json:"signature"` +} + +func (ls *LivepeerServer) SignBYOCJobRequest(w http.ResponseWriter, r *http.Request) { + ctx := clog.AddVal(r.Context(), "request_id", string(core.RandomManifestID())) + remoteAddr := getRemoteAddr(r) + clog.Info(ctx, "BYOC job signing request", "ip", remoteAddr) + + gw := core.NewBroadcaster(ls.LivepeerNode) + + var req SignBYOCJobRequestInput + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + clog.Errorf(ctx, "Failed to decode SignBYOCJobRequest err=%q", err) + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + + // Sign the request + parameters (same as Go gateway does for BYOC) + dataToSign := req.Request + req.Parameters + sig, err := gw.Sign([]byte(dataToSign)) + if err != nil { + clog.Errorf(ctx, "Failed to sign BYOC job request err=%q", err) + respondJsonError(ctx, w, err, http.StatusInternalServerError) + return + } + + response := SignBYOCJobRequestResponse{ + Sender: gw.Address().Hex(), + Signature: "0x" + hex.EncodeToString(sig), + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(response) +} + // StartRemoteSignerServer starts the HTTP server for remote signer mode func StartRemoteSignerServer(ls *LivepeerServer, bind string) error { // Register the remote signer endpoint ls.HTTPMux.Handle("POST /sign-orchestrator-info", http.HandlerFunc(ls.SignOrchestratorInfo)) ls.HTTPMux.Handle("POST /generate-live-payment", http.HandlerFunc(ls.GenerateLivePayment)) + ls.HTTPMux.Handle("POST /sign-byoc-job", http.HandlerFunc(ls.SignBYOCJobRequest)) // Start the HTTP server glog.Info("Starting Remote Signer server on ", bind) @@ -152,16 +198,19 @@ type RemotePaymentRequest struct { Orchestrator []byte `json:"orchestrator"` // Set if an ID is needed to tie into orch accounting for a session. Optional - ManifestID string + ManifestID string `json:"manifestId,omitempty"` // Number of pixels to generate a ticket for. Required if `type` is not set. InPixels int64 `json:"inPixels"` - // Job type to automatically calculate payments. Valid values: `lv2v`. Optional. + // Job type to automatically calculate payments. Valid values: `lv2v`, `byoc`. Optional. Type string `json:"type"` // Capabilities to include in the ticket. Optional; may be set for the lv2v job type. Capabilities []byte `json:"capabilities"` + + // Capability name for BYOC job type. Required when Type is "byoc". + Capability string `json:"capability,omitempty"` } // Returned by the remote signer and includes a new payment plus updated state. @@ -286,7 +335,12 @@ func (ls *LivepeerServer) GenerateLivePayment(w http.ResponseWriter, r *http.Req respondJsonError(ctx, w, err, http.StatusBadRequest) return } - manifestID = string(core.RandomManifestID()) + if req.Type == RemoteType_BYOC && req.Capability != "" { + // For BYOC, use capability name as manifest ID for shared balance tracking + manifestID = req.Capability + } else { + manifestID = string(core.RandomManifestID()) + } } ctx = clog.AddVal(ctx, "manifest_id", manifestID) @@ -370,6 +424,29 @@ func (ls *LivepeerServer) GenerateLivePayment(w http.ResponseWriter, r *http.Req pixelsPerSec := float64(info.Height) * float64(info.Width) * float64(info.FPS) secSinceLastProcessed := now.Sub(lastUpdate).Seconds() pixels = int64(pixelsPerSec * secSinceLastProcessed) + } else if req.Type == RemoteType_BYOC { + // BYOC uses time-based pricing: price per unit of time (typically seconds) + // The pixelsPerUnit in the price info represents the time scaling factor + if req.Capability == "" { + err = errors.New("missing capability for BYOC job type") + respondJsonError(ctx, w, err, http.StatusBadRequest) + return + } + now := time.Now() + lastUpdate := state.LastUpdate + if lastUpdate.IsZero() { + // Preload with 120 seconds (2 minutes) of data by default. + // The orchestrator requires minimum 60 seconds balance, so we use 2 minutes + // to have a buffer (matching the Go gateway's approach). + lastUpdate = now.Add(-120 * time.Second) + } + secSinceLastProcessed := now.Sub(lastUpdate).Seconds() + // For BYOC, "pixels" represents time units; pixelsPerUnit is typically 1 (per second) + // We calculate units as: seconds × pixelsPerUnit (which is typically 1) + pixels = int64(secSinceLastProcessed * float64(priceInfo.PixelsPerUnit)) + if pixels < priceInfo.PixelsPerUnit { + pixels = priceInfo.PixelsPerUnit // Minimum 1 unit + } } else if req.Type != "" { err = errors.New("invalid job type") respondJsonError(ctx, w, err, http.StatusBadRequest)