Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/ratelimiter"
"github.com/influxdata/telegraf/selfstat"
)

const (
Expand Down Expand Up @@ -60,6 +61,16 @@ type httpClient struct {
retryCount atomic.Int64
concurrent uint64
log telegraf.Logger
statistics *selfstat.Collector

statBytesWritten selfstat.Stat
statSuccessfulWrites selfstat.Stat
statFailedWrites selfstat.Stat
statTimeout selfstat.Stat
statRetryableErrorCounters selfstat.Stat
statNonRetryableErrorCounters selfstat.Stat
statSuccessfulWriteDuration selfstat.Stat
statFailedWriteDuration selfstat.Stat

// Mutex to protect the retry-time field
sync.Mutex
Expand Down Expand Up @@ -135,6 +146,20 @@ func (c *httpClient) Init() error {
c.concurrent = 1
}
c.pool = pond.NewPool(int(c.concurrent))

// setup observability
tags := map[string]string{
"url": c.url.String(),
}

c.statBytesWritten = c.statistics.Register("write", "bytes_total", make(map[string]string))
c.statSuccessfulWrites = c.statistics.Register("write", "writes", tags)
c.statFailedWrites = c.statistics.Register("write", "errors", tags)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this be conflicting with the already existing errors stat?

writeErrorsRegister := selfstat.Register("write", "errors", tags)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it won't because the normal stats do not have the url tag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but that will be confusing as the meaning of the value is different solely based on the presence of that tag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be a different metric as well, to be honest. Otherwise you will get the same measurement with different tags.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the inputs.influxdb_v2_listener also has internal metrics, uses influxdb_v2_listener as name which then becomes internal_influxdb_v2_listener as metric name.

c.statRetryableErrorCounters = c.statistics.Register("write", "errors_retryable", tags)
c.statNonRetryableErrorCounters = c.statistics.Register("write", "errors_non_retryable", tags)
c.statTimeout = c.statistics.Register("write", "request_timeouts", tags)
c.statSuccessfulWriteDuration = c.statistics.RegisterTiming("write", "request_success_time_ns", tags)
c.statFailedWriteDuration = c.statistics.RegisterTiming("write", "request_fail_time_ns", tags)
return nil
}

Expand Down Expand Up @@ -315,13 +340,24 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error {
}

// Execute the request
start := time.Now()
resp, err := c.client.Do(req.WithContext(ctx))
durationNs := time.Since(start).Nanoseconds()
if err != nil {
c.statFailedWriteDuration.Set(durationNs)
c.statFailedWrites.Incr(1)
var urlErr *url.Error
if errors.As(err, &urlErr) && urlErr.Timeout() {
c.statTimeout.Incr(1)
}

internal.OnClientError(c.client, err)
return err
}
defer resp.Body.Close()

c.statBytesWritten.Incr(int64(len(b.payload)))

// Check for success
switch resp.StatusCode {
case
Expand All @@ -334,10 +370,15 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error {
http.StatusPartialContent,
http.StatusMultiStatus,
http.StatusAlreadyReported:
c.statSuccessfulWriteDuration.Set(durationNs)
c.statSuccessfulWrites.Incr(1)
c.retryCount.Store(0)
return nil
}

c.statFailedWriteDuration.Set(durationNs)
c.statFailedWrites.Incr(1)

// We got an error and now try to decode further
var desc string
writeResp := &genericRespError{}
Expand All @@ -348,6 +389,7 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error {
switch resp.StatusCode {
// request was too large, send back to try again
case http.StatusRequestEntityTooLarge:
c.statRetryableErrorCounters.Incr(1)
c.log.Errorf("Failed to write metrics with size %d bytes to %s, request was too large (413)", len(b.payload), b.bucket)
return &ThrottleError{
Err: fmt.Errorf("%s: %s", resp.Status, desc),
Expand All @@ -360,19 +402,21 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error {
// for example, submitting metrics outside the retention period.
http.StatusUnprocessableEntity,
http.StatusNotAcceptable:

c.statNonRetryableErrorCounters.Incr(1)
// Clients should *not* repeat the request and the metrics should be rejected.
return &APIError{
Err: fmt.Errorf("failed to write metrics to %s (will be dropped: %s)%s", b.bucket, resp.Status, desc),
StatusCode: resp.StatusCode,
}
case http.StatusUnauthorized, http.StatusForbidden:
c.statRetryableErrorCounters.Incr(1)
return fmt.Errorf("failed to write metrics to %s (%s)%s", b.bucket, resp.Status, desc)
case http.StatusTooManyRequests,
http.StatusServiceUnavailable,
http.StatusBadGateway,
http.StatusGatewayTimeout:
// ^ these handle the cases where the server is likely overloaded, and may not be able to say so.
c.statRetryableErrorCounters.Incr(1)
retryDuration := getRetryDuration(resp.Header, c.retryCount.Add(1))
c.log.Warnf("Failed to write to %s; will retry in %s. (%s)\n", b.bucket, retryDuration, resp.Status)
return &ThrottleError{
Expand All @@ -385,6 +429,7 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error {
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
// retrying will not make the request magically work.
if len(resp.Status) > 0 && resp.Status[0] == '4' {
c.statNonRetryableErrorCounters.Incr(1)
return &APIError{
Err: fmt.Errorf("failed to write metrics to %s (will be dropped: %s)%s", b.bucket, resp.Status, desc),
StatusCode: resp.StatusCode,
Expand All @@ -397,6 +442,7 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error {
desc = fmt.Sprintf(": %s; %s", desc, xErr)
}

c.statRetryableErrorCounters.Incr(1)
return &APIError{
Err: fmt.Errorf("failed to write metrics to bucket %q: %s%s", b.bucket, resp.Status, desc),
StatusCode: resp.StatusCode,
Expand Down
13 changes: 12 additions & 1 deletion plugins/outputs/influxdb_v2/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/ratelimiter"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -46,6 +47,10 @@ func TestHTTPClientInit(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
u, err := url.Parse(tt.addr)
require.NoError(t, err)

collector := selfstat.NewCollector(make(map[string]string))
defer collector.UnregisterAll()
tt.client.statistics = collector
tt.client.url = u

require.NoError(t, tt.client.Init())
Expand Down Expand Up @@ -215,13 +220,16 @@ func TestRetryLaterEarlyExit(t *testing.T) {
u, err := url.Parse("http://" + ts.Listener.Addr().String())
require.NoError(t, err)

collector := selfstat.NewCollector(make(map[string]string))
defer collector.UnregisterAll()
c := &httpClient{
url: u,
bucketTag: "bucket",
contentEncoding: "identity",
serializer: ratelimiter.NewIndividualSerializer(serializer),
rateLimiter: limiter,
log: &testutil.Logger{},
statistics: selfstat.NewCollector(make(map[string]string)),
}
require.NoError(t, c.Init())

Expand Down Expand Up @@ -286,13 +294,16 @@ func TestHeadersDoNotOverrideConfig(t *testing.T) {
require.NoError(t, err)
authHeader := config.NewSecret([]byte("Bearer foo"))
userAgentHeader := config.NewSecret([]byte("foo"))
collector := selfstat.NewCollector(make(map[string]string))
defer collector.UnregisterAll()
c := &httpClient{
headers: map[string]*config.Secret{
"Authorization": &authHeader,
"User-Agent": &userAgentHeader,
},
// URL to make Init() happy
url: testURL,
url: testURL,
statistics: collector,
}
require.NoError(t, c.Init())
require.Equal(t, &authHeader, c.headers["Authorization"])
Expand Down
4 changes: 4 additions & 0 deletions plugins/outputs/influxdb_v2/influxdb_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
commontls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/selfstat"
)

//go:embed sample.conf
Expand All @@ -45,6 +46,8 @@ type InfluxDB struct {
ReadIdleTimeout config.Duration `toml:"read_idle_timeout"`
ConcurrentWrites uint64 `toml:"concurrent_writes"`
Log telegraf.Logger `toml:"-"`
Statistics *selfstat.Collector `toml:"-"`

commontls.ClientConfig
ratelimiter.RateLimitConfig

Expand Down Expand Up @@ -171,6 +174,7 @@ func (i *InfluxDB) Connect() error {
serializer: i.serializer,
concurrent: i.ConcurrentWrites,
log: i.Log,
statistics: i.Statistics,
}

if err := c.Init(); err != nil {
Expand Down
Loading
Loading