Skip to content

Commit d11b15c

Browse files
committed
add archiver lag collector
Track WAL archiving lag by comparing current WAL position to last archived segment position, providing visibility into archival delays. This works well with planetscale/hzdb-operator#1052 - we can/should expose this metric on single node branches I think, and then users can think about these cases themselves correctly and see e.g. historical spikes etc.
1 parent 8d826f8 commit d11b15c

File tree

2 files changed

+481
-0
lines changed

2 files changed

+481
-0
lines changed

collector/pg_stat_archiver_lag.go

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Copyright 2025 PlanetScale Inc.
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
import (
17+
"context"
18+
"database/sql"
19+
"fmt"
20+
"strconv"
21+
"strings"
22+
23+
"github.com/prometheus/client_golang/prometheus"
24+
)
25+
26+
const archiverLagSubsystem = "stat_archiver"
27+
28+
func init() {
29+
registerCollector(archiverLagSubsystem, defaultEnabled, NewPGStatArchiverLagCollector)
30+
}
31+
32+
type PGStatArchiverLagCollector struct{}
33+
34+
func NewPGStatArchiverLagCollector(collectorConfig) (Collector, error) {
35+
return &PGStatArchiverLagCollector{}, nil
36+
}
37+
38+
var (
39+
statArchiverLagBytesDesc = prometheus.NewDesc(
40+
prometheus.BuildFQName(namespace, archiverLagSubsystem, "lag_bytes"),
41+
"Archiver lag in bytes (difference between current WAL position and last archived WAL)",
42+
[]string{},
43+
prometheus.Labels{},
44+
)
45+
46+
statArchiverLagQuery = `
47+
SELECT
48+
last_archived_wal,
49+
pg_current_wal_lsn() AS current_lsn
50+
FROM pg_stat_archiver
51+
WHERE last_archived_wal IS NOT NULL
52+
AND last_archived_wal != ''
53+
`
54+
)
55+
56+
// LSN represents a PostgreSQL Log Sequence Number, a 64-bit unsigned integer
57+
// representing a byte position in the WAL.
58+
type LSN uint64
59+
60+
const (
61+
// walSegmentSizeBytes is the size of a WAL segment in bytes (16MB)
62+
walSegmentSizeBytes = 16 * 1024 * 1024 // 16777216
63+
// segmentsPerLogID is the number of segments per log ID (256)
64+
segmentsPerLogID = 256
65+
)
66+
67+
// parseLSNFromWalFile parses a WAL file name (e.g., "000000010000000000000001") and returns
68+
// the LSN position in bytes. The WAL file format is:
69+
// - Positions 1-8: timeline ID (8 hex chars)
70+
// - Positions 9-16: log ID (8 hex chars)
71+
// - Positions 17-24: segment ID (8 hex chars)
72+
// Returns LSN = logID * 256 segments * 16MB + segmentID * 16MB
73+
//
74+
// Handles WAL files with suffixes like .backup, .history, .partial by stripping them first.
75+
func parseLSNFromWalFile(walFile string) (LSN, error) {
76+
// Strip suffix if present (e.g., .backup, .history, .partial)
77+
if idx := strings.Index(walFile, "."); idx != -1 {
78+
walFile = walFile[:idx]
79+
}
80+
81+
if len(walFile) != 24 {
82+
return 0, fmt.Errorf("WAL file name must be exactly 24 hex chars, got %d: %q", len(walFile), walFile)
83+
}
84+
85+
// Validate all characters are hex
86+
for i, r := range walFile {
87+
if (r < '0' || r > '9') && (r < 'A' || r > 'F') && (r < 'a' || r > 'f') {
88+
return 0, fmt.Errorf("WAL file name contains invalid hex character at position %d: %q", i+1, string(r))
89+
}
90+
}
91+
92+
// Extract log ID (positions 9-16, 0-indexed: 8-15)
93+
logIDHex := walFile[8:16]
94+
logID, err := parseHexUint32(logIDHex)
95+
if err != nil {
96+
return 0, fmt.Errorf("parse log ID from %q: %w", logIDHex, err)
97+
}
98+
99+
// Extract segment ID (positions 17-24, 0-indexed: 16-23)
100+
segIDHex := walFile[16:24]
101+
segID, err := parseHexUint32(segIDHex)
102+
if err != nil {
103+
return 0, fmt.Errorf("parse segment ID from %q: %w", segIDHex, err)
104+
}
105+
106+
// Calculate LSN: logID * 256 segments * 16MB + segmentID * 16MB
107+
lsnBytes := LSN(logID)*segmentsPerLogID*walSegmentSizeBytes + LSN(segID)*walSegmentSizeBytes
108+
return lsnBytes, nil
109+
}
110+
111+
// parseLSNFromLSNString parses a PostgreSQL LSN string (e.g., "0/12345678") and returns
112+
// the LSN position in bytes. PostgreSQL LSNs represent byte positions in the WAL.
113+
// The format is "high/low" where both are hex numbers representing a 64-bit byte offset:
114+
// LSN = (high << 32) | low
115+
func parseLSNFromLSNString(lsnStr string) (LSN, error) {
116+
parts := strings.Split(lsnStr, "/")
117+
if len(parts) != 2 {
118+
return 0, fmt.Errorf("LSN string must be in format 'high/low', got: %q", lsnStr)
119+
}
120+
121+
highStr, lowStr := parts[0], parts[1]
122+
if highStr == "" || lowStr == "" {
123+
return 0, fmt.Errorf("LSN string parts cannot be empty: %q", lsnStr)
124+
}
125+
126+
high, err := strconv.ParseUint(highStr, 16, 64)
127+
if err != nil {
128+
return 0, fmt.Errorf("parse high part %q of LSN string %q: %w", highStr, lsnStr, err)
129+
}
130+
131+
low, err := strconv.ParseUint(lowStr, 16, 64)
132+
if err != nil {
133+
return 0, fmt.Errorf("parse low part %q of LSN string %q: %w", lowStr, lsnStr, err)
134+
}
135+
136+
// LSN = (high << 32) | low
137+
return LSN(high<<32 | low), nil
138+
}
139+
140+
// parseHexUint32 parses a hex string (8 hex chars) and returns a uint32.
141+
func parseHexUint32(hexStr string) (uint32, error) {
142+
if len(hexStr) != 8 {
143+
return 0, fmt.Errorf("hex string must be exactly 8 chars, got %d: %q", len(hexStr), hexStr)
144+
}
145+
146+
val, err := strconv.ParseUint(hexStr, 16, 32)
147+
if err != nil {
148+
return 0, fmt.Errorf("parse hex %q: %w", hexStr, err)
149+
}
150+
return uint32(val), nil
151+
}
152+
153+
// bytesBetweenLSN calculates the difference in bytes between two LSN positions.
154+
// Returns the difference, clamped to 0 if currentLSN is less than archivedLSN
155+
// (which can happen during wraparound or timeline switches).
156+
func bytesBetweenLSN(currentLSN, archivedLSN LSN) LSN {
157+
if currentLSN < archivedLSN {
158+
return 0
159+
}
160+
return currentLSN - archivedLSN
161+
}
162+
163+
func (PGStatArchiverLagCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
164+
db := instance.getDB()
165+
row := db.QueryRowContext(ctx, statArchiverLagQuery)
166+
167+
var lastArchivedWal sql.NullString
168+
var currentLSN sql.NullString
169+
170+
err := row.Scan(&lastArchivedWal, &currentLSN)
171+
if err != nil {
172+
// If no rows found (no WAL segments archived yet), return 0 lag
173+
if err == sql.ErrNoRows {
174+
ch <- prometheus.MustNewConstMetric(
175+
statArchiverLagBytesDesc,
176+
prometheus.GaugeValue,
177+
0,
178+
)
179+
return nil
180+
}
181+
return err
182+
}
183+
184+
// If either value is null, return 0 lag
185+
if !lastArchivedWal.Valid || !currentLSN.Valid {
186+
ch <- prometheus.MustNewConstMetric(
187+
statArchiverLagBytesDesc,
188+
prometheus.GaugeValue,
189+
0,
190+
)
191+
return nil
192+
}
193+
194+
// Parse LSN from WAL file name
195+
archivedLSN, err := parseLSNFromWalFile(lastArchivedWal.String)
196+
if err != nil {
197+
return fmt.Errorf("parse archived WAL file %q: %w", lastArchivedWal.String, err)
198+
}
199+
200+
// Parse current LSN from PostgreSQL LSN string format
201+
currentLSNBytes, err := parseLSNFromLSNString(currentLSN.String)
202+
if err != nil {
203+
return fmt.Errorf("parse current LSN %q: %w", currentLSN.String, err)
204+
}
205+
206+
// Calculate lag
207+
lagBytes := bytesBetweenLSN(currentLSNBytes, archivedLSN)
208+
209+
ch <- prometheus.MustNewConstMetric(
210+
statArchiverLagBytesDesc,
211+
prometheus.GaugeValue,
212+
float64(lagBytes),
213+
)
214+
215+
return nil
216+
}

0 commit comments

Comments
 (0)