Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions deploy/k8s/api-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,13 @@ spec:
value: "http://dep:8082"
- name: DATABASE_URL
value: "postgres://workshop:workshop@postgres:5432/workshop?sslmode=disable"
# LAB: STEP4 TODO — Add resource requests and limits here.
# Without cpu requests, the HPA cannot compute utilization.
# Example fix:
# resources:
# requests:
# cpu: "100m"
# memory: "64Mi"
# limits:
# cpu: "500m"
# memory: "128Mi"
resources:
requests:
cpu: "100m"
memory: "64Mi"
limits:
cpu: "500m"
memory: "128Mi"
readinessProbe:
httpGet:
path: /healthz
Expand Down
6 changes: 2 additions & 4 deletions deploy/k8s/api-hpa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ spec:
# 2. Configure CPU target utilization percentage
# 3. Ensure the api deployment has CPU resource requests set
minReplicas: 1
maxReplicas: 1
maxReplicas: 5
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
# LAB: STEP4 TODO - This target is too high to ever trigger scaling.
# Change to a reasonable value like 50 to see scaling in action.
averageUtilization: 95
averageUtilization: 50
13 changes: 5 additions & 8 deletions pkg/cases/timeout_case.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@ type TimeoutCase struct {
func (tc *TimeoutCase) Handle(w http.ResponseWriter, r *http.Request) {
start := time.Now()

// LAB: STEP1 TODO - This context has no timeout/deadline.
// Participants should add context.WithTimeout here, e.g.:
// ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
// defer cancel()
ctx := context.Background()

// Call dep service with a slow sleep parameter
result, err := depclient.Call(ctx, tc.DepClient, "3s", "0.0")
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
defer cancel()

// Call dep service — sleep is under our 2s timeout so requests succeed
result, err := depclient.Call(ctx, tc.DepClient, "500ms", "0.0")
elapsed := time.Since(start)

if err != nil {
Expand Down
23 changes: 6 additions & 17 deletions pkg/cases/tx_case.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ func (tc *TxCase) Handle(w http.ResponseWriter, r *http.Request) {

start := time.Now()

// LAB: STEP2 TODO - This is the anti-pattern: BEGIN TX, then make a
// slow network call while holding the transaction open.
// The fix is to:
// 1. Move the dep call OUTSIDE the transaction
// 2. Only use the TX for the actual DB operation
// 3. Keep TX duration as short as possible
//Make the dep call OUTSIDE the transaction first
_, depErr := depclient.Call(r.Context(), tc.DepClient, "2s", "0.0")
if depErr != nil {
log.Printf("tx: dep call error: %v", depErr)
}

// Begin transaction
// Now open the transaction — only for the fast DB operations
tx, err := tc.DB.Begin()
if err != nil {
log.Printf("tx: begin error: %v", err)
Expand All @@ -44,8 +43,6 @@ func (tc *TxCase) Handle(w http.ResponseWriter, r *http.Request) {
}
defer tx.Rollback()

// LAB: STEP2 TODO - Lock a row inside the transaction.
// This SELECT FOR UPDATE holds a row lock for the entire TX duration.
var balance int
err = tx.QueryRow("SELECT balance FROM accounts WHERE name = $1 FOR UPDATE", "alice").Scan(&balance)
if err != nil {
Expand All @@ -54,14 +51,6 @@ func (tc *TxCase) Handle(w http.ResponseWriter, r *http.Request) {
return
}

// LAB: STEP2 TODO - Making a network call INSIDE the transaction.
// This is the anti-pattern! The dep call takes ~2s, and during that
// time we hold a DB connection AND a row lock.
_, depErr := depclient.Call(r.Context(), tc.DepClient, "2s", "0.0")
if depErr != nil {
log.Printf("tx: dep call error: %v", depErr)
}

// Update the row
_, err = tx.Exec("UPDATE accounts SET balance = balance - 1, updated_at = NOW() WHERE name = $1", "alice")
if err != nil {
Expand Down
14 changes: 9 additions & 5 deletions pkg/depclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/http"
"time"
)

// Client calls the dependency simulator service.
Expand All @@ -20,9 +21,9 @@ type Client struct {
func NewClient(baseURL string) *Client {
return &Client{
BaseURL: baseURL,
// LAB: STEP1 TODO - add Timeout and/or a custom Transport with
// TLSHandshakeTimeout, ResponseHeaderTimeout, etc.
HTTPClient: &http.Client{},
HTTPClient: &http.Client{
Timeout: 2 * time.Second,
},
}
}

Expand All @@ -32,8 +33,11 @@ func NewClient(baseURL string) *Client {
// 2. Use http.NewRequestWithContext so the HTTP call respects cancellation
func Call(ctx context.Context, c *Client, sleep string, failRate string) (string, error) {
url := fmt.Sprintf("%s/work?sleep=%s&fail=%s", c.BaseURL, sleep, failRate)
// LAB: STEP1 TODO - replace http.Get with http.NewRequestWithContext(ctx, ...)
resp, err := c.HTTPClient.Get(url)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", fmt.Errorf("creating request: %w", err)
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return "", fmt.Errorf("dep call failed: %w", err)
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/driver/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package driver
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -181,10 +183,17 @@ loop:

func (r *Runner) doRequest(client *http.Client) RequestResult {
start := time.Now()
req, err := http.NewRequest(r.Config.Method, r.Config.TargetURL, nil)
var body io.Reader
if r.Config.Body != "" {
body = strings.NewReader(r.Config.Body)
}
req, err := http.NewRequest(r.Config.Method, r.Config.TargetURL, body)
if err != nil {
return RequestResult{Error: err, Latency: time.Since(start), Timestamp: start}
}
if r.Config.Body != "" {
req.Header.Set("Content-Type", "application/json")
}
resp, err := client.Do(req)
latency := time.Since(start)
if err != nil {
Expand Down
16 changes: 7 additions & 9 deletions pkg/worker/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,16 @@ func handleSubmitBatch(w http.ResponseWriter, r *http.Request) {
}

func processBatch(b *Batch) {
// LAB: STEP3 TODO - This is a single shared pool with limited concurrency.
// Both fast and slow jobs compete for the same workers.
// When slow jobs occupy all workers, fast jobs are starved.
poolSize := 10
sem := make(chan struct{}, poolSize)
// Separate pools (bulkheads) so slow jobs can't starve fast jobs
fastPool := make(chan struct{}, 20) // large pool for fast jobs
slowPool := make(chan struct{}, 5) // capped pool for slow jobs
var wg sync.WaitGroup
for i := 0; i < b.Fast; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
fastPool <- struct{}{}
defer func() { <-fastPool }()
start := time.Now()
time.Sleep(10 * time.Millisecond)
b.recordResult("fast", time.Since(start))
Expand All @@ -113,8 +111,8 @@ func processBatch(b *Batch) {
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
slowPool <- struct{}{}
defer func() { <-slowPool }()
start := time.Now()
time.Sleep(1 * time.Second)
b.recordResult("slow", time.Since(start))
Expand Down