From e57cef55b731d9bf7dfefc866232ff648cbfa3dc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Sep 2025 14:45:28 +0000 Subject: [PATCH 1/2] Initial plan From 2034eb84c8d0e0a264c2aac3274782fee25e7fd1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Sep 2025 14:53:18 +0000 Subject: [PATCH 2/2] Implement configurable notifications with Slack webhook support Co-authored-by: wschenk <12232+wschenk@users.noreply.github.com> --- README.md | 73 ++++++++++++++++++++- main.go | 174 +++++++++++++++++++++++++++++++++++++++++++-------- main_test.go | 103 ++++++++++++++++++++++++++++++ 3 files changed, 322 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 1baba25..95e0749 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,9 @@ This is a simple, file-backed job queue server written in Go. Each job runs as a - Jobs are executed as child processes - Each job logs `stderr` and stores `stdout` as the final result - Persistent metadata and logs saved to the filesystem -- Webhook support to notify external services on job completion +- **Configurable notifications** with support for: + - Traditional webhooks on job completion + - Slack webhooks with rich formatting for job start and/or completion events - REST API for job submission, status tracking, result fetching, and cancellation - **Static file serving** from `public/` directory at the root path @@ -79,6 +81,7 @@ Serving static files from: /path/to/public ### 3. Submit a Job +**Basic job submission:** ```bash curl -X POST http://localhost:8080/jobs \ -H 'Content-Type: application/json' \ @@ -89,6 +92,27 @@ curl -X POST http://localhost:8080/jobs \ }' ``` +**With Slack notifications:** +```bash +curl -X POST http://localhost:8080/jobs \ + -H 'Content-Type: application/json' \ + -d '{ + "args": ["python", "my_script.py"], + "mime_type": "text/plain", + "slack_webhook": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK", + "notify_on_start": true, + "notify_on_finish": true + }' +``` + +**Job submission parameters:** +- `args` (required): Array of command and arguments to execute +- `mime_type` (optional): MIME type for the result content +- `webhook` (optional): URL for traditional webhook notifications (triggered on job completion) +- `slack_webhook` (optional): Slack webhook URL for rich notifications +- `notify_on_start` (optional): Send Slack notification when job starts (default: false) +- `notify_on_finish` (optional): Send Slack notification when job completes (default: false) + ### 4. Check Status ```bash @@ -134,6 +158,9 @@ curl -X PUT http://localhost:8080/jobs//cancel "args": ["echo", "Hello, world!"], "mime_type": "text/plain", "webhook": "https://webhook.site/your-id", + "slack_webhook": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK", + "notify_on_start": true, + "notify_on_finish": true, "status": "IN_QUEUE", "enqueued_at": "2024-01-01T12:00:00Z", "status_url": "/jobs/550e8400-e29b-41d4-a716-446655440000/status", @@ -144,6 +171,50 @@ curl -X PUT http://localhost:8080/jobs//cancel --- +## 🔔 Notifications + +The job queue supports two types of webhook notifications: + +### Traditional Webhooks +- Triggered only when jobs complete (success, failure, or cancellation) +- Simple JSON payload with job metadata +- Backward compatible with existing integrations + +**Payload format:** +```json +{ + "id": "job-id", + "status": "COMPLETED|FAILED|CANCELED", + "result_url": "/jobs/job-id/result", + "log_url": "/jobs/job-id/log" +} +``` + +### Slack Webhooks +- Rich formatting with job details and status colors +- Configurable for job start and/or completion events +- Includes clickable links to logs and results + +**Slack notification features:** +- **Start notifications**: Alert when a job begins execution +- **Completion notifications**: Alert when a job finishes with status-specific formatting +- **Rich formatting**: Command details, timing information, and direct links +- **Status colors**: Green for success, red for failure, blue for cancellation + +**Example Slack notification setup:** +```json +{ + "args": ["python", "data_processor.py"], + "slack_webhook": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK", + "notify_on_start": true, + "notify_on_finish": true +} +``` + +Both notification types can be used simultaneously for the same job. + +--- + ## 🐳 Docker ### Build diff --git a/main.go b/main.go index 5e3c186..4de811c 100644 --- a/main.go +++ b/main.go @@ -20,18 +20,21 @@ import ( ) type JobMeta struct { - ID string `json:"id"` - Args []string `json:"args"` - MimeType string `json:"mime_type,omitempty"` - Webhook string `json:"webhook,omitempty"` - Status string `json:"status"` - PID int `json:"pid,omitempty"` - EnqueuedAt time.Time `json:"enqueued_at"` - StartedAt time.Time `json:"started_at,omitempty"` - CompletedAt time.Time `json:"completed_at,omitempty"` - StatusURL string `json:"status_url,omitempty"` - ResultURL string `json:"result_url,omitempty"` - LogURL string `json:"log_url,omitempty"` + ID string `json:"id"` + Args []string `json:"args"` + MimeType string `json:"mime_type,omitempty"` + Webhook string `json:"webhook,omitempty"` + SlackWebhook string `json:"slack_webhook,omitempty"` + NotifyOnStart bool `json:"notify_on_start,omitempty"` + NotifyOnFinish bool `json:"notify_on_finish,omitempty"` + Status string `json:"status"` + PID int `json:"pid,omitempty"` + EnqueuedAt time.Time `json:"enqueued_at"` + StartedAt time.Time `json:"started_at,omitempty"` + CompletedAt time.Time `json:"completed_at,omitempty"` + StatusURL string `json:"status_url,omitempty"` + ResultURL string `json:"result_url,omitempty"` + LogURL string `json:"log_url,omitempty"` } type queuedJob struct { @@ -165,9 +168,12 @@ func jobsHandler(w http.ResponseWriter, r *http.Request, fixedArgs []string) { func submitJob(w http.ResponseWriter, r *http.Request, fixedArgs []string) { var req struct { - Args []string `json:"args"` - MimeType string `json:"mime_type,omitempty"` - Webhook string `json:"webhook,omitempty"` + Args []string `json:"args"` + MimeType string `json:"mime_type,omitempty"` + Webhook string `json:"webhook,omitempty"` + SlackWebhook string `json:"slack_webhook,omitempty"` + NotifyOnStart bool `json:"notify_on_start,omitempty"` + NotifyOnFinish bool `json:"notify_on_finish,omitempty"` } dec := json.NewDecoder(r.Body) if err := dec.Decode(&req); err != nil { @@ -197,12 +203,15 @@ func submitJob(w http.ResponseWriter, r *http.Request, fixedArgs []string) { } meta := &JobMeta{ - ID: id, - Args: args, - MimeType: req.MimeType, - Webhook: req.Webhook, - Status: "IN_QUEUE", - EnqueuedAt: time.Now(), + ID: id, + Args: args, + MimeType: req.MimeType, + Webhook: req.Webhook, + SlackWebhook: req.SlackWebhook, + NotifyOnStart: req.NotifyOnStart, + NotifyOnFinish: req.NotifyOnFinish, + Status: "IN_QUEUE", + EnqueuedAt: time.Now(), } baseURL := os.Getenv("BASE_URL") statusPath := "/jobs/" + id + "/status" @@ -325,6 +334,9 @@ func runJob(meta *JobMeta, inputFilePath string) { runningJobs[meta.ID] = &RunningJob{Cmd: cmd, Meta: meta, Cancel: cancel} mu.Unlock() + // Send start notification + sendNotifications(meta, "start") + err := cmd.Wait() meta.CompletedAt = time.Now() @@ -353,12 +365,8 @@ func runJob(meta *JobMeta, inputFilePath string) { fmt.Fprintf(os.Stderr, "[DEBUG] Job finished: id=%s status=%s\n", meta.ID, meta.Status) } - if meta.Webhook != "" { - if os.Getenv("DEBUG") == "1" { - fmt.Fprintf(os.Stderr, "[DEBUG] Triggering webhook: url=%s id=%s status=%s\n", meta.Webhook, meta.ID, meta.Status) - } - go sendWebhook(meta) - } + // Send finish notifications + sendNotifications(meta, "finish") } func saveMeta(meta *JobMeta) { @@ -383,11 +391,123 @@ func sendWebhook(meta *JobMeta) { "id": meta.ID, "status": meta.Status, "result_url": "/jobs/" + meta.ID + "/result", + "log_url": "/jobs/" + meta.ID + "/log", } data, _ := json.Marshal(payload) http.Post(meta.Webhook, "application/json", bytes.NewReader(data)) } +func sendSlackNotification(meta *JobMeta, eventType string) { + if meta.SlackWebhook == "" { + return + } + + var color, title, text string + var fields []map[string]interface{} + + baseURL := os.Getenv("BASE_URL") + logURL := "/jobs/" + meta.ID + "/log" + resultURL := "/jobs/" + meta.ID + "/result" + statusURL := "/jobs/" + meta.ID + "/status" + + if baseURL != "" { + logURL = baseURL + logURL + resultURL = baseURL + resultURL + statusURL = baseURL + statusURL + } + + switch eventType { + case "start": + color = "warning" + title = "Job Started" + text = fmt.Sprintf("Job `%s` has started execution", meta.ID) + fields = []map[string]interface{}{ + {"title": "Command", "value": fmt.Sprintf("`%s`", strings.Join(meta.Args, " ")), "short": false}, + {"title": "Started At", "value": meta.StartedAt.Format(time.RFC3339), "short": true}, + {"title": "PID", "value": fmt.Sprintf("%d", meta.PID), "short": true}, + } + case "finish": + switch meta.Status { + case "COMPLETED": + color = "good" + title = "Job Completed Successfully" + case "FAILED": + color = "danger" + title = "Job Failed" + case "CANCELED": + color = "#439FE0" + title = "Job Canceled" + default: + color = "warning" + title = "Job Finished" + } + + duration := "" + if !meta.StartedAt.IsZero() && !meta.CompletedAt.IsZero() { + duration = meta.CompletedAt.Sub(meta.StartedAt).String() + } + + text = fmt.Sprintf("Job `%s` finished with status: *%s*", meta.ID, meta.Status) + fields = []map[string]interface{}{ + {"title": "Command", "value": fmt.Sprintf("`%s`", strings.Join(meta.Args, " ")), "short": false}, + {"title": "Duration", "value": duration, "short": true}, + {"title": "Completed At", "value": meta.CompletedAt.Format(time.RFC3339), "short": true}, + } + } + + // Add links + fields = append(fields, + map[string]interface{}{"title": "Links", "value": fmt.Sprintf("<%s|View Logs> | <%s|View Status>", logURL, statusURL), "short": false}, + ) + + if meta.Status == "COMPLETED" { + lastField := fields[len(fields)-1] + lastField["value"] = fmt.Sprintf("<%s|View Result> | <%s|View Logs> | <%s|View Status>", resultURL, logURL, statusURL) + } + + slackPayload := map[string]interface{}{ + "attachments": []map[string]interface{}{ + { + "color": color, + "title": title, + "text": text, + "fields": fields, + "footer": "Shell Job Queue", + "ts": time.Now().Unix(), + "mrkdwn_in": []string{"text", "fields"}, + }, + }, + } + + data, _ := json.Marshal(slackPayload) + http.Post(meta.SlackWebhook, "application/json", bytes.NewReader(data)) +} + +func sendNotifications(meta *JobMeta, eventType string) { + // Send traditional webhook if configured + if meta.Webhook != "" && eventType == "finish" { + if os.Getenv("DEBUG") == "1" { + fmt.Fprintf(os.Stderr, "[DEBUG] Triggering webhook: url=%s id=%s status=%s\n", meta.Webhook, meta.ID, meta.Status) + } + go sendWebhook(meta) + } + + // Send Slack notification if configured + shouldNotify := false + if eventType == "start" && meta.NotifyOnStart { + shouldNotify = true + } else if eventType == "finish" && meta.NotifyOnFinish { + shouldNotify = true + } + + if shouldNotify && meta.SlackWebhook != "" { + if os.Getenv("DEBUG") == "1" { + fmt.Fprintf(os.Stderr, "[DEBUG] Triggering Slack notification: url=%s id=%s event=%s\n", meta.SlackWebhook, meta.ID, eventType) + } + go sendSlackNotification(meta, eventType) + } +} + func listJobs(w http.ResponseWriter, r *http.Request) { // Check for orphaned jobs before listing checkForOrphanedJobs() diff --git a/main_test.go b/main_test.go index 39a3709..6e1574d 100644 --- a/main_test.go +++ b/main_test.go @@ -1,9 +1,12 @@ package main import ( + "encoding/json" "os" + "strings" "syscall" "testing" + "time" ) func TestIsProcessRunning(t *testing.T) { @@ -38,3 +41,103 @@ func TestProcessSignalHandling(t *testing.T) { t.Errorf("Failed to send signal 0 to current process: %v", err) } } + +func TestJobMetaNotificationFields(t *testing.T) { + // Test that the JobMeta struct correctly handles notification fields + meta := &JobMeta{ + ID: "test-job-1", + Args: []string{"echo", "hello"}, + SlackWebhook: "https://hooks.slack.com/test", + NotifyOnStart: true, + NotifyOnFinish: true, + Status: "IN_QUEUE", + EnqueuedAt: time.Now(), + } + + // Test JSON marshaling/unmarshaling to ensure fields are preserved + data, err := json.Marshal(meta) + if err != nil { + t.Fatalf("Failed to marshal JobMeta: %v", err) + } + + var unmarshaledMeta JobMeta + err = json.Unmarshal(data, &unmarshaledMeta) + if err != nil { + t.Fatalf("Failed to unmarshal JobMeta: %v", err) + } + + if unmarshaledMeta.SlackWebhook != meta.SlackWebhook { + t.Errorf("SlackWebhook not preserved: got %s, want %s", unmarshaledMeta.SlackWebhook, meta.SlackWebhook) + } + if unmarshaledMeta.NotifyOnStart != meta.NotifyOnStart { + t.Errorf("NotifyOnStart not preserved: got %v, want %v", unmarshaledMeta.NotifyOnStart, meta.NotifyOnStart) + } + if unmarshaledMeta.NotifyOnFinish != meta.NotifyOnFinish { + t.Errorf("NotifyOnFinish not preserved: got %v, want %v", unmarshaledMeta.NotifyOnFinish, meta.NotifyOnFinish) + } +} + +func TestSlackNotificationPayload(t *testing.T) { + // Create a test job meta + meta := &JobMeta{ + ID: "test-job-123", + Args: []string{"echo", "test message"}, + Status: "COMPLETED", + StartedAt: time.Now().Add(-5 * time.Minute), + CompletedAt: time.Now(), + PID: 12345, + } + + // Test that sendSlackNotification would be called correctly + // (We can't easily test the actual HTTP call without a mock server) + + // Test notification logic for different event types + testCases := []struct { + eventType string + notifyOnStart bool + notifyOnFinish bool + slackWebhook string + shouldNotify bool + }{ + {"start", true, false, "https://hooks.slack.com/test", true}, + {"start", false, true, "https://hooks.slack.com/test", false}, + {"finish", false, true, "https://hooks.slack.com/test", true}, + {"finish", true, false, "https://hooks.slack.com/test", false}, + {"start", true, false, "", false}, // no webhook + } + + for _, tc := range testCases { + meta.NotifyOnStart = tc.notifyOnStart + meta.NotifyOnFinish = tc.notifyOnFinish + meta.SlackWebhook = tc.slackWebhook + + shouldNotify := false + if tc.eventType == "start" && meta.NotifyOnStart { + shouldNotify = true + } else if tc.eventType == "finish" && meta.NotifyOnFinish { + shouldNotify = true + } + shouldNotify = shouldNotify && meta.SlackWebhook != "" + + if shouldNotify != tc.shouldNotify { + t.Errorf("For event %s with notifyOnStart=%v, notifyOnFinish=%v, webhook=%s: got shouldNotify=%v, want %v", + tc.eventType, tc.notifyOnStart, tc.notifyOnFinish, tc.slackWebhook, shouldNotify, tc.shouldNotify) + } + } +} + +func TestCommandArgumentHandling(t *testing.T) { + // Test that command arguments are properly formatted in notifications + testArgs := [][]string{ + {"echo", "hello world"}, + {"ls", "-la", "/tmp"}, + {"python3", "-c", "print('test')"}, + } + + for _, args := range testArgs { + joined := strings.Join(args, " ") + if !strings.Contains(joined, args[0]) { + t.Errorf("Command formatting failed for args: %v", args) + } + } +}