@@ -6,7 +6,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o
66import (
77 "context"
88 "errors"
9- "fmt "
9+ "sync/atomic "
1010 "time"
1111
1212 collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
@@ -21,6 +21,8 @@ import (
2121 "google.golang.org/grpc/metadata"
2222 "google.golang.org/grpc/status"
2323
24+ "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
25+ "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
2426 "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
2527)
2628
@@ -37,6 +39,8 @@ type client struct {
3739 ourConn bool
3840 conn * grpc.ClientConn
3941 lsc collogpb.LogsServiceClient
42+
43+ instrumentation * observ.Instrumentation
4044}
4145
4246// Used for testing.
@@ -71,7 +75,18 @@ func newClient(cfg config) (*client, error) {
7175
7276 c .lsc = collogpb .NewLogsServiceClient (c .conn )
7377
74- return c , nil
78+ var err error
79+ id := nextExporterID ()
80+ c .instrumentation , err = observ .NewInstrumentation (id , c .conn .CanonicalTarget ())
81+ return c , err
82+ }
83+
84+ var exporterN atomic.Int64
85+
86+ // nextExporterID returns the next unique ID for an exporter.
87+ func nextExporterID () int64 {
88+ const inc = 1
89+ return exporterN .Add (inc ) - inc
7590}
7691
7792func newGRPCDialOptions (cfg config ) []grpc.DialOption {
@@ -131,6 +146,14 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
131146 ctx , cancel := c .exportContext (ctx )
132147 defer cancel ()
133148
149+ count := int64 (len (rl ))
150+ if c .instrumentation != nil {
151+ eo := c .instrumentation .ExportLogs (ctx , count )
152+ defer func () {
153+ eo .End (uploadErr )
154+ }()
155+ }
156+
134157 return errors .Join (uploadErr , c .requestFunc (ctx , func (ctx context.Context ) error {
135158 resp , err := c .lsc .Export (ctx , & collogpb.ExportLogsServiceRequest {
136159 ResourceLogs : rl ,
@@ -139,7 +162,7 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
139162 msg := resp .PartialSuccess .GetErrorMessage ()
140163 n := resp .PartialSuccess .GetRejectedLogRecords ()
141164 if n != 0 || msg != "" {
142- err := errPartial { msg : msg , n : n }
165+ err := internal . LogPartialSuccessError ( n , msg )
143166 uploadErr = errors .Join (uploadErr , err )
144167 }
145168 }
@@ -152,23 +175,6 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
152175 }))
153176}
154177
155- type errPartial struct {
156- msg string
157- n int64
158- }
159-
160- var _ error = errPartial {}
161-
162- func (e errPartial ) Error () string {
163- const form = "OTLP partial success: %s (%d log records rejected)"
164- return fmt .Sprintf (form , e .msg , e .n )
165- }
166-
167- func (errPartial ) Is (target error ) bool {
168- _ , ok := target .(errPartial )
169- return ok
170- }
171-
172178// Shutdown shuts down the client, freeing all resources.
173179//
174180// Any active connections to a remote endpoint are closed if they were created
0 commit comments