diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index aab8ad609..e359a4fb2 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -41,6 +41,7 @@ import ( "sync" "time" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil" @@ -79,6 +80,88 @@ var gzipPool = sync.Pool{ }, } +// coalescingGatherer wraps a TransactionalGatherer to deduplicate concurrent +// Gather calls. When a Gather is already in flight, new callers join the +// existing cycle and receive the same result once it completes. The underlying +// done function is called exactly once, when the last joined caller releases. +// +// This prevents goroutine pile-up when the scrape rate is faster than the +// time collectors need to produce metrics. +type coalescingGatherer struct { + g prometheus.TransactionalGatherer + mu sync.Mutex + cycle *gatherCycle +} + +// gatherCycle tracks a single in-flight Gather and all HTTP handlers sharing it. +type gatherCycle struct { + ready chan struct{} // closed when Gather completes; happens-before reads of mfs/err/done + mfs []*dto.MetricFamily // set before ready is closed; shared across handlers, must not be mutated + err error // set before ready is closed + done func() // underlying done callback; set before ready is closed + refs int // number of handlers using this cycle; protected by coalescingGatherer.mu +} + +var _ prometheus.TransactionalGatherer = (*coalescingGatherer)(nil) // compile-time interface check + +func (c *coalescingGatherer) Gather() ([]*dto.MetricFamily, func(), error) { + c.mu.Lock() + if cy := c.cycle; cy != nil { + // c.cycle is non-nil while Gather runs or handlers are still consuming its results. + cy.refs++ + c.mu.Unlock() + <-cy.ready + return cy.mfs, c.releaseFunc(cy), cy.err + } + cy := &gatherCycle{ + ready: make(chan struct{}), + done: func() {}, + refs: 1, + } + c.cycle = cy + c.mu.Unlock() + + // Guard against a panic in c.g.Gather: close cy.ready and clear c.cycle + // so that any joiners waiting on <-cy.ready are unblocked rather than + // deadlocked, and future Gather calls start a fresh cycle. + panicked := true + defer func() { + if panicked { + c.mu.Lock() + if c.cycle == cy { + c.cycle = nil + } + c.mu.Unlock() + close(cy.ready) + } + }() + cy.mfs, cy.done, cy.err = c.g.Gather() + panicked = false + close(cy.ready) // happens-before joiners' reads of cy.mfs/err/done + + return cy.mfs, c.releaseFunc(cy), cy.err +} + +// releaseFunc returns the done callback for one caller sharing cy. +// When the last caller releases, the underlying done is invoked and the +// cycle is cleared so the next Gather starts fresh. +func (c *coalescingGatherer) releaseFunc(cy *gatherCycle) func() { + return func() { + c.mu.Lock() + cy.refs-- + if cy.refs > 0 { + c.mu.Unlock() + return + } + // Last caller. + if c.cycle == cy { + c.cycle = nil + } + c.mu.Unlock() + cy.done() // called outside the lock to avoid holding it during done + } +} + // Handler returns an http.Handler for the prometheus.DefaultGatherer, using // default HandlerOpts, i.e. it reports the first error as an HTTP error, it has // no error logging, and it applies compression if requested by the client. @@ -125,6 +208,10 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { // Multiple metric names can be specified by providing the parameter multiple times. // When no name[] parameters are provided, all metrics are returned. func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler { + if opts.CoalesceGather { + reg = &coalescingGatherer{g: reg} + } + var ( inFlightSem chan struct{} errCnt = prometheus.NewCounterVec( @@ -428,6 +515,22 @@ type HandlerOpts struct { // Service Unavailable and a suitable message in the body. If // MaxRequestsInFlight is 0 or negative, no limit is applied. MaxRequestsInFlight int + // CoalesceGather, if true, deduplicates concurrent Gather calls so that + // only one collection runs at a time. Additional requests that arrive + // while a Gather is in flight will receive the same result once it + // completes. This prevents goroutine pile-up when the scrape rate is + // faster than the time collectors need to produce metrics. + // + // When enabled, concurrent scrapers share a single metric snapshot per + // collection cycle. The returned MetricFamily values are shared and must + // not be mutated in place. The built-in handler only reads them, so this + // is safe in practice, but custom TransactionalGatherer implementations + // that modify the returned families after Gather returns must not use + // this option. + // + // Consider using CoalesceGather together with Timeout to bound both the + // scrape response time and the number of concurrent background Gathers. + CoalesceGather bool // If handling a request takes longer than Timeout, it is responded to // with 503 ServiceUnavailable and a suitable Message. No timeout is // applied if Timeout is 0 or negative. Note that with the current @@ -435,8 +538,9 @@ type HandlerOpts struct { // described above (and even that only if sending of the body hasn't // started yet), while the bulk work of gathering all the metrics keeps // running in the background (with the eventual result to be thrown - // away). Until the implementation is improved, it is recommended to - // implement a separate timeout in potentially slow Collectors. + // away). When CoalesceGather is enabled, only one such background Gather + // can be in flight at a time. It is also recommended to implement a + // separate timeout in potentially slow Collectors. Timeout time.Duration // If true, the experimental OpenMetrics encoding is added to the // possible options during content negotiation. Note that Prometheus diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index ccb0b8bbd..3a6e97a9b 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -22,12 +22,16 @@ import ( "log" "net/http" "net/http/httptest" + "runtime" "strings" + "sync" + "sync/atomic" "testing" "time" "github.com/klauspost/compress/zstd" dto "github.com/prometheus/client_model/go" + "go.uber.org/goleak" "github.com/prometheus/client_golang/prometheus" _ "github.com/prometheus/client_golang/prometheus/promhttp/zstd" @@ -875,3 +879,155 @@ func TestHandlerWithMetricFilter(t *testing.T) { }) } } + +// syncGatherCounter is a thread-safe TransactionalGatherer wrapper that counts +// Gather and done invocations. Safe for concurrent use from multiple goroutines, +// unlike mockTransactionGatherer whose counters are not race-safe. +type syncGatherCounter struct { + g prometheus.Gatherer + gatherCalled atomic.Int64 + doneCalled atomic.Int64 +} + +func (m *syncGatherCounter) Gather() ([]*dto.MetricFamily, func(), error) { + m.gatherCalled.Add(1) + mfs, err := m.g.Gather() + return mfs, func() { m.doneCalled.Add(1) }, err +} + +// TestCoalesceGatherSequentialInvariant verifies that sequential requests each +// trigger exactly one Gather call and one done call. +func TestCoalesceGatherSequentialInvariant(t *testing.T) { + reg := prometheus.NewRegistry() + counter := &syncGatherCounter{g: reg} + handler := HandlerForTransactional(counter, HandlerOpts{CoalesceGather: true}) + req, _ := http.NewRequest(http.MethodGet, "/", nil) + req.Header.Add(acceptHeader, acceptTextPlain) + + const n = 3 + for i := range n { + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + if got, want := w.Code, http.StatusOK; got != want { + t.Fatalf("request %d: HTTP status %d, want %d", i+1, got, want) + } + } + if got, want := counter.gatherCalled.Load(), int64(n); got != want { + t.Errorf("Gather called %d times, want %d", got, want) + } + if got, want := counter.doneCalled.Load(), int64(n); got != want { + t.Errorf("done called %d times, want %d", got, want) + } +} + +// TestCoalesceGatherDoneCalledExactlyOnce verifies that when concurrent requests +// share a single Gather cycle, the underlying done callback is called exactly once. +func TestCoalesceGatherDoneCalledExactlyOnce(t *testing.T) { + defer goleak.VerifyNone(t) + + reg := prometheus.NewRegistry() + block := make(chan struct{}) + started := make(chan struct{}, 1) + reg.MustRegister(blockingCollector{CollectStarted: started, Block: block}) + + counter := &syncGatherCounter{g: reg} + handler := HandlerForTransactional(counter, HandlerOpts{CoalesceGather: true}) + req, _ := http.NewRequest(http.MethodGet, "/", nil) + req.Header.Add(acceptHeader, acceptTextPlain) + + // Start request 1 in background; it blocks in Collect. + w1 := httptest.NewRecorder() + req1Done := make(chan struct{}) + go func() { + handler.ServeHTTP(w1, req) + close(req1Done) + }() + <-started // Gather 1 is now in-flight and blocked in Collect. + + // Start request 2; it will join the in-flight Gather cycle. + w2 := httptest.NewRecorder() + req2Done := make(chan struct{}) + go func() { + handler.ServeHTTP(w2, req) + close(req2Done) + }() + + // Yield to allow request 2 to enter the coalescing wait before releasing. + runtime.Gosched() + close(block) + <-req1Done + <-req2Done + + // Key invariant: done() must be called exactly once per Gather cycle. + gathers := counter.gatherCalled.Load() + dones := counter.doneCalled.Load() + if gathers != dones { + t.Errorf("Gather called %d times but done called %d times; invariant violated", gathers, dones) + } + // Coalescing should keep gather count below the number of requests. + if gathers > 2 { + t.Errorf("Gather called %d times for 2 requests; expected ≤ 2 with coalescing", gathers) + } + for i, w := range []*httptest.ResponseRecorder{w1, w2} { + if got, want := w.Code, http.StatusOK; got != want { + t.Errorf("request %d: HTTP status %d, want %d", i+1, got, want) + } + } +} + +// TestCoalesceGatherGoroutineLeakFree verifies that concurrent requests with a +// slow collector do not leak goroutines when CoalesceGather is enabled. +func TestCoalesceGatherGoroutineLeakFree(t *testing.T) { + defer goleak.VerifyNone(t) + + reg := prometheus.NewRegistry() + block := make(chan struct{}) + started := make(chan struct{}, 1) + reg.MustRegister(blockingCollector{CollectStarted: started, Block: block}) + + handler := HandlerForTransactional( + &syncGatherCounter{g: reg}, + HandlerOpts{CoalesceGather: true}, + ) + req, _ := http.NewRequest(http.MethodGet, "/", nil) + req.Header.Add(acceptHeader, acceptTextPlain) + + var wg sync.WaitGroup + for range 5 { + wg.Go(func() { + handler.ServeHTTP(httptest.NewRecorder(), req) + }) + } + <-started + close(block) + wg.Wait() + // goleak.VerifyNone (deferred above) asserts no goroutines leaked. +} + +// TestCoalesceGatherNewCycleAfterCompletion verifies that once all handlers of a +// cycle have released, the next request starts a fresh Gather. +func TestCoalesceGatherNewCycleAfterCompletion(t *testing.T) { + reg := prometheus.NewRegistry() + counter := &syncGatherCounter{g: reg} + handler := HandlerForTransactional(counter, HandlerOpts{CoalesceGather: true}) + req, _ := http.NewRequest(http.MethodGet, "/", nil) + req.Header.Add(acceptHeader, acceptTextPlain) + + // Cycle 1. + handler.ServeHTTP(httptest.NewRecorder(), req) + if got, want := counter.gatherCalled.Load(), int64(1); got != want { + t.Fatalf("after cycle 1: Gather called %d times, want %d", got, want) + } + if got, want := counter.doneCalled.Load(), int64(1); got != want { + t.Fatalf("after cycle 1: done called %d times, want %d", got, want) + } + + // Cycle 2: previous cycle is complete so a fresh Gather must run. + handler.ServeHTTP(httptest.NewRecorder(), req) + if got, want := counter.gatherCalled.Load(), int64(2); got != want { + t.Errorf("after cycle 2: Gather called %d times, want %d", got, want) + } + if got, want := counter.doneCalled.Load(), int64(2); got != want { + t.Errorf("after cycle 2: done called %d times, want %d", got, want) + } +}