diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ff7484f..c419d96 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -61,7 +61,7 @@ jobs: - name: Upload coverage to Codecov if: always() - uses: codecov/codecov-action@v4 + uses: codecov/codecov-action@v5 with: files: ./cover.out token: ${{ secrets.CODECOV_TOKEN }} diff --git a/go.mod b/go.mod index 166d873..80f42f2 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,13 @@ module github.com/mickamy/dbtop go 1.26.3 + +require github.com/jackc/pgx/v5 v5.9.2 + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f5b2410 --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= +github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/driver/backend.go b/internal/driver/backend.go new file mode 100644 index 0000000..06be905 --- /dev/null +++ b/internal/driver/backend.go @@ -0,0 +1,81 @@ +package driver + +import "time" + +// State is a backend's connection state, normalized across drivers. +type State uint8 + +const ( + StateUnknown State = iota + StateActive + StateIdle + StateIdleInTx + StateIdleInTxAborted +) + +func (s State) String() string { + switch s { + case StateUnknown: + return "unknown" + case StateActive: + return "active" + case StateIdle: + return "idle" + case StateIdleInTx: + return "idle-tx" + case StateIdleInTxAborted: + return "idle-tx-aborted" + } + + return "unknown" +} + +// Backend is one server-side connection in an Activity snapshot. +type Backend struct { + PID int64 + User string + DB string + State State + + // Empty when the backend is not waiting. + WaitType string + WaitEvent string + + Query string + + // PIDs blocking this backend; the ▲/⊘ markers are derived across rows. + BlockedBy []int64 + + // now() - query_start and now() - xact_start; nil when the timestamp is NULL. + QueryAge *time.Duration + XactAge *time.Duration + + // System-internal (non-client) backend, hidden by default. + SystemBackend bool +} + +// Duration returns the value shown in the DURATION column. Idle and unknown +// backends have none: their query_start is stale from the last query. +func (b Backend) Duration() (time.Duration, bool) { + switch b.State { + case StateActive: + if b.QueryAge != nil { + return *b.QueryAge, true + } + case StateIdleInTx, StateIdleInTxAborted: + if b.XactAge != nil { + return *b.XactAge, true + } + case StateUnknown, StateIdle: + } + + return 0, false +} + +func (b Backend) Wait() string { + if b.WaitType == "" { + return "" + } + + return b.WaitType + ":" + b.WaitEvent +} diff --git a/internal/driver/backend_test.go b/internal/driver/backend_test.go new file mode 100644 index 0000000..cd5bddd --- /dev/null +++ b/internal/driver/backend_test.go @@ -0,0 +1,103 @@ +package driver_test + +import ( + "testing" + "time" + + "github.com/mickamy/dbtop/internal/driver" +) + +func TestStateString(t *testing.T) { + t.Parallel() + + tests := []struct { + state driver.State + want string + }{ + {driver.StateActive, "active"}, + {driver.StateIdle, "idle"}, + {driver.StateIdleInTx, "idle-tx"}, + {driver.StateIdleInTxAborted, "idle-tx-aborted"}, + {driver.StateUnknown, "unknown"}, + } + + for _, tt := range tests { + if got := tt.state.String(); got != tt.want { + t.Errorf("State(%d).String() = %q, want %q", tt.state, got, tt.want) + } + } +} + +func TestBackendDuration(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + backend driver.Backend + want time.Duration + wantOK bool + }{ + { + name: "active uses query age", + backend: driver.Backend{State: driver.StateActive, QueryAge: new(3 * time.Second), XactAge: new(time.Minute)}, + want: 3 * time.Second, + wantOK: true, + }, + { + name: "idle-tx uses xact age", + backend: driver.Backend{State: driver.StateIdleInTx, QueryAge: new(time.Second), XactAge: new(90 * time.Second)}, + want: 90 * time.Second, + wantOK: true, + }, + { + name: "idle-tx-aborted uses xact age", + backend: driver.Backend{State: driver.StateIdleInTxAborted, XactAge: new(30 * time.Second)}, + want: 30 * time.Second, + wantOK: true, + }, + { + name: "active without query age is undefined", + backend: driver.Backend{State: driver.StateActive}, + wantOK: false, + }, + { + name: "idle ignores stale query age", + backend: driver.Backend{State: driver.StateIdle, QueryAge: new(2 * time.Hour)}, + wantOK: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, ok := tt.backend.Duration() + if ok != tt.wantOK || got != tt.want { + t.Errorf("Duration() = (%v, %v), want (%v, %v)", got, ok, tt.want, tt.wantOK) + } + }) + } +} + +func TestBackendWait(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + backend driver.Backend + want string + }{ + {"lock wait", driver.Backend{WaitType: "Lock", WaitEvent: "tuple"}, "Lock:tuple"}, + {"not waiting", driver.Backend{}, ""}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + if got := tt.backend.Wait(); got != tt.want { + t.Errorf("Wait() = %q, want %q", got, tt.want) + } + }) + } +} diff --git a/internal/driver/driver.go b/internal/driver/driver.go new file mode 100644 index 0000000..41f5e52 --- /dev/null +++ b/internal/driver/driver.go @@ -0,0 +1,37 @@ +package driver + +import "context" + +// Driver abstracts a database backend behind the screens the TUI renders. The +// TUI is driver-independent: it only consumes []Backend, MetricSample, and +// []Statement. +type Driver interface { + Activity(ctx context.Context) ([]Backend, error) + Metrics(ctx context.Context) (MetricSample, error) + Statements(ctx context.Context) ([]Statement, error) + ResetStatements(ctx context.Context) error + + // Cancel gracefully stops the running query (pg_cancel_backend / KILL QUERY). + Cancel(ctx context.Context, pid int64) error + + // Terminate forcibly closes the connection (pg_terminate_backend / KILL CONNECTION). + Terminate(ctx context.Context, pid int64) error + + Capabilities(ctx context.Context) (Capabilities, error) + Close() error +} + +// Capabilities reports the privileges and features available to the connected +// user, used to degrade gracefully and to render the startup banner. +type Capabilities struct { + Superuser bool + + // Can read other backends' full query text (pg_monitor / PROCESS). + Monitor bool + + // Can cancel/terminate backends (pg_signal_backend / CONNECTION_ADMIN). + Kill bool + + // Digest source available (pg_stat_statements / performance_schema digest). + Statements bool +} diff --git a/internal/driver/metric.go b/internal/driver/metric.go new file mode 100644 index 0000000..2332182 --- /dev/null +++ b/internal/driver/metric.go @@ -0,0 +1,43 @@ +package driver + +import "time" + +// MetricSample is one polling sample of server-wide health counters. Drivers +// return raw cumulative counters and gauges; per-second rates are derived +// downstream from the delta between successive samples. +type MetricSample struct { + At time.Time + + MaxConnections int + Conns ConnCounts + + // Monotonic counters; rate = delta / dt. + Commits int64 + Rollbacks int64 + BlocksHit int64 + BlocksRead int64 + TuplesInserted int64 + TuplesUpdated int64 + TuplesDeleted int64 + TuplesReturned int64 + TuplesFetched int64 + TempFiles int64 + TempBytes int64 + + // Point-in-time value; use as-is, not as a rate. + WaitingLocks int + + Replicas []ReplicaLag +} + +type ConnCounts struct { + Total int + Active int + Idle int + IdleInTx int +} + +type ReplicaLag struct { + Client string + Lag time.Duration +} diff --git a/internal/driver/postgres/activity.go b/internal/driver/postgres/activity.go new file mode 100644 index 0000000..6018ca8 --- /dev/null +++ b/internal/driver/postgres/activity.go @@ -0,0 +1,90 @@ +package postgres + +import ( + "context" + "fmt" + + "github.com/mickamy/dbtop/internal/driver" + "github.com/mickamy/dbtop/internal/durations" + "github.com/mickamy/dbtop/internal/ptr" +) + +const activityQuery = ` +SELECT a.pid, a.usename, a.datname, a.state, + a.wait_event_type, a.wait_event, a.query, a.backend_type, + pg_blocking_pids(a.pid)::bigint[] AS blocked_by, + EXTRACT(EPOCH FROM now() - a.query_start)::float8 AS query_age, + EXTRACT(EPOCH FROM now() - a.xact_start)::float8 AS xact_age +FROM pg_stat_activity a +WHERE a.pid <> pg_backend_pid() + AND coalesce(a.application_name, '') <> $1 +ORDER BY CASE + WHEN a.state = 'active' + THEN EXTRACT(EPOCH FROM now() - a.query_start) + WHEN a.state IN ('idle in transaction', 'idle in transaction (aborted)') + THEN EXTRACT(EPOCH FROM now() - a.xact_start) + ELSE NULL + END DESC NULLS LAST +` + +func (d *Driver) Activity(ctx context.Context) ([]driver.Backend, error) { + rows, err := d.pool.Query(ctx, activityQuery, appName) + if err != nil { + return nil, fmt.Errorf("query activity: %w", err) + } + defer rows.Close() + + var backends []driver.Backend + + for rows.Next() { + var ( + pid int64 + user, db, state *string + waitType, waitEvent *string + query, backendType *string + blockedBy []int64 + queryAge, xactAge *float64 + ) + + if err := rows.Scan( + &pid, &user, &db, &state, &waitType, &waitEvent, &query, &backendType, &blockedBy, &queryAge, &xactAge, + ); err != nil { + return nil, fmt.Errorf("scan activity: %w", err) + } + + backends = append(backends, driver.Backend{ + PID: pid, + User: ptr.OrZero(user), + DB: ptr.OrZero(db), + State: parseState(ptr.OrZero(state)), + WaitType: ptr.OrZero(waitType), + WaitEvent: ptr.OrZero(waitEvent), + Query: ptr.OrZero(query), + BlockedBy: blockedBy, + QueryAge: ptr.Map(queryAge, durations.FromSeconds), + XactAge: ptr.Map(xactAge, durations.FromSeconds), + SystemBackend: ptr.OrZero(backendType) != "client backend", + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate activity: %w", err) + } + + return backends, nil +} + +func parseState(s string) driver.State { + switch s { + case "active": + return driver.StateActive + case "idle": + return driver.StateIdle + case "idle in transaction": + return driver.StateIdleInTx + case "idle in transaction (aborted)": + return driver.StateIdleInTxAborted + default: + return driver.StateUnknown + } +} diff --git a/internal/driver/postgres/activity_test.go b/internal/driver/postgres/activity_test.go new file mode 100644 index 0000000..599fed2 --- /dev/null +++ b/internal/driver/postgres/activity_test.go @@ -0,0 +1,34 @@ +package postgres_test + +import ( + "testing" + + "github.com/mickamy/dbtop/internal/driver" + "github.com/mickamy/dbtop/internal/driver/postgres" +) + +func TestParseState(t *testing.T) { + t.Parallel() + + tests := []struct { + in string + want driver.State + }{ + {"active", driver.StateActive}, + {"idle", driver.StateIdle}, + {"idle in transaction", driver.StateIdleInTx}, + {"idle in transaction (aborted)", driver.StateIdleInTxAborted}, + {"", driver.StateUnknown}, + {"fastpath function call", driver.StateUnknown}, + } + + for _, tt := range tests { + t.Run(tt.in, func(t *testing.T) { + t.Parallel() + + if got := postgres.ParseState(tt.in); got != tt.want { + t.Errorf("ParseState(%q) = %v, want %v", tt.in, got, tt.want) + } + }) + } +} diff --git a/internal/driver/postgres/capabilities.go b/internal/driver/postgres/capabilities.go new file mode 100644 index 0000000..e97637d --- /dev/null +++ b/internal/driver/postgres/capabilities.go @@ -0,0 +1,38 @@ +package postgres + +import ( + "context" + "fmt" + + "github.com/mickamy/dbtop/internal/driver" +) + +const capabilitiesQuery = ` +SELECT + current_setting('is_superuser')::bool, + pg_has_role(current_user, 'pg_monitor', 'MEMBER'), + pg_has_role(current_user, 'pg_signal_backend', 'MEMBER'), + EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements') +` + +func (d *Driver) Capabilities(ctx context.Context) (driver.Capabilities, error) { + var capabilities driver.Capabilities + + err := d.pool.QueryRow(ctx, capabilitiesQuery).Scan( + &capabilities.Superuser, + &capabilities.Monitor, + &capabilities.Kill, + &capabilities.Statements, + ) + if err != nil { + return driver.Capabilities{}, fmt.Errorf("query capabilities: %w", err) + } + + // A superuser can always monitor and kill, regardless of role membership. + if capabilities.Superuser { + capabilities.Monitor = true + capabilities.Kill = true + } + + return capabilities, nil +} diff --git a/internal/driver/postgres/control.go b/internal/driver/postgres/control.go new file mode 100644 index 0000000..3df97b6 --- /dev/null +++ b/internal/driver/postgres/control.go @@ -0,0 +1,33 @@ +package postgres + +import ( + "context" + "fmt" +) + +const ( + cancelQuery = `SELECT pg_cancel_backend($1)` + terminateQuery = `SELECT pg_terminate_backend($1)` +) + +func (d *Driver) Cancel(ctx context.Context, pid int64) error { + return d.signalBackend(ctx, cancelQuery, pid) +} + +func (d *Driver) Terminate(ctx context.Context, pid int64) error { + return d.signalBackend(ctx, terminateQuery, pid) +} + +func (d *Driver) signalBackend(ctx context.Context, query string, pid int64) error { + var sent bool + if err := d.pool.QueryRow(ctx, query, pid).Scan(&sent); err != nil { + return fmt.Errorf("signal backend %d: %w", pid, err) + } + + // false means there was no such backend to signal. + if !sent { + return fmt.Errorf("backend %d not found", pid) + } + + return nil +} diff --git a/internal/driver/postgres/export_test.go b/internal/driver/postgres/export_test.go new file mode 100644 index 0000000..f4bc785 --- /dev/null +++ b/internal/driver/postgres/export_test.go @@ -0,0 +1,6 @@ +package postgres + +var ( + GuardVersion = guardVersion + ParseState = parseState +) diff --git a/internal/driver/postgres/integration_test.go b/internal/driver/postgres/integration_test.go new file mode 100644 index 0000000..61888f0 --- /dev/null +++ b/internal/driver/postgres/integration_test.go @@ -0,0 +1,96 @@ +//go:build integration + +package postgres_test + +import ( + "os" + "testing" + + "github.com/mickamy/dbtop/internal/driver/postgres" +) + +// Run with: DBTOP_TEST_DSN=postgres://... go test -tags integration ./internal/driver/postgres/ +func openTestDriver(t *testing.T) *postgres.Driver { + t.Helper() + + dsn := os.Getenv("DBTOP_TEST_DSN") + if dsn == "" { + t.Skip("set DBTOP_TEST_DSN to run integration tests") + } + + d, err := postgres.Open(t.Context(), dsn) + if err != nil { + t.Fatalf("Open: %v", err) + } + + t.Cleanup(func() { + _ = d.Close() + }) + + return d +} + +func TestIntegrationActivity(t *testing.T) { + t.Parallel() + + d := openTestDriver(t) + + backends, err := d.Activity(t.Context()) + if err != nil { + t.Fatalf("Activity: %v", err) + } + + for _, b := range backends { + if b.PID == 0 { + t.Errorf("backend has zero PID: %+v", b) + } + } +} + +func TestIntegrationMetrics(t *testing.T) { + t.Parallel() + + d := openTestDriver(t) + + m, err := d.Metrics(t.Context()) + if err != nil { + t.Fatalf("Metrics: %v", err) + } + + if m.MaxConnections <= 0 { + t.Errorf("MaxConnections = %d, want > 0", m.MaxConnections) + } + if m.At.IsZero() { + t.Error("At is zero") + } +} + +func TestIntegrationCapabilities(t *testing.T) { + t.Parallel() + + d := openTestDriver(t) + + // Smoke test: the probe must succeed; values depend on the connected role. + if _, err := d.Capabilities(t.Context()); err != nil { + t.Fatalf("Capabilities: %v", err) + } +} + +func TestIntegrationStatements(t *testing.T) { + t.Parallel() + + d := openTestDriver(t) + ctx := t.Context() + + caps, err := d.Capabilities(ctx) + if err != nil { + t.Fatalf("Capabilities: %v", err) + } + if !caps.Statements { + t.Skip("pg_stat_statements not available") + } + + if _, err := d.Statements(ctx); err != nil { + t.Fatalf("Statements: %v", err) + } +} diff --git a/internal/driver/postgres/metrics.go b/internal/driver/postgres/metrics.go new file mode 100644 index 0000000..f920514 --- /dev/null +++ b/internal/driver/postgres/metrics.go @@ -0,0 +1,119 @@ +package postgres + +import ( + "context" + "fmt" + "time" + + "github.com/mickamy/dbtop/internal/driver" + "github.com/mickamy/dbtop/internal/durations" + "github.com/mickamy/dbtop/internal/ptr" +) + +const metricsQuery = ` +SELECT + current_setting('max_connections')::int, + c.total, + c.active, + c.idle, + c.idle_tx, + (SELECT count(*) FROM pg_locks WHERE NOT granted), + d.xact_commit, + d.xact_rollback, + d.blks_hit, + d.blks_read, + d.tup_inserted, + d.tup_updated, + d.tup_deleted, + d.tup_returned, + d.tup_fetched, + d.temp_files, + d.temp_bytes +FROM pg_stat_database d +CROSS JOIN ( + SELECT + count(*) AS total, + count(*) FILTER (WHERE state = 'active') AS active, + count(*) FILTER (WHERE state = 'idle') AS idle, + count(*) FILTER (WHERE state IN ('idle in transaction', 'idle in transaction (aborted)')) AS idle_tx + FROM pg_stat_activity + WHERE backend_type = 'client backend' + AND coalesce(application_name, '') <> $1 +) c +WHERE d.datname = current_database() +` + +const replicaLagQuery = ` +SELECT + coalesce(nullif(client_addr::text, ''), application_name), + EXTRACT(EPOCH FROM replay_lag)::float8 +FROM pg_stat_replication +` + +func (d *Driver) Metrics(ctx context.Context) (driver.MetricSample, error) { + sample := driver.MetricSample{At: time.Now()} + + err := d.pool.QueryRow(ctx, metricsQuery, appName).Scan( + &sample.MaxConnections, + &sample.Conns.Total, + &sample.Conns.Active, + &sample.Conns.Idle, + &sample.Conns.IdleInTx, + &sample.WaitingLocks, + &sample.Commits, + &sample.Rollbacks, + &sample.BlocksHit, + &sample.BlocksRead, + &sample.TuplesInserted, + &sample.TuplesUpdated, + &sample.TuplesDeleted, + &sample.TuplesReturned, + &sample.TuplesFetched, + &sample.TempFiles, + &sample.TempBytes, + ) + if err != nil { + return driver.MetricSample{}, fmt.Errorf("query metrics: %w", err) + } + + replicas, err := d.replicaLag(ctx) + if err != nil { + return driver.MetricSample{}, err + } + + sample.Replicas = replicas + + return sample, nil +} + +func (d *Driver) replicaLag(ctx context.Context) ([]driver.ReplicaLag, error) { + rows, err := d.pool.Query(ctx, replicaLagQuery) + if err != nil { + return nil, fmt.Errorf("query replica lag: %w", err) + } + defer rows.Close() + + var replicas []driver.ReplicaLag + + for rows.Next() { + var ( + client *string + lagSec *float64 + ) + + if err := rows.Scan(&client, &lagSec); err != nil { + return nil, fmt.Errorf("scan replica lag: %w", err) + } + + replicas = append(replicas, driver.ReplicaLag{ + Client: ptr.OrZero(client), + Lag: durations.FromSeconds(ptr.OrZero(lagSec)), + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate replica lag: %w", err) + } + + return replicas, nil +} diff --git a/internal/driver/postgres/postgres.go b/internal/driver/postgres/postgres.go new file mode 100644 index 0000000..73cde6b --- /dev/null +++ b/internal/driver/postgres/postgres.go @@ -0,0 +1,71 @@ +package postgres + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/mickamy/dbtop/internal/driver" +) + +var _ driver.Driver = (*Driver)(nil) + +// PostgreSQL 14.0 (server_version_num); 14+ gives both total_exec_time and pg_blocking_pids. +const minServerVersionNum = 140000 + +// Small pool: a poll and a kill may run at once, nothing more. +const maxConns = 3 + +// appName tags dbtop's own connections so they can be excluded from metrics. +const appName = "dbtop" + +type Driver struct { + pool *pgxpool.Pool +} + +// Open connects to dsn and verifies the server is PostgreSQL 14+. The caller +// must Close the returned Driver. +func Open(ctx context.Context, dsn string) (*Driver, error) { + cfg, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, fmt.Errorf("parse dsn: %w", err) + } + + cfg.MaxConns = maxConns + cfg.ConnConfig.RuntimeParams["application_name"] = appName + + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("connect: %w", err) + } + + var version int + if err := pool.QueryRow(ctx, "SELECT current_setting('server_version_num')::int").Scan(&version); err != nil { + pool.Close() + + return nil, fmt.Errorf("read server version: %w", err) + } + + if err := guardVersion(version); err != nil { + pool.Close() + + return nil, err + } + + return &Driver{pool: pool}, nil +} + +func (d *Driver) Close() error { + d.pool.Close() + + return nil +} + +func guardVersion(num int) error { + if num < minServerVersionNum { + return fmt.Errorf("dbtop requires PostgreSQL 14+ (server_version_num=%d)", num) + } + + return nil +} diff --git a/internal/driver/postgres/postgres_test.go b/internal/driver/postgres/postgres_test.go new file mode 100644 index 0000000..f6bcaee --- /dev/null +++ b/internal/driver/postgres/postgres_test.go @@ -0,0 +1,32 @@ +package postgres_test + +import ( + "testing" + + "github.com/mickamy/dbtop/internal/driver/postgres" +) + +func TestGuardVersion(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + num int + wantErr bool + }{ + {"pg 13 rejected", 130010, true}, + {"pg 14.0 accepted", 140000, false}, + {"pg 16.2 accepted", 160002, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := postgres.GuardVersion(tt.num) + if (err != nil) != tt.wantErr { + t.Errorf("GuardVersion(%d) error = %v, wantErr %v", tt.num, err, tt.wantErr) + } + }) + } +} diff --git a/internal/driver/postgres/statements.go b/internal/driver/postgres/statements.go new file mode 100644 index 0000000..16573d7 --- /dev/null +++ b/internal/driver/postgres/statements.go @@ -0,0 +1,106 @@ +package postgres + +import ( + "context" + "fmt" + "strconv" + + "github.com/mickamy/dbtop/internal/driver" + "github.com/mickamy/dbtop/internal/durations" + "github.com/mickamy/dbtop/internal/ptr" +) + +const statementsQuery = ` +SELECT + queryid, + query, + calls, + total_exec_time, + mean_exec_time, + min_exec_time, + max_exec_time, + stddev_exec_time, + rows, + shared_blks_hit, + shared_blks_read +FROM pg_stat_statements +WHERE dbid = (SELECT oid FROM pg_database WHERE datname = current_database()) +ORDER BY total_exec_time DESC +LIMIT 100 +` + +const resetStatementsQuery = `SELECT pg_stat_statements_reset()` + +func (d *Driver) Statements(ctx context.Context) ([]driver.Statement, error) { + rows, err := d.pool.Query(ctx, statementsQuery) + if err != nil { + return nil, fmt.Errorf("query statements: %w", err) + } + defer rows.Close() + + var statements []driver.Statement + + for rows.Next() { + var ( + queryID *int64 + query *string + calls int64 + total, mean float64 + minTime, maxTime, stddevTime float64 + rowCount int64 + blocksHit, blocksRead int64 + ) + + if err := rows.Scan( + &queryID, + &query, + &calls, + &total, + &mean, + &minTime, + &maxTime, + &stddevTime, + &rowCount, + &blocksHit, + &blocksRead, + ); err != nil { + return nil, fmt.Errorf("scan statement: %w", err) + } + + statements = append(statements, driver.Statement{ + ID: queryIDString(queryID), + Query: ptr.OrZero(query), + Calls: calls, + Total: durations.FromMillis(total), + Mean: durations.FromMillis(mean), + Rows: rowCount, + Min: durations.FromMillis(minTime), + Max: durations.FromMillis(maxTime), + Stddev: durations.FromMillis(stddevTime), + SharedBlocksHit: blocksHit, + SharedBlocksRead: blocksRead, + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate statements: %w", err) + } + + return statements, nil +} + +func (d *Driver) ResetStatements(ctx context.Context) error { + if _, err := d.pool.Exec(ctx, resetStatementsQuery); err != nil { + return fmt.Errorf("reset statements: %w", err) + } + + return nil +} + +func queryIDString(id *int64) string { + if id == nil { + return "" + } + + return strconv.FormatInt(*id, 10) +} diff --git a/internal/driver/statement.go b/internal/driver/statement.go new file mode 100644 index 0000000..e6ac9e6 --- /dev/null +++ b/internal/driver/statement.go @@ -0,0 +1,30 @@ +package driver + +import "time" + +// Statement is one normalized query in the cumulative digest ranking +// (pg_stat_statements / events_statements_summary_by_digest). +type Statement struct { + // queryid (PostgreSQL) or digest (MySQL). + ID string + Query string + + Calls int64 + Total time.Duration // total execution time; default sort key + Mean time.Duration // mean execution time per call + Rows int64 // total rows; per-call is Rows / Calls + + // Full scan without an index (MySQL SUM_NO_INDEX_USED); always false on PostgreSQL. + NoIndex bool + + Min time.Duration + Max time.Duration + Stddev time.Duration + + // PostgreSQL-only. + SharedBlocksHit int64 + SharedBlocksRead int64 + + // MySQL-only. + RowsExamined int64 +} diff --git a/internal/durations/durations.go b/internal/durations/durations.go new file mode 100644 index 0000000..641569f --- /dev/null +++ b/internal/durations/durations.go @@ -0,0 +1,11 @@ +package durations + +import "time" + +func FromSeconds(s float64) time.Duration { + return time.Duration(s * float64(time.Second)) +} + +func FromMillis(ms float64) time.Duration { + return time.Duration(ms * float64(time.Millisecond)) +} diff --git a/internal/durations/durations_test.go b/internal/durations/durations_test.go new file mode 100644 index 0000000..90240c6 --- /dev/null +++ b/internal/durations/durations_test.go @@ -0,0 +1,54 @@ +package durations_test + +import ( + "testing" + "time" + + "github.com/mickamy/dbtop/internal/durations" +) + +func TestFromSeconds(t *testing.T) { + t.Parallel() + + tests := []struct { + in float64 + want time.Duration + }{ + {1.5, 1500 * time.Millisecond}, + {0, 0}, + {90, 90 * time.Second}, + } + + for _, tt := range tests { + t.Run(tt.want.String(), func(t *testing.T) { + t.Parallel() + + if got := durations.FromSeconds(tt.in); got != tt.want { + t.Errorf("FromSeconds(%v) = %v, want %v", tt.in, got, tt.want) + } + }) + } +} + +func TestFromMillis(t *testing.T) { + t.Parallel() + + tests := []struct { + in float64 + want time.Duration + }{ + {0.26, 260 * time.Microsecond}, + {0, 0}, + {1500, 1500 * time.Millisecond}, + } + + for _, tt := range tests { + t.Run(tt.want.String(), func(t *testing.T) { + t.Parallel() + + if got := durations.FromMillis(tt.in); got != tt.want { + t.Errorf("FromMillis(%v) = %v, want %v", tt.in, got, tt.want) + } + }) + } +} diff --git a/internal/ptr/ptr.go b/internal/ptr/ptr.go new file mode 100644 index 0000000..096078b --- /dev/null +++ b/internal/ptr/ptr.go @@ -0,0 +1,18 @@ +package ptr + +func OrZero[T any](p *T) T { + if p == nil { + var zero T + return zero + } + + return *p +} + +func Map[T, U any](p *T, f func(T) U) *U { + if p == nil { + return nil + } + + return new(f(*p)) +} diff --git a/internal/ptr/ptr_test.go b/internal/ptr/ptr_test.go new file mode 100644 index 0000000..a957117 --- /dev/null +++ b/internal/ptr/ptr_test.go @@ -0,0 +1,34 @@ +package ptr_test + +import ( + "testing" + + "github.com/mickamy/dbtop/internal/ptr" +) + +func TestOrZero(t *testing.T) { + t.Parallel() + + if got := ptr.OrZero[string](nil); got != "" { + t.Errorf("OrZero(nil) = %q, want empty", got) + } + + if got := ptr.OrZero(new("hello")); got != "hello" { + t.Errorf("OrZero(*\"hello\") = %q, want \"hello\"", got) + } +} + +func TestMap(t *testing.T) { + t.Parallel() + + double := func(n int) int { return n * 2 } + + if got := ptr.Map[int, int](nil, double); got != nil { + t.Errorf("Map(nil) = %v, want nil", got) + } + + got := ptr.Map(new(21), double) + if got == nil || *got != 42 { + t.Errorf("Map(*21) = %v, want 42", got) + } +}