diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index ccb650a665a3b..558ea17509886 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -28,6 +28,7 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/ratelimiter" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -60,6 +61,16 @@ type httpClient struct { retryCount atomic.Int64 concurrent uint64 log telegraf.Logger + statistics *selfstat.Collector + + statBytesWritten selfstat.Stat + statSuccessfulWrites selfstat.Stat + statFailedWrites selfstat.Stat + statTimeout selfstat.Stat + statRetryableErrorCounters selfstat.Stat + statNonRetryableErrorCounters selfstat.Stat + statSuccessfulWriteDuration selfstat.Stat + statFailedWriteDuration selfstat.Stat // Mutex to protect the retry-time field sync.Mutex @@ -135,6 +146,20 @@ func (c *httpClient) Init() error { c.concurrent = 1 } c.pool = pond.NewPool(int(c.concurrent)) + + // setup observability + tags := map[string]string{ + "url": c.url.String(), + } + + c.statBytesWritten = c.statistics.Register("write", "bytes_total", make(map[string]string)) + c.statSuccessfulWrites = c.statistics.Register("write", "writes", tags) + c.statFailedWrites = c.statistics.Register("write", "errors", tags) + c.statRetryableErrorCounters = c.statistics.Register("write", "errors_retryable", tags) + c.statNonRetryableErrorCounters = c.statistics.Register("write", "errors_non_retryable", tags) + c.statTimeout = c.statistics.Register("write", "request_timeouts", tags) + c.statSuccessfulWriteDuration = c.statistics.RegisterTiming("write", "request_success_time_ns", tags) + c.statFailedWriteDuration = c.statistics.RegisterTiming("write", "request_fail_time_ns", tags) return nil } @@ -315,13 +340,24 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error { } // Execute the request + start := time.Now() resp, err := c.client.Do(req.WithContext(ctx)) + durationNs := time.Since(start).Nanoseconds() if err != nil { + c.statFailedWriteDuration.Set(durationNs) + c.statFailedWrites.Incr(1) + var urlErr *url.Error + if errors.As(err, &urlErr) && urlErr.Timeout() { + c.statTimeout.Incr(1) + } + internal.OnClientError(c.client, err) return err } defer resp.Body.Close() + c.statBytesWritten.Incr(int64(len(b.payload))) + // Check for success switch resp.StatusCode { case @@ -334,10 +370,15 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error { http.StatusPartialContent, http.StatusMultiStatus, http.StatusAlreadyReported: + c.statSuccessfulWriteDuration.Set(durationNs) + c.statSuccessfulWrites.Incr(1) c.retryCount.Store(0) return nil } + c.statFailedWriteDuration.Set(durationNs) + c.statFailedWrites.Incr(1) + // We got an error and now try to decode further var desc string writeResp := &genericRespError{} @@ -348,6 +389,7 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error { switch resp.StatusCode { // request was too large, send back to try again case http.StatusRequestEntityTooLarge: + c.statRetryableErrorCounters.Incr(1) c.log.Errorf("Failed to write metrics with size %d bytes to %s, request was too large (413)", len(b.payload), b.bucket) return &ThrottleError{ Err: fmt.Errorf("%s: %s", resp.Status, desc), @@ -360,19 +402,21 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error { // for example, submitting metrics outside the retention period. http.StatusUnprocessableEntity, http.StatusNotAcceptable: - + c.statNonRetryableErrorCounters.Incr(1) // Clients should *not* repeat the request and the metrics should be rejected. return &APIError{ Err: fmt.Errorf("failed to write metrics to %s (will be dropped: %s)%s", b.bucket, resp.Status, desc), StatusCode: resp.StatusCode, } case http.StatusUnauthorized, http.StatusForbidden: + c.statRetryableErrorCounters.Incr(1) return fmt.Errorf("failed to write metrics to %s (%s)%s", b.bucket, resp.Status, desc) case http.StatusTooManyRequests, http.StatusServiceUnavailable, http.StatusBadGateway, http.StatusGatewayTimeout: // ^ these handle the cases where the server is likely overloaded, and may not be able to say so. + c.statRetryableErrorCounters.Incr(1) retryDuration := getRetryDuration(resp.Header, c.retryCount.Add(1)) c.log.Warnf("Failed to write to %s; will retry in %s. (%s)\n", b.bucket, retryDuration, resp.Status) return &ThrottleError{ @@ -385,6 +429,7 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error { // if it's any other 4xx code, the client should not retry as it's the client's mistake. // retrying will not make the request magically work. if len(resp.Status) > 0 && resp.Status[0] == '4' { + c.statNonRetryableErrorCounters.Incr(1) return &APIError{ Err: fmt.Errorf("failed to write metrics to %s (will be dropped: %s)%s", b.bucket, resp.Status, desc), StatusCode: resp.StatusCode, @@ -397,6 +442,7 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error { desc = fmt.Sprintf(": %s; %s", desc, xErr) } + c.statRetryableErrorCounters.Incr(1) return &APIError{ Err: fmt.Errorf("failed to write metrics to bucket %q: %s%s", b.bucket, resp.Status, desc), StatusCode: resp.StatusCode, diff --git a/plugins/outputs/influxdb_v2/http_test.go b/plugins/outputs/influxdb_v2/http_test.go index e287b5cd4b082..15cb63bf53163 100644 --- a/plugins/outputs/influxdb_v2/http_test.go +++ b/plugins/outputs/influxdb_v2/http_test.go @@ -18,6 +18,7 @@ import ( "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/common/ratelimiter" "github.com/influxdata/telegraf/plugins/serializers/influx" + "github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/testutil" ) @@ -46,6 +47,10 @@ func TestHTTPClientInit(t *testing.T) { t.Run(tt.name, func(t *testing.T) { u, err := url.Parse(tt.addr) require.NoError(t, err) + + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + tt.client.statistics = collector tt.client.url = u require.NoError(t, tt.client.Init()) @@ -215,6 +220,8 @@ func TestRetryLaterEarlyExit(t *testing.T) { u, err := url.Parse("http://" + ts.Listener.Addr().String()) require.NoError(t, err) + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() c := &httpClient{ url: u, bucketTag: "bucket", @@ -222,6 +229,7 @@ func TestRetryLaterEarlyExit(t *testing.T) { serializer: ratelimiter.NewIndividualSerializer(serializer), rateLimiter: limiter, log: &testutil.Logger{}, + statistics: selfstat.NewCollector(make(map[string]string)), } require.NoError(t, c.Init()) @@ -286,13 +294,16 @@ func TestHeadersDoNotOverrideConfig(t *testing.T) { require.NoError(t, err) authHeader := config.NewSecret([]byte("Bearer foo")) userAgentHeader := config.NewSecret([]byte("foo")) + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() c := &httpClient{ headers: map[string]*config.Secret{ "Authorization": &authHeader, "User-Agent": &userAgentHeader, }, // URL to make Init() happy - url: testURL, + url: testURL, + statistics: collector, } require.NoError(t, c.Init()) require.Equal(t, &authHeader, c.headers["Authorization"]) diff --git a/plugins/outputs/influxdb_v2/influxdb_v2.go b/plugins/outputs/influxdb_v2/influxdb_v2.go index 1bc558168dfc0..2f747545c98bb 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2.go @@ -21,6 +21,7 @@ import ( commontls "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers/influx" + "github.com/influxdata/telegraf/selfstat" ) //go:embed sample.conf @@ -45,6 +46,8 @@ type InfluxDB struct { ReadIdleTimeout config.Duration `toml:"read_idle_timeout"` ConcurrentWrites uint64 `toml:"concurrent_writes"` Log telegraf.Logger `toml:"-"` + Statistics *selfstat.Collector `toml:"-"` + commontls.ClientConfig ratelimiter.RateLimitConfig @@ -171,6 +174,7 @@ func (i *InfluxDB) Connect() error { serializer: i.serializer, concurrent: i.ConcurrentWrites, log: i.Log, + statistics: i.Statistics, } if err := c.Init(); err != nil { diff --git a/plugins/outputs/influxdb_v2/influxdb_v2_test.go b/plugins/outputs/influxdb_v2/influxdb_v2_test.go index 1a6c25396322c..191bd90b87518 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2_test.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2_test.go @@ -23,6 +23,7 @@ import ( "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" + "github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/testutil" ) @@ -124,10 +125,13 @@ func TestConnect(t *testing.T) { } for _, plugin := range tests { + collector := selfstat.NewCollector(make(map[string]string)) + plugin.Statistics = collector t.Run(plugin.URLs[0], func(t *testing.T) { require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) }) + collector.UnregisterAll() } } @@ -181,9 +185,12 @@ func TestWrite(t *testing.T) { ) defer ts.Close() + url := "http://" + ts.Listener.Addr().String() + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() // Setup plugin and connect plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, + URLs: []string{url}, Bucket: "telegraf", BucketTag: "bucket", ExcludeBucketTag: true, @@ -191,6 +198,7 @@ func TestWrite(t *testing.T) { PingTimeout: config.Duration(15 * time.Second), ReadIdleTimeout: config.Duration(30 * time.Second), Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -211,6 +219,16 @@ func TestWrite(t *testing.T) { } require.NoError(t, plugin.Write(metrics)) require.NoError(t, plugin.Write(metrics)) + + stat := collector.Get("outputs.influxdb_v2", "successful_writes_total", map[string]string{ + "url": url + "/api/v2/write", + }) + require.NotNil(t, stat) + require.Equal(t, int64(2), stat.Get()) + + stat = collector.Get("write", "bytes_total", map[string]string{}) + require.NotNil(t, stat) + require.Equal(t, int64(38), stat.Get()) } func TestWriteBucketTagWorksOnRetry(t *testing.T) { @@ -253,13 +271,17 @@ func TestWriteBucketTagWorksOnRetry(t *testing.T) { defer ts.Close() // Setup plugin and connect + url := "http://" + ts.Listener.Addr().String() + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, + URLs: []string{url}, Bucket: "telegraf", BucketTag: "bucket", ExcludeBucketTag: true, ContentEncoding: "identity", Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -280,6 +302,12 @@ func TestWriteBucketTagWorksOnRetry(t *testing.T) { } require.NoError(t, plugin.Write(metrics)) require.NoError(t, plugin.Write(metrics)) + + stat := collector.Get("outputs.influxdb_v2", "successful_writes_total", map[string]string{ + "url": url + "/api/v2/write", + }) + require.NotNil(t, stat) + require.Equal(t, int64(2), stat.Get()) } func TestTooLargeWriteRetry(t *testing.T) { @@ -318,6 +346,8 @@ func TestTooLargeWriteRetry(t *testing.T) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() plugin := &influxdb.InfluxDB{ URLs: []string{"http://" + ts.Listener.Addr().String()}, Bucket: "telegraf", @@ -325,6 +355,7 @@ func TestTooLargeWriteRetry(t *testing.T) { ExcludeBucketTag: true, ContentEncoding: "identity", Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -412,6 +443,8 @@ func TestRateLimit(t *testing.T) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() plugin := &influxdb.InfluxDB{ URLs: []string{"http://" + ts.Listener.Addr().String()}, Bucket: "telegraf", @@ -420,7 +453,8 @@ func TestRateLimit(t *testing.T) { Limit: 50, Period: config.Duration(time.Second), }, - Log: &testutil.Logger{}, + Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -520,11 +554,15 @@ func TestStatusCodeNonRetryable4xx(t *testing.T) { defer ts.Close() // Setup plugin and connect + url := "http://" + ts.Listener.Addr().String() + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, + URLs: []string{url}, BucketTag: "bucket", ContentEncoding: "identity", Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -585,6 +623,17 @@ func TestStatusCodeNonRetryable4xx(t *testing.T) { var writeErr *internal.PartialWriteError require.ErrorAs(t, err, &writeErr) require.Len(t, writeErr.MetricsReject, 2, "rejected metrics") + + stat := collector.Get("outputs.influxdb_v2", "failed_writes_total", map[string]string{ + "url": url + "/api/v2/write", + }) + require.NotNil(t, stat) + require.Equal(t, int64(1), stat.Get()) + stat = collector.Get("outputs.influxdb_v2", "non_retryable_errors_total", map[string]string{ + "url": url + "/api/v2/write", + }) + require.NotNil(t, stat) + require.Equal(t, int64(1), stat.Get()) }) } } @@ -613,10 +662,13 @@ func TestStatusCodeInvalidAuthentication(t *testing.T) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, - BucketTag: "bucket", - Log: &testutil.Logger{}, + URLs: []string{"http://" + ts.Listener.Addr().String()}, + BucketTag: "bucket", + Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -708,11 +760,15 @@ func TestStatusCodeServiceUnavailable(t *testing.T) { defer ts.Close() // Setup plugin and connect + url := "http://" + ts.Listener.Addr().String() + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, + URLs: []string{url}, BucketTag: "bucket", ContentEncoding: "identity", Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -770,6 +826,17 @@ func TestStatusCodeServiceUnavailable(t *testing.T) { require.ErrorAs(t, err, &writeErr) require.Empty(t, writeErr.MetricsReject, "rejected metrics") require.LessOrEqual(t, len(writeErr.MetricsAccept), 2, "accepted metrics") + + stat := collector.Get("outputs.influxdb_v2", "failed_writes_total", map[string]string{ + "url": url + "/api/v2/write", + }) + require.NotNil(t, stat) + require.Equal(t, int64(1), stat.Get()) + stat = collector.Get("outputs.influxdb_v2", "retryable_errors_total", map[string]string{ + "url": url + "/api/v2/write", + }) + require.NotNil(t, stat) + require.Equal(t, int64(1), stat.Get()) }) } } @@ -798,11 +865,15 @@ func TestStatusCodeUnexpected(t *testing.T) { defer ts.Close() // Setup plugin and connect + url := "http://" + ts.Listener.Addr().String() + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, + URLs: []string{url}, BucketTag: "bucket", ContentEncoding: "identity", Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -861,6 +932,17 @@ func TestStatusCodeUnexpected(t *testing.T) { require.ErrorAs(t, err, &writeErr) require.Empty(t, writeErr.MetricsReject, "rejected metrics") require.LessOrEqual(t, len(writeErr.MetricsAccept), 2, "accepted metrics") + + stat := collector.Get("outputs.influxdb_v2", "failed_writes_total", map[string]string{ + "url": url + "/api/v2/write", + }) + require.NotNil(t, stat) + require.Equal(t, int64(1), stat.Get()) + stat = collector.Get("outputs.influxdb_v2", "retryable_errors_total", map[string]string{ + "url": url + "/api/v2/write", + }) + require.NotNil(t, stat) + require.Equal(t, int64(1), stat.Get()) }) } } @@ -880,12 +962,16 @@ func TestUseDynamicSecret(t *testing.T) { defer ts.Close() secretToken := config.NewSecret([]byte("wrongtk")) + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + // Setup plugin and connect plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, - Log: &testutil.Logger{}, - Bucket: "my_bucket", - Token: secretToken, + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Log: &testutil.Logger{}, + Statistics: collector, + Bucket: "my_bucket", + Token: secretToken, } require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -924,11 +1010,15 @@ func BenchmarkWrite1k(b *testing.B) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, - Token: config.NewSecret([]byte("sometoken")), - Bucket: "my_bucket", - Log: &testutil.Logger{}, + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(b, plugin.Init()) require.NoError(b, plugin.Connect()) @@ -968,11 +1058,15 @@ func BenchmarkWrite5k(b *testing.B) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, - Token: config.NewSecret([]byte("sometoken")), - Bucket: "my_bucket", - Log: &testutil.Logger{}, + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(b, plugin.Init()) require.NoError(b, plugin.Connect()) @@ -1012,11 +1106,15 @@ func BenchmarkWrite10k(b *testing.B) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, - Token: config.NewSecret([]byte("sometoken")), - Bucket: "my_bucket", - Log: &testutil.Logger{}, + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(b, plugin.Init()) require.NoError(b, plugin.Connect()) @@ -1056,11 +1154,15 @@ func BenchmarkWrite25k(b *testing.B) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, - Token: config.NewSecret([]byte("sometoken")), - Bucket: "my_bucket", - Log: &testutil.Logger{}, + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(b, plugin.Init()) require.NoError(b, plugin.Connect()) @@ -1100,11 +1202,15 @@ func BenchmarkWrite50k(b *testing.B) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, - Token: config.NewSecret([]byte("sometoken")), - Bucket: "my_bucket", - Log: &testutil.Logger{}, + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(b, plugin.Init()) require.NoError(b, plugin.Connect()) @@ -1144,11 +1250,15 @@ func BenchmarkWrite100k(b *testing.B) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + plugin := &influxdb.InfluxDB{ - URLs: []string{"http://" + ts.Listener.Addr().String()}, - Token: config.NewSecret([]byte("sometoken")), - Bucket: "my_bucket", - Log: &testutil.Logger{}, + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(b, plugin.Init()) require.NoError(b, plugin.Connect()) @@ -1188,12 +1298,16 @@ func BenchmarkWriteConcurrent100k_4(b *testing.B) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + plugin := &influxdb.InfluxDB{ URLs: []string{"http://" + ts.Listener.Addr().String()}, Token: config.NewSecret([]byte("sometoken")), Bucket: "my_bucket", ConcurrentWrites: 4, Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(b, plugin.Init()) require.NoError(b, plugin.Connect()) @@ -1233,12 +1347,16 @@ func BenchmarkWriteConcurrent100k_8(b *testing.B) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + plugin := &influxdb.InfluxDB{ URLs: []string{"http://" + ts.Listener.Addr().String()}, Token: config.NewSecret([]byte("sometoken")), Bucket: "my_bucket", ConcurrentWrites: 8, Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(b, plugin.Init()) require.NoError(b, plugin.Connect()) @@ -1278,12 +1396,16 @@ func BenchmarkWriteConcurrent100k_16(b *testing.B) { defer ts.Close() // Setup plugin and connect + collector := selfstat.NewCollector(make(map[string]string)) + defer collector.UnregisterAll() + plugin := &influxdb.InfluxDB{ URLs: []string{"http://" + ts.Listener.Addr().String()}, Token: config.NewSecret([]byte("sometoken")), Bucket: "my_bucket", ConcurrentWrites: 16, Log: &testutil.Logger{}, + Statistics: collector, } require.NoError(b, plugin.Init()) require.NoError(b, plugin.Connect())