Skip to content

feat(go/genkit): Background Model implementation along with Veo integration #3262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/ai/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ type ModelSupports struct {
SystemRole bool `json:"systemRole,omitempty"`
ToolChoice bool `json:"toolChoice,omitempty"`
Tools bool `json:"tools,omitempty"`
LongRunning bool `json:"longRunning,omitempty"`
}

type ConstrainedSupport string
Expand Down
166 changes: 166 additions & 0 deletions go/ai/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"slices"
"strings"
"time"

"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/core/logger"
Expand Down Expand Up @@ -92,6 +93,12 @@ type (
toolResponse *Part
interrupt *Part
}

StartModelFunc = core.BackgroundStartFunc[*ModelRequest, *ModelResponse] // Function to start a model background operation
CheckModelFunc = core.BackgroundCheckFunc[*ModelResponse] // Function to check model background operation status
CancelModelFunc = core.BackgroundCancelFunc[*ModelResponse] // Function to cancel model background operation

BackgroundAction = core.BackgroundAction[*ModelRequest, *ModelResponse] // Background action for model operations
)

// DefineGenerateAction defines a utility generate action.
Expand Down Expand Up @@ -133,6 +140,7 @@ func DefineModel(r *registry.Registry, provider, name string, info *ModelInfo, f
"tools": info.Supports.Tools,
"toolChoice": info.Supports.ToolChoice,
"constrained": info.Supports.Constrained,
"longRunning": info.Supports.LongRunning,
},
"versions": info.Versions,
"stage": info.Stage,
Expand Down Expand Up @@ -171,6 +179,12 @@ func LookupModel(r *registry.Registry, provider, name string) Model {
return (*model)(action)
}

// LookupBackgroundModel looks up a BackgroundAction registered by [DefineBackgroundModel].
// It returns nil if the background model was not found.
func LookupBackgroundModel(r *registry.Registry, provider, name string) BackgroundAction {
return core.LookupBackgroundAction[*ModelRequest, *ModelResponse](r, provider, name)
}

// LookupModelByName looks up a [Model] registered by [DefineModel].
// It will try to resolve the model dynamically if the model is not found.
// It returns an error if the model was not resolved.
Expand Down Expand Up @@ -1053,3 +1067,155 @@ func handleResumeOption(ctx context.Context, r *registry.Registry, genOpts *Gene
toolMessage: toolMessage,
}, nil
}

// DefineBackgroundModel defines a new model that runs in the background
func DefineBackgroundModel(
r *registry.Registry,
provider, name string,
options *core.BackgroundModelOptions,
start StartModelFunc,
check CheckModelFunc,
cancel CancelModelFunc,
) BackgroundAction {
label := options.Label
if label == "" {
label = name
}

// Prepare metadata
metadata := map[string]any{
"model": map[string]any{
"label": label,
"versions": options.Versions,
"supports": options.Supports,
},
}

if options.ConfigSchema != nil {
metadata["model"].(map[string]any)["customOptions"] = options.ConfigSchema
}
startFunc := func(ctx context.Context, request *ModelRequest) (*core.Operation, error) {
startTime := time.Now()

// Call the user's start function
operation, err := start(ctx, request)
if err != nil {
return nil, err
}

// Add latency metadata
if operation.Metadata == nil {
operation.Metadata = make(map[string]any)
}
operation.Metadata["latencyMs"] = float64(time.Since(startTime).Nanoseconds()) / 1e6

return operation, nil
}

bgAction := core.DefineBackgroundAction[*ModelRequest, *ModelResponse](r, provider, name, *options, metadata, startFunc,
func(ctx context.Context, operation *core.Operation) (*core.Operation, error) {
return check(ctx, operation)
},
func(ctx context.Context, operation *core.Operation) (*core.Operation, error) {
if cancel == nil {
return nil, core.NewError(core.UNIMPLEMENTED, "cancel not implemented")
}
return cancel(ctx, operation)
},
)

return bgAction
}

// GenerateOperation generates a model response as a long-running operation based on the provided options.
// It returns an error if the model does not support long-running operations.
//
// This is a beta feature and requires the model to explicitly support long-running operations.
func GenerateOperation(ctx context.Context, r *registry.Registry, opts ...GenerateOption) (*core.Operation, error) {
genOpts := &generateOptions{}
for _, opt := range opts {
if err := opt.applyGenerate(genOpts); err != nil {
return nil, core.NewError(core.INVALID_ARGUMENT, "ai.Generate: error applying options: %v", err)
}
}

var modelName string
if genOpts.Model != nil {
modelName = genOpts.Model.Name()
} else {
modelName = genOpts.ModelName
}

provider, name, _ := strings.Cut(modelName, "/")
backAction := LookupBackgroundModel(r, provider, name)
if backAction == nil {
return nil, core.NewError(core.NOT_FOUND, "background model %q not found", modelName)
}

var messages []*Message
if genOpts.SystemFn != nil {
system, err := genOpts.SystemFn(ctx, nil)
if err != nil {
return nil, err
}

messages = append(messages, NewSystemTextMessage(system))
}
if genOpts.MessagesFn != nil {
msgs, err := genOpts.MessagesFn(ctx, nil)
if err != nil {
return nil, err
}

messages = append(messages, msgs...)
}
if genOpts.PromptFn != nil {
prompt, err := genOpts.PromptFn(ctx, nil)
if err != nil {
return nil, err
}

messages = append(messages, NewUserTextMessage(prompt))
}

if modelRef, ok := genOpts.Model.(ModelRef); ok && genOpts.Config == nil {
genOpts.Config = modelRef.Config()
}

startOps, err := backAction.Start(ctx, &ModelRequest{Messages: messages, Config: genOpts.Config})
if err != nil {
return nil, err
}

currentOp := startOps
for {
// Check if operation completed with error
if currentOp.Error != "" {
return nil, core.NewError(core.INTERNAL, "operation failed: %s", currentOp.Error)
}

// Check if operation is complete
if currentOp.Done {
break
}

// Wait before polling again (avoid busy waiting)
select {
case <-ctx.Done():
logger.FromContext(ctx).Debug("Context cancelled, stopping operation polling", "operationId", currentOp.ID)
return nil, ctx.Err()
case <-time.After(1 * time.Second): // Poll every second
}

// Check operation status
updatedOp, err := backAction.Check(ctx, currentOp)
if err != nil {
return nil, fmt.Errorf("failed to check operation status: %w", err)
}

currentOp = updatedOp
}

// Operation completed, return the final result
return currentOp, nil
}
3 changes: 3 additions & 0 deletions go/core/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ const (
ActionTypeTool ActionType = "tool"
ActionTypeUtil ActionType = "util"
ActionTypeCustom ActionType = "custom"
ActionTypeBackgroundModel ActionType = "background-model"
ActionTypeBackgroundCheck ActionType = "check-operation"
ActionTypeBackgroundCancel ActionType = "cancel-operation"
)

// An ActionDef is a named, observable operation that underlies all Genkit primitives.
Expand Down
Loading
Loading