diff --git a/framework/docs/background-job/README.md b/framework/docs/background-job/README.md index e611f8e..0d3963f 100644 --- a/framework/docs/background-job/README.md +++ b/framework/docs/background-job/README.md @@ -160,6 +160,57 @@ Scheduled → Running → Completed → Cancelled ``` +## Tracing + +All background job operations automatically emit OpenTelemetry spans via the `BBT.Aether.Infrastructure` ActivitySource. No additional configuration is needed when using `AddAetherTelemetry`. + +### Scheduling Spans + +| Span | Description | +|------|-------------| +| `BackgroundJob.Enqueue` | Job creation + persistence via `IBackgroundJobService` | +| `BackgroundJob.Update` | Schedule update via `IBackgroundJobService` | +| `BackgroundJob.Delete` | Job cancellation via `IBackgroundJobService` | +| `BackgroundJob.Schedule` | Scheduler-level job registration (e.g. Dapr) | +| `BackgroundJob.Schedule.Update` | Scheduler-level reschedule | +| `BackgroundJob.Schedule.Delete` | Scheduler-level job removal | + +### Execution Spans + +| Span | Description | +|------|-------------| +| `BackgroundJob.Execute` | Dapr callback entry point (execution bridge) | +| `BackgroundJob.Dispatch` | Handler invocation with idempotency check | + +### Tags + +| Tag | Description | +|-----|-------------| +| `job.handler_name` | Handler type name (e.g. `"SendEmail"`) | +| `job.name` | Unique job identifier (e.g. `"send-email-order-123"`) | +| `job.schedule` | Schedule expression (e.g. `"@daily"`, `"*/15 * * * *"`) | +| `job.id` | Entity ID from BackgroundJobInfo | +| `job.scheduler` | Scheduler backend (e.g. `"dapr"`) | +| `job.status` | Final dispatch status (`"completed"`, `"failed"`, `"cancelled"`) | + +### Example Trace Hierarchy + +**Scheduling:** +``` +[ASP.NET Core] POST /api/orders + [BBT.Aether.Infrastructure] BackgroundJob.Enqueue + [BBT.Aether.Infrastructure] BackgroundJob.Schedule + [HTTP Client] POST http://localhost:3500/... +``` + +**Execution (Dapr callback):** +``` +[ASP.NET Core] POST /job + [BBT.Aether.Infrastructure] BackgroundJob.Execute + [BBT.Aether.Infrastructure] BackgroundJob.Dispatch + [EF Core] SELECT/UPDATE ... +``` + ## Best Practices 1. **Make handlers idempotent** - Jobs may be retried on failure @@ -195,3 +246,4 @@ public class ProcessPaymentJobHandler : IBackgroundJobHandler }); ``` +## Tracing + +All event bus operations are automatically instrumented with OpenTelemetry spans via the `BBT.Aether.Infrastructure` ActivitySource. No additional configuration is required beyond enabling Aether telemetry. + +### Publishing Spans + +| Span | Kind | Description | +|------|------|-------------| +| `EventBus.Publish` | Producer | Created when `PublishAsync` is called (both generic and metadata-based overloads) | +| `EventBus.PublishEnvelope` | Producer | Created when a pre-serialized envelope is published (used by outbox processor) | +| `EventBus.PublishToBroker` | Producer | Created when the event is sent to the Dapr PubSub broker | + +### Processing Spans + +| Span | Kind | Description | +|------|------|-------------| +| `Outbox.Process` | Producer | Created per message in `OutboxProcessor` when republishing from the outbox | +| `Inbox.Process` | Consumer | Created per message in `InboxProcessor` when processing a pending inbox event | +| `Inbox.Invoke` | Internal | Created in `DistributedEventInvoker` when deserializing and calling the event handler | + +### Semantic Tags + +| Tag | Example | Description | +|-----|---------|-------------| +| `event.name` | `"order.created"` | CloudEvent Type / event name | +| `event.topic` | `"dev.order.created/v1"` | Broker topic | +| `event.pubsub_name` | `"pubsub"` | Dapr PubSub component | +| `event.broker` | `"dapr"` | Broker implementation | +| `event.use_outbox` | `true` / `false` | Whether outbox pattern was used | +| `event.id` | `"abc123"` | CloudEvent ID (inbox) | +| `event.version` | `1` | Event version | +| `event.handler` | `"OrderCreatedEventHandler"` | Handler type name | +| `outbox.message_id` | GUID | Outbox message entity ID | +| `outbox.retry_count` | `0` | Current retry attempt | + +### Example Trace Hierarchy + +**Direct publish:** +``` +[ASP.NET Core] POST /api/orders + └─ [BBT.Aether.Infrastructure] EventBus.Publish + └─ [BBT.Aether.Infrastructure] EventBus.PublishToBroker + └─ [HTTP Client] POST http://localhost:3500/v1.0/publish/pubsub/order.created +``` + +**Outbox publish:** +``` +[ASP.NET Core] POST /api/orders + └─ [BBT.Aether.Infrastructure] EventBus.Publish (use_outbox=true) +...later (background)... +[BBT.Aether.Infrastructure] Outbox.Process + └─ [BBT.Aether.Infrastructure] EventBus.PublishEnvelope + └─ [BBT.Aether.Infrastructure] EventBus.PublishToBroker + └─ [HTTP Client] POST http://localhost:3500/... +``` + +**Inbox consumption:** +``` +[ASP.NET Core] POST /events/order.created/v1 (Dapr delivers event) +...later (background)... +[BBT.Aether.Infrastructure] Inbox.Process + └─ [BBT.Aether.Infrastructure] Inbox.Invoke + └─ (handler code runs here) +``` + ## Best Practices 1. **Use EventNameAttribute** - Explicit topic naming with versioning diff --git a/framework/docs/distributed-lock/README.md b/framework/docs/distributed-lock/README.md index 9795f12..7c3b366 100644 --- a/framework/docs/distributed-lock/README.md +++ b/framework/docs/distributed-lock/README.md @@ -159,6 +159,35 @@ public async Task GenerateDailyReportAsync() } ``` +## Tracing + +All lock operations automatically emit OpenTelemetry spans via the `BBT.Aether.Infrastructure` ActivitySource. No additional configuration is needed when using `AddAetherTelemetry`. + +Span names: +- `DistributedLock.Acquire` - lock acquisition attempts +- `DistributedLock.Release` - explicit or dispose-based lock release +- `DistributedLock.Execute` - `ExecuteWithLockAsync` (covers acquire + execute + release) + +Tags added to each span: + +| Tag | Description | +|-----|-------------| +| `lock.provider` | `"dapr"` or `"redis"` | +| `lock.resource_id` | The resource identifier being locked | +| `lock.store_name` | Dapr component name (Dapr provider only) | +| `lock.expiry_seconds` | Lock TTL | +| `lock.acquired` | Whether the lock was successfully obtained | +| `lock.released` | Whether the lock was successfully released | + +Example trace hierarchy: + +``` +[ASP.NET Core] POST /api/orders + [BBT.Aether.Aspects] OrderService.ProcessOrder + [BBT.Aether.Infrastructure] DistributedLock.Execute + [HTTP Client] POST http://localhost:3500/v1.0-alpha1/lock/lockstore +``` + ## Best Practices 1. **Choose appropriate lock duration** - Match to expected operation time + buffer @@ -171,3 +200,4 @@ public async Task GenerateDailyReportAsync() - [Distributed Cache](../distributed-cache/README.md) - Cache stampede prevention - [Background Jobs](../background-job/README.md) - Coordinate job execution +- [OpenTelemetry](../telemetry/README.md) - Tracing configuration diff --git a/framework/docs/telemetry/README.md b/framework/docs/telemetry/README.md index c092674..bf9df48 100644 --- a/framework/docs/telemetry/README.md +++ b/framework/docs/telemetry/README.md @@ -8,7 +8,7 @@ Aether provides comprehensive OpenTelemetry integration for distributed tracing, - **OpenTelemetry Standard** - Industry-standard observability - **Three Pillars** - Traces, Metrics, and Logs -- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core +- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus - **OTLP Exporters** - Export to any OTLP-compatible backend - **Environment Variable Support** - Standard OTEL_* variables - **Custom Instrumentation** - Extensible builder pattern @@ -344,12 +344,28 @@ _logger.LogInformation( ## Context Propagation +### Infrastructure Spans + +Distributed lock, cache, and background job operations automatically create spans under the `BBT.Aether.Infrastructure` ActivitySource: + +- `DistributedLock.Acquire` / `DistributedLock.Release` / `DistributedLock.Execute` +- `DistributedCache.Get` / `DistributedCache.Set` / `DistributedCache.Remove` / `DistributedCache.Refresh` +- `BackgroundJob.Enqueue` / `BackgroundJob.Update` / `BackgroundJob.Delete` +- `BackgroundJob.Schedule` / `BackgroundJob.Schedule.Update` / `BackgroundJob.Schedule.Delete` +- `BackgroundJob.Execute` / `BackgroundJob.Dispatch` +- `EventBus.Publish` / `EventBus.PublishEnvelope` / `EventBus.PublishToBroker` +- `Outbox.Process` +- `Inbox.Process` / `Inbox.Invoke` + +Each span includes provider-specific tags (`lock.provider`, `cache.provider`, `job.handler_name`, `job.scheduler`, `event.name`, `event.topic`, `event.broker`, etc.) and records exceptions following OpenTelemetry semantic conventions. + ### Automatic Propagation Trace context is automatically propagated: - HTTP requests (W3C Trace Context headers) - Dapr service invocation - Event bus messages +- Distributed lock, cache, background job, and event bus operations (via `BBT.Aether.Infrastructure` ActivitySource) ### Manual Propagation diff --git a/framework/src/BBT.Aether.AspNetCore/Microsoft/Extensions/DependencyInjection/AetherTelemetryServiceCollectionExtensions.cs b/framework/src/BBT.Aether.AspNetCore/Microsoft/Extensions/DependencyInjection/AetherTelemetryServiceCollectionExtensions.cs index b461b17..bc0600d 100644 --- a/framework/src/BBT.Aether.AspNetCore/Microsoft/Extensions/DependencyInjection/AetherTelemetryServiceCollectionExtensions.cs +++ b/framework/src/BBT.Aether.AspNetCore/Microsoft/Extensions/DependencyInjection/AetherTelemetryServiceCollectionExtensions.cs @@ -171,6 +171,7 @@ public static IServiceCollection AddAetherTelemetry( } tracing.AddSource("BBT.Aether.Aspects"); + tracing.AddSource("BBT.Aether.Infrastructure"); // Custom sources foreach (var src in opts.Tracing.AdditionalSources) diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs index 27354e4..14911e4 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -9,6 +10,7 @@ using BBT.Aether.Events; using BBT.Aether.Guids; using BBT.Aether.MultiSchema; +using BBT.Aether.Telemetry; using BBT.Aether.Uow; using Microsoft.Extensions.Logging; @@ -55,12 +57,22 @@ public async Task EnqueueAsync( if (string.IsNullOrWhiteSpace(schedule)) throw new ArgumentNullException(nameof(schedule)); + using var activity = InfrastructureActivitySource.Source.StartActivity( + "BackgroundJob.Enqueue", + ActivityKind.Producer, + Activity.Current?.Context ?? default); + + activity?.SetTag("job.handler_name", handlerName); + activity?.SetTag("job.name", jobName); + activity?.SetTag("job.schedule", schedule); + logger.LogInformation( "Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'", handlerName, jobName, schedule); // Create job entity var jobId = guidGenerator.Create(); + activity?.SetTag("job.id", jobId.ToString()); // Convert metadata to nullable dictionary for ExtraPropertyDictionary var extraProperties = new ExtraPropertyDictionary(); @@ -118,12 +130,14 @@ public async Task EnqueueAsync( "Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", handlerName, jobName, jobId); + activity?.SetStatus(ActivityStatusCode.Ok); return jobId; } catch (Exception ex) { logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName); + RecordException(activity, ex); await uow.RollbackAsync(cancellationToken); throw; } @@ -138,6 +152,14 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can if (string.IsNullOrWhiteSpace(newSchedule)) throw new ArgumentNullException(nameof(newSchedule)); + using var activity = InfrastructureActivitySource.Source.StartActivity( + "BackgroundJob.Update", + ActivityKind.Producer, + Activity.Current?.Context ?? default); + + activity?.SetTag("job.id", id.ToString()); + activity?.SetTag("job.schedule", newSchedule); + logger.LogInformation("Updating job with entity id '{Id}' to new schedule '{NewSchedule}'", id, newSchedule); await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); @@ -150,6 +172,9 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can throw new InvalidOperationException($"Job with id '{id}' not found."); } + activity?.SetTag("job.handler_name", jobInfo.HandlerName); + activity?.SetTag("job.name", jobInfo.JobName); + // Update schedule in entity jobInfo.ExpressionValue = newSchedule; @@ -179,10 +204,12 @@ await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedu await uow.CommitAsync(cancellationToken); logger.LogInformation("Successfully updated job with entity id '{Id}'", id); + activity?.SetStatus(ActivityStatusCode.Ok); } catch (Exception ex) { logger.LogError(ex, "Failed to update job with entity id '{Id}'", id); + RecordException(activity, ex); await uow.RollbackAsync(cancellationToken); throw; } @@ -194,6 +221,13 @@ public async Task DeleteAsync(Guid id, CancellationToken cancellationToken if (id == Guid.Empty) throw new ArgumentException("Id cannot be empty.", nameof(id)); + using var activity = InfrastructureActivitySource.Source.StartActivity( + "BackgroundJob.Delete", + ActivityKind.Producer, + Activity.Current?.Context ?? default); + + activity?.SetTag("job.id", id.ToString()); + logger.LogInformation("Deleting job with entity id '{Id}'", id); await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); @@ -204,9 +238,13 @@ public async Task DeleteAsync(Guid id, CancellationToken cancellationToken if (jobInfo == null) { logger.LogWarning("Job with entity id '{Id}' not found", id); + activity?.SetStatus(ActivityStatusCode.Ok); return false; } + activity?.SetTag("job.handler_name", jobInfo.HandlerName); + activity?.SetTag("job.name", jobInfo.JobName); + // Delete from scheduler await jobScheduler.DeleteAsync(jobInfo.HandlerName, jobInfo.JobName, cancellationToken); @@ -218,13 +256,27 @@ await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow await uow.CommitAsync(cancellationToken); logger.LogInformation("Successfully deleted job with entity id '{Id}'", id); + activity?.SetStatus(ActivityStatusCode.Ok); return true; } catch (Exception ex) { logger.LogError(ex, "Failed to delete job with entity id '{Id}'", id); + RecordException(activity, ex); await uow.RollbackAsync(cancellationToken); throw; } } -} \ No newline at end of file + + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs index e8d9786..02ba609 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs @@ -1,10 +1,12 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using BBT.Aether.Domain.Entities; using BBT.Aether.Domain.Repositories; using BBT.Aether.Events; using BBT.Aether.MultiSchema; +using BBT.Aether.Telemetry; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -28,6 +30,14 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can if (string.IsNullOrWhiteSpace(jobName)) throw new ArgumentNullException(nameof(jobName)); + using var activity = InfrastructureActivitySource.Source.StartActivity( + "BackgroundJob.Execute", + ActivityKind.Consumer, + Activity.Current?.Context ?? default); + + activity?.SetTag("job.scheduler", "dapr"); + activity?.SetTag("job.name", jobName); + try { await using var scope = scopeFactory.CreateAsyncScope(); @@ -49,16 +59,33 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can logger.LogWarning( "Job '{JobName}' not found in Scheduled state — it may have already been completed, failed, or cancelled", jobName); + activity?.SetStatus(ActivityStatusCode.Ok); return; } + activity?.SetTag("job.handler_name", jobInfo.HandlerName); + activity?.SetTag("job.id", jobInfo.Id.ToString()); + // Dispatch to handler with extracted data payload await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, dataPayload, cancellationToken); + + activity?.SetStatus(ActivityStatusCode.Ok); } catch (Exception ex) { logger.LogError(ex, "Failed to execute Dapr job '{JobName}' through execution bridge", jobName); + + if (activity != null) + { + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } + throw; } } -} \ No newline at end of file +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobScheduler.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobScheduler.cs index 8e5744a..cc1409f 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobScheduler.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobScheduler.cs @@ -1,7 +1,9 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using BBT.Aether.Events; +using BBT.Aether.Telemetry; using Dapr.Jobs; using Dapr.Jobs.Models; using Microsoft.Extensions.Logging; @@ -37,6 +39,9 @@ public async Task ScheduleAsync( if (string.IsNullOrWhiteSpace(schedule)) throw new ArgumentNullException(nameof(schedule)); + using var activity = StartSchedulerActivity("BackgroundJob.Schedule", handlerName, jobName); + activity?.SetTag("job.schedule", schedule); + try { var daprSchedule = ParseSchedule(schedule); @@ -48,15 +53,18 @@ public async Task ScheduleAsync( var payloadBytes = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(envelopeObject); await daprJobsClient.ScheduleJobAsync( - jobName: jobName, // Use jobName as the unique identifier in Dapr + jobName: jobName, schedule: daprSchedule, payload: new ReadOnlyMemory(payloadBytes), overwrite: true, cancellationToken: cancellationToken); + + activity?.SetStatus(ActivityStatusCode.Ok); } catch (Exception ex) { logger.LogError(ex, "Failed to schedule Dapr job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName); + RecordException(activity, ex); throw new InvalidOperationException($"Failed to schedule job handler '{handlerName}' with job name '{jobName}'.", ex); } } @@ -77,15 +85,21 @@ public async Task UpdateScheduleAsync( if (string.IsNullOrWhiteSpace(newSchedule)) throw new ArgumentNullException(nameof(newSchedule)); + using var activity = StartSchedulerActivity("BackgroundJob.Schedule.Update", handlerName, jobName); + activity?.SetTag("job.schedule", newSchedule); + try { var jobInfo = await daprJobsClient.GetJobAsync(jobName, cancellationToken); await daprJobsClient.DeleteJobAsync(jobName, cancellationToken); await ScheduleAsync(handlerName, jobName, newSchedule, jobInfo.Payload, cancellationToken); + + activity?.SetStatus(ActivityStatusCode.Ok); } catch (Exception ex) { logger.LogError(ex, "Failed to update Dapr job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName); + RecordException(activity, ex); throw new InvalidOperationException($"Failed to update job handler '{handlerName}' with job name '{jobName}'.", ex); } } @@ -99,19 +113,50 @@ public async Task DeleteAsync(string handlerName, string jobName, CancellationTo if (string.IsNullOrWhiteSpace(jobName)) throw new ArgumentNullException(nameof(jobName)); + using var activity = StartSchedulerActivity("BackgroundJob.Schedule.Delete", handlerName, jobName); + try { await daprJobsClient.DeleteJobAsync( - jobName: jobName, // Use jobName as the unique identifier in Dapr + jobName: jobName, cancellationToken: cancellationToken); + + activity?.SetStatus(ActivityStatusCode.Ok); } catch (Exception ex) { logger.LogError(ex, "Failed to delete Dapr job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName); + RecordException(activity, ex); throw new InvalidOperationException($"Failed to delete job handler '{handlerName}' with job name '{jobName}'.", ex); } } + private static Activity? StartSchedulerActivity(string operationName, string handlerName, string jobName) + { + var activity = InfrastructureActivitySource.Source.StartActivity( + operationName, + ActivityKind.Client, + Activity.Current?.Context ?? default); + + activity?.SetTag("job.scheduler", "dapr"); + activity?.SetTag("job.handler_name", handlerName); + activity?.SetTag("job.name", jobName); + + return activity; + } + + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } + /// /// Parses a schedule string into a DaprJobSchedule. /// Supports cron expressions and simple delay formats. @@ -123,4 +168,4 @@ private DaprJobSchedule ParseSchedule(string schedule) // Default to treating as cron expression return DaprJobSchedule.FromExpression(schedule); } -} \ No newline at end of file +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs index 840c70d..b0d276d 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using BBT.Aether.Clock; @@ -6,6 +7,7 @@ using BBT.Aether.Domain.Repositories; using BBT.Aether.Events; using BBT.Aether.MultiSchema; +using BBT.Aether.Telemetry; using BBT.Aether.Uow; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -34,6 +36,14 @@ public virtual async Task DispatchAsync( if (string.IsNullOrWhiteSpace(handlerName)) throw new ArgumentNullException(nameof(handlerName)); + using var activity = InfrastructureActivitySource.Source.StartActivity( + "BackgroundJob.Dispatch", + ActivityKind.Internal, + Activity.Current?.Context ?? default); + + activity?.SetTag("job.id", jobId.ToString()); + activity?.SetTag("job.handler_name", handlerName); + await using var scope = scopeFactory.CreateAsyncScope(); var argsPayload = CloudEventEnvelopeHelper.ExtractDataPayload(eventSerializer, jobPayload, out var envelope); @@ -56,13 +66,17 @@ public virtual async Task DispatchAsync( var jobInfo = await jobStore.GetAsync(jobId, cancellationToken); if (jobInfo == null) { + activity?.SetStatus(ActivityStatusCode.Ok); return; } jobName = jobInfo.JobName; + activity?.SetTag("job.name", jobName); if (IsJobAlreadyProcessed(jobInfo, jobId, handlerName)) { + activity?.SetTag("job.status", jobInfo.Status.ToString().ToLowerInvariant()); + activity?.SetStatus(ActivityStatusCode.Ok); await uow.CommitAsync(cancellationToken); await TryDeleteFromSchedulerAsync(jobScheduler, handlerName, jobName, cancellationToken); return; @@ -74,6 +88,8 @@ public virtual async Task DispatchAsync( jobId); await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, clock.UtcNow, "No handler found for handler type", cancellationToken); + activity?.SetTag("job.status", "failed"); + activity?.SetStatus(ActivityStatusCode.Error, "No handler found for handler type"); await uow.CommitAsync(cancellationToken); await TryDeleteFromSchedulerAsync(jobScheduler, handlerName, jobName, cancellationToken); return; @@ -98,12 +114,17 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed, logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId); + activity?.SetTag("job.status", "completed"); + activity?.SetStatus(ActivityStatusCode.Ok); + await handlerUow.CommitAsync(cancellationToken); await TryDeleteFromSchedulerAsync(jobScheduler, handlerName, jobName, cancellationToken); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); + activity?.SetTag("job.status", "cancelled"); + activity?.SetStatus(ActivityStatusCode.Ok); await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled, "Job was cancelled", cancellationToken); await TryDeleteFromSchedulerAsync(jobScheduler, handlerName, jobName, cancellationToken); @@ -112,6 +133,18 @@ await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancel { logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId); var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000); + + activity?.SetTag("job.status", "failed"); + if (activity != null) + { + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } + await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed, errorMessage, cancellationToken); await TryDeleteFromSchedulerAsync(jobScheduler, handlerName, jobName, cancellationToken); @@ -217,4 +250,4 @@ private async Task InvokeHandlerAsync(IServiceProvider scopedProvider, string ha // Invoke handler - completely type-safe, no reflection at runtime await invoker.InvokeAsync(scopedProvider, eventSerializer, jobPayload, cancellationToken); } -} \ No newline at end of file +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs index f4606a3..3026955 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; +using BBT.Aether.Telemetry; using Dapr.Client; namespace BBT.Aether.DistributedCache.Dapr; @@ -19,12 +21,18 @@ public class DaprDistributedCacheService( public async override Task GetAsync(string key, CancellationToken cancellationToken = default) where T : class { + using var activity = StartCacheActivity("DistributedCache.Get", key); + try { - return await _daprClient.GetStateAsync(storeName, key, cancellationToken: cancellationToken); + var result = await _daprClient.GetStateAsync(storeName, key, cancellationToken: cancellationToken); + activity?.SetTag("cache.hit", result != null); + activity?.SetStatus(ActivityStatusCode.Ok); + return result; } - catch + catch (Exception ex) { + RecordException(activity, ex); return null; } } @@ -35,20 +43,24 @@ public async override Task SetAsync( DistributedCacheEntryOptions? options = null, CancellationToken cancellationToken = default) where T : class { + using var activity = StartCacheActivity("DistributedCache.Set", key); + var metadata = new Dictionary(); - // Add TTL if specified if (options?.AbsoluteExpiration.HasValue == true) { var ttl = (int)(options.AbsoluteExpiration.Value - DateTimeOffset.UtcNow).TotalSeconds; if (ttl > 0) { metadata["ttlInSeconds"] = ttl.ToString(); + activity?.SetTag("cache.ttl_seconds", ttl); } } else if (options?.SlidingExpiration.HasValue == true) { - metadata["ttlInSeconds"] = ((int)options.SlidingExpiration.Value.TotalSeconds).ToString(); + var ttl = (int)options.SlidingExpiration.Value.TotalSeconds; + metadata["ttlInSeconds"] = ttl.ToString(); + activity?.SetTag("cache.ttl_seconds", ttl); } await _daprClient.SaveStateAsync( @@ -58,16 +70,46 @@ await _daprClient.SaveStateAsync( metadata: metadata, cancellationToken: cancellationToken ); + + activity?.SetStatus(ActivityStatusCode.Ok); } - public override Task RemoveAsync(string key, CancellationToken cancellationToken = default) + public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default) { - return _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken); + using var activity = StartCacheActivity("DistributedCache.Remove", key); + + await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken); + activity?.SetStatus(ActivityStatusCode.Ok); } public override Task RefreshAsync(string key, CancellationToken cancellationToken = default) { - // Dapr doesn't have a direct refresh mechanism, so we'll do nothing return Task.CompletedTask; } -} \ No newline at end of file + + private Activity? StartCacheActivity(string operationName, string key) + { + var activity = InfrastructureActivitySource.Source.StartActivity( + operationName, + ActivityKind.Client, + Activity.Current?.Context ?? default); + + activity?.SetTag("cache.provider", "dapr"); + activity?.SetTag("cache.key", key); + activity?.SetTag("cache.store_name", storeName); + + return activity; + } + + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Redis/RedisDistributedCacheService.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Redis/RedisDistributedCacheService.cs index 01fd50c..dc7960f 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Redis/RedisDistributedCacheService.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Redis/RedisDistributedCacheService.cs @@ -1,7 +1,9 @@ using System; +using System.Diagnostics; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using BBT.Aether.Telemetry; using Microsoft.Extensions.Logging; using StackExchange.Redis; @@ -22,6 +24,8 @@ public class RedisDistributedCacheService( public async override Task GetAsync(string key, CancellationToken cancellationToken = default) where T : class { + using var activity = StartCacheActivity("DistributedCache.Get", key); + try { var database = _redisConnection.GetDatabase(); @@ -30,16 +34,21 @@ public class RedisDistributedCacheService( if (!cachedValue.HasValue) { _logger.LogDebug("Cache miss for key: {Key}", key); + activity?.SetTag("cache.hit", false); + activity?.SetStatus(ActivityStatusCode.Ok); return null; } var result = JsonSerializer.Deserialize((string)cachedValue!, _jsonOptions); _logger.LogDebug("Cache hit for key: {Key}", key); + activity?.SetTag("cache.hit", true); + activity?.SetStatus(ActivityStatusCode.Ok); return result; } catch (Exception ex) { _logger.LogError(ex, "Error getting value from Redis cache for key: {Key}", key); + RecordException(activity, ex); return null; } } @@ -50,6 +59,8 @@ public async override Task SetAsync( DistributedCacheEntryOptions? options = null, CancellationToken cancellationToken = default) where T : class { + using var activity = StartCacheActivity("DistributedCache.Set", key); + try { var database = _redisConnection.GetDatabase(); @@ -66,18 +77,27 @@ public async override Task SetAsync( expiry = options.SlidingExpiration.Value; } + if (expiry.HasValue) + { + activity?.SetTag("cache.ttl_seconds", (int)expiry.Value.TotalSeconds); + } + await database.StringSetAsync(key, serializedValue, expiry, keepTtl: false); _logger.LogDebug("Successfully cached value for key: {Key} with expiry: {Expiry}", key, expiry); + activity?.SetStatus(ActivityStatusCode.Ok); } catch (Exception ex) { _logger.LogError(ex, "Error setting value in Redis cache for key: {Key}", key); + RecordException(activity, ex); throw; } } public async override Task RemoveAsync(string key, CancellationToken cancellationToken = default) { + using var activity = StartCacheActivity("DistributedCache.Remove", key); + try { var database = _redisConnection.GetDatabase(); @@ -91,27 +111,28 @@ public async override Task RemoveAsync(string key, CancellationToken cancellatio { _logger.LogDebug("Key not found in cache: {Key}", key); } + + activity?.SetStatus(ActivityStatusCode.Ok); } catch (Exception ex) { _logger.LogError(ex, "Error removing key from Redis cache: {Key}", key); + RecordException(activity, ex); throw; } } public async override Task RefreshAsync(string key, CancellationToken cancellationToken = default) { + using var activity = StartCacheActivity("DistributedCache.Refresh", key); + try { var database = _redisConnection.GetDatabase(); - - // Redis doesn't have a direct refresh mechanism like SQL Server - // We'll implement it by checking if the key exists and touching it var exists = await database.KeyExistsAsync(key); if (exists) { - // Touch the key to refresh its expiration await database.KeyTouchAsync(key); _logger.LogDebug("Successfully refreshed key: {Key}", key); } @@ -119,11 +140,39 @@ public async override Task RefreshAsync(string key, CancellationToken cancellati { _logger.LogDebug("Key does not exist for refresh: {Key}", key); } + + activity?.SetStatus(ActivityStatusCode.Ok); } catch (Exception ex) { _logger.LogError(ex, "Error refreshing key in Redis cache: {Key}", key); + RecordException(activity, ex); throw; } } -} \ No newline at end of file + + private static Activity? StartCacheActivity(string operationName, string key) + { + var activity = InfrastructureActivitySource.Source.StartActivity( + operationName, + ActivityKind.Client, + Activity.Current?.Context ?? default); + + activity?.SetTag("cache.provider", "redis"); + activity?.SetTag("cache.key", key); + + return activity; + } + + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Dapr/DaprDistributedLockService.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Dapr/DaprDistributedLockService.cs index 42ddc46..b6b87d8 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Dapr/DaprDistributedLockService.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Dapr/DaprDistributedLockService.cs @@ -1,6 +1,8 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; +using BBT.Aether.Telemetry; using Dapr.Client; using Microsoft.Extensions.Logging; @@ -21,6 +23,8 @@ public class DaprDistributedLockService( public async Task TryAcquireLockAsync(string resourceId, int expiryInSeconds = 60, CancellationToken cancellationToken = default) { + using var activity = StartLockActivity("DistributedLock.Acquire", resourceId, expiryInSeconds); + try { var lockOwner = $"{GetClientIdentifier()}"; @@ -29,30 +33,47 @@ public class DaprDistributedLockService( if (resourceLock != null && resourceLock.Success) { logger.LogDebug("Successfully acquired lock for resource {ResourceId}", resourceId); + activity?.SetTag("lock.acquired", true); + activity?.SetStatus(ActivityStatusCode.Ok); return resourceLock; } logger.LogWarning("Failed to acquire lock for resource {ResourceId}", resourceId); + activity?.SetTag("lock.acquired", false); + activity?.SetStatus(ActivityStatusCode.Ok); return null; } catch (Exception ex) { logger.LogError(ex, "Error acquiring lock for resource {ResourceId}", resourceId); + RecordException(activity, ex); return null; } } public async Task ReleaseLockAsync(string resourceId, CancellationToken cancellationToken = default) { + using var activity = InfrastructureActivitySource.Source.StartActivity( + "DistributedLock.Release", + ActivityKind.Client, + Activity.Current?.Context ?? default); + + activity?.SetTag("lock.provider", "dapr"); + activity?.SetTag("lock.resource_id", resourceId); + activity?.SetTag("lock.store_name", storeName); + try { var lockOwner = $"{GetClientIdentifier()}"; await daprClient.Unlock(storeName, resourceId, lockOwner, cancellationToken); + activity?.SetTag("lock.released", true); + activity?.SetStatus(ActivityStatusCode.Ok); return true; } catch (Exception ex) { logger.LogError(ex, "Error releasing lock for resource {ResourceId}", resourceId); + RecordException(activity, ex); return false; } } @@ -60,6 +81,8 @@ public async Task ReleaseLockAsync(string resourceId, CancellationToken ca public async Task<(bool Acquired, T? Result)> ExecuteWithLockAsync(string resourceId, Func> function, int expiryInSeconds = 60, CancellationToken cancellationToken = default) { + using var activity = StartLockActivity("DistributedLock.Execute", resourceId, expiryInSeconds); + try { var lockOwner = $"{GetClientIdentifier()}"; @@ -68,16 +91,21 @@ public async Task ReleaseLockAsync(string resourceId, CancellationToken ca if (resourceLock == null || !resourceLock.Success) { logger.LogWarning("Failed to acquire lock for resource {ResourceId}", resourceId); + activity?.SetTag("lock.acquired", false); + activity?.SetStatus(ActivityStatusCode.Ok); return (false, default); } logger.LogDebug("Successfully acquired lock for resource {ResourceId}", resourceId); + activity?.SetTag("lock.acquired", true); var result = await function(); + activity?.SetStatus(ActivityStatusCode.Ok); return (true, result); } catch (Exception ex) { logger.LogError(ex, "Error executing function with Dapr lock for resource {ResourceId}", resourceId); + RecordException(activity, ex); throw; } } @@ -85,6 +113,8 @@ public async Task ReleaseLockAsync(string resourceId, CancellationToken ca public async Task ExecuteWithLockAsync(string resourceId, Func action, int expiryInSeconds = 60, CancellationToken cancellationToken = default) { + using var activity = StartLockActivity("DistributedLock.Execute", resourceId, expiryInSeconds); + try { var lockOwner = $"{GetClientIdentifier()}"; @@ -93,24 +123,56 @@ public async Task ExecuteWithLockAsync(string resourceId, Func actio if (resourceLock == null || !resourceLock.Success) { logger.LogWarning("Failed to acquire lock for resource {ResourceId}", resourceId); + activity?.SetTag("lock.acquired", false); + activity?.SetStatus(ActivityStatusCode.Ok); return false; } logger.LogDebug("Successfully acquired lock for resource {ResourceId}", resourceId); + activity?.SetTag("lock.acquired", true); await action(); + activity?.SetStatus(ActivityStatusCode.Ok); return true; } catch (Exception ex) { logger.LogError(ex, "Error executing action with Dapr lock for resource {ResourceId}", resourceId); + RecordException(activity, ex); throw; } } + private Activity? StartLockActivity(string operationName, string resourceId, int expiryInSeconds) + { + var activity = InfrastructureActivitySource.Source.StartActivity( + operationName, + ActivityKind.Client, + Activity.Current?.Context ?? default); + + activity?.SetTag("lock.provider", "dapr"); + activity?.SetTag("lock.resource_id", resourceId); + activity?.SetTag("lock.store_name", storeName); + activity?.SetTag("lock.expiry_seconds", expiryInSeconds); + + return activity; + } + + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } + private string GetClientIdentifier() { return ($"{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.{applicationInfoAccessor.ApplicationName}") .ToLowerInvariant(); } -} \ No newline at end of file +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisDistributedLockService.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisDistributedLockService.cs index 4a662e3..005d0d0 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisDistributedLockService.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisDistributedLockService.cs @@ -1,6 +1,8 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; +using BBT.Aether.Telemetry; using Microsoft.Extensions.Logging; using StackExchange.Redis; @@ -18,14 +20,14 @@ public class RedisDistributedLockService( public async Task TryAcquireLockAsync(string resourceId, int expiryInSeconds = 60, CancellationToken cancellationToken = default) { + using var activity = StartLockActivity("DistributedLock.Acquire", resourceId, expiryInSeconds); + try { var database = redisConnection.GetDatabase(); var lockOwner = GetClientIdentifier(); var expiry = TimeSpan.FromSeconds(expiryInSeconds); - // Use atomic SETNX (SET if Not eXists) with expiry - // Key: resourceId, Value: lockOwner var acquired = await database.StringSetAsync( resourceId, lockOwner, @@ -37,27 +39,39 @@ public class RedisDistributedLockService( { logger.LogDebug("Successfully acquired Redis lock for resource {ResourceId} with owner {LockOwner}", resourceId, lockOwner); + activity?.SetTag("lock.acquired", true); + activity?.SetStatus(ActivityStatusCode.Ok); return new RedisLockHandle(database, resourceId, lockOwner, logger); } logger.LogWarning("Failed to acquire Redis lock for resource {ResourceId}", resourceId); + activity?.SetTag("lock.acquired", false); + activity?.SetStatus(ActivityStatusCode.Ok); return null; } catch (Exception ex) { logger.LogError(ex, "Error acquiring Redis lock for resource {ResourceId}", resourceId); + RecordException(activity, ex); return null; } } public async Task ReleaseLockAsync(string resourceId, CancellationToken cancellationToken = default) { + using var activity = InfrastructureActivitySource.Source.StartActivity( + "DistributedLock.Release", + ActivityKind.Client, + Activity.Current?.Context ?? default); + + activity?.SetTag("lock.provider", "redis"); + activity?.SetTag("lock.resource_id", resourceId); + try { var database = redisConnection.GetDatabase(); var lockOwner = GetClientIdentifier(); - // Use Lua script to ensure atomic delete only if the lock exists and is owned by this client var script = @" if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) @@ -84,11 +98,14 @@ public async Task ReleaseLockAsync(string resourceId, CancellationToken ca resourceId, lockOwner); } + activity?.SetTag("lock.released", released); + activity?.SetStatus(ActivityStatusCode.Ok); return released; } catch (Exception ex) { logger.LogError(ex, "Error releasing Redis lock for resource {ResourceId}", resourceId); + RecordException(activity, ex); return false; } } @@ -101,22 +118,30 @@ public async Task ReleaseLockAsync(string resourceId, CancellationToken ca throw new ArgumentNullException(nameof(function)); } + using var activity = StartLockActivity("DistributedLock.Execute", resourceId, expiryInSeconds); + await using var lockAcquired = await TryAcquireLockAsync(resourceId, expiryInSeconds, cancellationToken); if (lockAcquired == null) { logger.LogWarning("Could not acquire lock for resource {ResourceId}", resourceId); + activity?.SetTag("lock.acquired", false); + activity?.SetStatus(ActivityStatusCode.Ok); return (false, default); } + activity?.SetTag("lock.acquired", true); + try { var result = await function(); + activity?.SetStatus(ActivityStatusCode.Ok); return (true, result); } catch (Exception ex) { logger.LogError(ex, "Error executing function with Redis lock for resource {ResourceId}", resourceId); + RecordException(activity, ex); throw; } } @@ -129,30 +154,64 @@ public async Task ExecuteWithLockAsync(string resourceId, Func actio throw new ArgumentNullException(nameof(action)); } + using var activity = StartLockActivity("DistributedLock.Execute", resourceId, expiryInSeconds); + await using var lockAcquired = await TryAcquireLockAsync(resourceId, expiryInSeconds, cancellationToken); if (lockAcquired == null) { logger.LogWarning("Could not acquire lock for resource {ResourceId}", resourceId); + activity?.SetTag("lock.acquired", false); + activity?.SetStatus(ActivityStatusCode.Ok); return false; } + activity?.SetTag("lock.acquired", true); + try { await action(); + activity?.SetStatus(ActivityStatusCode.Ok); return true; } catch (Exception ex) { logger.LogError(ex, "Error executing action with Redis lock for resource {ResourceId}", resourceId); + RecordException(activity, ex); throw; } } + private static Activity? StartLockActivity(string operationName, string resourceId, int expiryInSeconds) + { + var activity = InfrastructureActivitySource.Source.StartActivity( + operationName, + ActivityKind.Client, + Activity.Current?.Context ?? default); + + activity?.SetTag("lock.provider", "redis"); + activity?.SetTag("lock.resource_id", resourceId); + activity?.SetTag("lock.expiry_seconds", expiryInSeconds); + + return activity; + } + + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } + private string GetClientIdentifier() { return ($"{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.{applicationInfoAccessor.ApplicationName}.{applicationInfoAccessor.InstanceId}") .ToLowerInvariant(); } -} \ No newline at end of file +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisLockHandle.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisLockHandle.cs index daa65f1..429f15f 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisLockHandle.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisLockHandle.cs @@ -1,5 +1,7 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; +using BBT.Aether.Telemetry; using Microsoft.Extensions.Logging; using StackExchange.Redis; @@ -31,6 +33,14 @@ private async ValueTask ReleaseAsync() _disposed = true; + using var activity = InfrastructureActivitySource.Source.StartActivity( + "DistributedLock.Release", + ActivityKind.Client, + Activity.Current?.Context ?? default); + + activity?.SetTag("lock.provider", "redis"); + activity?.SetTag("lock.resource_id", resourceId); + try { var script = @" @@ -50,19 +60,33 @@ private async ValueTask ReleaseAsync() logger.LogDebug( "Released Redis lock for resource {ResourceId} with owner {LockOwner}", resourceId, lockOwner); + activity?.SetTag("lock.released", true); } else { logger.LogDebug( "No Redis lock to release or owner mismatch for resource {ResourceId} with owner {LockOwner}", resourceId, lockOwner); + activity?.SetTag("lock.released", false); } + + activity?.SetStatus(ActivityStatusCode.Ok); } catch (Exception ex) { logger.LogError(ex, "Error releasing Redis lock for resource {ResourceId} with owner {LockOwner}", resourceId, lockOwner); + + if (activity != null) + { + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } } } -} \ No newline at end of file +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DaprEventBus.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DaprEventBus.cs index 0cd6aaf..af31a6b 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DaprEventBus.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DaprEventBus.cs @@ -1,6 +1,9 @@ +using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using BBT.Aether.MultiSchema; +using BBT.Aether.Telemetry; using Dapr.Client; namespace BBT.Aether.Events; @@ -26,26 +29,65 @@ private string ResolvePubSubName() where TEvent : class return eventInfo.PubSubName ?? _options.PubSubName; } - protected override Task PublishToBrokerAsync(string topic, byte[] serializedEnvelope, CancellationToken cancellationToken = default) + protected override async Task PublishToBrokerAsync(string topic, byte[] serializedEnvelope, CancellationToken cancellationToken = default) { - // Resolve PubSub component name from event type attribute or default var pubSubName = ResolvePubSubName(); - // Deserialize the envelope to object so Dapr can serialize it properly - // This prevents double-serialization (string wrapping) by Dapr - var envelope = EventSerializer.Deserialize(serializedEnvelope); - - // Publish as object - Dapr will serialize it as JSON object (not string-wrapped) - return daprClient.PublishEventAsync(pubSubName, topic, envelope, cancellationToken); + using var activity = InfrastructureActivitySource.Source.StartActivity( + "EventBus.PublishToBroker", + ActivityKind.Producer, + Activity.Current?.Context ?? default); + + activity?.SetTag("event.topic", topic); + activity?.SetTag("event.pubsub_name", pubSubName); + activity?.SetTag("event.broker", "dapr"); + + try + { + var envelope = EventSerializer.Deserialize(serializedEnvelope); + await daprClient.PublishEventAsync(pubSubName, topic, envelope, cancellationToken); + activity?.SetStatus(ActivityStatusCode.Ok); + } + catch (Exception ex) + { + RecordException(activity, ex); + throw; + } + } + + protected override async Task PublishToBrokerAsync(string topic, string pubSubName, byte[] serializedEnvelope, CancellationToken cancellationToken = default) + { + using var activity = InfrastructureActivitySource.Source.StartActivity( + "EventBus.PublishToBroker", + ActivityKind.Producer, + Activity.Current?.Context ?? default); + + activity?.SetTag("event.topic", topic); + activity?.SetTag("event.pubsub_name", pubSubName); + activity?.SetTag("event.broker", "dapr"); + + try + { + var envelope = EventSerializer.Deserialize(serializedEnvelope); + await daprClient.PublishEventAsync(pubSubName, topic, envelope, cancellationToken); + activity?.SetStatus(ActivityStatusCode.Ok); + } + catch (Exception ex) + { + RecordException(activity, ex); + throw; + } } - protected override Task PublishToBrokerAsync(string topic, string pubSubName, byte[] serializedEnvelope, CancellationToken cancellationToken = default) + private static void RecordException(Activity? activity, Exception ex) { - // Deserialize the envelope to object so Dapr can serialize it properly - // This prevents double-serialization (string wrapping) by Dapr - var envelope = EventSerializer.Deserialize(serializedEnvelope); - - // Publish as object - Dapr will serialize it as JSON object (not string-wrapped) - return daprClient.PublishEventAsync(pubSubName, topic, envelope, cancellationToken); + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); } } diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventBusBase.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventBusBase.cs index 6df2585..5a05876 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventBusBase.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventBusBase.cs @@ -1,7 +1,9 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using BBT.Aether.MultiSchema; +using BBT.Aether.Telemetry; namespace BBT.Aether.Events; @@ -71,15 +73,33 @@ public async Task PublishAsync( // Use Topic for routing to the message broker (includes environment prefix if enabled) var topicName = envelope.Topic ?? envelope.Type; - if (useOutbox) + using var activity = InfrastructureActivitySource.Source.StartActivity( + "EventBus.Publish", + ActivityKind.Producer, + Activity.Current?.Context ?? default); + + activity?.SetTag("event.name", envelope.Type); + activity?.SetTag("event.topic", topicName); + activity?.SetTag("event.use_outbox", useOutbox); + + try { - await StoreInOutboxAsync(envelope, cancellationToken); + if (useOutbox) + { + await StoreInOutboxAsync(envelope, cancellationToken); + } + else + { + var serialized = EventSerializer.Serialize(envelope); + await PublishToBrokerAsync(topicName, serialized, cancellationToken); + } + + activity?.SetStatus(ActivityStatusCode.Ok); } - else + catch (Exception ex) { - // Serialize envelope using IEventSerializer for consistent format - var serialized = EventSerializer.Serialize(envelope); - await PublishToBrokerAsync(topicName, serialized, cancellationToken); + RecordException(activity, ex); + throw; } } @@ -93,19 +113,37 @@ public async Task PublishAsync( { // Create envelope using metadata - no reflection needed var envelope = CreateEnvelopeFromMetadata(@event, metadata, subject); - - if (useOutbox) + var topicName = envelope.Topic ?? envelope.Type; + + using var activity = InfrastructureActivitySource.Source.StartActivity( + "EventBus.Publish", + ActivityKind.Producer, + Activity.Current?.Context ?? default); + + activity?.SetTag("event.name", envelope.Type); + activity?.SetTag("event.topic", topicName); + activity?.SetTag("event.use_outbox", useOutbox); + + try { - await StoreInOutboxAsync(envelope, cancellationToken); + if (useOutbox) + { + await StoreInOutboxAsync(envelope, cancellationToken); + } + else + { + var serialized = EventSerializer.Serialize(envelope); + var pubSubName = metadata.PubSubName ?? AetherEventBusOptions.PubSubName; + activity?.SetTag("event.pubsub_name", pubSubName); + await PublishToBrokerAsync(topicName, pubSubName, serialized, cancellationToken); + } + + activity?.SetStatus(ActivityStatusCode.Ok); } - else + catch (Exception ex) { - var serialized = EventSerializer.Serialize(envelope); - // Use PubSubName from metadata or fall back to default from options - var pubSubName = metadata.PubSubName ?? AetherEventBusOptions.PubSubName; - // Use Topic for routing to the message broker - var topicName = envelope.Topic ?? envelope.Type; - await PublishToBrokerAsync(topicName, pubSubName, serialized, cancellationToken); + RecordException(activity, ex); + throw; } } @@ -155,13 +193,30 @@ private async Task StoreInOutboxAsync(CloudEventEnvelope envelope, CancellationT /// Publishes a pre-serialized CloudEventEnvelope directly to the broker. /// Used internally by the outbox processor to republish stored events. /// - public Task PublishEnvelopeAsync( + public async Task PublishEnvelopeAsync( byte[] serializedEnvelope, string topicName, string pubSubName, CancellationToken cancellationToken = default) { - return PublishToBrokerAsync(topicName, pubSubName, serializedEnvelope, cancellationToken); + using var activity = InfrastructureActivitySource.Source.StartActivity( + "EventBus.PublishEnvelope", + ActivityKind.Producer, + Activity.Current?.Context ?? default); + + activity?.SetTag("event.topic", topicName); + activity?.SetTag("event.pubsub_name", pubSubName); + + try + { + await PublishToBrokerAsync(topicName, pubSubName, serializedEnvelope, cancellationToken); + activity?.SetStatus(ActivityStatusCode.Ok); + } + catch (Exception ex) + { + RecordException(activity, ex); + throw; + } } /// @@ -183,4 +238,16 @@ public Task PublishEnvelopeAsync( /// The serialized CloudEventEnvelope /// Cancellation token protected abstract Task PublishToBrokerAsync(string topic, string pubSubName, byte[] serializedEnvelope, CancellationToken cancellationToken = default); + + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } } diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventInvoker.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventInvoker.cs index 83d4662..85d827d 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventInvoker.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventInvoker.cs @@ -1,6 +1,8 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; +using BBT.Aether.Telemetry; using Microsoft.Extensions.DependencyInjection; namespace BBT.Aether.Events; @@ -38,20 +40,47 @@ public DistributedEventInvoker(string topic, string pubSubName) /// public async Task InvokeAsync(IServiceProvider serviceProvider, ReadOnlyMemory body, CancellationToken cancellationToken) { - // Resolve dependencies from service provider - var serializer = serviceProvider.GetRequiredService(); - var handler = serviceProvider.GetRequiredService>(); - - // Deserialize to strongly-typed CloudEventEnvelope - var envelope = serializer.Deserialize>(body.Span); - - if (envelope == null) + using var activity = InfrastructureActivitySource.Source.StartActivity( + "Inbox.Invoke", + ActivityKind.Internal, + Activity.Current?.Context ?? default); + + activity?.SetTag("event.name", Name); + activity?.SetTag("event.version", Version); + + try + { + var serializer = serviceProvider.GetRequiredService(); + var handler = serviceProvider.GetRequiredService>(); + + activity?.SetTag("event.handler", handler.GetType().Name); + + var envelope = serializer.Deserialize>(body.Span); + + if (envelope == null) + { + throw new InvalidOperationException($"Failed to deserialize CloudEventEnvelope<{typeof(T).Name}>"); + } + + await handler.HandleAsync(envelope, cancellationToken); + activity?.SetStatus(ActivityStatusCode.Ok); + } + catch (Exception ex) { - throw new InvalidOperationException($"Failed to deserialize CloudEventEnvelope<{typeof(T).Name}>"); + RecordException(activity, ex); + throw; } - - // Invoke handler with strongly-typed envelope - await handler.HandleAsync(envelope, cancellationToken); } -} + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs index 22d917d..8b32c32 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs @@ -1,9 +1,11 @@ using System; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using BBT.Aether.DistributedLock; using BBT.Aether.Persistence; +using BBT.Aether.Telemetry; using BBT.Aether.Uow; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; @@ -86,11 +88,17 @@ private async Task ProcessSingleEventAsync( IServiceProvider scopedServiceProvider, CancellationToken cancellationToken) { + using var activity = InfrastructureActivitySource.Source.StartActivity( + "Inbox.Process", + ActivityKind.Consumer, + Activity.Current?.Context ?? default); + + activity?.SetTag("event.id", inboxMessage.Id); + logger.LogInformation("Start processing incoming event with id = {EventId}", inboxMessage.Id); try { - // Begin a new UoW for this event processing var unitOfWorkManager = scopedServiceProvider.GetRequiredService(); await using var uow = await unitOfWorkManager.BeginRequiresNew(cancellationToken); @@ -108,18 +116,23 @@ private async Task ProcessSingleEventAsync( { logger.LogWarning("Failed to deserialize event {EventId}, marking as failed", inboxMessage.Id); await MarkEventAsFailedAsync(inboxMessage.Id, scopedServiceProvider, cancellationToken); + activity?.SetStatus(ActivityStatusCode.Error, "Failed to deserialize event envelope"); return; } var eventName = envelope.Type; var version = envelope.Version ?? 1; + activity?.SetTag("event.name", eventName); + activity?.SetTag("event.version", version); + // Lookup invoker from registry if (!invokerRegistry.TryGet(eventName, version, out var invoker)) { logger.LogWarning("No handler registered for event {EventName} v{Version}, marking as failed", eventName, version); await MarkEventAsFailedAsync(inboxMessage.Id, scopedServiceProvider, cancellationToken); + activity?.SetStatus(ActivityStatusCode.Error, $"No handler registered for {eventName} v{version}"); return; } @@ -134,12 +147,14 @@ private async Task ProcessSingleEventAsync( // Commit handler changes + processed status await handlerUow.CommitAsync(cancellationToken); + activity?.SetStatus(ActivityStatusCode.Ok); logger.LogInformation("Successfully processed event {EventId} ({EventName} v{Version})", inboxMessage.Id, eventName, version); } catch (Exception ex) { logger.LogError(ex, "Failed to process event {EventId}", inboxMessage.Id); + RecordException(activity, ex); await MarkEventAsFailedAsync(inboxMessage.Id, scopedServiceProvider, cancellationToken); } } @@ -206,4 +221,16 @@ protected virtual async Task CleanupOldMessagesAsync(CancellationToken cancellat logger.LogError(ex, "Error cleaning up old inbox messages"); } } -} \ No newline at end of file + + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs index d660913..dd4444d 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs @@ -1,11 +1,13 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using BBT.Aether.Clock; using BBT.Aether.MultiSchema; using BBT.Aether.Persistence; +using BBT.Aether.Telemetry; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -69,24 +71,31 @@ protected virtual async Task ProcessOutboxMessagesAsync(CancellationToken cancel { break; } - + + using var activity = InfrastructureActivitySource.Source.StartActivity( + "Outbox.Process", + ActivityKind.Producer, + Activity.Current?.Context ?? default); + + // Get topicName and pubSubName from ExtraProperties + var topicName = message.ExtraProperties.TryGetValue("TopicName", out var topicObj) + ? topicObj?.ToString() ?? message.EventName + : message.EventName; + var pubSubName = message.ExtraProperties.TryGetValue("PubSubName", out var pubSubObj) + ? pubSubObj?.ToString() ?? eventBusOptions.PubSubName + : eventBusOptions.PubSubName; + + activity?.SetTag("event.name", message.EventName); + activity?.SetTag("event.topic", topicName); + activity?.SetTag("event.pubsub_name", pubSubName); + activity?.SetTag("outbox.message_id", message.Id.ToString()); + activity?.SetTag("outbox.retry_count", message.RetryCount); + try { - // Get topicName and pubSubName from ExtraProperties - var topicName = message.ExtraProperties.TryGetValue("TopicName", out var topicObj) - ? topicObj?.ToString() ?? message.EventName - : message.EventName; - var pubSubName = message.ExtraProperties.TryGetValue("PubSubName", out var pubSubObj) - ? pubSubObj?.ToString() ?? eventBusOptions.PubSubName - : eventBusOptions.PubSubName; - - // EventData is already serialized CloudEventEnvelope bytes var serializedEnvelope = message.EventData; - - // Publish using IDistributedEventBus abstraction await eventBus.PublishEnvelopeAsync(serializedEnvelope, topicName, pubSubName, cancellationToken); - // Find and update the domain entity var domainMessage = await dbContext.OutboxMessages.FindAsync(new object[] { message.Id }, cancellationToken); if (domainMessage != null) { @@ -97,13 +106,14 @@ protected virtual async Task ProcessOutboxMessagesAsync(CancellationToken cancel domainMessage.LockedUntil = null; } + activity?.SetStatus(ActivityStatusCode.Ok); logger.LogInformation("Successfully published outbox message {MessageId}", message.Id); } catch (Exception ex) { logger.LogError(ex, "Failed to publish outbox message {MessageId}", message.Id); + RecordException(activity, ex); - // Find and update the domain entity for retry var domainMessage = await dbContext.OutboxMessages.FindAsync(new object[] { message.Id }, cancellationToken); if (domainMessage != null) { @@ -148,4 +158,16 @@ private DateTime CalculateNextRetryTime(int retryCount) var delay = options.RetryBaseDelay * Math.Pow(2, retryCount - 1); return clock.UtcNow.Add(TimeSpan.FromMilliseconds(delay.TotalMilliseconds)); } -} \ No newline at end of file + + private static void RecordException(Activity? activity, Exception ex) + { + if (activity == null) return; + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, + { "exception.message", ex.Message }, + })); + } +} diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Telemetry/InfrastructureActivitySource.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Telemetry/InfrastructureActivitySource.cs new file mode 100644 index 0000000..951af29 --- /dev/null +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Telemetry/InfrastructureActivitySource.cs @@ -0,0 +1,25 @@ +using System.Diagnostics; + +namespace BBT.Aether.Telemetry; + +/// +/// Provides a static ActivitySource for Aether infrastructure tracing (distributed lock, cache, etc.). +/// Automatically registered by AddAetherTelemetry; no manual configuration required. +/// +public static class InfrastructureActivitySource +{ + /// + /// The name of the ActivitySource used by Aether infrastructure. + /// + public const string SourceName = "BBT.Aether.Infrastructure"; + + /// + /// The version of the Aether infrastructure library. + /// + public const string Version = "1.0.0"; + + /// + /// The shared ActivitySource instance for creating activities (spans) in Aether infrastructure. + /// + public static readonly ActivitySource Source = new(SourceName, Version); +}