Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions cmd/ctrlc/root/run/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package exec

import (
"fmt"
"runtime"

"github.com/MakeNowJust/heredoc/v2"
"github.com/charmbracelet/log"
"github.com/ctrlplanedev/cli/internal/api"
"github.com/ctrlplanedev/cli/pkg/jobagent"
Expand All @@ -11,23 +13,41 @@ import (
)

func NewRunExecCmd() *cobra.Command {
return &cobra.Command{
var name string
var jobAgentType = "exec-bash"
if runtime.GOOS == "windows" {
jobAgentType = "exec-powershell"
}

cmd := &cobra.Command{
Use: "exec",
Short: "Execute commands directly when a job is received",
Example: heredoc.Doc(`
$ ctrlc run exec --name "my-script-agent" --workspace 123e4567-e89b-12d3-a456-426614174000
`),
RunE: func(cmd *cobra.Command, args []string) error {
apiURL := viper.GetString("url")
apiKey := viper.GetString("api-key")
workspaceId := viper.GetString("workspace")
client, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey)
if err != nil {
return fmt.Errorf("failed to create API client: %w", err)
}
if name == "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could move input validation above the client creation?

return fmt.Errorf("name is required")
}
if workspaceId == "" {
return fmt.Errorf("workspace is required")
}

ja, err := jobagent.NewJobAgent(
client,
api.UpsertJobAgentJSONRequestBody{
Name: "exec",
Type: "exec",
Name: name,
Type: jobAgentType,
WorkspaceId: workspaceId,
},
&ExecRunner{},
NewExecRunner(),
)
if err != nil {
return fmt.Errorf("failed to create job agent: %w", err)
Expand All @@ -41,4 +61,8 @@ func NewRunExecCmd() *cobra.Command {
return nil
},
}

cmd.Flags().StringVar(&name, "name", "", "Name of the job agent")
cmd.MarkFlagRequired("name")
return cmd
}
75 changes: 61 additions & 14 deletions cmd/ctrlc/root/run/exec/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,68 @@ import (
"os/exec"
"runtime"
"strconv"
"sync"
"syscall"

"github.com/charmbracelet/log"
"github.com/ctrlplanedev/cli/internal/api"
"github.com/ctrlplanedev/cli/pkg/jobagent"
)

var _ jobagent.Runner = &ExecRunner{}

type ExecRunner struct{}
type ExecRunner struct {
mu sync.Mutex
finished map[int]error
}

func NewExecRunner() *ExecRunner {
return &ExecRunner{
finished: make(map[int]error),
}
}

type ExecConfig struct {
WorkingDir string `json:"workingDir,omitempty"`
Script string `json:"script"`
}

func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the pid is missing is that considered sucessful? what if the pid is now a different process?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not clear how to keep track of these processes, maybe we assum they are not deamon so if the command closes so does the children. This means we won't need this status check and instead create a goroutine that updates the job when it exists

externalId, err := strconv.Atoi(*job.ExternalId)
if job.ExternalId == nil {
return api.JobStatusExternalRunNotFound, fmt.Sprintf("external ID is nil: %v", job.ExternalId)
}

pid, err := strconv.Atoi(*job.ExternalId)
if err != nil {
return api.JobStatusExternalRunNotFound, fmt.Sprintf("invalid process id: %v", err)
}

process, err := os.FindProcess(externalId)
// Check if we've recorded a finished result for this process
r.mu.Lock()
finishedErr, exists := r.finished[pid]
r.mu.Unlock()
if exists {
if finishedErr != nil {
return api.JobStatusFailure, fmt.Sprintf("process exited with error: %v", finishedErr)
}
return api.JobStatusSuccessful, "process completed successfully"
}

// If not finished yet, try to check if the process is still running.
process, err := os.FindProcess(pid)
if err != nil {
return api.JobStatusExternalRunNotFound, fmt.Sprintf("failed to find process: %v", err)
}

// On Unix systems, FindProcess always succeeds, so we need to send signal 0
// to check if process exists
// On Unix, Signal 0 will error if the process is not running.
err = process.Signal(syscall.Signal(0))
if err != nil {
return api.JobStatusSuccessful, fmt.Sprintf("process not running: %v", err)
// Process is not running but we haven't recorded its result.
return api.JobStatusFailure, fmt.Sprintf("process not running: %v", err)
}

return api.JobStatusInProgress, fmt.Sprintf("process running with pid %d", externalId)
return api.JobStatusInProgress, fmt.Sprintf("process running with pid %d", pid)
}

func (r *ExecRunner) Start(job api.Job) (string, error) {
func (r *ExecRunner) Start(job api.Job, jobDetails map[string]interface{}) (string, error) {
// Create temp script file
ext := ".sh"
if runtime.GOOS == "windows" {
Expand All @@ -56,7 +81,6 @@ func (r *ExecRunner) Start(job api.Job) (string, error) {
if err != nil {
return "", fmt.Errorf("failed to create temp script file: %w", err)
}
defer os.Remove(tmpFile.Name())

config := ExecConfig{}
jsonBytes, err := json.Marshal(job.JobAgentConfig)
Expand All @@ -73,7 +97,7 @@ func (r *ExecRunner) Start(job api.Job) (string, error) {
}

buf := new(bytes.Buffer)
if err := templatedScript.Execute(buf, job); err != nil {
if err := templatedScript.Execute(buf, jobDetails); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential code injection in the script.
Passing jobDetails directly into the template and executing as a script can result in code injection if jobDetails includes malicious input. Consider sanitizing or validating fields in jobDetails and config.Script to reduce security risks.

return "", fmt.Errorf("failed to execute script template: %w", err)
}
script := buf.String()
Expand Down Expand Up @@ -104,9 +128,32 @@ func (r *ExecRunner) Start(job api.Job) (string, error) {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

if err := cmd.Run(); err != nil {
return "", fmt.Errorf("failed to execute script: %w", err)
if err := cmd.Start(); err != nil {
os.Remove(tmpFile.Name())
return "", fmt.Errorf("failed to start process: %w", err)
}

pid := cmd.Process.Pid

// Launch a goroutine to wait for process completion and store the result.
go func(pid int, scriptPath string) {
err := cmd.Wait()
// Ensure the map is not nil; if there's any chance ExecRunner is used as a zero-value, initialize it.
r.mu.Lock()
if r.finished == nil {
r.finished = make(map[int]error)
}
r.finished[pid] = err
r.mu.Unlock()

if err != nil {
log.Error("Process execution failed", "pid", pid, "error", err)
} else {
log.Info("Process execution succeeded", "pid", pid)
}

os.Remove(scriptPath)
}(pid, tmpFile.Name())

return strconv.Itoa(cmd.Process.Pid), nil
}
35 changes: 35 additions & 0 deletions pkg/jobagent/job_details.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package jobagent

import (
"context"
"encoding/json"
"fmt"

"github.com/ctrlplanedev/cli/internal/api"
"github.com/spf13/viper"
)

func fetchJobDetails(ctx context.Context, jobID string) (map[string]interface{}, error) {
client, err := api.NewAPIKeyClientWithResponses(viper.GetString("url"), viper.GetString("api-key"))
if err != nil {
return nil, fmt.Errorf("failed to create API client for job details: %w", err)
}

resp, err := client.GetJobWithResponse(ctx, jobID)
if err != nil {
return nil, fmt.Errorf("failed to get job details: %w", err)
}
if resp.JSON200 == nil {
return nil, fmt.Errorf("received empty response from job details API")
}

var details map[string]interface{}
detailsBytes, err := json.Marshal(resp.JSON200)
if err != nil {
return nil, fmt.Errorf("failed to marshal job response: %w", err)
}
if err := json.Unmarshal(detailsBytes, &details); err != nil {
return nil, fmt.Errorf("failed to unmarshal job details: %w", err)
}
return details, nil
}
13 changes: 9 additions & 4 deletions pkg/jobagent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type Runner interface {
Start(job api.Job) (string, error)
Start(job api.Job, jobDetails map[string]interface{}) (string, error)
Status(job api.Job) (api.JobStatus, string)
}

Expand All @@ -29,10 +29,10 @@ func NewJobAgent(
}

ja := &JobAgent{
client: client,

client: client,
id: agent.JSON200.Id,
workspaceId: config.WorkspaceId,
runner: runner,
}

return ja, nil
Expand Down Expand Up @@ -66,9 +66,14 @@ func (a *JobAgent) RunQueuedJobs() error {
var wg sync.WaitGroup
for _, job := range *jobs.JSON200.Jobs {
wg.Add(1)
jobDetails, err := fetchJobDetails(context.Background(), job.Id.String())
if err != nil {
log.Error("Failed to fetch job details", "error", err, "jobId", job.Id.String())
continue
}
go func(job api.Job) {
defer wg.Done()
externalId, err := a.runner.Start(job)
externalId, err := a.runner.Start(job, jobDetails)
if err != nil {
status := api.JobStatusInProgress
message := fmt.Sprintf("Failed to start job: %s", err.Error())
Expand Down