Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 106 additions & 2 deletions prometheus/promhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sync"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"

"github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil"
Expand Down Expand Up @@ -79,6 +80,88 @@ var gzipPool = sync.Pool{
},
}

// coalescingGatherer wraps a TransactionalGatherer to deduplicate concurrent
// Gather calls. When a Gather is already in flight, new callers join the
// existing cycle and receive the same result once it completes. The underlying
// done function is called exactly once, when the last joined caller releases.
//
// This prevents goroutine pile-up when the scrape rate is faster than the
// time collectors need to produce metrics.
type coalescingGatherer struct {
g prometheus.TransactionalGatherer
mu sync.Mutex
cycle *gatherCycle
}

// gatherCycle tracks a single in-flight Gather and all HTTP handlers sharing it.
type gatherCycle struct {
ready chan struct{} // closed when Gather completes; happens-before reads of mfs/err/done
mfs []*dto.MetricFamily // set before ready is closed; shared across handlers, must not be mutated
err error // set before ready is closed
done func() // underlying done callback; set before ready is closed
refs int // number of handlers using this cycle; protected by coalescingGatherer.mu
}

var _ prometheus.TransactionalGatherer = (*coalescingGatherer)(nil) // compile-time interface check

func (c *coalescingGatherer) Gather() ([]*dto.MetricFamily, func(), error) {
c.mu.Lock()
if cy := c.cycle; cy != nil {
// c.cycle is non-nil while Gather runs or handlers are still consuming its results.
cy.refs++
c.mu.Unlock()
<-cy.ready
return cy.mfs, c.releaseFunc(cy), cy.err
}
cy := &gatherCycle{
ready: make(chan struct{}),
done: func() {},
refs: 1,
}
c.cycle = cy
c.mu.Unlock()

// Guard against a panic in c.g.Gather: close cy.ready and clear c.cycle
// so that any joiners waiting on <-cy.ready are unblocked rather than
// deadlocked, and future Gather calls start a fresh cycle.
panicked := true
defer func() {
if panicked {
c.mu.Lock()
if c.cycle == cy {
c.cycle = nil
}
c.mu.Unlock()
close(cy.ready)
}
}()
cy.mfs, cy.done, cy.err = c.g.Gather()
panicked = false
close(cy.ready) // happens-before joiners' reads of cy.mfs/err/done

return cy.mfs, c.releaseFunc(cy), cy.err
}

// releaseFunc returns the done callback for one caller sharing cy.
// When the last caller releases, the underlying done is invoked and the
// cycle is cleared so the next Gather starts fresh.
func (c *coalescingGatherer) releaseFunc(cy *gatherCycle) func() {
return func() {
c.mu.Lock()
cy.refs--
if cy.refs > 0 {
c.mu.Unlock()
return
}
// Last caller.
if c.cycle == cy {
c.cycle = nil
}
c.mu.Unlock()
cy.done() // called outside the lock to avoid holding it during done
}
}

// Handler returns an http.Handler for the prometheus.DefaultGatherer, using
// default HandlerOpts, i.e. it reports the first error as an HTTP error, it has
// no error logging, and it applies compression if requested by the client.
Expand Down Expand Up @@ -125,6 +208,10 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
// Multiple metric names can be specified by providing the parameter multiple times.
// When no name[] parameters are provided, all metrics are returned.
func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler {
if opts.CoalesceGather {
reg = &coalescingGatherer{g: reg}
}

var (
inFlightSem chan struct{}
errCnt = prometheus.NewCounterVec(
Expand Down Expand Up @@ -428,15 +515,32 @@ type HandlerOpts struct {
// Service Unavailable and a suitable message in the body. If
// MaxRequestsInFlight is 0 or negative, no limit is applied.
MaxRequestsInFlight int
// CoalesceGather, if true, deduplicates concurrent Gather calls so that
// only one collection runs at a time. Additional requests that arrive
// while a Gather is in flight will receive the same result once it
// completes. This prevents goroutine pile-up when the scrape rate is
// faster than the time collectors need to produce metrics.
//
// When enabled, concurrent scrapers share a single metric snapshot per
// collection cycle. The returned MetricFamily values are shared and must
// not be mutated in place. The built-in handler only reads them, so this
// is safe in practice, but custom TransactionalGatherer implementations
// that modify the returned families after Gather returns must not use
// this option.
//
// Consider using CoalesceGather together with Timeout to bound both the
// scrape response time and the number of concurrent background Gathers.
CoalesceGather bool
// If handling a request takes longer than Timeout, it is responded to
// with 503 ServiceUnavailable and a suitable Message. No timeout is
// applied if Timeout is 0 or negative. Note that with the current
// implementation, reaching the timeout simply ends the HTTP requests as
// described above (and even that only if sending of the body hasn't
// started yet), while the bulk work of gathering all the metrics keeps
// running in the background (with the eventual result to be thrown
// away). Until the implementation is improved, it is recommended to
// implement a separate timeout in potentially slow Collectors.
// away). When CoalesceGather is enabled, only one such background Gather
// can be in flight at a time. It is also recommended to implement a
// separate timeout in potentially slow Collectors.
Timeout time.Duration
// If true, the experimental OpenMetrics encoding is added to the
// possible options during content negotiation. Note that Prometheus
Expand Down
156 changes: 156 additions & 0 deletions prometheus/promhttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ import (
"log"
"net/http"
"net/http/httptest"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/klauspost/compress/zstd"
dto "github.com/prometheus/client_model/go"
"go.uber.org/goleak"

"github.com/prometheus/client_golang/prometheus"
_ "github.com/prometheus/client_golang/prometheus/promhttp/zstd"
Expand Down Expand Up @@ -875,3 +879,155 @@ func TestHandlerWithMetricFilter(t *testing.T) {
})
}
}

// syncGatherCounter is a thread-safe TransactionalGatherer wrapper that counts
// Gather and done invocations. Safe for concurrent use from multiple goroutines,
// unlike mockTransactionGatherer whose counters are not race-safe.
type syncGatherCounter struct {
g prometheus.Gatherer
gatherCalled atomic.Int64
doneCalled atomic.Int64
}

func (m *syncGatherCounter) Gather() ([]*dto.MetricFamily, func(), error) {
m.gatherCalled.Add(1)
mfs, err := m.g.Gather()
return mfs, func() { m.doneCalled.Add(1) }, err
}

// TestCoalesceGatherSequentialInvariant verifies that sequential requests each
// trigger exactly one Gather call and one done call.
func TestCoalesceGatherSequentialInvariant(t *testing.T) {
reg := prometheus.NewRegistry()
counter := &syncGatherCounter{g: reg}
handler := HandlerForTransactional(counter, HandlerOpts{CoalesceGather: true})
req, _ := http.NewRequest(http.MethodGet, "/", nil)
req.Header.Add(acceptHeader, acceptTextPlain)

const n = 3
for i := range n {
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
if got, want := w.Code, http.StatusOK; got != want {
t.Fatalf("request %d: HTTP status %d, want %d", i+1, got, want)
}
}
if got, want := counter.gatherCalled.Load(), int64(n); got != want {
t.Errorf("Gather called %d times, want %d", got, want)
}
if got, want := counter.doneCalled.Load(), int64(n); got != want {
t.Errorf("done called %d times, want %d", got, want)
}
}

// TestCoalesceGatherDoneCalledExactlyOnce verifies that when concurrent requests
// share a single Gather cycle, the underlying done callback is called exactly once.
func TestCoalesceGatherDoneCalledExactlyOnce(t *testing.T) {
defer goleak.VerifyNone(t)

reg := prometheus.NewRegistry()
block := make(chan struct{})
started := make(chan struct{}, 1)
reg.MustRegister(blockingCollector{CollectStarted: started, Block: block})

counter := &syncGatherCounter{g: reg}
handler := HandlerForTransactional(counter, HandlerOpts{CoalesceGather: true})
req, _ := http.NewRequest(http.MethodGet, "/", nil)
req.Header.Add(acceptHeader, acceptTextPlain)

// Start request 1 in background; it blocks in Collect.
w1 := httptest.NewRecorder()
req1Done := make(chan struct{})
go func() {
handler.ServeHTTP(w1, req)
close(req1Done)
}()
<-started // Gather 1 is now in-flight and blocked in Collect.

// Start request 2; it will join the in-flight Gather cycle.
w2 := httptest.NewRecorder()
req2Done := make(chan struct{})
go func() {
handler.ServeHTTP(w2, req)
close(req2Done)
}()

// Yield to allow request 2 to enter the coalescing wait before releasing.
runtime.Gosched()
close(block)
<-req1Done
<-req2Done

// Key invariant: done() must be called exactly once per Gather cycle.
gathers := counter.gatherCalled.Load()
dones := counter.doneCalled.Load()
if gathers != dones {
t.Errorf("Gather called %d times but done called %d times; invariant violated", gathers, dones)
}
// Coalescing should keep gather count below the number of requests.
if gathers > 2 {
t.Errorf("Gather called %d times for 2 requests; expected ≤ 2 with coalescing", gathers)
}
for i, w := range []*httptest.ResponseRecorder{w1, w2} {
if got, want := w.Code, http.StatusOK; got != want {
t.Errorf("request %d: HTTP status %d, want %d", i+1, got, want)
}
}
}

// TestCoalesceGatherGoroutineLeakFree verifies that concurrent requests with a
// slow collector do not leak goroutines when CoalesceGather is enabled.
func TestCoalesceGatherGoroutineLeakFree(t *testing.T) {
defer goleak.VerifyNone(t)

reg := prometheus.NewRegistry()
block := make(chan struct{})
started := make(chan struct{}, 1)
reg.MustRegister(blockingCollector{CollectStarted: started, Block: block})

handler := HandlerForTransactional(
&syncGatherCounter{g: reg},
HandlerOpts{CoalesceGather: true},
)
req, _ := http.NewRequest(http.MethodGet, "/", nil)
req.Header.Add(acceptHeader, acceptTextPlain)

var wg sync.WaitGroup
for range 5 {
wg.Go(func() {
handler.ServeHTTP(httptest.NewRecorder(), req)
})
}
<-started
close(block)
wg.Wait()
// goleak.VerifyNone (deferred above) asserts no goroutines leaked.
}

// TestCoalesceGatherNewCycleAfterCompletion verifies that once all handlers of a
// cycle have released, the next request starts a fresh Gather.
func TestCoalesceGatherNewCycleAfterCompletion(t *testing.T) {
reg := prometheus.NewRegistry()
counter := &syncGatherCounter{g: reg}
handler := HandlerForTransactional(counter, HandlerOpts{CoalesceGather: true})
req, _ := http.NewRequest(http.MethodGet, "/", nil)
req.Header.Add(acceptHeader, acceptTextPlain)

// Cycle 1.
handler.ServeHTTP(httptest.NewRecorder(), req)
if got, want := counter.gatherCalled.Load(), int64(1); got != want {
t.Fatalf("after cycle 1: Gather called %d times, want %d", got, want)
}
if got, want := counter.doneCalled.Load(), int64(1); got != want {
t.Fatalf("after cycle 1: done called %d times, want %d", got, want)
}

// Cycle 2: previous cycle is complete so a fresh Gather must run.
handler.ServeHTTP(httptest.NewRecorder(), req)
if got, want := counter.gatherCalled.Load(), int64(2); got != want {
t.Errorf("after cycle 2: Gather called %d times, want %d", got, want)
}
if got, want := counter.doneCalled.Load(), int64(2); got != want {
t.Errorf("after cycle 2: done called %d times, want %d", got, want)
}
}
Loading