Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ This is a simple, file-backed job queue server written in Go. Each job runs as a
- Jobs are executed as child processes
- Each job logs `stderr` and stores `stdout` as the final result
- Persistent metadata and logs saved to the filesystem
- Webhook support to notify external services on job completion
- **Configurable notifications** with support for:
- Traditional webhooks on job completion
- Slack webhooks with rich formatting for job start and/or completion events
- REST API for job submission, status tracking, result fetching, and cancellation
- **Static file serving** from `public/` directory at the root path

Expand Down Expand Up @@ -79,6 +81,7 @@ Serving static files from: /path/to/public

### 3. Submit a Job

**Basic job submission:**
```bash
curl -X POST http://localhost:8080/jobs \
-H 'Content-Type: application/json' \
Expand All @@ -89,6 +92,27 @@ curl -X POST http://localhost:8080/jobs \
}'
```

**With Slack notifications:**
```bash
curl -X POST http://localhost:8080/jobs \
-H 'Content-Type: application/json' \
-d '{
"args": ["python", "my_script.py"],
"mime_type": "text/plain",
"slack_webhook": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"notify_on_start": true,
"notify_on_finish": true
}'
```

**Job submission parameters:**
- `args` (required): Array of command and arguments to execute
- `mime_type` (optional): MIME type for the result content
- `webhook` (optional): URL for traditional webhook notifications (triggered on job completion)
- `slack_webhook` (optional): Slack webhook URL for rich notifications
- `notify_on_start` (optional): Send Slack notification when job starts (default: false)
- `notify_on_finish` (optional): Send Slack notification when job completes (default: false)

### 4. Check Status

```bash
Expand Down Expand Up @@ -134,6 +158,9 @@ curl -X PUT http://localhost:8080/jobs/<job-id>/cancel
"args": ["echo", "Hello, world!"],
"mime_type": "text/plain",
"webhook": "https://webhook.site/your-id",
"slack_webhook": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"notify_on_start": true,
"notify_on_finish": true,
"status": "IN_QUEUE",
"enqueued_at": "2024-01-01T12:00:00Z",
"status_url": "/jobs/550e8400-e29b-41d4-a716-446655440000/status",
Expand All @@ -144,6 +171,50 @@ curl -X PUT http://localhost:8080/jobs/<job-id>/cancel

---

## 🔔 Notifications

The job queue supports two types of webhook notifications:

### Traditional Webhooks
- Triggered only when jobs complete (success, failure, or cancellation)
- Simple JSON payload with job metadata
- Backward compatible with existing integrations

**Payload format:**
```json
{
"id": "job-id",
"status": "COMPLETED|FAILED|CANCELED",
"result_url": "/jobs/job-id/result",
"log_url": "/jobs/job-id/log"
}
```

### Slack Webhooks
- Rich formatting with job details and status colors
- Configurable for job start and/or completion events
- Includes clickable links to logs and results

**Slack notification features:**
- **Start notifications**: Alert when a job begins execution
- **Completion notifications**: Alert when a job finishes with status-specific formatting
- **Rich formatting**: Command details, timing information, and direct links
- **Status colors**: Green for success, red for failure, blue for cancellation

**Example Slack notification setup:**
```json
{
"args": ["python", "data_processor.py"],
"slack_webhook": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"notify_on_start": true,
"notify_on_finish": true
}
```

Both notification types can be used simultaneously for the same job.

---

## 🐳 Docker

### Build
Expand Down
174 changes: 147 additions & 27 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ import (
)

type JobMeta struct {
ID string `json:"id"`
Args []string `json:"args"`
MimeType string `json:"mime_type,omitempty"`
Webhook string `json:"webhook,omitempty"`
Status string `json:"status"`
PID int `json:"pid,omitempty"`
EnqueuedAt time.Time `json:"enqueued_at"`
StartedAt time.Time `json:"started_at,omitempty"`
CompletedAt time.Time `json:"completed_at,omitempty"`
StatusURL string `json:"status_url,omitempty"`
ResultURL string `json:"result_url,omitempty"`
LogURL string `json:"log_url,omitempty"`
ID string `json:"id"`
Args []string `json:"args"`
MimeType string `json:"mime_type,omitempty"`
Webhook string `json:"webhook,omitempty"`
SlackWebhook string `json:"slack_webhook,omitempty"`
NotifyOnStart bool `json:"notify_on_start,omitempty"`
NotifyOnFinish bool `json:"notify_on_finish,omitempty"`
Status string `json:"status"`
PID int `json:"pid,omitempty"`
EnqueuedAt time.Time `json:"enqueued_at"`
StartedAt time.Time `json:"started_at,omitempty"`
CompletedAt time.Time `json:"completed_at,omitempty"`
StatusURL string `json:"status_url,omitempty"`
ResultURL string `json:"result_url,omitempty"`
LogURL string `json:"log_url,omitempty"`
}

type queuedJob struct {
Expand Down Expand Up @@ -165,9 +168,12 @@ func jobsHandler(w http.ResponseWriter, r *http.Request, fixedArgs []string) {

func submitJob(w http.ResponseWriter, r *http.Request, fixedArgs []string) {
var req struct {
Args []string `json:"args"`
MimeType string `json:"mime_type,omitempty"`
Webhook string `json:"webhook,omitempty"`
Args []string `json:"args"`
MimeType string `json:"mime_type,omitempty"`
Webhook string `json:"webhook,omitempty"`
SlackWebhook string `json:"slack_webhook,omitempty"`
NotifyOnStart bool `json:"notify_on_start,omitempty"`
NotifyOnFinish bool `json:"notify_on_finish,omitempty"`
}
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&req); err != nil {
Expand Down Expand Up @@ -197,12 +203,15 @@ func submitJob(w http.ResponseWriter, r *http.Request, fixedArgs []string) {
}

meta := &JobMeta{
ID: id,
Args: args,
MimeType: req.MimeType,
Webhook: req.Webhook,
Status: "IN_QUEUE",
EnqueuedAt: time.Now(),
ID: id,
Args: args,
MimeType: req.MimeType,
Webhook: req.Webhook,
SlackWebhook: req.SlackWebhook,
NotifyOnStart: req.NotifyOnStart,
NotifyOnFinish: req.NotifyOnFinish,
Status: "IN_QUEUE",
EnqueuedAt: time.Now(),
}
baseURL := os.Getenv("BASE_URL")
statusPath := "/jobs/" + id + "/status"
Expand Down Expand Up @@ -325,6 +334,9 @@ func runJob(meta *JobMeta, inputFilePath string) {
runningJobs[meta.ID] = &RunningJob{Cmd: cmd, Meta: meta, Cancel: cancel}
mu.Unlock()

// Send start notification
sendNotifications(meta, "start")

err := cmd.Wait()
meta.CompletedAt = time.Now()

Expand Down Expand Up @@ -353,12 +365,8 @@ func runJob(meta *JobMeta, inputFilePath string) {
fmt.Fprintf(os.Stderr, "[DEBUG] Job finished: id=%s status=%s\n", meta.ID, meta.Status)
}

if meta.Webhook != "" {
if os.Getenv("DEBUG") == "1" {
fmt.Fprintf(os.Stderr, "[DEBUG] Triggering webhook: url=%s id=%s status=%s\n", meta.Webhook, meta.ID, meta.Status)
}
go sendWebhook(meta)
}
// Send finish notifications
sendNotifications(meta, "finish")
}

func saveMeta(meta *JobMeta) {
Expand All @@ -383,11 +391,123 @@ func sendWebhook(meta *JobMeta) {
"id": meta.ID,
"status": meta.Status,
"result_url": "/jobs/" + meta.ID + "/result",
"log_url": "/jobs/" + meta.ID + "/log",
}
data, _ := json.Marshal(payload)
http.Post(meta.Webhook, "application/json", bytes.NewReader(data))
}

func sendSlackNotification(meta *JobMeta, eventType string) {
if meta.SlackWebhook == "" {
return
}

var color, title, text string
var fields []map[string]interface{}

baseURL := os.Getenv("BASE_URL")
logURL := "/jobs/" + meta.ID + "/log"
resultURL := "/jobs/" + meta.ID + "/result"
statusURL := "/jobs/" + meta.ID + "/status"

if baseURL != "" {
logURL = baseURL + logURL
resultURL = baseURL + resultURL
statusURL = baseURL + statusURL
}

switch eventType {
case "start":
color = "warning"
title = "Job Started"
text = fmt.Sprintf("Job `%s` has started execution", meta.ID)
fields = []map[string]interface{}{
{"title": "Command", "value": fmt.Sprintf("`%s`", strings.Join(meta.Args, " ")), "short": false},
{"title": "Started At", "value": meta.StartedAt.Format(time.RFC3339), "short": true},
{"title": "PID", "value": fmt.Sprintf("%d", meta.PID), "short": true},
}
case "finish":
switch meta.Status {
case "COMPLETED":
color = "good"
title = "Job Completed Successfully"
case "FAILED":
color = "danger"
title = "Job Failed"
case "CANCELED":
color = "#439FE0"
title = "Job Canceled"
default:
color = "warning"
title = "Job Finished"
}

duration := ""
if !meta.StartedAt.IsZero() && !meta.CompletedAt.IsZero() {
duration = meta.CompletedAt.Sub(meta.StartedAt).String()
}

text = fmt.Sprintf("Job `%s` finished with status: *%s*", meta.ID, meta.Status)
fields = []map[string]interface{}{
{"title": "Command", "value": fmt.Sprintf("`%s`", strings.Join(meta.Args, " ")), "short": false},
{"title": "Duration", "value": duration, "short": true},
{"title": "Completed At", "value": meta.CompletedAt.Format(time.RFC3339), "short": true},
}
}

// Add links
fields = append(fields,
map[string]interface{}{"title": "Links", "value": fmt.Sprintf("<%s|View Logs> | <%s|View Status>", logURL, statusURL), "short": false},
)

if meta.Status == "COMPLETED" {
lastField := fields[len(fields)-1]
lastField["value"] = fmt.Sprintf("<%s|View Result> | <%s|View Logs> | <%s|View Status>", resultURL, logURL, statusURL)
}

slackPayload := map[string]interface{}{
"attachments": []map[string]interface{}{
{
"color": color,
"title": title,
"text": text,
"fields": fields,
"footer": "Shell Job Queue",
"ts": time.Now().Unix(),
"mrkdwn_in": []string{"text", "fields"},
},
},
}

data, _ := json.Marshal(slackPayload)
http.Post(meta.SlackWebhook, "application/json", bytes.NewReader(data))
}

func sendNotifications(meta *JobMeta, eventType string) {
// Send traditional webhook if configured
if meta.Webhook != "" && eventType == "finish" {
if os.Getenv("DEBUG") == "1" {
fmt.Fprintf(os.Stderr, "[DEBUG] Triggering webhook: url=%s id=%s status=%s\n", meta.Webhook, meta.ID, meta.Status)
}
go sendWebhook(meta)
}

// Send Slack notification if configured
shouldNotify := false
if eventType == "start" && meta.NotifyOnStart {
shouldNotify = true
} else if eventType == "finish" && meta.NotifyOnFinish {
shouldNotify = true
}

if shouldNotify && meta.SlackWebhook != "" {
if os.Getenv("DEBUG") == "1" {
fmt.Fprintf(os.Stderr, "[DEBUG] Triggering Slack notification: url=%s id=%s event=%s\n", meta.SlackWebhook, meta.ID, eventType)
}
go sendSlackNotification(meta, eventType)
}
}

func listJobs(w http.ResponseWriter, r *http.Request) {
// Check for orphaned jobs before listing
checkForOrphanedJobs()
Expand Down
Loading