Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#### Orchestrator

* [#3857](https://github.com/livepeer/go-livepeer/pull/3857) byoc: fix orchestrator streaming reserve capacity (@ad-astra-video)

#### Transcoder

### Bug Fixes 🐞
Expand Down
19 changes: 11 additions & 8 deletions byoc/job_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (bsg *BYOCGatewayServer) setupGatewayJob(ctx context.Context, jobReqHdr str
var orchs []JobToken

clog.Infof(ctx, "processing job request req=%v", jobReqHdr)
jobReq, err := bsg.verifyJobCreds(ctx, jobReqHdr, true)
jobReq, err := bsg.verifyJobCreds(jobReqHdr)
if err != nil {
return nil, errors.New(fmt.Sprintf("Unable to parse job request, err=%v", err))
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func getOrchSearchTimeouts(ctx context.Context, searchTimeoutHdr, respTimeoutHdr
return timeout, respTimeout
}

func (bsg *BYOCGatewayServer) verifyJobCreds(ctx context.Context, jobCreds string, reserveCapacity bool) (*JobRequest, error) {
func (bsg *BYOCGatewayServer) verifyJobCreds(jobCreds string) (*JobRequest, error) {
//Gateway needs JobRequest parsed and verification of required fields
jobData, err := parseJobRequest(jobCreds)
if err != nil {
Expand Down Expand Up @@ -392,13 +392,13 @@ func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capabilit
}

var jobTokens []JobToken
timedOut := false
nbResp := 0
numAvailableOrchs := node.OrchestratorPool.Size()
tokenCh := make(chan JobToken, numAvailableOrchs)
errCh := make(chan error, numAvailableOrchs)

tokensCtx, cancel := context.WithTimeout(clog.Clone(context.Background(), ctx), timeout)
defer cancel()
// Shuffle and get job tokens
for _, i := range rand.Perm(len(orchs)) {
//do not send to excluded Orchestrators
Expand All @@ -412,22 +412,25 @@ func getJobOrchestrators(ctx context.Context, node *core.LivepeerNode, capabilit
continue
}

go getOrchJobToken(ctx, orchs[i].URL, *reqSender, 500*time.Millisecond, tokenCh, errCh)
go getOrchJobToken(ctx, orchs[i].URL, *reqSender, respTimeout, tokenCh, errCh)
}

for nbResp < numAvailableOrchs && len(jobTokens) < numAvailableOrchs && !timedOut {
for nbResp < numAvailableOrchs && len(jobTokens) < numAvailableOrchs {
select {
case token := <-tokenCh:
jobTokens = append(jobTokens, token)
if token.AvailableCapacity > 0 {
jobTokens = append(jobTokens, token)
}
nbResp++
case <-errCh:
nbResp++
case <-tokensCtx.Done():
break
//searchTimeout reached, return tokens received
return jobTokens, nil
}
}
cancel()

// received enough tokens or all responses arrived
return jobTokens, nil
}

Expand Down
92 changes: 45 additions & 47 deletions byoc/job_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,61 +147,59 @@ func (bso *BYOCOrchestratorServer) GetJobToken() http.Handler {
w.Header().Set("Content-Type", "application/json")
jobToken := JobToken{SenderAddress: nil, TicketParams: nil, Balance: 0, Price: nil}

if !orch.CheckExternalCapabilityCapacity(jobCapsHdr) {
//send response indicating no capacity available
w.WriteHeader(http.StatusServiceUnavailable)
} else {
senderAddr := ethcommon.HexToAddress(jobSenderAddr.Addr)
capacity := orch.CheckExternalCapabilityCapacity(jobCapsHdr)

jobPrice, err := orch.JobPriceInfo(senderAddr, jobCapsHdr)
if err != nil {
statusCode := http.StatusBadRequest
if err.Error() == "insufficient sender reserve" {
statusCode = http.StatusServiceUnavailable
}
glog.Errorf("could not get price err=%v", err.Error())
http.Error(w, fmt.Sprintf("Could not get price err=%v", err.Error()), statusCode)
return
}
ticketParams, err := orch.TicketParams(senderAddr, jobPrice)
if err != nil {
glog.Errorf("could not get ticket params err=%v", err.Error())
http.Error(w, fmt.Sprintf("Could not get ticket params err=%v", err.Error()), http.StatusBadRequest)
return
}
senderAddr := ethcommon.HexToAddress(jobSenderAddr.Addr)

capBal := orch.Balance(senderAddr, core.ManifestID(jobCapsHdr))
if capBal != nil {
capBal, err = common.PriceToInt64(capBal)
if err != nil {
clog.Errorf(context.TODO(), "could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error())
capBal = big.NewRat(0, 1)
}
} else {
capBal = big.NewRat(0, 1)
}
//convert to int64. Note: returns with 000 more digits to allow for precision of 3 decimal places.
capBalInt, err := common.PriceToFixed(capBal)
if err != nil {
glog.Errorf("could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error())
capBalInt = 0
} else {
// Remove the last three digits from capBalInt
capBalInt = capBalInt / 1000
jobPrice, err := orch.JobPriceInfo(senderAddr, jobCapsHdr)
if err != nil {
statusCode := http.StatusBadRequest
if err.Error() == "insufficient sender reserve" {
statusCode = http.StatusServiceUnavailable
}
glog.Errorf("could not get price err=%v", err.Error())
http.Error(w, fmt.Sprintf("Could not get price err=%v", err.Error()), statusCode)
return
}
ticketParams, err := orch.TicketParams(senderAddr, jobPrice)
if err != nil {
glog.Errorf("could not get ticket params err=%v", err.Error())
http.Error(w, fmt.Sprintf("Could not get ticket params err=%v", err.Error()), http.StatusBadRequest)
return
}

jobToken = JobToken{
SenderAddress: jobSenderAddr,
TicketParams: ticketParams,
Balance: capBalInt,
Price: jobPrice,
ServiceAddr: orch.ServiceURI().String(),
capBal := orch.Balance(senderAddr, core.ManifestID(jobCapsHdr))
if capBal != nil {
capBal, err = common.PriceToInt64(capBal)
if err != nil {
clog.Errorf(context.TODO(), "could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error())
capBal = big.NewRat(0, 1)
}
} else {
capBal = big.NewRat(0, 1)
}
//convert to int64. Note: returns with 000 more digits to allow for precision of 3 decimal places.
capBalInt, err := common.PriceToFixed(capBal)
if err != nil {
glog.Errorf("could not convert balance to int64 sender=%v capability=%v err=%v", senderAddr.Hex(), jobCapsHdr, err.Error())
capBalInt = 0
} else {
// Remove the last three digits from capBalInt
capBalInt = capBalInt / 1000
}

//send response indicating compatible
w.WriteHeader(http.StatusOK)
jobToken = JobToken{
SenderAddress: jobSenderAddr,
TicketParams: ticketParams,
Balance: capBalInt,
Price: jobPrice,
ServiceAddr: orch.ServiceURI().String(),
AvailableCapacity: capacity,
Copy link
Collaborator

@eliteprox eliteprox Jan 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth noting that JobTokens generally can't be changed during the active stream, so this value will always reflect the capacity available when the orchestrator was selected for the job, is that correct?

Not sure if a code comment is needed or anything, just adding context here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, each JobToken is a point in time representation of information from the Orchestrator. They are requested and generated on demand for a stream start and refreshed when failing over to another Orchestrator to ensure fresh. The stream stop request gets a fresh token as well. Status and Update requests do not use the token since its just using a stream already running and paid for.

}

//send response indicating compatible
w.WriteHeader(http.StatusOK)

json.NewEncoder(w).Encode(jobToken)
})
}
Expand Down
38 changes: 29 additions & 9 deletions byoc/job_orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type mockJobOrchestrator struct {
registerExternalCapability func(string) (*core.ExternalCapability, error)
unregisterExternalCapability func(string) error
verifySignature func(ethcommon.Address, string, []byte) bool
checkExternalCapabilityCapacity func(string) bool
checkExternalCapabilityCapacity func(string) int64
reserveCapacity func(string) error
getUrlForCapability func(string) string
balance func(ethcommon.Address, core.ManifestID) *big.Rat
Expand Down Expand Up @@ -142,9 +142,9 @@ func (r *mockJobOrchestrator) RemoveExternalCapability(extCapability string) err

return nil
}
func (r *mockJobOrchestrator) CheckExternalCapabilityCapacity(extCap string) bool {
func (r *mockJobOrchestrator) CheckExternalCapabilityCapacity(extCap string) int64 {
if r.checkExternalCapabilityCapacity == nil {
return true
return 1
} else {
return r.checkExternalCapabilityCapacity(extCap)
}
Expand Down Expand Up @@ -486,18 +486,33 @@ func TestGetJobToken_NoCapacity(t *testing.T) {
mockVerifySig := func(addr ethcommon.Address, msg string, sig []byte) bool {
return true
}
mockCheckExternalCapabilityCapacity := func(extCap string) bool {
return false
mockCheckExternalCapabilityCapacity := func(extCap string) int64 {
return 0
}

mockReserveCapacity := func(cap string) error {
return errors.New("no capacity")
mockJobPriceInfo := func(addr ethcommon.Address, cap string) (*net.PriceInfo, error) {
return &net.PriceInfo{
PricePerUnit: 10,
PixelsPerUnit: 1,
}, nil
}

mockTicketParams := func(addr ethcommon.Address, price *net.PriceInfo) (*net.TicketParams, error) {
return &net.TicketParams{
Recipient: ethcommon.HexToAddress("0x1111111111111111111111111111111111111111").Bytes(),
FaceValue: big.NewInt(1000).Bytes(),
WinProb: big.NewInt(1).Bytes(),
RecipientRandHash: []byte("hash"),
Seed: big.NewInt(1234).Bytes(),
ExpirationBlock: big.NewInt(100000).Bytes(),
}, nil
}

mockJobOrch := newMockJobOrchestrator()
mockJobOrch.verifySignature = mockVerifySig
mockJobOrch.checkExternalCapabilityCapacity = mockCheckExternalCapabilityCapacity
mockJobOrch.reserveCapacity = mockReserveCapacity
mockJobOrch.jobPriceInfo = mockJobPriceInfo
mockJobOrch.ticketParams = mockTicketParams

bso := &BYOCOrchestratorServer{
node: mockJobLivepeerNode(),
Expand All @@ -523,7 +538,11 @@ func TestGetJobToken_NoCapacity(t *testing.T) {
handler.ServeHTTP(w, req)

resp := w.Result()
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
assert.Equal(t, http.StatusOK, resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
var jobToken JobToken
json.Unmarshal(body, &jobToken)
assert.Equal(t, int64(0), jobToken.AvailableCapacity)
}

func TestGetJobToken_JobPriceInfoError(t *testing.T) {
Expand Down Expand Up @@ -854,5 +873,6 @@ func createMockJobToken(hostUrl string) *JobToken {
PricePerUnit: 100,
PixelsPerUnit: 1,
},
AvailableCapacity: 1,
}
}
3 changes: 1 addition & 2 deletions byoc/stream_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (

var getNewTokenTimeout = 3 * time.Second

// StartStream handles the POST /stream/start endpoint for the Orchestrator
func (bso *BYOCOrchestratorServer) StartStream() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
orch := bso.orch
remoteAddr := getRemoteAddr(r)
ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr)

orchJob, err := bso.setupOrchJob(ctx, r, false)
orchJob, err := bso.setupOrchJob(ctx, r, true)
if err != nil {
code := http.StatusBadRequest
if err == errInsufficientBalance {
Expand Down
75 changes: 75 additions & 0 deletions byoc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"math/big"
"math/rand/v2"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -2042,3 +2043,77 @@ func TestStartStreamError_OtherScenarios(t *testing.T) {
})
})
}

func TestStartStreamHandler_ReserveCapacity(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var reserveCapacityCalled bool
node := mockJobLivepeerNode()

// Create mock orchestrator with ReserveExternalCapabilityCapacity expectation
mockOrch := newMockJobOrchestrator()
mockOrch.verifySignature = func(addr ethcommon.Address, msg string, sig []byte) bool {
return true
}
mockOrch.reserveCapacity = func(extCap string) error {
reserveCapacityCalled = true
return nil
}
mockOrch.freeCapacity = func(extCap string) error {
return nil
}
mockOrch.getUrlForCapability = func(capability string) string {
return "http://127.0.0.1:1234"
}
mockOrch.jobPriceInfo = func(sender ethcommon.Address, jobCapability string) (*net.PriceInfo, error) {
return &net.PriceInfo{PricePerUnit: 0, PixelsPerUnit: 1}, nil
}
mockOrch.balance = func(addr ethcommon.Address, manifestID core.ManifestID) *big.Rat {
return new(big.Rat).SetInt64(0)
}

// Set up an lphttp-based orchestrator test server with trickle endpoints
mux := http.NewServeMux()
bso := &BYOCOrchestratorServer{orch: mockOrch, httpMux: mux, node: node}

// Configure trickle server on the mux
bso.trickleSrv = trickle.ConfigureServer(trickle.TrickleServerConfig{
Mux: mux,
BasePath: "/ai/trickle/",
Autocreate: true,
})

// Setup Orch server stub
mux.HandleFunc("/process/token", orchTokenHandler)
mux.HandleFunc("/ai/stream/start", orchAIStreamStartNoUrlsHandler)

server := httptest.NewServer(mux)
defer server.Close()

node.OrchestratorPool = newStubOrchestratorPool(node, []string{server.URL})
drivers.NodeStorage = drivers.NewMemoryDriver(nil)

// Prepare a valid StartRequest body
startReq := StartRequest{
Stream: "teststream",
RtmpOutput: "rtmp://output",
StreamId: "streamid",
Params: "{}",
}
body, _ := json.Marshal(startReq)
req := httptest.NewRequest(http.MethodPost, "/ai/stream/start", bytes.NewBuffer(body))
req.Header.Set("Content-Type", "application/json")

req.Header.Set("Livepeer", base64TestJobRequest(10, true, true, true))

w := httptest.NewRecorder()

handler := bso.StartStream()
handler.ServeHTTP(w, req)

// Verify ReserveExternalCapabilityCapacity was called
assert.True(t, reserveCapacityCalled, "ReserveExternalCapabilityCapacity should have been called")

// no stream created
assert.Zero(t, len(mockOrch.node.ExternalCapabilities.Streams))
})
}
13 changes: 7 additions & 6 deletions byoc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Orchestrator interface {
JobPriceInfo(sender ethcommon.Address, capability string) (*net.PriceInfo, error)
TicketParams(sender ethcommon.Address, priceInfo *net.PriceInfo) (*net.TicketParams, error)
Balance(sender ethcommon.Address, manifestID core.ManifestID) *big.Rat
CheckExternalCapabilityCapacity(capability string) bool
CheckExternalCapabilityCapacity(capability string) int64
RemoveExternalCapability(extCapName string) error
RegisterExternalCapability(extCapSettings string) (*core.ExternalCapability, error)
FreeExternalCapabilityCapacity(capability string) error
Expand Down Expand Up @@ -132,11 +132,12 @@ type JobOrchestratorsFilter struct {
}

type JobToken struct {
SenderAddress *JobSender `json:"sender_address,omitempty"`
TicketParams *net.TicketParams `json:"ticket_params,omitempty"`
Balance int64 `json:"balance,omitempty"`
Price *net.PriceInfo `json:"price,omitempty"`
ServiceAddr string `json:"service_addr,omitempty"`
SenderAddress *JobSender `json:"sender_address,omitempty"`
TicketParams *net.TicketParams `json:"ticket_params,omitempty"`
Balance int64 `json:"balance,omitempty"`
Price *net.PriceInfo `json:"price,omitempty"`
ServiceAddr string `json:"service_addr,omitempty"`
AvailableCapacity int64 `json:"available_capacity,omitempty"`

LastNonce uint32
}
Expand Down
Loading
Loading