Skip to content

Commit e15303b

Browse files
authored
ENGTAI-64543: expose withGrpcConn to allow exporters to share connection (#252)
* ENGTAI-64543: expose withGrpcConn to allow exporters to share connection * Allow service options to be wired through any Init method
1 parent 2848f3a commit e15303b

File tree

2 files changed

+100
-10
lines changed

2 files changed

+100
-10
lines changed

instrumentation/opentelemetry/init.go

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"crypto/tls"
66
"crypto/x509"
77
"fmt"
8-
"go.uber.org/zap"
98
"log"
109
"maps"
1110
"net/http"
@@ -41,7 +40,10 @@ import (
4140
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
4241
"go.opentelemetry.io/otel/trace"
4342
"go.opentelemetry.io/otel/trace/noop"
43+
"go.uber.org/zap"
44+
"google.golang.org/grpc"
4445
"google.golang.org/grpc/credentials"
46+
"google.golang.org/grpc/credentials/insecure"
4547
"google.golang.org/grpc/resolver"
4648
)
4749

@@ -63,7 +65,8 @@ var (
6365
type ServiceOption func(*ServiceOptions)
6466

6567
type ServiceOptions struct {
66-
headers map[string]string
68+
headers map[string]string
69+
grpcConn *grpc.ClientConn
6770
}
6871

6972
func WithHeaders(headers map[string]string) ServiceOption {
@@ -75,6 +78,49 @@ func WithHeaders(headers map[string]string) ServiceOption {
7578
}
7679
}
7780

81+
// Please ref https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc#WithGRPCConn
82+
// To create the grpc connection with same logic as goagent please use CreateGrpcConn
83+
func WithGrpcConn(conn *grpc.ClientConn) ServiceOption {
84+
return func(opts *ServiceOptions) {
85+
opts.grpcConn = conn
86+
}
87+
}
88+
89+
// Can be used for external clients to reference the underlying connection for otlp grpc exporter
90+
func CreateGrpcConn(cfg *config.AgentConfig) (*grpc.ClientConn, error) {
91+
endpoint := removeProtocolPrefixForOTLP(cfg.GetReporting().GetEndpoint().GetValue())
92+
93+
dialOpts := []grpc.DialOption{}
94+
95+
if !cfg.GetReporting().GetSecure().GetValue() {
96+
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
97+
} else {
98+
certFile := cfg.GetReporting().GetCertFile().GetValue()
99+
if len(certFile) > 0 {
100+
tlsCredentials, err := credentials.NewClientTLSFromFile(certFile, "")
101+
if err != nil {
102+
return nil, fmt.Errorf("error creating TLS credentials from cert path %s: %v", certFile, err)
103+
}
104+
dialOpts = append(dialOpts, grpc.WithTransportCredentials(tlsCredentials))
105+
} else {
106+
// Default to system certs
107+
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
108+
}
109+
}
110+
111+
if cfg.Reporting.GetEnableGrpcLoadbalancing().GetValue() {
112+
resolver.SetDefaultScheme("dns")
113+
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [ { "round_robin": {} } ]}`))
114+
}
115+
116+
conn, err := grpc.NewClient(endpoint, dialOpts...)
117+
if err != nil {
118+
return nil, fmt.Errorf("failed to create gRPC connection to %s: %v", endpoint, err)
119+
}
120+
121+
return conn, nil
122+
}
123+
78124
func makePropagator(formats []config.PropagationFormat) propagation.TextMapPropagator {
79125
var propagators []propagation.TextMapPropagator
80126
for _, format := range formats {
@@ -177,6 +223,7 @@ func makeExporterFactory(cfg *config.AgentConfig) func(serviceOpts ...ServiceOpt
177223
zipkin.WithHeaders(serviceOpts.headers),
178224
)
179225
}
226+
180227
case config.TraceReporterType_LOGGING:
181228
return func(opts ...ServiceOption) (sdktrace.SpanExporter, error) {
182229
// TODO: Define if endpoint could be a filepath to write into a file.
@@ -211,7 +258,7 @@ func makeExporterFactory(cfg *config.AgentConfig) func(serviceOpts ...ServiceOpt
211258
return otlphttp.New(context.Background(), finalOpts...)
212259
}
213260

214-
default:
261+
default: // OTLP GRPC
215262
standardOpts := []otlpgrpc.Option{
216263
otlpgrpc.WithEndpoint(removeProtocolPrefixForOTLP(cfg.GetReporting().GetEndpoint().GetValue())),
217264
}
@@ -246,6 +293,11 @@ func makeExporterFactory(cfg *config.AgentConfig) func(serviceOpts ...ServiceOpt
246293
finalOpts := append([]otlpgrpc.Option{}, standardOpts...)
247294
finalOpts = append(finalOpts, otlpgrpc.WithHeaders(serviceOpts.headers))
248295

296+
// Important: gRPC connection takes precedence over other connection based options
297+
if serviceOpts.grpcConn != nil {
298+
finalOpts = append(finalOpts, otlpgrpc.WithGRPCConn(serviceOpts.grpcConn))
299+
}
300+
249301
return otlptrace.New(
250302
context.Background(),
251303
otlpgrpc.NewClient(finalOpts...),
@@ -293,20 +345,20 @@ func createCaCertPoolFromFile(certFile string) *x509.CertPool {
293345

294346
// Init initializes opentelemetry tracing and returns a shutdown function to flush data immediately
295347
// on a termination signal.
296-
func Init(cfg *config.AgentConfig) func() {
297-
return InitWithSpanProcessorWrapper(cfg, nil, versionInfoAttributes)
348+
func Init(cfg *config.AgentConfig, opts ...ServiceOption) func() {
349+
return InitWithSpanProcessorWrapper(cfg, nil, versionInfoAttributes, opts...)
298350
}
299351

300352
// InitWithSpanProcessorWrapper initializes opentelemetry tracing with a wrapper over span processor
301353
// and returns a shutdown function to flush data immediately on a termination signal.
302354
func InitWithSpanProcessorWrapper(cfg *config.AgentConfig, wrapper SpanProcessorWrapper,
303-
versionInfoAttrs []attribute.KeyValue) func() {
355+
versionInfoAttrs []attribute.KeyValue, opts ...ServiceOption) func() {
304356
logger, err := zap.NewProduction()
305357
if err != nil {
306358
logger = nil
307359
log.Printf("error while creating default zap logger %v", err)
308360
}
309-
return InitWithSpanProcessorWrapperAndZap(cfg, wrapper, versionInfoAttrs, logger)
361+
return InitWithSpanProcessorWrapperAndZap(cfg, wrapper, versionInfoAttrs, logger, opts...)
310362
}
311363

312364
// InitWithSpanProcessorWrapperAndZap initializes opentelemetry tracing with a wrapper over span processor
@@ -349,11 +401,11 @@ func InitWithSpanProcessorWrapperAndZap(cfg *config.AgentConfig, wrapper SpanPro
349401

350402
// Initialize metrics
351403
metricsShutdownFn := initializeMetrics(cfg, versionInfoAttrs, opts...)
352-
353404
exporterFactory = makeExporterFactory(cfg)
354405
configFactory = makeConfigFactory(cfg)
355406

356-
exporter, err := exporterFactory()
407+
exporter, err := exporterFactory(opts...)
408+
357409
if err != nil {
358410
log.Fatal(err)
359411
}
@@ -374,8 +426,8 @@ func InitWithSpanProcessorWrapperAndZap(cfg *config.AgentConfig, wrapper SpanPro
374426
if err != nil {
375427
log.Fatal(err)
376428
}
377-
378429
sampler := sdktrace.AlwaysSample()
430+
379431
tp := sdktrace.NewTracerProvider(
380432
sdktrace.WithSampler(sampler),
381433
sdktrace.WithSpanProcessor(sp),

instrumentation/opentelemetry/init_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"google.golang.org/grpc"
2222
"google.golang.org/grpc/metadata"
2323
"google.golang.org/grpc/resolver"
24+
"google.golang.org/protobuf/types/known/wrapperspb"
2425

2526
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
2627
)
@@ -589,3 +590,40 @@ func TestMakeExporterFactory_Headers_ZipkinAndOTLPHTTP(t *testing.T) {
589590
})
590591
}
591592
}
593+
594+
func TestCreateGrpcConn(t *testing.T) {
595+
lis, err := net.Listen("tcp", "localhost:0")
596+
require.NoError(t, err)
597+
defer lis.Close()
598+
599+
server := grpc.NewServer()
600+
defer server.Stop()
601+
602+
go func() {
603+
if err := server.Serve(lis); err != nil {
604+
t.Logf("Server exited with error: %v", err)
605+
}
606+
}()
607+
608+
cfg := &v1.AgentConfig{
609+
Reporting: &v1.Reporting{
610+
Endpoint: &wrapperspb.StringValue{
611+
Value: lis.Addr().String(),
612+
},
613+
Secure: &wrapperspb.BoolValue{
614+
Value: false,
615+
},
616+
},
617+
}
618+
619+
conn, err := CreateGrpcConn(cfg)
620+
require.NoError(t, err)
621+
require.NotNil(t, conn)
622+
conn.Close()
623+
624+
cfg.Reporting.EnableGrpcLoadbalancing = &wrapperspb.BoolValue{Value: true}
625+
conn, err = CreateGrpcConn(cfg)
626+
require.NoError(t, err)
627+
require.NotNil(t, conn)
628+
conn.Close()
629+
}

0 commit comments

Comments
 (0)