diff --git a/.gitignore b/.gitignore index 74c384e..736a41c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ test.bash /bin +/ctrlc .idea .vscode *.test.yaml \ No newline at end of file diff --git a/cmd/ctrlc/ctrlc.go b/cmd/ctrlc/ctrlc.go index a58e993..b899712 100644 --- a/cmd/ctrlc/ctrlc.go +++ b/cmd/ctrlc/ctrlc.go @@ -1,10 +1,14 @@ package main import ( + "context" "os" + "strings" + "time" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/cmd/ctrlc/root" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/mitchellh/go-homedir" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -36,9 +40,51 @@ func init() { } func main() { - if err := cmd.Execute(); err != nil { + ctx := context.Background() + + // Initialize telemetry + shutdown, err := telemetry.InitTelemetry(ctx) + if err != nil { + log.Warn("Failed to initialize telemetry", "error", err) + // Continue execution even if telemetry fails + } + + // Ensure telemetry is properly shut down + if shutdown != nil { + defer func() { + // Give a brief moment for any pending spans to be exported + shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if shutdownErr := shutdown(shutdownCtx); shutdownErr != nil { + log.Debug("Error during telemetry shutdown", "error", shutdownErr) + } + }() + } + + // Determine command name for the root span + commandName := "help" + if len(os.Args) > 1 { + commandName = strings.Join(os.Args[1:], " ") + } + + // Start root span + ctx, rootSpan := telemetry.StartRootSpan(ctx, commandName, os.Args[1:]) + defer rootSpan.End() + + // Execute command with telemetry context + if err := executeWithTelemetry(ctx, cmd); err != nil { + telemetry.SetSpanError(rootSpan, err) os.Exit(1) } + + telemetry.SetSpanSuccess(rootSpan) +} + +// executeWithTelemetry wraps the command execution with telemetry context +func executeWithTelemetry(ctx context.Context, cmd *cobra.Command) error { + // Set the context in the command so it can be used by subcommands + cmd.SetContext(ctx) + return cmd.Execute() } func initConfig() { diff --git a/cmd/ctrlc/root/sync/aws/ec2/ec2.go b/cmd/ctrlc/root/sync/aws/ec2/ec2.go index 62a2ce8..3dfcd57 100644 --- a/cmd/ctrlc/root/sync/aws/ec2/ec2.go +++ b/cmd/ctrlc/root/sync/aws/ec2/ec2.go @@ -16,8 +16,11 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/cliutil" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type ConnectionMethod struct { @@ -92,11 +95,22 @@ func NewSyncEC2Cmd() *cobra.Command { workspaceId := viper.GetString("workspace") // Get EC2 instances + ctx, span := telemetry.StartSpan(ctx, "aws.ec2.describe_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + ), + ) + defer span.End() + result, err := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{}) if err != nil { + telemetry.SetSpanError(span, err) return fmt.Errorf("failed to describe instances: %w", err) } + telemetry.AddSpanAttribute(span, "aws.ec2.reservations_count", len(result.Reservations)) + resources := []api.CreateResource{} for _, reservation := range result.Reservations { accountId := *reservation.OwnerId @@ -242,6 +256,9 @@ func NewSyncEC2Cmd() *cobra.Command { } } + telemetry.AddSpanAttribute(span, "aws.ec2.instances_processed", len(resources)) + telemetry.SetSpanSuccess(span) + // Create or update resource provider if name == "" { name = fmt.Sprintf("aws-ec2-region-%s", region) diff --git a/cmd/ctrlc/root/sync/aws/eks/eks.go b/cmd/ctrlc/root/sync/aws/eks/eks.go index 5474365..e987270 100644 --- a/cmd/ctrlc/root/sync/aws/eks/eks.go +++ b/cmd/ctrlc/root/sync/aws/eks/eks.go @@ -17,8 +17,11 @@ import ( "github.com/ctrlplanedev/cli/cmd/ctrlc/root/sync/aws/common" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/kinds" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // NewSyncEKSCmd creates a new cobra command for syncing EKS clusters @@ -131,31 +134,70 @@ func initEKSClient(ctx context.Context, region string) (*eks.Client, aws.Config, } func processClusters(ctx context.Context, eksClient *eks.Client, region string, cfg aws.Config) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "aws.eks.process_clusters", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + ), + ) + defer span.End() + var resources []api.CreateResource var nextToken *string accountID, err := common.GetAccountID(ctx, cfg) if err != nil { + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to get AWS account ID: %w", err) } for { - resp, err := eksClient.ListClusters(ctx, &eks.ListClustersInput{ + // Create span for ListClusters call + listCtx, listSpan := telemetry.StartSpan(ctx, "aws.eks.list_clusters", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + ), + ) + + resp, err := eksClient.ListClusters(listCtx, &eks.ListClustersInput{ NextToken: nextToken, }) + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() return nil, fmt.Errorf("failed to list EKS clusters: %w", err) } + telemetry.AddSpanAttribute(listSpan, "aws.eks.clusters_found", len(resp.Clusters)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + for _, clusterName := range resp.Clusters { - cluster, err := eksClient.DescribeCluster(ctx, &eks.DescribeClusterInput{ + // Create span for DescribeCluster call + descCtx, descSpan := telemetry.StartSpan(ctx, "aws.eks.describe_cluster", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + attribute.String("aws.eks.cluster_name", clusterName), + ), + ) + + cluster, err := eksClient.DescribeCluster(descCtx, &eks.DescribeClusterInput{ Name: &clusterName, }) + if err != nil { log.Error("Failed to describe cluster", "name", clusterName, "error", err) + telemetry.SetSpanError(descSpan, err) + descSpan.End() continue } + telemetry.SetSpanSuccess(descSpan) + descSpan.End() + resource, err := processCluster(ctx, cluster.Cluster, region, accountID) if err != nil { log.Error("Failed to process EKS cluster", "name", clusterName, "error", err) @@ -170,6 +212,9 @@ func processClusters(ctx context.Context, eksClient *eks.Client, region string, nextToken = resp.NextToken } + telemetry.AddSpanAttribute(span, "aws.eks.total_clusters", len(resources)) + telemetry.SetSpanSuccess(span) + log.Info("Found EKS clusters", "region", region, "count", len(resources)) return resources, nil } diff --git a/cmd/ctrlc/root/sync/aws/networks/networks.go b/cmd/ctrlc/root/sync/aws/networks/networks.go index 218c706..04d8b3f 100644 --- a/cmd/ctrlc/root/sync/aws/networks/networks.go +++ b/cmd/ctrlc/root/sync/aws/networks/networks.go @@ -11,8 +11,11 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/cmd/ctrlc/root/sync/aws/common" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "os" "strconv" "sync" @@ -174,6 +177,14 @@ func initComputeClient(ctx context.Context, region string) (*ec2.Client, aws.Con func processNetworks( ctx context.Context, ec2Client *ec2.Client, awsSubnets []types.Subnet, region string, accountId string, ) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "aws.networks.process_networks", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + ), + ) + defer span.End() + var nextToken *string vpcs := make([]types.Vpc, 0) subnetsByVpc := make(map[string][]types.Subnet) @@ -189,13 +200,27 @@ func processNetworks( } for { - output, err := ec2Client.DescribeVpcs(ctx, &ec2.DescribeVpcsInput{ + listCtx, listSpan := telemetry.StartSpan(ctx, "aws.networks.describe_vpcs", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + ), + ) + + output, err := ec2Client.DescribeVpcs(listCtx, &ec2.DescribeVpcsInput{ NextToken: nextToken, }) if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list VPCs: %w", err) } + telemetry.AddSpanAttribute(listSpan, "aws.networks.vpcs_found", len(output.Vpcs)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + vpcs = append(vpcs, output.Vpcs...) if output.NextToken == nil { break @@ -218,6 +243,8 @@ func processNetworks( resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "aws.networks.vpcs_processed", len(resources)) + telemetry.SetSpanSuccess(span) return resources, nil } @@ -287,6 +314,14 @@ func initNetworkMetadata(vpc types.Vpc, region string, subnetCount int) map[stri // getSubnetsForVpc retrieves subnets as AWS SDK objects // these objects are processed differently for VPC and subnet resources func getAwsSubnets(ctx context.Context, ec2Client *ec2.Client, region string) ([]types.Subnet, error) { + ctx, span := telemetry.StartSpan(ctx, "aws.networks.get_subnets", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + ), + ) + defer span.End() + var subnets []types.Subnet var nextToken *string @@ -298,6 +333,7 @@ func getAwsSubnets(ctx context.Context, ec2Client *ec2.Client, region string) ([ subnetsOutput, err := ec2Client.DescribeSubnets(ctx, subnetInput) if err != nil { + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list subnets at region %s: %w", region, err) } @@ -308,11 +344,22 @@ func getAwsSubnets(ctx context.Context, ec2Client *ec2.Client, region string) ([ nextToken = subnetsOutput.NextToken } + telemetry.AddSpanAttribute(span, "aws.networks.subnets_found", len(subnets)) + telemetry.SetSpanSuccess(span) return subnets, nil } // processSubnets lists and processes all subnetworks -func processSubnets(_ context.Context, subnets []types.Subnet, region string) ([]api.CreateResource, error) { +func processSubnets(ctx context.Context, subnets []types.Subnet, region string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "aws.networks.process_subnets", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + attribute.Int("aws.networks.subnets_total", len(subnets)), + ), + ) + defer span.End() + resources := make([]api.CreateResource, 0) subnetCount := 0 @@ -327,6 +374,8 @@ func processSubnets(_ context.Context, subnets []types.Subnet, region string) ([ subnetCount++ } + telemetry.AddSpanAttribute(span, "aws.networks.subnets_processed", subnetCount) + telemetry.SetSpanSuccess(span) log.Info("Processed subnets", "count", subnetCount) return resources, nil } diff --git a/cmd/ctrlc/root/sync/aws/rds/rds.go b/cmd/ctrlc/root/sync/aws/rds/rds.go index 3b381ab..e615ec2 100644 --- a/cmd/ctrlc/root/sync/aws/rds/rds.go +++ b/cmd/ctrlc/root/sync/aws/rds/rds.go @@ -15,8 +15,11 @@ import ( "github.com/ctrlplanedev/cli/cmd/ctrlc/root/sync/aws/common" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/kinds" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // NewSyncRDSCmd creates a new cobra command for syncing AWS RDS instances @@ -152,17 +155,39 @@ func initRDSClient(ctx context.Context, region string) (*rds.Client, error) { } func processInstances(ctx context.Context, rdsClient *rds.Client, region string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "aws.rds.process_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + ), + ) + defer span.End() + var resources []api.CreateResource var marker *string for { - resp, err := rdsClient.DescribeDBInstances(ctx, &rds.DescribeDBInstancesInput{ + describeCtx, describeSpan := telemetry.StartSpan(ctx, "aws.rds.describe_db_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.region", region), + ), + ) + + resp, err := rdsClient.DescribeDBInstances(describeCtx, &rds.DescribeDBInstancesInput{ Marker: marker, }) if err != nil { + telemetry.SetSpanError(describeSpan, err) + describeSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list RDS instances: %w", err) } + telemetry.AddSpanAttribute(describeSpan, "aws.rds.instances_found", len(resp.DBInstances)) + telemetry.SetSpanSuccess(describeSpan) + describeSpan.End() + for _, instance := range resp.DBInstances { resource, err := processInstance(ctx, &instance, region, rdsClient) if err != nil { @@ -178,6 +203,8 @@ func processInstances(ctx context.Context, rdsClient *rds.Client, region string) marker = resp.Marker } + telemetry.AddSpanAttribute(span, "aws.rds.instances_processed", len(resources)) + telemetry.SetSpanSuccess(span) log.Info("Found RDS instances", "region", region, "count", len(resources)) return resources, nil } @@ -477,6 +504,14 @@ var relationshipRules = []api.CreateResourceRelationshipRule{ // fetchParameterGroupDetails retrieves parameters from a parameter group and adds them to metadata func fetchParameterGroupDetails(ctx context.Context, rdsClient *rds.Client, parameterGroupName string, metadata map[string]string) { + ctx, span := telemetry.StartSpan(ctx, "aws.rds.fetch_parameter_group", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("aws.rds.parameter_group", parameterGroupName), + ), + ) + defer span.End() + metadata["database/parameter-group"] = parameterGroupName // Get the parameters for this parameter group @@ -490,6 +525,7 @@ func fetchParameterGroupDetails(ctx context.Context, rdsClient *rds.Client, para }) if err != nil { log.Error("Failed to get parameter group details", "parameter_group", parameterGroupName, "error", err) + telemetry.SetSpanError(span, err) return } @@ -511,6 +547,9 @@ func fetchParameterGroupDetails(ctx context.Context, rdsClient *rds.Client, para if paramCount > 0 { metadata["database/parameter-count"] = strconv.Itoa(paramCount) } + + telemetry.AddSpanAttribute(span, "aws.rds.parameters_fetched", paramCount) + telemetry.SetSpanSuccess(span) } // upsertToCtrlplane handles upserting resources to Ctrlplane diff --git a/cmd/ctrlc/root/sync/azure/aks/aks.go b/cmd/ctrlc/root/sync/azure/aks/aks.go index 51e4e85..2b8b082 100644 --- a/cmd/ctrlc/root/sync/azure/aks/aks.go +++ b/cmd/ctrlc/root/sync/azure/aks/aks.go @@ -19,8 +19,11 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/kinds" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) // NewSyncAKSCmd creates a new cobra command for syncing AKS clusters @@ -160,6 +163,14 @@ func getDefaultSubscriptionID(ctx context.Context, cred azcore.TokenCredential) } func processClusters(ctx context.Context, cred azcore.TokenCredential, subscriptionID string, tenantID string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "azure.aks.process_clusters", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("azure.subscription_id", subscriptionID), + ), + ) + defer span.End() + var resources []api.CreateResource var mu sync.Mutex var wg sync.WaitGroup @@ -168,17 +179,33 @@ func processClusters(ctx context.Context, cred azcore.TokenCredential, subscript // Create AKS client aksClient, err := armcontainerservice.NewManagedClustersClient(subscriptionID, cred, nil) if err != nil { + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to create AKS client: %w", err) } + var clustersFound int + // List all clusters in the subscription + // Create span for ListClusters call + listCtx, listSpan := telemetry.StartSpan(ctx, "azure.aks.list_clusters", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("azure.subscription_id", subscriptionID), + ), + ) + pager := aksClient.NewListPager(nil) for pager.More() { - page, err := pager.NextPage(ctx) + page, err := pager.NextPage(listCtx) if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list AKS clusters: %w", err) } + clustersFound += len(page.Value) + for _, cluster := range page.Value { wg.Add(1) go func(mc *armcontainerservice.ManagedCluster) { @@ -200,6 +227,10 @@ func processClusters(ctx context.Context, cred azcore.TokenCredential, subscript } } + telemetry.AddSpanAttribute(listSpan, "azure.aks.clusters_found", clustersFound) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + wg.Wait() if len(syncErrors) > 0 { @@ -207,6 +238,9 @@ func processClusters(ctx context.Context, cred azcore.TokenCredential, subscript // Continue with the clusters that succeeded } + telemetry.AddSpanAttribute(span, "azure.aks.clusters_processed", len(resources)) + telemetry.SetSpanSuccess(span) + log.Info("Found AKS clusters", "count", len(resources)) return resources, nil } diff --git a/cmd/ctrlc/root/sync/azure/networks/networks.go b/cmd/ctrlc/root/sync/azure/networks/networks.go index 90fb581..22c8707 100644 --- a/cmd/ctrlc/root/sync/azure/networks/networks.go +++ b/cmd/ctrlc/root/sync/azure/networks/networks.go @@ -3,6 +3,10 @@ package networks import ( "context" "fmt" + "os" + "strconv" + "sync" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork" @@ -13,11 +17,11 @@ import ( "github.com/ctrlplanedev/cli/cmd/ctrlc/root/sync/azure/common" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/kinds" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" - "os" - "strconv" - "sync" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func NewSyncNetworksCmd() *cobra.Command { @@ -157,20 +161,32 @@ func getDefaultSubscriptionID(ctx context.Context, cred azcore.TokenCredential) func processNetworks( ctx context.Context, cred azcore.TokenCredential, subscriptionID string, tenantID string, ) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "azure.networks.process_networks", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("azure.subscription_id", subscriptionID), + ), + ) + defer span.End() + var allResources []api.CreateResource var resourceGroups []common.ResourceGroupInfo var mu sync.Mutex var wg sync.WaitGroup var err error var syncErrors []error + var networksFound int + var subnetsFound int if resourceGroups, err = common.GetResourceGroupInfo(ctx, cred, subscriptionID); err != nil { + telemetry.SetSpanError(span, err) return nil, err } // Create virtual network client client, err := armnetwork.NewVirtualNetworksClient(subscriptionID, cred, nil) if err != nil { + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to create Virtual Network client: %w", err) } @@ -180,15 +196,32 @@ func processNetworks( go func(resourceGroup string) { defer wg.Done() + // Create span for list virtual networks call + listCtx, listSpan := telemetry.StartSpan(ctx, "azure.networks.list_virtual_networks", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("azure.subscription_id", subscriptionID), + attribute.String("azure.resource_group", resourceGroup), + ), + ) + pager := client.NewListPager(resourceGroup, nil) + var localNetworksFound int + var localSubnetsFound int + for pager.More() { - page, err := pager.NextPage(ctx) + page, err := pager.NextPage(listCtx) if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() mu.Lock() syncErrors = append(syncErrors, fmt.Errorf("failed to list networks: %w", err)) mu.Unlock() return } + + localNetworksFound += len(page.Value) + for _, network := range page.Value { resources, err := processNetwork(ctx, network, resourceGroup, subscriptionID, tenantID) if err != nil { @@ -196,23 +229,56 @@ func processNetworks( mu.Lock() syncErrors = append(syncErrors, fmt.Errorf("network %s: %w", *network.Name, err)) mu.Unlock() - return + continue + } + + // Count subnets (each network returns 1 network resource + N subnet resources) + if len(resources) > 0 { + localSubnetsFound += len(resources) - 1 } + mu.Lock() allResources = append(allResources, resources...) mu.Unlock() } } + + telemetry.AddSpanAttribute(listSpan, "azure.networks.networks_found", localNetworksFound) + telemetry.AddSpanAttribute(listSpan, "azure.networks.subnets_found", localSubnetsFound) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + + mu.Lock() + networksFound += localNetworksFound + subnetsFound += localSubnetsFound + mu.Unlock() }(rgName) } wg.Wait() if len(syncErrors) > 0 { - log.Warn("Some clusters failed to sync", "errors", len(syncErrors)) - // Continue with the clusters that succeeded + log.Warn("Some networks failed to sync", "errors", len(syncErrors)) + // Continue with the networks that succeeded } + // Calculate processed counts (networks + subnets) + var networksProcessed int + var subnetsProcessed int + for _, resource := range allResources { + if resource.Kind == "AzureNetwork" { + networksProcessed++ + } else if resource.Kind == "AzureSubnet" { + subnetsProcessed++ + } + } + + telemetry.AddSpanAttribute(span, "azure.networks.networks_found", networksFound) + telemetry.AddSpanAttribute(span, "azure.networks.networks_processed", networksProcessed) + telemetry.AddSpanAttribute(span, "azure.networks.subnets_found", subnetsFound) + telemetry.AddSpanAttribute(span, "azure.networks.subnets_processed", subnetsProcessed) + telemetry.SetSpanSuccess(span) + log.Info("Found network resources", "count", len(allResources)) return allResources, nil } diff --git a/cmd/ctrlc/root/sync/github/pullrequests.go b/cmd/ctrlc/root/sync/github/pullrequests.go index c3e9bbb..2cfeedc 100644 --- a/cmd/ctrlc/root/sync/github/pullrequests.go +++ b/cmd/ctrlc/root/sync/github/pullrequests.go @@ -12,9 +12,12 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/kinds" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/google/go-github/v57/github" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/oauth2" ) @@ -288,6 +291,16 @@ func processPullRequests(ctx context.Context, client *github.Client, owner, repo // fetchPRs fetches pull requests with the given state from GitHub func fetchPRs(ctx context.Context, client *github.Client, owner, repo, state string) ([]*github.PullRequest, error) { + ctx, span := telemetry.StartSpan(ctx, "github.fetch_pull_requests", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("github.owner", owner), + attribute.String("github.repo", repo), + attribute.String("github.state", state), + ), + ) + defer span.End() + log.Debug("Fetching pull requests", "owner", owner, "repo", repo, "state", state) opts := &github.PullRequestListOptions{ State: state, @@ -298,14 +311,38 @@ func fetchPRs(ctx context.Context, client *github.Client, owner, repo, state str var prs []*github.PullRequest page := 1 + totalApiCalls := 0 + for { log.Debug("Fetching page of pull requests", "page", page, "state", state) - batch, resp, err := client.PullRequests.List(ctx, owner, repo, opts) + + // Create a child span for each API call + pageCtx, pageSpan := telemetry.StartSpan(ctx, "github.list_pull_requests_page", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("github.owner", owner), + attribute.String("github.repo", repo), + attribute.String("github.state", state), + attribute.Int("github.page", page), + ), + ) + + batch, resp, err := client.PullRequests.List(pageCtx, owner, repo, opts) + totalApiCalls++ + if err != nil { log.Error("Failed to list pull requests", "state", state, "page", page, "error", err) + telemetry.SetSpanError(pageSpan, err) + pageSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list %s pull requests: %w", state, err) } + log.Debug("Fetched pull requests", "state", state, "page", page, "count", len(batch)) + telemetry.AddSpanAttribute(pageSpan, "github.prs_fetched", len(batch)) + telemetry.SetSpanSuccess(pageSpan) + pageSpan.End() + prs = append(prs, batch...) if resp.NextPage == 0 { log.Debug("No more pages to fetch", "state", state) @@ -315,29 +352,63 @@ func fetchPRs(ctx context.Context, client *github.Client, owner, repo, state str page = resp.NextPage } + telemetry.AddSpanAttribute(span, "github.total_prs", len(prs)) + telemetry.AddSpanAttribute(span, "github.total_api_calls", totalApiCalls) + telemetry.SetSpanSuccess(span) + log.Debug("Completed fetching pull requests", "state", state, "total", len(prs)) return prs, nil } // fetchAllCommits fetches all commits for a pull request with pagination support func fetchAllCommits(ctx context.Context, client *github.Client, owner, repo string, prNumber int) ([]*github.RepositoryCommit, error) { + ctx, span := telemetry.StartSpan(ctx, "github.fetch_pr_commits", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("github.owner", owner), + attribute.String("github.repo", repo), + attribute.Int("github.pr_number", prNumber), + ), + ) + defer span.End() + var allCommits []*github.RepositoryCommit page := 1 + totalApiCalls := 0 for { log.Debug("Fetching PR commits", "pr", prNumber, "page", page) - commits, resp, err := client.PullRequests.ListCommits(ctx, owner, repo, prNumber, &github.ListOptions{ + // Create child span for each page request + pageCtx, pageSpan := telemetry.StartSpan(ctx, "github.list_pr_commits_page", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("github.owner", owner), + attribute.String("github.repo", repo), + attribute.Int("github.pr_number", prNumber), + attribute.Int("github.page", page), + ), + ) + + commits, resp, err := client.PullRequests.ListCommits(pageCtx, owner, repo, prNumber, &github.ListOptions{ Page: page, PerPage: 100, }) + totalApiCalls++ if err != nil { log.Error("Failed to list commits", "pr", prNumber, "page", page, "error", err) + telemetry.SetSpanError(pageSpan, err) + pageSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list commits for PR #%d (page %d): %w", prNumber, page, err) } log.Debug("Fetched commits", "pr", prNumber, "page", page, "count", len(commits)) + telemetry.AddSpanAttribute(pageSpan, "github.commits_fetched", len(commits)) + telemetry.SetSpanSuccess(pageSpan) + pageSpan.End() + allCommits = append(allCommits, commits...) if resp.NextPage == 0 { @@ -348,6 +419,10 @@ func fetchAllCommits(ctx context.Context, client *github.Client, owner, repo str page = resp.NextPage } + telemetry.AddSpanAttribute(span, "github.total_commits", len(allCommits)) + telemetry.AddSpanAttribute(span, "github.total_api_calls", totalApiCalls) + telemetry.SetSpanSuccess(span) + log.Debug("Retrieved all commits for PR", "pr", prNumber, "count", len(allCommits)) return allCommits, nil } diff --git a/cmd/ctrlc/root/sync/google/bigtable/bigtable.go b/cmd/ctrlc/root/sync/google/bigtable/bigtable.go index c381c94..041a7ee 100644 --- a/cmd/ctrlc/root/sync/google/bigtable/bigtable.go +++ b/cmd/ctrlc/root/sync/google/bigtable/bigtable.go @@ -12,8 +12,11 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/kinds" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/bigtableadmin/v2" ) @@ -102,12 +105,36 @@ func initBigtableClient(ctx context.Context) (*bigtableadmin.Service, error) { // processInstances lists and processes all Bigtable instances func processInstances(ctx context.Context, adminClient *bigtableadmin.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.bigtable.process_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for ListInstances call + _, listSpan := telemetry.StartSpan(ctx, "google.bigtable.list_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + projectParent := fmt.Sprintf("projects/%s", project) instances, err := adminClient.Projects.Instances.List(projectParent).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list instances: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.bigtable.instances_found", len(instances.Instances)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + log.Info("Found instances", "count", len(instances.Instances)) resources := []api.CreateResource{} @@ -120,6 +147,9 @@ func processInstances(ctx context.Context, adminClient *bigtableadmin.Service, p resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "google.bigtable.total_instances", len(resources)) + telemetry.SetSpanSuccess(span) + return resources, nil } diff --git a/cmd/ctrlc/root/sync/google/buckets/buckets.go b/cmd/ctrlc/root/sync/google/buckets/buckets.go index 76a1cf7..660c72c 100644 --- a/cmd/ctrlc/root/sync/google/buckets/buckets.go +++ b/cmd/ctrlc/root/sync/google/buckets/buckets.go @@ -11,8 +11,11 @@ import ( "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/storage/v1" ) @@ -93,12 +96,36 @@ func initStorageClient(ctx context.Context) (*storage.Service, error) { // processBuckets lists and processes all Storage buckets in the project func processBuckets(ctx context.Context, storageClient *storage.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.storage.process_buckets", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for ListBuckets call + _, listSpan := telemetry.StartSpan(ctx, "google.storage.list_buckets", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + // List all buckets in the project buckets, err := storageClient.Buckets.List(project).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list buckets: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.storage.buckets_found", len(buckets.Items)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + log.Info("Found buckets", "count", len(buckets.Items)) resources := []api.CreateResource{} @@ -111,6 +138,9 @@ func processBuckets(ctx context.Context, storageClient *storage.Service, project resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "google.storage.total_buckets", len(resources)) + telemetry.SetSpanSuccess(span) + return resources, nil } diff --git a/cmd/ctrlc/root/sync/google/cloudrun/cloudrun.go b/cmd/ctrlc/root/sync/google/cloudrun/cloudrun.go index 31b92d2..fb8a8d5 100644 --- a/cmd/ctrlc/root/sync/google/cloudrun/cloudrun.go +++ b/cmd/ctrlc/root/sync/google/cloudrun/cloudrun.go @@ -10,8 +10,11 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/cliutil" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/run/v1" ) @@ -141,17 +144,34 @@ func runSync(project, providerName *string, regions *[]string) func(cmd *cobra.C return fmt.Errorf("failed to initialize Cloud Run client: %w", err) } + // Create span for listing Cloud Run services + _, listSpan := telemetry.StartSpan(ctx, "google.cloudrun.list_services", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", *project), + ), + ) + services, err := cloudRunService.Projects.Locations.Services.List(fmt.Sprintf("projects/%s/locations/-", *project)).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() return fmt.Errorf("failed to list Cloud Run services: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.cloudrun.services_found", len(services.Items)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + allResources := make([]api.CreateResource, 0) for _, service := range services.Items { resource := processService(service) allResources = append(allResources, resource) } + log.Info("Found Cloud Run services", "count", len(allResources)) + upsertResp, err := upsertToCtrlplane(ctx, allResources, project, providerName) if err != nil { return fmt.Errorf("failed to upsert Cloud Run services: %w", err) diff --git a/cmd/ctrlc/root/sync/google/cloudsql/cloudsql.go b/cmd/ctrlc/root/sync/google/cloudsql/cloudsql.go index 132c37a..bc07bef 100644 --- a/cmd/ctrlc/root/sync/google/cloudsql/cloudsql.go +++ b/cmd/ctrlc/root/sync/google/cloudsql/cloudsql.go @@ -11,8 +11,11 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/kinds" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/sqladmin/v1" ) @@ -234,12 +237,36 @@ func initSQLAdminClient(ctx context.Context) (*sqladmin.Service, error) { } // processInstances lists and processes all Cloud SQL instances -func processInstances(_ context.Context, sqlService *sqladmin.Service, project string) ([]api.CreateResource, error) { +func processInstances(ctx context.Context, sqlService *sqladmin.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.cloudsql.process_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for ListInstances call + _, listSpan := telemetry.StartSpan(ctx, "google.cloudsql.list_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + instances, err := sqlService.Instances.List(project).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list instances: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.cloudsql.instances_found", len(instances.Items)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + log.Info("Found instances", "count", len(instances.Items)) resources := []api.CreateResource{} @@ -248,6 +275,9 @@ func processInstances(_ context.Context, sqlService *sqladmin.Service, project s resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "google.cloudsql.total_instances", len(resources)) + telemetry.SetSpanSuccess(span) + return resources, nil } diff --git a/cmd/ctrlc/root/sync/google/gke/gke.go b/cmd/ctrlc/root/sync/google/gke/gke.go index 407996b..3916550 100644 --- a/cmd/ctrlc/root/sync/google/gke/gke.go +++ b/cmd/ctrlc/root/sync/google/gke/gke.go @@ -12,8 +12,11 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/kinds" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/container/v1" ) @@ -88,12 +91,36 @@ func initGKEClient(ctx context.Context) (*container.Service, error) { // processClusters lists and processes all GKE clusters func processClusters(ctx context.Context, gkeClient *container.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.gke.process_clusters", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for ListClusters call + _, listSpan := telemetry.StartSpan(ctx, "google.gke.list_clusters", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + parent := fmt.Sprintf("projects/%s/locations/-", project) resp, err := gkeClient.Projects.Locations.Clusters.List(parent).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list GKE clusters: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.gke.clusters_found", len(resp.Clusters)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + log.Info("Found GKE clusters", "count", len(resp.Clusters)) resources := []api.CreateResource{} @@ -106,6 +133,9 @@ func processClusters(ctx context.Context, gkeClient *container.Service, project resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "google.gke.total_clusters", len(resources)) + telemetry.SetSpanSuccess(span) + return resources, nil } diff --git a/cmd/ctrlc/root/sync/google/networks/networks.go b/cmd/ctrlc/root/sync/google/networks/networks.go index c26eee1..6f4fc68 100644 --- a/cmd/ctrlc/root/sync/google/networks/networks.go +++ b/cmd/ctrlc/root/sync/google/networks/networks.go @@ -10,8 +10,11 @@ import ( "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/compute/v1" ) @@ -108,12 +111,36 @@ func initComputeClient(ctx context.Context) (*compute.Service, error) { } // processNetworks lists and processes all VPC networks -func processNetworks(_ context.Context, computeClient *compute.Service, project string) ([]api.CreateResource, error) { +func processNetworks(ctx context.Context, computeClient *compute.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.compute.process_networks", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for ListNetworks call + _, listSpan := telemetry.StartSpan(ctx, "google.compute.list_networks", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + networks, err := computeClient.Networks.List(project).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list networks: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.compute.networks_found", len(networks.Items)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + log.Info("Found networks", "count", len(networks.Items)) resources := []api.CreateResource{} @@ -137,6 +164,9 @@ func processNetworks(_ context.Context, computeClient *compute.Service, project resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "google.compute.total_networks", len(resources)) + telemetry.SetSpanSuccess(span) + return resources, nil } @@ -237,13 +267,36 @@ func initNetworkMetadata(network *compute.Network, project string, subnetCount i } // processSubnets lists and processes all subnetworks -func processSubnets(_ context.Context, computeClient *compute.Service, project string) ([]api.CreateResource, error) { +func processSubnets(ctx context.Context, computeClient *compute.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.compute.process_subnets", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for AggregatedList call + _, listSpan := telemetry.StartSpan(ctx, "google.compute.list_subnetworks", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + // Use AggregatedList to get subnets from all regions resp, err := computeClient.Subnetworks.AggregatedList(project).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list subnetworks: %w", err) } + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + resources := []api.CreateResource{} subnetCount := 0 @@ -265,6 +318,9 @@ func processSubnets(_ context.Context, computeClient *compute.Service, project s } } + telemetry.AddSpanAttribute(span, "google.compute.total_subnets", subnetCount) + telemetry.SetSpanSuccess(span) + log.Info("Found subnets", "count", subnetCount) return resources, nil } @@ -390,12 +446,36 @@ func initSubnetMetadata(subnet *compute.Subnetwork, project string, region strin } // processFirewalls lists and processes all firewall rules -func processFirewalls(_ context.Context, computeClient *compute.Service, project string) ([]api.CreateResource, error) { +func processFirewalls(ctx context.Context, computeClient *compute.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.compute.process_firewalls", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for ListFirewalls call + _, listSpan := telemetry.StartSpan(ctx, "google.compute.list_firewalls", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + firewalls, err := computeClient.Firewalls.List(project).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list firewalls: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.compute.firewalls_found", len(firewalls.Items)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + log.Info("Found firewall rules", "count", len(firewalls.Items)) resources := []api.CreateResource{} @@ -408,6 +488,9 @@ func processFirewalls(_ context.Context, computeClient *compute.Service, project resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "google.compute.total_firewalls", len(resources)) + telemetry.SetSpanSuccess(span) + return resources, nil } @@ -558,13 +641,36 @@ func initFirewallMetadata(firewall *compute.Firewall, project string) map[string } // processForwardingRules lists and processes all forwarding rules (load balancers) -func processForwardingRules(_ context.Context, computeClient *compute.Service, project string) ([]api.CreateResource, error) { +func processForwardingRules(ctx context.Context, computeClient *compute.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.compute.process_forwarding_rules", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for AggregatedList call + _, listSpan := telemetry.StartSpan(ctx, "google.compute.list_forwarding_rules", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + // Use AggregatedList to get forwarding rules from all regions resp, err := computeClient.ForwardingRules.AggregatedList(project).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list forwarding rules: %w", err) } + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + resources := []api.CreateResource{} ruleCount := 0 @@ -586,6 +692,9 @@ func processForwardingRules(_ context.Context, computeClient *compute.Service, p } } + telemetry.AddSpanAttribute(span, "google.compute.total_forwarding_rules", ruleCount) + telemetry.SetSpanSuccess(span) + log.Info("Found forwarding rules", "count", ruleCount) return resources, nil } diff --git a/cmd/ctrlc/root/sync/google/projects/projects.go b/cmd/ctrlc/root/sync/google/projects/projects.go index 4725d96..1543c46 100644 --- a/cmd/ctrlc/root/sync/google/projects/projects.go +++ b/cmd/ctrlc/root/sync/google/projects/projects.go @@ -7,8 +7,10 @@ import ( "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/cloudresourcemanager/v1" ) @@ -17,7 +19,7 @@ func NewSyncProjectsCmd() *cobra.Command { var name string cmd := &cobra.Command{ - Use: "projects", + Use: "projects", Short: "Sync Google Cloud projects into Ctrlplane", Example: heredoc.Doc(` # Make sure Google Cloud credentials are configured via environment variables or application default credentials @@ -36,29 +38,43 @@ func NewSyncProjectsCmd() *cobra.Command { return fmt.Errorf("failed to create Cloud Resource Manager client: %w", err) } + // Create span for listing projects + _, listSpan := telemetry.StartSpan(ctx, "google.cloudresourcemanager.list_projects", + trace.WithSpanKind(trace.SpanKindClient), + ) + // List all projects resp, err := crm.Projects.List().Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() return fmt.Errorf("failed to list projects: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.projects.projects_found", len(resp.Projects)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + + log.Info("Found projects", "count", len(resp.Projects)) + resources := []api.CreateResource{} // Process each project for _, project := range resp.Projects { metadata := map[string]string{ - "account/id": project.ProjectId, - "account/name": project.Name, - "account/number": fmt.Sprintf("%d", project.ProjectNumber), - "account/state": project.LifecycleState, - "account/parent-id": project.Parent.Id, + "account/id": project.ProjectId, + "account/name": project.Name, + "account/number": fmt.Sprintf("%d", project.ProjectNumber), + "account/state": project.LifecycleState, + "account/parent-id": project.Parent.Id, "account/parent-type": project.Parent.Type, - "google/project": project.ProjectId, - "google/number": fmt.Sprintf("%d", project.ProjectNumber), - "google/state": project.LifecycleState, - "google/parent-id": project.Parent.Id, - "google/parent-type": project.Parent.Type, + "google/project": project.ProjectId, + "google/number": fmt.Sprintf("%d", project.ProjectNumber), + "google/state": project.LifecycleState, + "google/parent-id": project.Parent.Id, + "google/parent-type": project.Parent.Type, } // Add labels as metadata @@ -68,7 +84,7 @@ func NewSyncProjectsCmd() *cobra.Command { resources = append(resources, api.CreateResource{ Version: "ctrlplane.dev/cloud/account/v1", - Kind: "GoogleProject", + Kind: "GoogleProject", Name: project.Name, Identifier: project.ProjectId, Config: map[string]any{ diff --git a/cmd/ctrlc/root/sync/google/redis/redis.go b/cmd/ctrlc/root/sync/google/redis/redis.go index faae290..96ed632 100644 --- a/cmd/ctrlc/root/sync/google/redis/redis.go +++ b/cmd/ctrlc/root/sync/google/redis/redis.go @@ -11,8 +11,11 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/kinds" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/redis/v1" ) @@ -87,12 +90,36 @@ func initRedisClient(ctx context.Context) (*redis.Service, error) { // processInstances lists and processes all Redis instances func processInstances(ctx context.Context, redisClient *redis.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.redis.process_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for ListInstances call + _, listSpan := telemetry.StartSpan(ctx, "google.redis.list_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + parent := fmt.Sprintf("projects/%s/locations/-", project) instances, err := redisClient.Projects.Locations.Instances.List(parent).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list Redis instances: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.redis.instances_found", len(instances.Instances)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + log.Info("Found Redis instances", "count", len(instances.Instances)) resources := []api.CreateResource{} @@ -105,6 +132,9 @@ func processInstances(ctx context.Context, redisClient *redis.Service, project s resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "google.redis.total_instances", len(resources)) + telemetry.SetSpanSuccess(span) + return resources, nil } diff --git a/cmd/ctrlc/root/sync/google/secrets/secrets.go b/cmd/ctrlc/root/sync/google/secrets/secrets.go index 6358e04..eff6484 100644 --- a/cmd/ctrlc/root/sync/google/secrets/secrets.go +++ b/cmd/ctrlc/root/sync/google/secrets/secrets.go @@ -10,8 +10,11 @@ import ( "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/secretmanager/v1" ) @@ -86,6 +89,14 @@ func initSecretManagerClient(ctx context.Context) (*secretmanager.Service, error // processSecrets lists and processes all secrets func processSecrets(ctx context.Context, secretClient *secretmanager.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.secretmanager.process_secrets", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + // Build the parent name for listing secrets parent := fmt.Sprintf("projects/%s", project) @@ -94,6 +105,14 @@ func processSecrets(ctx context.Context, secretClient *secretmanager.Service, pr pageToken := "" for { + // Create span for ListSecrets call + _, listSpan := telemetry.StartSpan(ctx, "google.secretmanager.list_secrets", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + // List secrets with pagination call := secretClient.Projects.Secrets.List(parent) if pageToken != "" { @@ -103,10 +122,18 @@ func processSecrets(ctx context.Context, secretClient *secretmanager.Service, pr log.Info("Listing secrets", "parent", parent, "pageToken", pageToken, "secretCount", secretCount) response, err := call.Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list secrets: %w", err) } + telemetry.AddSpanAttribute(listSpan, "google.secretmanager.secrets_in_page", len(response.Secrets)) + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + // Process secrets from current page for _, secret := range response.Secrets { resource, err := processSecret(ctx, secretClient, secret, project) @@ -125,6 +152,9 @@ func processSecrets(ctx context.Context, secretClient *secretmanager.Service, pr pageToken = response.NextPageToken } + telemetry.AddSpanAttribute(span, "google.secretmanager.total_secrets", secretCount) + telemetry.SetSpanSuccess(span) + log.Info("Found secrets", "count", secretCount) return resources, nil } diff --git a/cmd/ctrlc/root/sync/google/vms/vms.go b/cmd/ctrlc/root/sync/google/vms/vms.go index 515fc23..f6373dd 100644 --- a/cmd/ctrlc/root/sync/google/vms/vms.go +++ b/cmd/ctrlc/root/sync/google/vms/vms.go @@ -10,8 +10,11 @@ import ( "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/api/compute/v1" ) @@ -86,12 +89,35 @@ func initComputeClient(ctx context.Context) (*compute.Service, error) { // processVMs lists and processes all VM instances func processVMs(ctx context.Context, computeClient *compute.Service, project string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "google.compute.process_vms", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + defer span.End() + + // Create span for AggregatedList call + _, listSpan := telemetry.StartSpan(ctx, "google.compute.list_instances", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("google.project_id", project), + ), + ) + // Use AggregatedList to get VMs from all zones resp, err := computeClient.Instances.AggregatedList(project).Do() + if err != nil { + telemetry.SetSpanError(listSpan, err) + listSpan.End() + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list VM instances: %w", err) } + telemetry.SetSpanSuccess(listSpan) + listSpan.End() + resources := []api.CreateResource{} vmCount := 0 @@ -113,6 +139,9 @@ func processVMs(ctx context.Context, computeClient *compute.Service, project str } } + telemetry.AddSpanAttribute(span, "google.compute.total_vms", vmCount) + telemetry.SetSpanSuccess(span) + log.Info("Found VM instances", "count", vmCount) return resources, nil } diff --git a/cmd/ctrlc/root/sync/kubernetes/kubernetes.go b/cmd/ctrlc/root/sync/kubernetes/kubernetes.go index 10af5cd..34f86d5 100644 --- a/cmd/ctrlc/root/sync/kubernetes/kubernetes.go +++ b/cmd/ctrlc/root/sync/kubernetes/kubernetes.go @@ -8,8 +8,11 @@ import ( "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,13 +31,13 @@ func processNamespace(_ context.Context, clusterName string, namespace corev1.Na metadata["namespace/status"] = string(namespace.Status.Phase) return api.CreateResource{ - Version: "ctrlplane.dev/kubernetes/namespace/v1", - Kind: "KubernetesNamespace", - Name: fmt.Sprintf("%s/%s", clusterName, namespace.Name), + Version: "ctrlplane.dev/kubernetes/namespace/v1", + Kind: "KubernetesNamespace", + Name: fmt.Sprintf("%s/%s", clusterName, namespace.Name), Identifier: string(namespace.UID), Config: map[string]any{ - "id": string(namespace.UID), - "name": namespace.Name, + "id": string(namespace.UID), + "name": namespace.Name, "status": namespace.Status.Phase, }, Metadata: metadata, @@ -52,13 +55,13 @@ func processDeployment(_ context.Context, clusterName string, deployment appsv1. metadata["deployment/namespace"] = deployment.Namespace return api.CreateResource{ - Version: "ctrlplane.dev/kubernetes/deployment/v1", - Kind: "KubernetesDeployment", - Name: fmt.Sprintf("%s/%s/%s", clusterName, deployment.Namespace, deployment.Name), + Version: "ctrlplane.dev/kubernetes/deployment/v1", + Kind: "KubernetesDeployment", + Name: fmt.Sprintf("%s/%s/%s", clusterName, deployment.Namespace, deployment.Name), Identifier: string(deployment.UID), Config: map[string]any{ - "id": string(deployment.UID), - "name": deployment.Name, + "id": string(deployment.UID), + "name": deployment.Name, "namespace": deployment.Namespace, }, Metadata: metadata, @@ -92,12 +95,12 @@ func NewSyncKubernetesCmd() *cobra.Command { apiURL := viper.GetString("url") apiKey := viper.GetString("api-key") workspaceId := viper.GetString("workspace") - + ctrlplaneClient, err := api.NewAPIKeyClientWithResponses(apiURL, apiKey) if err != nil { return fmt.Errorf("failed to create API client: %w", err) } - + ctx := context.Background() clusterResource, _ := ctrlplaneClient.GetResourceByIdentifierWithResponse(ctx, workspaceId, clusterIdentifier) if clusterResource.JSON200 != nil { @@ -113,22 +116,50 @@ func NewSyncKubernetesCmd() *cobra.Command { return err } - namespaces, err := clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + // Create span for listing namespaces + listNsCtx, listNsSpan := telemetry.StartSpan(ctx, "kubernetes.namespaces.list", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("kubernetes.cluster", clusterName), + ), + ) + + namespaces, err := clientset.CoreV1().Namespaces().List(listNsCtx, metav1.ListOptions{}) if err != nil { + telemetry.SetSpanError(listNsSpan, err) + listNsSpan.End() return err } + telemetry.AddSpanAttribute(listNsSpan, "kubernetes.namespaces.resources_found", len(namespaces.Items)) + telemetry.SetSpanSuccess(listNsSpan) + listNsSpan.End() + resources := []api.CreateResource{} for _, namespace := range namespaces.Items { resource := processNamespace(context.Background(), clusterName, namespace) resources = append(resources, resource) } - deployments, err := clientset.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) + // Create span for listing deployments + listDeployCtx, listDeploySpan := telemetry.StartSpan(ctx, "kubernetes.deployments.list", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("kubernetes.cluster", clusterName), + ), + ) + + deployments, err := clientset.AppsV1().Deployments(metav1.NamespaceAll).List(listDeployCtx, metav1.ListOptions{}) if err != nil { + telemetry.SetSpanError(listDeploySpan, err) + listDeploySpan.End() return err } + telemetry.AddSpanAttribute(listDeploySpan, "kubernetes.deployments.resources_found", len(deployments.Items)) + telemetry.SetSpanSuccess(listDeploySpan) + listDeploySpan.End() + for _, deployment := range deployments.Items { resource := processDeployment(context.Background(), clusterName, deployment) resources = append(resources, resource) @@ -148,6 +179,8 @@ func NewSyncKubernetesCmd() *cobra.Command { } } + log.Info("Found Kubernetes resources", "cluster", clusterName, "namespaces", len(namespaces.Items), "deployments", len(deployments.Items), "total", len(resources)) + return upsertToCtrlplane(ctrlplaneClient, resources, clusterIdentifier, clusterName, providerName) }, } diff --git a/cmd/ctrlc/root/sync/salesforce/accounts/accounts.go b/cmd/ctrlc/root/sync/salesforce/accounts/accounts.go index 2fec776..554277f 100644 --- a/cmd/ctrlc/root/sync/salesforce/accounts/accounts.go +++ b/cmd/ctrlc/root/sync/salesforce/accounts/accounts.go @@ -8,9 +8,12 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/cmd/ctrlc/root/sync/salesforce/common" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/k-capehart/go-salesforce/v2" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func NewSalesforceAccountsCmd() *cobra.Command { @@ -97,6 +100,20 @@ func NewSalesforceAccountsCmd() *cobra.Command { } func processAccounts(ctx context.Context, sf *salesforce.Salesforce, metadataMappings map[string]string, limit int, listAllFields bool, whereClause string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "salesforce.accounts.process", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("salesforce.object", "Account"), + attribute.Int("salesforce.limit", limit), + attribute.Bool("salesforce.list_all_fields", listAllFields), + ), + ) + defer span.End() + + if whereClause != "" { + telemetry.AddSpanAttribute(span, "salesforce.where_clause", whereClause) + } + additionalFields := make([]string, 0, len(metadataMappings)) for _, fieldName := range metadataMappings { additionalFields = append(additionalFields, fieldName) @@ -105,10 +122,13 @@ func processAccounts(ctx context.Context, sf *salesforce.Salesforce, metadataMap var accounts []map[string]any err := common.QuerySalesforceObject(ctx, sf, "Account", limit, listAllFields, &accounts, additionalFields, whereClause) if err != nil { + log.Error("Failed to query Salesforce accounts", "error", err) + telemetry.SetSpanError(span, err) return nil, err } log.Info("Found Salesforce accounts", "count", len(accounts)) + telemetry.AddSpanAttribute(span, "salesforce.records_found", len(accounts)) resources := []api.CreateResource{} for _, account := range accounts { @@ -116,6 +136,9 @@ func processAccounts(ctx context.Context, sf *salesforce.Salesforce, metadataMap resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "salesforce.records_processed", len(resources)) + telemetry.SetSpanSuccess(span) + return resources, nil } diff --git a/cmd/ctrlc/root/sync/salesforce/common/util.go b/cmd/ctrlc/root/sync/salesforce/common/util.go index 83a87ef..7a4396c 100644 --- a/cmd/ctrlc/root/sync/salesforce/common/util.go +++ b/cmd/ctrlc/root/sync/salesforce/common/util.go @@ -11,8 +11,11 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/k-capehart/go-salesforce/v2" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func GetSalesforceSubdomain(domain string) string { @@ -31,9 +34,25 @@ func GetSalesforceSubdomain(domain string) string { // QuerySalesforceObject performs a generic query on any Salesforce object with pagination support func QuerySalesforceObject(ctx context.Context, sf *salesforce.Salesforce, objectName string, limit int, listAllFields bool, target interface{}, additionalFields []string, whereClause string) error { + ctx, span := telemetry.StartSpan(ctx, "salesforce.query_object", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("salesforce.object", objectName), + attribute.Int("salesforce.limit", limit), + attribute.Bool("salesforce.list_all_fields", listAllFields), + ), + ) + defer span.End() + + if whereClause != "" { + telemetry.AddSpanAttribute(span, "salesforce.where_clause", whereClause) + } + targetValue := reflect.ValueOf(target).Elem() if targetValue.Kind() != reflect.Slice { - return fmt.Errorf("target must be a pointer to a slice") + err := fmt.Errorf("target must be a pointer to a slice") + telemetry.SetSpanError(span, err) + return err } fieldMap := make(map[string]bool) @@ -74,30 +93,53 @@ func QuerySalesforceObject(ctx context.Context, sf *salesforce.Salesforce, objec fieldNames = append(fieldNames, field) } + telemetry.AddSpanAttribute(span, "salesforce.field_count", len(fieldNames)) + if listAllFields { - if err := logAvailableFields(sf, objectName); err != nil { + if err := logAvailableFields(ctx, sf, objectName); err != nil { + telemetry.SetSpanError(span, err) return err } } - return paginateQuery(ctx, sf, objectName, fieldNames, whereClause, limit, targetValue) + err := paginateQuery(ctx, sf, objectName, fieldNames, whereClause, limit, targetValue) + if err != nil { + telemetry.SetSpanError(span, err) + return err + } + + telemetry.AddSpanAttribute(span, "salesforce.records_retrieved", targetValue.Len()) + telemetry.SetSpanSuccess(span) + return nil } -func logAvailableFields(sf *salesforce.Salesforce, objectName string) error { +func logAvailableFields(ctx context.Context, sf *salesforce.Salesforce, objectName string) error { + ctx, span := telemetry.StartSpan(ctx, "salesforce.describe_object", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("salesforce.object", objectName), + ), + ) + defer span.End() + resp, err := sf.DoRequest("GET", fmt.Sprintf("/sobjects/%s/describe", objectName), nil) if err != nil { + telemetry.SetSpanError(span, err) return fmt.Errorf("failed to describe %s object: %w", objectName, err) } defer resp.Body.Close() var result map[string]interface{} if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + telemetry.SetSpanError(span, err) return fmt.Errorf("failed to decode describe response: %w", err) } fields, ok := result["fields"].([]interface{}) if !ok { - return fmt.Errorf("unexpected describe response format") + err := fmt.Errorf("unexpected describe response format") + telemetry.SetSpanError(span, err) + return err } fieldNames := make([]string, 0, len(fields)) @@ -110,6 +152,8 @@ func logAvailableFields(sf *salesforce.Salesforce, objectName string) error { } log.Info("Available fields", "object", objectName, "count", len(fieldNames), "fields", fieldNames) + telemetry.AddSpanAttribute(span, "salesforce.available_fields_count", len(fieldNames)) + telemetry.SetSpanSuccess(span) return nil } @@ -152,9 +196,19 @@ func getRecordId(record reflect.Value) string { } func paginateQuery(ctx context.Context, sf *salesforce.Salesforce, objectName string, fields []string, whereClause string, limit int, targetValue reflect.Value) error { + ctx, span := telemetry.StartSpan(ctx, "salesforce.paginate_query", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("salesforce.object", objectName), + attribute.Int("salesforce.limit", limit), + ), + ) + defer span.End() + const batchSize = 200 totalRetrieved := 0 lastId := "" + totalApiCalls := 0 for { batchLimit := batchSize @@ -163,12 +217,31 @@ func paginateQuery(ctx context.Context, sf *salesforce.Salesforce, objectName st } query := buildSOQL(objectName, fields, whereClause, lastId, batchLimit) - batch, err := executeQuery(sf, query, targetValue.Type()) + + // Create child span for each API call/batch + batchCtx, batchSpan := telemetry.StartSpan(ctx, "salesforce.execute_query_batch", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("salesforce.object", objectName), + attribute.Int("salesforce.batch_limit", batchLimit), + attribute.Int("salesforce.batch_number", totalApiCalls+1), + ), + ) + + batch, err := executeQuery(batchCtx, sf, query, targetValue.Type()) + totalApiCalls++ + if err != nil { + telemetry.SetSpanError(batchSpan, err) + batchSpan.End() + telemetry.SetSpanError(span, err) return fmt.Errorf("failed to query %s: %w", objectName, err) } if batch.Len() == 0 { + telemetry.AddSpanAttribute(batchSpan, "salesforce.records_fetched", 0) + telemetry.SetSpanSuccess(batchSpan) + batchSpan.End() break } @@ -184,6 +257,9 @@ func paginateQuery(ctx context.Context, sf *salesforce.Salesforce, objectName st } log.Debug("Retrieved batch", "object", objectName, "batch_size", recordCount, "total", totalRetrieved) + telemetry.AddSpanAttribute(batchSpan, "salesforce.records_fetched", recordCount) + telemetry.SetSpanSuccess(batchSpan) + batchSpan.End() if (limit > 0 && totalRetrieved >= limit) || recordCount < batchLimit { break @@ -194,20 +270,30 @@ func paginateQuery(ctx context.Context, sf *salesforce.Salesforce, objectName st targetValue.Set(targetValue.Slice(0, limit)) } + telemetry.AddSpanAttribute(span, "salesforce.total_records", totalRetrieved) + telemetry.AddSpanAttribute(span, "salesforce.total_api_calls", totalApiCalls) + telemetry.SetSpanSuccess(span) return nil } // executeQuery executes a SOQL query and returns the unmarshaled records -func executeQuery(sf *salesforce.Salesforce, query string, targetType reflect.Type) (reflect.Value, error) { +func executeQuery(ctx context.Context, sf *salesforce.Salesforce, query string, targetType reflect.Type) (reflect.Value, error) { + ctx, span := telemetry.StartSpan(ctx, "salesforce.execute_query", + trace.WithSpanKind(trace.SpanKindClient), + ) + defer span.End() + encodedQuery := url.QueryEscape(query) resp, err := sf.DoRequest("GET", fmt.Sprintf("/query?q=%s", encodedQuery), nil) if err != nil { + telemetry.SetSpanError(span, err) return reflect.Value{}, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { + telemetry.SetSpanError(span, err) return reflect.Value{}, fmt.Errorf("failed to read response: %w", err) } @@ -215,14 +301,17 @@ func executeQuery(sf *salesforce.Salesforce, query string, targetType reflect.Ty Records json.RawMessage `json:"records"` } if err := json.Unmarshal(body, &result); err != nil { + telemetry.SetSpanError(span, err) return reflect.Value{}, fmt.Errorf("failed to unmarshal response: %w", err) } batch := reflect.New(targetType).Elem() if err := json.Unmarshal(result.Records, batch.Addr().Interface()); err != nil { + telemetry.SetSpanError(span, err) return reflect.Value{}, fmt.Errorf("failed to unmarshal records: %w", err) } + telemetry.SetSpanSuccess(span) return batch, nil } diff --git a/cmd/ctrlc/root/sync/salesforce/opportunities/opportunities.go b/cmd/ctrlc/root/sync/salesforce/opportunities/opportunities.go index 3dcf419..9644fe8 100644 --- a/cmd/ctrlc/root/sync/salesforce/opportunities/opportunities.go +++ b/cmd/ctrlc/root/sync/salesforce/opportunities/opportunities.go @@ -9,9 +9,12 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/cmd/ctrlc/root/sync/salesforce/common" "github.com/ctrlplanedev/cli/internal/api" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/k-capehart/go-salesforce/v2" "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func NewSalesforceOpportunitiesCmd() *cobra.Command { @@ -97,6 +100,20 @@ func NewSalesforceOpportunitiesCmd() *cobra.Command { // processOpportunities queries and transforms opportunities func processOpportunities(ctx context.Context, sf *salesforce.Salesforce, metadataMappings map[string]string, limit int, listAllFields bool, whereClause string) ([]api.CreateResource, error) { + ctx, span := telemetry.StartSpan(ctx, "salesforce.opportunities.process", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("salesforce.object", "Opportunity"), + attribute.Int("salesforce.limit", limit), + attribute.Bool("salesforce.list_all_fields", listAllFields), + ), + ) + defer span.End() + + if whereClause != "" { + telemetry.AddSpanAttribute(span, "salesforce.where_clause", whereClause) + } + additionalFields := make([]string, 0, len(metadataMappings)) for _, fieldName := range metadataMappings { additionalFields = append(additionalFields, fieldName) @@ -105,10 +122,13 @@ func processOpportunities(ctx context.Context, sf *salesforce.Salesforce, metada var opportunities []map[string]any err := common.QuerySalesforceObject(ctx, sf, "Opportunity", limit, listAllFields, &opportunities, additionalFields, whereClause) if err != nil { + log.Error("Failed to query Salesforce opportunities", "error", err) + telemetry.SetSpanError(span, err) return nil, err } log.Info("Found Salesforce opportunities", "count", len(opportunities)) + telemetry.AddSpanAttribute(span, "salesforce.records_found", len(opportunities)) resources := []api.CreateResource{} for _, opp := range opportunities { @@ -116,6 +136,9 @@ func processOpportunities(ctx context.Context, sf *salesforce.Salesforce, metada resources = append(resources, resource) } + telemetry.AddSpanAttribute(span, "salesforce.records_processed", len(resources)) + telemetry.SetSpanSuccess(span) + return resources, nil } diff --git a/cmd/ctrlc/root/sync/tailscale/tailscale.go b/cmd/ctrlc/root/sync/tailscale/tailscale.go index 799c7ec..bf0bb35 100644 --- a/cmd/ctrlc/root/sync/tailscale/tailscale.go +++ b/cmd/ctrlc/root/sync/tailscale/tailscale.go @@ -14,9 +14,12 @@ import ( "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/internal/cliutil" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/spf13/cobra" "github.com/spf13/viper" tsclient "github.com/tailscale/tailscale-client-go/v2" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type TailscaleConfig struct { @@ -89,11 +92,34 @@ func NewSyncTailscaleCmd() *cobra.Command { } ctx := context.Background() + + ctx, span := telemetry.StartSpan(ctx, "tailscale.list_devices", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("tailscale.tailnet", tailnet), + ), + ) + devices, err := tsc.Devices().List(ctx) if err != nil { + telemetry.SetSpanError(span, err) + span.End() return fmt.Errorf("failed to list devices: %w", err) } + telemetry.AddSpanAttribute(span, "tailscale.devices_found", len(devices)) + telemetry.SetSpanSuccess(span) + span.End() + + log.Info("Found Tailscale devices", "count", len(devices)) + + processCtx, processSpan := telemetry.StartSpan(ctx, "tailscale.process_devices", + trace.WithSpanKind(trace.SpanKindInternal), + trace.WithAttributes( + attribute.Int("tailscale.devices_to_process", len(devices)), + ), + ) + resources := []api.CreateResource{} for _, device := range devices { metadata := map[string]string{} @@ -141,20 +167,36 @@ func NewSyncTailscaleCmd() *cobra.Command { }) } + telemetry.AddSpanAttribute(processSpan, "tailscale.devices_processed", len(resources)) + telemetry.SetSpanSuccess(processSpan) + processSpan.End() + log.Info("Upserting resources", "count", len(resources)) + upsertCtx, upsertSpan := telemetry.StartSpan(processCtx, "tailscale.upsert_resources", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.Int("tailscale.resources_to_upsert", len(resources)), + ), + ) + defer upsertSpan.End() + providerName := fmt.Sprintf("tailscale-%s", tailnet) rp, err := api.NewResourceProvider(ctrlplaneClient, workspaceId, providerName) if err != nil { + telemetry.SetSpanError(upsertSpan, err) return fmt.Errorf("failed to create resource provider: %w", err) } - upsertResp, err := rp.UpsertResource(ctx, resources) - log.Info("Response from upserting resources", "status", upsertResp.Status) + upsertResp, err := rp.UpsertResource(upsertCtx, resources) if err != nil { + telemetry.SetSpanError(upsertSpan, err) return fmt.Errorf("failed to upsert resources: %w", err) } + log.Info("Response from upserting resources", "status", upsertResp.Status) + telemetry.SetSpanSuccess(upsertSpan) + return cliutil.HandleResponseOutput(cmd, upsertResp) }, } diff --git a/cmd/ctrlc/root/sync/terraform/terraform_workspaces.go b/cmd/ctrlc/root/sync/terraform/terraform_workspaces.go index f059c2b..400e219 100644 --- a/cmd/ctrlc/root/sync/terraform/terraform_workspaces.go +++ b/cmd/ctrlc/root/sync/terraform/terraform_workspaces.go @@ -11,7 +11,10 @@ import ( "github.com/avast/retry-go" "github.com/charmbracelet/log" + "github.com/ctrlplanedev/cli/internal/telemetry" "github.com/hashicorp/go-tfe" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) const ( @@ -45,25 +48,57 @@ func getLinksMetadata(workspace *tfe.Workspace, baseURL url.URL) *string { } func getWorkspaceVariables(ctx context.Context, workspace *tfe.Workspace, client *tfe.Client) map[string]string { + ctx, span := telemetry.StartSpan(ctx, "terraform.get_workspace_variables", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("terraform.workspace_id", workspace.ID), + attribute.Int("terraform.variables_total", len(workspace.Variables)), + ), + ) + defer span.End() + variables := make(map[string]string) + processedCount := 0 + for _, variable := range workspace.Variables { if variable == nil || variable.Sensitive { continue } - fetchedVariable, err := client.Variables.Read(ctx, workspace.ID, variable.ID) + // Create a child span for each variable read + varCtx, varSpan := telemetry.StartSpan(ctx, "terraform.read_variable", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("terraform.variable_key", variable.Key), + attribute.String("terraform.variable_id", variable.ID), + ), + ) + + fetchedVariable, err := client.Variables.Read(varCtx, workspace.ID, variable.ID) if err != nil { log.Error("Failed to read variable", "error", err, "variable", variable.Key) + telemetry.SetSpanError(varSpan, err) + varSpan.End() continue } if fetchedVariable.Category != tfe.CategoryTerraform || fetchedVariable.Sensitive { + telemetry.AddSpanAttribute(varSpan, "terraform.variable_skipped", true) + telemetry.AddSpanAttribute(varSpan, "terraform.variable_category", string(fetchedVariable.Category)) + varSpan.End() continue } variables[fetchedVariable.Key] = fetchedVariable.Value + processedCount++ + telemetry.SetSpanSuccess(varSpan) + varSpan.End() + time.Sleep(50 * time.Millisecond) } + + telemetry.AddSpanAttribute(span, "terraform.variables_processed", processedCount) + telemetry.SetSpanSuccess(span) return variables } @@ -138,6 +173,16 @@ func convertWorkspaceToResource(ctx context.Context, workspace *tfe.Workspace, c } func listWorkspacesWithRetry(ctx context.Context, client *tfe.Client, organization string, pageNum, pageSize int) (*tfe.WorkspaceList, error) { + ctx, span := telemetry.StartSpan(ctx, "terraform.list_workspaces", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("terraform.organization", organization), + attribute.Int("terraform.page_number", pageNum), + attribute.Int("terraform.page_size", pageSize), + ), + ) + defer span.End() + var workspaces *tfe.WorkspaceList err := retry.Do( func() error { @@ -154,10 +199,28 @@ func listWorkspacesWithRetry(ctx context.Context, client *tfe.Client, organizati retry.Delay(time.Second), retry.MaxDelay(5*time.Second), ) + + if err != nil { + telemetry.SetSpanError(span, err) + } else { + telemetry.SetSpanSuccess(span) + if workspaces != nil { + telemetry.AddSpanAttribute(span, "terraform.workspaces_count", len(workspaces.Items)) + } + } + return workspaces, err } func listAllWorkspaces(ctx context.Context, client *tfe.Client, organization string) ([]*tfe.Workspace, error) { + ctx, span := telemetry.StartSpan(ctx, "terraform.list_all_workspaces", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("terraform.organization", organization), + ), + ) + defer span.End() + var allWorkspaces []*tfe.Workspace pageNum := 1 pageSize := 100 @@ -165,6 +228,7 @@ func listAllWorkspaces(ctx context.Context, client *tfe.Client, organization str for { workspaces, err := listWorkspacesWithRetry(ctx, client, organization, pageNum, pageSize) if err != nil { + telemetry.SetSpanError(span, err) return nil, fmt.Errorf("failed to list workspaces: %w", err) } @@ -175,12 +239,23 @@ func listAllWorkspaces(ctx context.Context, client *tfe.Client, organization str pageNum++ } + telemetry.AddSpanAttribute(span, "terraform.total_workspaces", len(allWorkspaces)) + telemetry.SetSpanSuccess(span) return allWorkspaces, nil } func getWorkspacesInOrg(ctx context.Context, client *tfe.Client, organization string) ([]WorkspaceResource, error) { + ctx, span := telemetry.StartSpan(ctx, "terraform.get_workspaces_in_org", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("terraform.organization", organization), + ), + ) + defer span.End() + workspaces, err := listAllWorkspaces(ctx, client, organization) if err != nil { + telemetry.SetSpanError(span, err) return nil, err } @@ -194,5 +269,8 @@ func getWorkspacesInOrg(ctx context.Context, client *tfe.Client, organization st workspaceResources = append(workspaceResources, workspaceResource) time.Sleep(50 * time.Millisecond) } + + telemetry.AddSpanAttribute(span, "terraform.workspaces_processed", len(workspaceResources)) + telemetry.SetSpanSuccess(span) return workspaceResources, nil } diff --git a/docs/TELEMETRY.md b/docs/TELEMETRY.md new file mode 100644 index 0000000..c6e8005 --- /dev/null +++ b/docs/TELEMETRY.md @@ -0,0 +1,277 @@ +# OpenTelemetry Integration + +The Ctrlplane CLI now includes OpenTelemetry support for observability and distributed tracing. + +## Features + +- **Root span creation**: Every CLI invocation creates a root span with command details +- **Automatic span management**: Spans are properly started and ended with success/error status +- **Configurable telemetry**: Can be enabled/disabled via environment variables +- **OTLP export**: Traces are exported using the OpenTelemetry Protocol (OTLP) + +## Configuration + +### Environment Variables + +#### Core Configuration + +- `TELEMETRY_DISABLED`: Set to `"true"` to disable telemetry completely +- `OTEL_EXPORTER_OTLP_ENDPOINT`: OTLP endpoint URL (default: `http://localhost:4317`) +- `OTEL_EXPORTER_OTLP_INSECURE`: Set to `"true"` to use insecure (non-TLS) connection +- `OTEL_SERVICE_NAME`: Override service name (default: `ctrlplane-cli`) +- `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes + +#### Datadog Configuration + +To send traces to Datadog (via Agent or directly): + +- `DATADOG_ENABLED`: Set to `"true"` to enable Datadog-specific configuration +- `DD_API_KEY`: Datadog API key (required for direct intake, optional for Agent) +- `DD_OTLP_GRPC_ENDPOINT`: Datadog endpoint (default: `localhost:4317`) + - For Agent: `localhost:4317` + - For direct intake: `api.datadoghq.com:4317` (US) or `api.datadoghq.eu:4317` (EU) +- `DD_SERVICE`: Service name for Datadog (overrides `OTEL_SERVICE_NAME`) +- `DD_ENV`: Environment name (e.g., `production`, `staging`, `development`) +- `DD_VERSION`: Service version for Datadog +- `DD_TAGS`: Additional tags in format `key1:value1,key2:value2` + +### Standard OpenTelemetry Variables + +All standard OpenTelemetry environment variables are supported: + +- `OTEL_EXPORTER_OTLP_HEADERS`: Custom headers for OTLP exporter +- `OTEL_EXPORTER_OTLP_TIMEOUT`: Timeout for OTLP export +- `OTEL_TRACES_SAMPLER`: Sampling strategy +- `OTEL_TRACES_SAMPLER_ARG`: Sampler arguments + +## Usage Examples + +### Enable Telemetry with Jaeger + +```bash +# Start Jaeger (example using Docker) +docker run -d --name jaeger \ + -p 16686:16686 \ + -p 14268:14268 \ + -p 14250:14250 \ + -p 6831:6831/udp \ + -p 6832:6832/udp \ + -p 5778:5778 \ + -p 4317:4317 \ + -p 4318:4318 \ + jaegertracing/all-in-one:latest + +# Run CLI with telemetry +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 ctrlc version +``` + +### Enable Telemetry with Custom OTLP Collector + +```bash +# Configure OTLP endpoint +export OTEL_EXPORTER_OTLP_ENDPOINT=https://your-collector-endpoint:4317 +export OTEL_EXPORTER_OTLP_HEADERS="api-key=your-api-key" + +# Run CLI commands +ctrlc sync github pull-requests --repo owner/repo +``` + +### Enable Telemetry with Datadog + +#### Direct to Datadog Intake (Agentless) + +```bash +# Send traces directly to Datadog without local Agent +export DATADOG_ENABLED=true +export DD_API_KEY=your_datadog_api_key_here +export OTLP_EXPORTER_GRPC_ENDPOINT=api.datadoghq.com:4317 # US region +export OTEL_SERVICE_NAME=ctrlplane-cli +export DD_ENV=production + +# Run CLI commands +ctrlc sync aws eks --region us-west-2 +``` + +**Note**: Direct intake requires a Datadog API key and uses TLS automatically. + +### Disable Telemetry + +```bash +# Disable telemetry completely +TELEMETRY_DISABLED=true ctrlc version +``` + +## Span Attributes + +Each root span includes the following attributes: + +- `cli.command`: The command being executed (e.g., "sync github pull-requests") +- `cli.args`: Command arguments as a string array +- `cli.version`: CLI version +- `success`: Boolean indicating if the command succeeded +- `error`: Boolean indicating if an error occurred + +Additional resource attributes are automatically added: + +- `service.name`: Service name (ctrlplane-cli) +- `service.version`: Service version +- `process.pid`: Process ID +- `host.*`: Host information +- `os.*`: Operating system information + +## External API Tracing + +The CLI automatically traces all external API calls to: + +- **Terraform Cloud API** ✅ - Workspace listing, variable fetching +- **GitHub API** ✅ - Pull request sync, commit fetching +- **AWS SDK** ✅ - EKS, EC2, RDS, VPC operations +- **Azure SDK** ✅ - AKS and networking operations +- **Google Cloud APIs** ✅ - GKE, Cloud SQL, Cloud Run, and more +- **Salesforce API** ✅ - SOQL queries for opportunities and accounts +- **Tailscale API** ✅ - Device listing and management +- **Kubernetes Client** ✅ - Namespace and deployment operations +- **Ctrlplane API** ✅ - Resource provider operations with trace context propagation + +Each API call creates detailed spans with: +- Service-specific attributes (region, project ID, organization, etc.) +- Operation details (list, describe, query, etc.) +- Success/error states +- Result counts and pagination info + +### Distributed Tracing + +All API calls to the Ctrlplane backend automatically include trace context via the `traceparent` HTTP header (W3C Trace Context standard). This enables end-to-end distributed tracing from the CLI through to the backend services. + +## Integration in Subcommands + +Subcommands can access the telemetry context and create child spans: + +```go +import ( + "github.com/ctrlplanedev/cli/internal/telemetry" +) + +func myCommandHandler(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + // Create a child span + ctx, span := telemetry.StartSpan(ctx, "my-operation") + defer span.End() + + // Add custom attributes + telemetry.AddSpanAttribute(span, "custom.attribute", "value") + + // Your command logic here + if err := doSomething(ctx); err != nil { + telemetry.SetSpanError(span, err) + return err + } + + telemetry.SetSpanSuccess(span) + return nil +} +``` + +### API Call Tracing Helpers + +For external API integrations, use these helper functions: + +```go +// Simple API call with automatic error handling +result, err := telemetry.WithTelemetry(ctx, "service.operation", + func(ctx context.Context) (Result, error) { + return apiClient.CallAPI(ctx, params) + }, + attribute.String("service.param", value), +) + +// Manual span creation for complex operations +ctx, span := telemetry.StartAPISpan(ctx, "service", "operation", + attribute.String("service.resource_id", resourceID), +) +defer span.End() +// ... API call logic +``` + +## Troubleshooting + +### Connection Issues + +If you see connection refused errors like: + +``` +grpc: addrConn.createTransport failed to connect to {...}: connection refused +``` + +This means the OTLP collector/agent is not running. Options: + +1. **For Datadog**: Ensure Datadog Agent is running and OTLP is enabled +2. **For Jaeger**: Start a Jaeger instance (see example above) +3. **For other collectors**: Configure endpoint with `OTEL_EXPORTER_OTLP_ENDPOINT` +4. **To disable**: Set `TELEMETRY_DISABLED=true` + +### Datadog-Specific Troubleshooting + +**Traces not appearing in Datadog:** + +1. Verify Datadog Agent is running: + ```bash + datadog-agent status + ``` + +2. Check OTLP receiver is enabled in `datadog.yaml`: + ```yaml + otlp_config: + receiver: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + ``` + +3. Verify connectivity: + ```bash + telnet localhost 4317 + ``` + +4. Check Agent logs for OTLP errors: + ```bash + tail -f /var/log/datadog/agent.log | grep -i otlp + ``` + +**Service not showing correct name in Datadog:** + +Ensure you've set `DD_SERVICE` or it will default to `ctrlplane-cli`: +```bash +export DD_SERVICE=my-custom-service-name +``` + +**Authentication errors when sending directly to Datadog intake:** + +Ensure `DD_API_KEY` is set when using direct intake (not needed for Agent): +```bash +export DD_API_KEY=your_datadog_api_key +export DD_OTLP_GRPC_ENDPOINT=api.datadoghq.com:4317 +``` + +**Connection timeouts or TLS errors with direct intake:** + +Verify your API key is valid and you're using the correct regional endpoint: +- US: `api.datadoghq.com:4317` +- EU: `api.datadoghq.eu:4317` +- US1-FED: `api.ddog-gov.com:4317` + +### Debugging + +Enable debug logging to see telemetry-related messages: + +```bash +ctrlc --log-level debug version +``` + +## Performance + +- Telemetry initialization happens early in the CLI lifecycle +- Spans are batched and exported asynchronously +- Failed telemetry initialization does not prevent CLI operation +- Graceful shutdown ensures pending spans are exported \ No newline at end of file diff --git a/go.mod b/go.mod index d166415..f2b6721 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,10 @@ require ( github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 github.com/tailscale/tailscale-client-go/v2 v2.0.0-20241217012816-8143c7dc1766 + go.opentelemetry.io/otel v1.35.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.30.0 + go.opentelemetry.io/otel/sdk v1.35.0 + go.opentelemetry.io/otel/trace v1.35.0 golang.org/x/oauth2 v0.29.0 google.golang.org/api v0.230.0 gopkg.in/yaml.v2 v2.4.0 @@ -194,12 +198,8 @@ require ( go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect - go.opentelemetry.io/otel v1.35.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.30.0 // indirect go.opentelemetry.io/otel/metric v1.35.0 // indirect - go.opentelemetry.io/otel/sdk v1.35.0 // indirect - go.opentelemetry.io/otel/trace v1.35.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/internal/api/client.go b/internal/api/client.go index 71c542f..4d1fdd1 100644 --- a/internal/api/client.go +++ b/internal/api/client.go @@ -4,6 +4,9 @@ import ( "context" "net/http" "strings" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" ) func NewAPIKeyClientWithResponses(server string, apiKey string) (*ClientWithResponses, error) { @@ -11,7 +14,14 @@ func NewAPIKeyClientWithResponses(server string, apiKey string) (*ClientWithResp server = strings.TrimSuffix(server, "/api") return NewClientWithResponses(server+"/api", WithRequestEditorFn(func(ctx context.Context, req *http.Request) error { + // Set API key req.Header.Set("X-API-Key", apiKey) + + // Inject trace context into HTTP headers (traceparent, tracestate) + // This propagates the current span context to the downstream service + propagator := otel.GetTextMapPropagator() + propagator.Inject(ctx, propagation.HeaderCarrier(req.Header)) + return nil }), ) diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go new file mode 100644 index 0000000..3a7571c --- /dev/null +++ b/internal/telemetry/telemetry.go @@ -0,0 +1,268 @@ +package telemetry + +import ( + "context" + "fmt" + "os" + "strings" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/credentials" +) + +const ( + serviceName = "ctrlplane-cli" + serviceVersion = "1.0.0" // This could be made configurable +) + +var tracer trace.Tracer + +// InitTelemetry initializes OpenTelemetry with OTLP exporter +func InitTelemetry(ctx context.Context) (func(context.Context) error, error) { + // Check if telemetry is disabled + if os.Getenv("TELEMETRY_DISABLED") == "true" { + // Return no-op shutdown function if telemetry is disabled + return func(context.Context) error { return nil }, nil + } + + // Check if any telemetry endpoint is configured + if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") == "" { + // Return no-op shutdown function if no endpoint is configured + return func(context.Context) error { return nil }, nil + } + + // Create resource with service information + res, err := createResource(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + // Create OTLP trace exporter with appropriate configuration + exporter, err := createOTLPExporter(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create OTLP exporter: %w", err) + } + + // Create trace provider + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + sdktrace.WithSampler(sdktrace.AlwaysSample()), // Configure sampling as needed + ) + + // Set global trace provider + otel.SetTracerProvider(tp) + + // Set global propagator (includes Datadog propagation if enabled) + otel.SetTextMapPropagator(createPropagator()) + + // Get tracer for this package + tracer = otel.Tracer(serviceName) + + // Return shutdown function + return tp.Shutdown, nil +} + +// createResource creates an OpenTelemetry resource with service information +func createResource(ctx context.Context) (*resource.Resource, error) { + attrs := []attribute.KeyValue{ + semconv.ServiceNameKey.String(serviceName), + semconv.ServiceVersionKey.String(serviceVersion), + } + + // Add Datadog-specific attributes if enabled + if os.Getenv("DATADOG_ENABLED") == "true" { + if env := os.Getenv("DD_ENV"); env != "" { + attrs = append(attrs, attribute.String("deployment.environment", env)) + } + if version := os.Getenv("DD_VERSION"); version != "" { + attrs[1] = semconv.ServiceVersionKey.String(version) + } + if tags := os.Getenv("DD_TAGS"); tags != "" { + // Parse DD_TAGS format: key1:value1,key2:value2 + for _, tag := range strings.Split(tags, ",") { + parts := strings.SplitN(strings.TrimSpace(tag), ":", 2) + if len(parts) == 2 { + attrs = append(attrs, attribute.String(parts[0], parts[1])) + } + } + } + } + + return resource.New(ctx, + resource.WithAttributes(attrs...), + resource.WithFromEnv(), + resource.WithProcessPID(), + resource.WithProcessExecutableName(), + resource.WithProcessExecutablePath(), + resource.WithProcessOwner(), + resource.WithProcessRuntimeName(), + resource.WithProcessRuntimeVersion(), + resource.WithProcessRuntimeDescription(), + resource.WithHost(), + resource.WithOS(), + ) +} + +// createOTLPExporter creates an OTLP exporter with Datadog or generic configuration +func createOTLPExporter(ctx context.Context) (sdktrace.SpanExporter, error) { + var opts []otlptracegrpc.Option + + // Check if Datadog is explicitly enabled + if os.Getenv("DATADOG_ENABLED") == "true" { + endpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + if endpoint == "" { + endpoint = "localhost:4317" + } + opts = append(opts, otlptracegrpc.WithEndpoint(endpoint)) + + // Add Datadog API key as header if provided + // This is required when sending directly to Datadog intake (not via Agent) + if apiKey := os.Getenv("DD_API_KEY"); apiKey != "" { + opts = append(opts, otlptracegrpc.WithHeaders(map[string]string{ + "dd-api-key": apiKey, + })) + } + + // Datadog Agent typically doesn't require TLS for localhost + // But Datadog intake endpoints always require TLS + if strings.HasPrefix(endpoint, "localhost") || strings.HasPrefix(endpoint, "127.0.0.1") { + opts = append(opts, otlptracegrpc.WithInsecure()) + } else { + // Use TLS for remote endpoints (Datadog intake or remote Agent) + opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, ""))) + } + } else { + // Standard OTLP configuration + // Check if insecure connection is requested + if os.Getenv("OTEL_EXPORTER_OTLP_INSECURE") == "true" { + opts = append(opts, otlptracegrpc.WithInsecure()) + } + } + + return otlptracegrpc.New(ctx, opts...) +} + +// createPropagator creates a composite propagator +func createPropagator() propagation.TextMapPropagator { + propagators := []propagation.TextMapPropagator{ + propagation.TraceContext{}, + propagation.Baggage{}, + } + + // Datadog uses its own propagation format in addition to W3C + // The Datadog Agent will handle conversion from W3C TraceContext to Datadog format + // So we don't need a special propagator here - W3C TraceContext is sufficient + + return propagation.NewCompositeTextMapPropagator(propagators...) +} + +// StartRootSpan creates a root span for the CLI invocation +func StartRootSpan(ctx context.Context, commandName string, args []string) (context.Context, trace.Span) { + if tracer == nil { + tracer = otel.Tracer(serviceName) + } + + spanName := fmt.Sprintf("ctrlc %s", commandName) + + ctx, span := tracer.Start(ctx, spanName, + trace.WithSpanKind(trace.SpanKindInternal), + trace.WithAttributes( + attribute.String("cli.command", commandName), + attribute.StringSlice("cli.args", args), + attribute.String("cli.version", serviceVersion), + ), + ) + + return ctx, span +} + +// SetSpanError sets an error on the current span +func SetSpanError(span trace.Span, err error) { + if span != nil && err != nil { + span.RecordError(err) + span.SetAttributes(attribute.Bool("error", true)) + } +} + +// SetSpanSuccess marks the span as successful +func SetSpanSuccess(span trace.Span) { + if span != nil { + span.SetAttributes(attribute.Bool("success", true)) + } +} + +// AddSpanAttribute adds an attribute to the current span +func AddSpanAttribute(span trace.Span, key string, value interface{}) { + if span == nil { + return + } + + switch v := value.(type) { + case string: + span.SetAttributes(attribute.String(key, v)) + case int: + span.SetAttributes(attribute.Int(key, v)) + case int64: + span.SetAttributes(attribute.Int64(key, v)) + case bool: + span.SetAttributes(attribute.Bool(key, v)) + case float64: + span.SetAttributes(attribute.Float64(key, v)) + case []string: + span.SetAttributes(attribute.StringSlice(key, v)) + default: + span.SetAttributes(attribute.String(key, fmt.Sprintf("%v", v))) + } +} + +// GetTracer returns the global tracer instance +func GetTracer() trace.Tracer { + if tracer == nil { + tracer = otel.Tracer(serviceName) + } + return tracer +} + +// StartSpan is a convenience function to start a new span +func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + if tracer == nil { + tracer = otel.Tracer(serviceName) + } + return tracer.Start(ctx, name, opts...) +} + +// StartAPISpan creates a span for an external API call with common attributes +func StartAPISpan(ctx context.Context, service, operation string, attributes ...attribute.KeyValue) (context.Context, trace.Span) { + spanName := fmt.Sprintf("%s.%s", service, operation) + opts := []trace.SpanStartOption{ + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attributes...), + } + return StartSpan(ctx, spanName, opts...) +} + +// WithTelemetry wraps a function with telemetry, automatically handling success/error states +func WithTelemetry[T any](ctx context.Context, spanName string, fn func(context.Context) (T, error), attributes ...attribute.KeyValue) (T, error) { + ctx, span := StartSpan(ctx, spanName, + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attributes...), + ) + defer span.End() + + result, err := fn(ctx) + if err != nil { + SetSpanError(span, err) + } else { + SetSpanSuccess(span) + } + + return result, err +}