Skip to content

Commit 39ad8a4

Browse files
committed
consumererror: Add partial error type
This PR adds functionality for consumers to create a partial error type. This will allow consumers to properly report partial success/failure with failed item counts, which can subsequently be used when reporting sent/failed metrics.
1 parent 14a7832 commit 39ad8a4

File tree

8 files changed

+247
-58
lines changed

8 files changed

+247
-58
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: consumererror
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add new partial error type to consumererror to allow consumers to report partial successes.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13423]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []

consumer/consumererror/partial.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror"
5+
6+
import "errors"
7+
8+
type partialError struct {
9+
inner error
10+
failed int
11+
}
12+
13+
var _ error = partialError{}
14+
15+
func (pe partialError) Error() string {
16+
return pe.inner.Error()
17+
}
18+
19+
func (pe partialError) Unwrap() error {
20+
return pe.inner
21+
}
22+
23+
func (pe partialError) Failed() int {
24+
return pe.failed
25+
}
26+
27+
// NewPartial creates a new partial error. This is used by consumers
28+
// to report errors where only a subset of the total items failed
29+
// to be written, but it is not possible to tell which particular items
30+
// failed.
31+
func NewPartial(err error, failed int) error {
32+
return NewPermanent(partialError{
33+
inner: err,
34+
failed: failed,
35+
})
36+
}
37+
38+
// AsPartial checks if an error was wrapped with the NewPartial function,
39+
// or if it contains one such error in its Unwrap() tree.
40+
func AsPartial(err error) (partialError, bool) {
41+
var pe partialError
42+
ok := errors.As(err, &pe)
43+
return pe, ok
44+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package consumererror
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestPartial(t *testing.T) {
11+
internalErr := errors.New("some points failed")
12+
err := NewPartial(internalErr, 5)
13+
assert.True(t, IsPermanent(err))
14+
partialErr, ok := AsPartial(err)
15+
assert.True(t, ok)
16+
assert.Equal(t, 5, partialErr.Failed())
17+
assert.Equal(t, internalErr, partialErr.Unwrap())
18+
assert.Equal(t, internalErr.Error(), partialErr.Error())
19+
}

exporter/exporterhelper/internal/obs_report_sender.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.opentelemetry.io/otel/trace"
1313

1414
"go.opentelemetry.io/collector/component"
15+
"go.opentelemetry.io/collector/consumer/consumererror"
1516
"go.opentelemetry.io/collector/exporter"
1617
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
@@ -132,8 +133,12 @@ func (ors *obsReportSender[K]) endOp(ctx context.Context, numLogRecords int, err
132133
}
133134

134135
func toNumItems(numExportedItems int, err error) (int64, int64) {
135-
if err != nil {
136-
return 0, int64(numExportedItems)
136+
if err == nil {
137+
return int64(numExportedItems), 0
138+
}
139+
if partialErr, ok := consumererror.AsPartial(err); ok {
140+
numFailedItems := int64(partialErr.Failed())
141+
return int64(numExportedItems) - numFailedItems, numFailedItems
137142
}
138-
return int64(numExportedItems), 0
143+
return 0, int64(numExportedItems)
139144
}

exporter/exporterhelper/internal/obs_report_sender_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
"go.opentelemetry.io/collector/component"
1919
"go.opentelemetry.io/collector/component/componenttest"
20+
"go.opentelemetry.io/collector/consumer/consumererror"
2021
"go.opentelemetry.io/collector/exporter"
2122
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest"
2223
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -46,9 +47,13 @@ func TestExportTraceDataOp(t *testing.T) {
4647
)
4748
require.NoError(t, err)
4849

50+
partialFailCount := 10
51+
errPartial := consumererror.NewPartial(errors.New("errPartial"), partialFailCount)
52+
4953
params := []testParams{
5054
{items: 22, err: nil},
5155
{items: 14, err: errFake},
56+
{items: 14, err: errPartial},
5257
}
5358
for i := range params {
5459
exporterErr = params[i].err
@@ -73,6 +78,14 @@ func TestExportTraceDataOp(t *testing.T) {
7378
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: ItemsFailed, Value: attribute.Int64Value(int64(params[i].items))})
7479
assert.Equal(t, codes.Error, span.Status().Code)
7580
assert.Equal(t, params[i].err.Error(), span.Status().Description)
81+
case errors.Is(params[i].err, errPartial):
82+
partialSuccessCount := params[i].items - partialFailCount
83+
sentSpans += partialSuccessCount
84+
failedToSendSpans += partialFailCount
85+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: ItemsSent, Value: attribute.Int64Value(int64(partialSuccessCount))})
86+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: ItemsFailed, Value: attribute.Int64Value(int64(partialFailCount))})
87+
assert.Equal(t, codes.Error, span.Status().Code)
88+
assert.Equal(t, params[i].err.Error(), span.Status().Description)
7689
default:
7790
t.Fatalf("unexpected error: %v", params[i].err)
7891
}
@@ -114,9 +127,13 @@ func TestExportMetricsOp(t *testing.T) {
114127
)
115128
require.NoError(t, err)
116129

130+
partialFailCount := 10
131+
errPartial := consumererror.NewPartial(errors.New("errPartial"), partialFailCount)
132+
117133
params := []testParams{
118134
{items: 17, err: nil},
119135
{items: 23, err: errFake},
136+
{items: 20, err: errPartial},
120137
}
121138
for i := range params {
122139
exporterErr = params[i].err
@@ -141,6 +158,14 @@ func TestExportMetricsOp(t *testing.T) {
141158
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: ItemsFailed, Value: attribute.Int64Value(int64(params[i].items))})
142159
assert.Equal(t, codes.Error, span.Status().Code)
143160
assert.Equal(t, params[i].err.Error(), span.Status().Description)
161+
case errors.Is(params[i].err, errPartial):
162+
partialSuccessCount := params[i].items - partialFailCount
163+
sentMetricPoints += partialSuccessCount
164+
failedToSendMetricPoints += partialFailCount
165+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: ItemsSent, Value: attribute.Int64Value(int64(partialSuccessCount))})
166+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: ItemsFailed, Value: attribute.Int64Value(int64(partialFailCount))})
167+
assert.Equal(t, codes.Error, span.Status().Code)
168+
assert.Equal(t, params[i].err.Error(), span.Status().Description)
144169
default:
145170
t.Fatalf("unexpected error: %v", params[i].err)
146171
}
@@ -182,9 +207,13 @@ func TestExportLogsOp(t *testing.T) {
182207
)
183208
require.NoError(t, err)
184209

210+
partialFailCount := 10
211+
errPartial := consumererror.NewPartial(errors.New("errPartial"), partialFailCount)
212+
185213
params := []testParams{
186214
{items: 17, err: nil},
187215
{items: 23, err: errFake},
216+
{items: 21, err: errPartial},
188217
}
189218
for i := range params {
190219
exporterErr = params[i].err
@@ -209,6 +238,14 @@ func TestExportLogsOp(t *testing.T) {
209238
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: ItemsFailed, Value: attribute.Int64Value(int64(params[i].items))})
210239
assert.Equal(t, codes.Error, span.Status().Code)
211240
assert.Equal(t, params[i].err.Error(), span.Status().Description)
241+
case errors.Is(params[i].err, errPartial):
242+
partialSuccessCount := params[i].items - partialFailCount
243+
sentLogRecords += partialSuccessCount
244+
failedToSendLogRecords += partialFailCount
245+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: ItemsSent, Value: attribute.Int64Value(int64(partialSuccessCount))})
246+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: ItemsFailed, Value: attribute.Int64Value(int64(partialFailCount))})
247+
assert.Equal(t, codes.Error, span.Status().Code)
248+
assert.Equal(t, params[i].err.Error(), span.Status().Description)
212249
default:
213250
t.Fatalf("unexpected error: %v", params[i].err)
214251
}

exporter/exporterhelper/logs_test.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go.opentelemetry.io/collector/component"
2424
"go.opentelemetry.io/collector/component/componenttest"
2525
"go.opentelemetry.io/collector/consumer"
26+
"go.opentelemetry.io/collector/consumer/consumererror"
2627
"go.opentelemetry.io/collector/consumer/consumertest"
2728
"go.opentelemetry.io/collector/exporter"
2829
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
@@ -181,6 +182,19 @@ func TestLogs_WithRecordMetrics(t *testing.T) {
181182
checkRecordedMetricsForLogs(t, tt, fakeLogsName, le, nil)
182183
}
183184

185+
func TestLogs_WithRecordMetrics_PartialError(t *testing.T) {
186+
tt := componenttest.NewTelemetry()
187+
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
188+
189+
partialErr := consumererror.NewPartial(errors.New("partial error"), 1)
190+
191+
le, err := NewLogs(context.Background(), exporter.Settings{ID: fakeLogsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeLogsConfig, newPushLogsData(partialErr))
192+
require.NoError(t, err)
193+
require.NotNil(t, le)
194+
195+
checkRecordedMetricsForLogs(t, tt, fakeLogsName, le, partialErr)
196+
}
197+
184198
func TestLogs_pLogModifiedDownStream_WithRecordMetrics(t *testing.T) {
185199
tt := componenttest.NewTelemetry()
186200
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
@@ -338,26 +352,24 @@ func checkRecordedMetricsForLogs(t *testing.T, tt *componenttest.Telemetry, id c
338352
require.Equal(t, wantError, le.ConsumeLogs(context.Background(), ld))
339353
}
340354

341-
// TODO: When the new metrics correctly count partial dropped fix this.
342-
if wantError != nil {
343-
metadatatest.AssertEqualExporterSendFailedLogRecords(t, tt,
344-
[]metricdata.DataPoint[int64]{
345-
{
346-
Attributes: attribute.NewSet(
347-
attribute.String("exporter", id.String())),
348-
Value: int64(numBatches * ld.LogRecordCount()),
349-
},
350-
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
355+
allRecords := ld.LogRecordCount() * numBatches
356+
var expectedRecords int
357+
if wantError == nil {
358+
expectedRecords = allRecords
359+
} else if partialError, ok := consumererror.AsPartial(wantError); ok {
360+
successPerBatch := ld.LogRecordCount() - partialError.Failed()
361+
expectedRecords = numBatches * successPerBatch
351362
} else {
352-
metadatatest.AssertEqualExporterSentLogRecords(t, tt,
353-
[]metricdata.DataPoint[int64]{
354-
{
355-
Attributes: attribute.NewSet(
356-
attribute.String("exporter", id.String())),
357-
Value: int64(numBatches * ld.LogRecordCount()),
358-
},
359-
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
363+
expectedRecords = 0
360364
}
365+
metadatatest.AssertEqualExporterSentLogRecords(t, tt,
366+
[]metricdata.DataPoint[int64]{
367+
{
368+
Attributes: attribute.NewSet(
369+
attribute.String("exporter", id.String())),
370+
Value: int64(expectedRecords),
371+
},
372+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
361373
}
362374

363375
func generateLogsTraffic(t *testing.T, tracer trace.Tracer, le exporter.Logs, numRequests int, wantError error) {

exporter/exporterhelper/metrics_test.go

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go.opentelemetry.io/collector/component"
2424
"go.opentelemetry.io/collector/component/componenttest"
2525
"go.opentelemetry.io/collector/consumer"
26+
"go.opentelemetry.io/collector/consumer/consumererror"
2627
"go.opentelemetry.io/collector/consumer/consumertest"
2728
"go.opentelemetry.io/collector/exporter"
2829
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
@@ -181,6 +182,19 @@ func TestMetrics_WithRecordMetrics(t *testing.T) {
181182
checkRecordedMetricsForMetrics(t, tt, fakeMetricsName, me, nil)
182183
}
183184

185+
func TestMetrics_WithRecordMetrics_PartialError(t *testing.T) {
186+
tt := componenttest.NewTelemetry()
187+
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
188+
189+
partialErr := consumererror.NewPartial(errors.New("partial error"), 1)
190+
191+
me, err := NewMetrics(context.Background(), exporter.Settings{ID: fakeMetricsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeMetricsConfig, newPushMetricsData(partialErr))
192+
require.NoError(t, err)
193+
require.NotNil(t, me)
194+
195+
checkRecordedMetricsForMetrics(t, tt, fakeMetricsName, me, partialErr)
196+
}
197+
184198
func TestMetrics_pMetricModifiedDownStream_WithRecordMetrics(t *testing.T) {
185199
tt := componenttest.NewTelemetry()
186200
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
@@ -341,27 +355,38 @@ func checkRecordedMetricsForMetrics(t *testing.T, tt *componenttest.Telemetry, i
341355
require.Equal(t, wantError, me.ConsumeMetrics(context.Background(), md))
342356
}
343357

344-
// TODO: When the new metrics correctly count partial dropped fix this.
345-
numPoints := int64(numBatches * md.MetricCount() * 2) /* 2 points per metric*/
346-
if wantError != nil {
347-
metadatatest.AssertEqualExporterSendFailedMetricPoints(t, tt,
348-
[]metricdata.DataPoint[int64]{
349-
{
350-
Attributes: attribute.NewSet(
351-
attribute.String(internal.ExporterKey, id.String())),
352-
Value: numPoints,
353-
},
354-
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
358+
allPoints := numBatches * md.MetricCount() * 2
359+
var successPoints, failedPoints int
360+
if wantError == nil {
361+
successPoints = allPoints
362+
failedPoints = 0
363+
} else if partialError, ok := consumererror.AsPartial(wantError); ok {
364+
// In this scenario the partial error failed count is failed datapoints,
365+
// not failed entire metrics.
366+
failedPerBatch := partialError.Failed()
367+
failedPoints = numBatches * failedPerBatch
368+
successPoints = allPoints - failedPoints
355369
} else {
356-
metadatatest.AssertEqualExporterSentMetricPoints(t, tt,
357-
[]metricdata.DataPoint[int64]{
358-
{
359-
Attributes: attribute.NewSet(
360-
attribute.String(internal.ExporterKey, id.String())),
361-
Value: numPoints,
362-
},
363-
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
370+
successPoints = 0
371+
failedPoints = allPoints
364372
}
373+
374+
metadatatest.AssertEqualExporterSendFailedMetricPoints(t, tt,
375+
[]metricdata.DataPoint[int64]{
376+
{
377+
Attributes: attribute.NewSet(
378+
attribute.String(internal.ExporterKey, id.String())),
379+
Value: int64(failedPoints),
380+
},
381+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
382+
metadatatest.AssertEqualExporterSentMetricPoints(t, tt,
383+
[]metricdata.DataPoint[int64]{
384+
{
385+
Attributes: attribute.NewSet(
386+
attribute.String(internal.ExporterKey, id.String())),
387+
Value: int64(successPoints),
388+
},
389+
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
365390
}
366391

367392
func generateMetricsTraffic(t *testing.T, tracer trace.Tracer, me exporter.Metrics, numRequests int, wantError error) {

0 commit comments

Comments
 (0)