diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 3e3ac497c9..1a83cd93c6 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -12,6 +12,8 @@ #### Orchestrator +* [#3857](https://github.com/livepeer/go-livepeer/pull/3857) byoc: fix orchestrator streaming reserve capacity (@ad-astra-video) + #### Transcoder ### Bug Fixes 🐞 diff --git a/byoc/job_gateway.go b/byoc/job_gateway.go index 095ce5e8ce..f217442008 100644 --- a/byoc/job_gateway.go +++ b/byoc/job_gateway.go @@ -260,7 +260,7 @@ func (bsg *BYOCGatewayServer) setupGatewayJob(ctx context.Context, jobReqHdr str var orchs []JobToken clog.Infof(ctx, "processing job request req=%v", jobReqHdr) - jobReq, err := bsg.verifyJobCreds(ctx, jobReqHdr, true) + jobReq, err := bsg.verifyJobCreds(jobReqHdr) if err != nil { return nil, errors.New(fmt.Sprintf("Unable to parse job request, err=%v", err)) } @@ -321,7 +321,7 @@ func getOrchSearchTimeouts(ctx context.Context, searchTimeoutHdr, respTimeoutHdr return timeout, respTimeout } -func (bsg *BYOCGatewayServer) verifyJobCreds(ctx context.Context, jobCreds string, reserveCapacity bool) (*JobRequest, error) { +func (bsg *BYOCGatewayServer) verifyJobCreds(jobCreds string) (*JobRequest, error) { //Gateway needs JobRequest parsed and verification of required fields jobData, err := parseJobRequest(jobCreds) if err != nil { @@ -392,13 +392,13 @@ func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capabilit } var jobTokens []JobToken - timedOut := false nbResp := 0 numAvailableOrchs := node.OrchestratorPool.Size() tokenCh := make(chan JobToken, numAvailableOrchs) errCh := make(chan error, numAvailableOrchs) tokensCtx, cancel := context.WithTimeout(clog.Clone(context.Background(), ctx), timeout) + defer cancel() // Shuffle and get job tokens for _, i := range rand.Perm(len(orchs)) { //do not send to excluded Orchestrators @@ -412,22 +412,25 @@ func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capabilit continue } - go getOrchJobToken(ctx, orchs[i].URL, *reqSender, 500*time.Millisecond, tokenCh, errCh) + go getOrchJobToken(ctx, orchs[i].URL, *reqSender, respTimeout, tokenCh, errCh) } - for nbResp < numAvailableOrchs && len(jobTokens) < numAvailableOrchs && !timedOut { + for nbResp < numAvailableOrchs && len(jobTokens) < numAvailableOrchs { select { case token := <-tokenCh: - jobTokens = append(jobTokens, token) + if token.AvailableCapacity > 0 { + jobTokens = append(jobTokens, token) + } nbResp++ case <-errCh: nbResp++ case <-tokensCtx.Done(): - break + //searchTimeout reached, return tokens received + return jobTokens, nil } } - cancel() + // received enough tokens or all responses arrived return jobTokens, nil } diff --git a/byoc/job_orchestrator.go b/byoc/job_orchestrator.go index f7cab9390b..159f9d83d3 100644 --- a/byoc/job_orchestrator.go +++ b/byoc/job_orchestrator.go @@ -147,61 +147,59 @@ func (bso *BYOCOrchestratorServer) GetJobToken() http.Handler { w.Header().Set("Content-Type", "application/json") jobToken := JobToken{SenderAddress: nil, TicketParams: nil, Balance: 0, Price: nil} - if !orch.CheckExternalCapabilityCapacity(jobCapsHdr) { - //send response indicating no capacity available - w.WriteHeader(http.StatusServiceUnavailable) - } else { - senderAddr := ethcommon.HexToAddress(jobSenderAddr.Addr) + capacity := orch.CheckExternalCapabilityCapacity(jobCapsHdr) - jobPrice, err := orch.JobPriceInfo(senderAddr, jobCapsHdr) - if err != nil { - statusCode := http.StatusBadRequest - if err.Error() == "insufficient sender reserve" { - statusCode = http.StatusServiceUnavailable - } - glog.Errorf("could not get price err=%v", err.Error()) - http.Error(w, fmt.Sprintf("Could not get price err=%v", err.Error()), statusCode) - return - } - ticketParams, err := orch.TicketParams(senderAddr, jobPrice) - if err != nil { - glog.Errorf("could not get ticket params err=%v", err.Error()) - http.Error(w, fmt.Sprintf("Could not get ticket params err=%v", err.Error()), http.StatusBadRequest) - return - } + senderAddr := ethcommon.HexToAddress(jobSenderAddr.Addr) - capBal := orch.Balance(senderAddr, core.ManifestID(jobCapsHdr)) - if capBal != nil { - capBal, err = common.PriceToInt64(capBal) - if err != nil { - clog.Errorf(context.TODO(), "could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error()) - capBal = big.NewRat(0, 1) - } - } else { - capBal = big.NewRat(0, 1) - } - //convert to int64. Note: returns with 000 more digits to allow for precision of 3 decimal places. - capBalInt, err := common.PriceToFixed(capBal) - if err != nil { - glog.Errorf("could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error()) - capBalInt = 0 - } else { - // Remove the last three digits from capBalInt - capBalInt = capBalInt / 1000 + jobPrice, err := orch.JobPriceInfo(senderAddr, jobCapsHdr) + if err != nil { + statusCode := http.StatusBadRequest + if err.Error() == "insufficient sender reserve" { + statusCode = http.StatusServiceUnavailable } + glog.Errorf("could not get price err=%v", err.Error()) + http.Error(w, fmt.Sprintf("Could not get price err=%v", err.Error()), statusCode) + return + } + ticketParams, err := orch.TicketParams(senderAddr, jobPrice) + if err != nil { + glog.Errorf("could not get ticket params err=%v", err.Error()) + http.Error(w, fmt.Sprintf("Could not get ticket params err=%v", err.Error()), http.StatusBadRequest) + return + } - jobToken = JobToken{ - SenderAddress: jobSenderAddr, - TicketParams: ticketParams, - Balance: capBalInt, - Price: jobPrice, - ServiceAddr: orch.ServiceURI().String(), + capBal := orch.Balance(senderAddr, core.ManifestID(jobCapsHdr)) + if capBal != nil { + capBal, err = common.PriceToInt64(capBal) + if err != nil { + clog.Errorf(context.TODO(), "could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error()) + capBal = big.NewRat(0, 1) } + } else { + capBal = big.NewRat(0, 1) + } + //convert to int64. Note: returns with 000 more digits to allow for precision of 3 decimal places. + capBalInt, err := common.PriceToFixed(capBal) + if err != nil { + glog.Errorf("could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error()) + capBalInt = 0 + } else { + // Remove the last three digits from capBalInt + capBalInt = capBalInt / 1000 + } - //send response indicating compatible - w.WriteHeader(http.StatusOK) + jobToken = JobToken{ + SenderAddress: jobSenderAddr, + TicketParams: ticketParams, + Balance: capBalInt, + Price: jobPrice, + ServiceAddr: orch.ServiceURI().String(), + AvailableCapacity: capacity, } + //send response indicating compatible + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(jobToken) }) } diff --git a/byoc/job_orchestrator_test.go b/byoc/job_orchestrator_test.go index c6cccf4e44..6e9c799a60 100644 --- a/byoc/job_orchestrator_test.go +++ b/byoc/job_orchestrator_test.go @@ -45,7 +45,7 @@ type mockJobOrchestrator struct { registerExternalCapability func(string) (*core.ExternalCapability, error) unregisterExternalCapability func(string) error verifySignature func(ethcommon.Address, string, []byte) bool - checkExternalCapabilityCapacity func(string) bool + checkExternalCapabilityCapacity func(string) int64 reserveCapacity func(string) error getUrlForCapability func(string) string balance func(ethcommon.Address, core.ManifestID) *big.Rat @@ -142,9 +142,9 @@ func (r *mockJobOrchestrator) RemoveExternalCapability(extCapability string) err return nil } -func (r *mockJobOrchestrator) CheckExternalCapabilityCapacity(extCap string) bool { +func (r *mockJobOrchestrator) CheckExternalCapabilityCapacity(extCap string) int64 { if r.checkExternalCapabilityCapacity == nil { - return true + return 1 } else { return r.checkExternalCapabilityCapacity(extCap) } @@ -486,18 +486,33 @@ func TestGetJobToken_NoCapacity(t *testing.T) { mockVerifySig := func(addr ethcommon.Address, msg string, sig []byte) bool { return true } - mockCheckExternalCapabilityCapacity := func(extCap string) bool { - return false + mockCheckExternalCapabilityCapacity := func(extCap string) int64 { + return 0 } - mockReserveCapacity := func(cap string) error { - return errors.New("no capacity") + mockJobPriceInfo := func(addr ethcommon.Address, cap string) (*net.PriceInfo, error) { + return &net.PriceInfo{ + PricePerUnit: 10, + PixelsPerUnit: 1, + }, nil + } + + mockTicketParams := func(addr ethcommon.Address, price *net.PriceInfo) (*net.TicketParams, error) { + return &net.TicketParams{ + Recipient: ethcommon.HexToAddress("0x1111111111111111111111111111111111111111").Bytes(), + FaceValue: big.NewInt(1000).Bytes(), + WinProb: big.NewInt(1).Bytes(), + RecipientRandHash: []byte("hash"), + Seed: big.NewInt(1234).Bytes(), + ExpirationBlock: big.NewInt(100000).Bytes(), + }, nil } mockJobOrch := newMockJobOrchestrator() mockJobOrch.verifySignature = mockVerifySig mockJobOrch.checkExternalCapabilityCapacity = mockCheckExternalCapabilityCapacity - mockJobOrch.reserveCapacity = mockReserveCapacity + mockJobOrch.jobPriceInfo = mockJobPriceInfo + mockJobOrch.ticketParams = mockTicketParams bso := &BYOCOrchestratorServer{ node: mockJobLivepeerNode(), @@ -523,7 +538,11 @@ func TestGetJobToken_NoCapacity(t *testing.T) { handler.ServeHTTP(w, req) resp := w.Result() - assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + assert.Equal(t, http.StatusOK, resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + var jobToken JobToken + json.Unmarshal(body, &jobToken) + assert.Equal(t, int64(0), jobToken.AvailableCapacity) } func TestGetJobToken_JobPriceInfoError(t *testing.T) { @@ -854,5 +873,6 @@ func createMockJobToken(hostUrl string) *JobToken { PricePerUnit: 100, PixelsPerUnit: 1, }, + AvailableCapacity: 1, } } diff --git a/byoc/stream_orchestrator.go b/byoc/stream_orchestrator.go index a10ad6add0..14a8b97f73 100644 --- a/byoc/stream_orchestrator.go +++ b/byoc/stream_orchestrator.go @@ -19,14 +19,13 @@ import ( var getNewTokenTimeout = 3 * time.Second -// StartStream handles the POST /stream/start endpoint for the Orchestrator func (bso *BYOCOrchestratorServer) StartStream() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { orch := bso.orch remoteAddr := getRemoteAddr(r) ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr) - orchJob, err := bso.setupOrchJob(ctx, r, false) + orchJob, err := bso.setupOrchJob(ctx, r, true) if err != nil { code := http.StatusBadRequest if err == errInsufficientBalance { diff --git a/byoc/stream_test.go b/byoc/stream_test.go index 330b06e5db..6e2ff0dd83 100644 --- a/byoc/stream_test.go +++ b/byoc/stream_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "math/big" "math/rand/v2" "net/http" "net/http/httptest" @@ -2042,3 +2043,77 @@ func TestStartStreamError_OtherScenarios(t *testing.T) { }) }) } + +func TestStartStreamHandler_ReserveCapacity(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var reserveCapacityCalled bool + node := mockJobLivepeerNode() + + // Create mock orchestrator with ReserveExternalCapabilityCapacity expectation + mockOrch := newMockJobOrchestrator() + mockOrch.verifySignature = func(addr ethcommon.Address, msg string, sig []byte) bool { + return true + } + mockOrch.reserveCapacity = func(extCap string) error { + reserveCapacityCalled = true + return nil + } + mockOrch.freeCapacity = func(extCap string) error { + return nil + } + mockOrch.getUrlForCapability = func(capability string) string { + return "http://127.0.0.1:1234" + } + mockOrch.jobPriceInfo = func(sender ethcommon.Address, jobCapability string) (*net.PriceInfo, error) { + return &net.PriceInfo{PricePerUnit: 0, PixelsPerUnit: 1}, nil + } + mockOrch.balance = func(addr ethcommon.Address, manifestID core.ManifestID) *big.Rat { + return new(big.Rat).SetInt64(0) + } + + // Set up an lphttp-based orchestrator test server with trickle endpoints + mux := http.NewServeMux() + bso := &BYOCOrchestratorServer{orch: mockOrch, httpMux: mux, node: node} + + // Configure trickle server on the mux + bso.trickleSrv = trickle.ConfigureServer(trickle.TrickleServerConfig{ + Mux: mux, + BasePath: "/ai/trickle/", + Autocreate: true, + }) + + // Setup Orch server stub + mux.HandleFunc("/process/token", orchTokenHandler) + mux.HandleFunc("/ai/stream/start", orchAIStreamStartNoUrlsHandler) + + server := httptest.NewServer(mux) + defer server.Close() + + node.OrchestratorPool = newStubOrchestratorPool(node, []string{server.URL}) + drivers.NodeStorage = drivers.NewMemoryDriver(nil) + + // Prepare a valid StartRequest body + startReq := StartRequest{ + Stream: "teststream", + RtmpOutput: "rtmp://output", + StreamId: "streamid", + Params: "{}", + } + body, _ := json.Marshal(startReq) + req := httptest.NewRequest(http.MethodPost, "/ai/stream/start", bytes.NewBuffer(body)) + req.Header.Set("Content-Type", "application/json") + + req.Header.Set("Livepeer", base64TestJobRequest(10, true, true, true)) + + w := httptest.NewRecorder() + + handler := bso.StartStream() + handler.ServeHTTP(w, req) + + // Verify ReserveExternalCapabilityCapacity was called + assert.True(t, reserveCapacityCalled, "ReserveExternalCapabilityCapacity should have been called") + + // no stream created + assert.Zero(t, len(mockOrch.node.ExternalCapabilities.Streams)) + }) +} diff --git a/byoc/types.go b/byoc/types.go index dbb653b4c4..fa9b4a5e8b 100644 --- a/byoc/types.go +++ b/byoc/types.go @@ -55,7 +55,7 @@ type Orchestrator interface { JobPriceInfo(sender ethcommon.Address, capability string) (*net.PriceInfo, error) TicketParams(sender ethcommon.Address, priceInfo *net.PriceInfo) (*net.TicketParams, error) Balance(sender ethcommon.Address, manifestID core.ManifestID) *big.Rat - CheckExternalCapabilityCapacity(capability string) bool + CheckExternalCapabilityCapacity(capability string) int64 RemoveExternalCapability(extCapName string) error RegisterExternalCapability(extCapSettings string) (*core.ExternalCapability, error) FreeExternalCapabilityCapacity(capability string) error @@ -132,11 +132,12 @@ type JobOrchestratorsFilter struct { } type JobToken struct { - SenderAddress *JobSender `json:"sender_address,omitempty"` - TicketParams *net.TicketParams `json:"ticket_params,omitempty"` - Balance int64 `json:"balance,omitempty"` - Price *net.PriceInfo `json:"price,omitempty"` - ServiceAddr string `json:"service_addr,omitempty"` + SenderAddress *JobSender `json:"sender_address,omitempty"` + TicketParams *net.TicketParams `json:"ticket_params,omitempty"` + Balance int64 `json:"balance,omitempty"` + Price *net.PriceInfo `json:"price,omitempty"` + ServiceAddr string `json:"service_addr,omitempty"` + AvailableCapacity int64 `json:"available_capacity,omitempty"` LastNonce uint32 } diff --git a/core/ai_orchestrator.go b/core/ai_orchestrator.go index 1fd48988d0..0155fe85da 100644 --- a/core/ai_orchestrator.go +++ b/core/ai_orchestrator.go @@ -1148,14 +1148,14 @@ func (orch *orchestrator) GetUrlForCapability(extCapability string) string { return "" } -func (orch *orchestrator) CheckExternalCapabilityCapacity(extCapability string) bool { +func (orch *orchestrator) CheckExternalCapabilityCapacity(extCapability string) int64 { if cap, ok := orch.node.ExternalCapabilities.Capabilities[extCapability]; !ok { - return false + return 0 } else { if cap.Load < cap.Capacity { - return true + return int64(cap.Capacity - cap.Load) } else { - return false + return 0 } } } diff --git a/server/rpc.go b/server/rpc.go index 98c2848b19..411b97b2fd 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -87,7 +87,7 @@ type Orchestrator interface { RegisterExternalCapability(extCapability string) (*core.ExternalCapability, error) RemoveExternalCapability(extCapability string) error GetUrlForCapability(extCapability string) string - CheckExternalCapabilityCapacity(extCapability string) bool + CheckExternalCapabilityCapacity(extCapability string) int64 ReserveExternalCapabilityCapacity(extCapability string) error FreeExternalCapabilityCapacity(extCapability string) error JobPriceInfo(sender ethcommon.Address, jobCapabiliy string) (*net.PriceInfo, error) diff --git a/server/rpc_test.go b/server/rpc_test.go index 1c2fd52699..4268e35665 100644 --- a/server/rpc_test.go +++ b/server/rpc_test.go @@ -266,8 +266,8 @@ func (r *stubOrchestrator) RegisterExternalCapability(extCapabilitySettings stri func (r *stubOrchestrator) RemoveExternalCapability(extCapability string) error { return nil } -func (r *stubOrchestrator) CheckExternalCapabilityCapacity(extCap string) bool { - return true +func (r *stubOrchestrator) CheckExternalCapabilityCapacity(extCap string) int64 { + return 1 } func (r *stubOrchestrator) ReserveExternalCapabilityCapacity(extCap string) error { return nil @@ -1623,8 +1623,8 @@ func (o *mockOrchestrator) RegisterExternalCapability(extCapabilitySettings stri func (o *mockOrchestrator) RemoveExternalCapability(extCapability string) error { return nil } -func (o *mockOrchestrator) CheckExternalCapabilityCapacity(extCap string) bool { - return true +func (o *mockOrchestrator) CheckExternalCapabilityCapacity(extCap string) int64 { + return 1 } func (o *mockOrchestrator) ReserveExternalCapabilityCapacity(extCap string) error { return nil