Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions framework/docs/background-job/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,57 @@ Scheduled → Running → Completed
→ Cancelled
```

## Tracing

All background job operations automatically emit OpenTelemetry spans via the `BBT.Aether.Infrastructure` ActivitySource. No additional configuration is needed when using `AddAetherTelemetry`.

### Scheduling Spans

| Span | Description |
|------|-------------|
| `BackgroundJob.Enqueue` | Job creation + persistence via `IBackgroundJobService` |
| `BackgroundJob.Update` | Schedule update via `IBackgroundJobService` |
| `BackgroundJob.Delete` | Job cancellation via `IBackgroundJobService` |
| `BackgroundJob.Schedule` | Scheduler-level job registration (e.g. Dapr) |
| `BackgroundJob.Schedule.Update` | Scheduler-level reschedule |
| `BackgroundJob.Schedule.Delete` | Scheduler-level job removal |

### Execution Spans

| Span | Description |
|------|-------------|
| `BackgroundJob.Execute` | Dapr callback entry point (execution bridge) |
| `BackgroundJob.Dispatch` | Handler invocation with idempotency check |

### Tags

| Tag | Description |
|-----|-------------|
| `job.handler_name` | Handler type name (e.g. `"SendEmail"`) |
| `job.name` | Unique job identifier (e.g. `"send-email-order-123"`) |
| `job.schedule` | Schedule expression (e.g. `"@daily"`, `"*/15 * * * *"`) |
| `job.id` | Entity ID from BackgroundJobInfo |
| `job.scheduler` | Scheduler backend (e.g. `"dapr"`) |
| `job.status` | Final dispatch status (`"completed"`, `"failed"`, `"cancelled"`) |

### Example Trace Hierarchy

**Scheduling:**
```
[ASP.NET Core] POST /api/orders
[BBT.Aether.Infrastructure] BackgroundJob.Enqueue
[BBT.Aether.Infrastructure] BackgroundJob.Schedule
[HTTP Client] POST http://localhost:3500/...
```

**Execution (Dapr callback):**
```
[ASP.NET Core] POST /job
[BBT.Aether.Infrastructure] BackgroundJob.Execute
[BBT.Aether.Infrastructure] BackgroundJob.Dispatch
[EF Core] SELECT/UPDATE ...
```

## Best Practices

1. **Make handlers idempotent** - Jobs may be retried on failure
Expand Down Expand Up @@ -195,3 +246,4 @@ public class ProcessPaymentJobHandler : IBackgroundJobHandler<ProcessPaymentJobA

- [Unit of Work](../unit-of-work/README.md) - Transaction management for handlers
- [Distributed Lock](../distributed-lock/README.md) - Coordinate job execution
- [OpenTelemetry](../telemetry/README.md) - Tracing configuration
65 changes: 65 additions & 0 deletions framework/docs/distributed-events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,71 @@ services.AddAetherEventBus(options =>
});
```

## Tracing

All event bus operations are automatically instrumented with OpenTelemetry spans via the `BBT.Aether.Infrastructure` ActivitySource. No additional configuration is required beyond enabling Aether telemetry.

### Publishing Spans

| Span | Kind | Description |
|------|------|-------------|
| `EventBus.Publish` | Producer | Created when `PublishAsync` is called (both generic and metadata-based overloads) |
| `EventBus.PublishEnvelope` | Producer | Created when a pre-serialized envelope is published (used by outbox processor) |
| `EventBus.PublishToBroker` | Producer | Created when the event is sent to the Dapr PubSub broker |

### Processing Spans

| Span | Kind | Description |
|------|------|-------------|
| `Outbox.Process` | Producer | Created per message in `OutboxProcessor` when republishing from the outbox |
| `Inbox.Process` | Consumer | Created per message in `InboxProcessor` when processing a pending inbox event |
| `Inbox.Invoke` | Internal | Created in `DistributedEventInvoker` when deserializing and calling the event handler |

### Semantic Tags

| Tag | Example | Description |
|-----|---------|-------------|
| `event.name` | `"order.created"` | CloudEvent Type / event name |
| `event.topic` | `"dev.order.created/v1"` | Broker topic |
| `event.pubsub_name` | `"pubsub"` | Dapr PubSub component |
| `event.broker` | `"dapr"` | Broker implementation |
| `event.use_outbox` | `true` / `false` | Whether outbox pattern was used |
| `event.id` | `"abc123"` | CloudEvent ID (inbox) |
| `event.version` | `1` | Event version |
| `event.handler` | `"OrderCreatedEventHandler"` | Handler type name |
| `outbox.message_id` | GUID | Outbox message entity ID |
| `outbox.retry_count` | `0` | Current retry attempt |

### Example Trace Hierarchy

**Direct publish:**
```
[ASP.NET Core] POST /api/orders
└─ [BBT.Aether.Infrastructure] EventBus.Publish
└─ [BBT.Aether.Infrastructure] EventBus.PublishToBroker
└─ [HTTP Client] POST http://localhost:3500/v1.0/publish/pubsub/order.created
```

**Outbox publish:**
```
[ASP.NET Core] POST /api/orders
└─ [BBT.Aether.Infrastructure] EventBus.Publish (use_outbox=true)
...later (background)...
[BBT.Aether.Infrastructure] Outbox.Process
└─ [BBT.Aether.Infrastructure] EventBus.PublishEnvelope
└─ [BBT.Aether.Infrastructure] EventBus.PublishToBroker
└─ [HTTP Client] POST http://localhost:3500/...
```

**Inbox consumption:**
```
[ASP.NET Core] POST /events/order.created/v1 (Dapr delivers event)
...later (background)...
[BBT.Aether.Infrastructure] Inbox.Process
└─ [BBT.Aether.Infrastructure] Inbox.Invoke
└─ (handler code runs here)
```

## Best Practices

1. **Use EventNameAttribute** - Explicit topic naming with versioning
Expand Down
30 changes: 30 additions & 0 deletions framework/docs/distributed-lock/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,35 @@ public async Task GenerateDailyReportAsync()
}
```

## Tracing

All lock operations automatically emit OpenTelemetry spans via the `BBT.Aether.Infrastructure` ActivitySource. No additional configuration is needed when using `AddAetherTelemetry`.

Span names:
- `DistributedLock.Acquire` - lock acquisition attempts
- `DistributedLock.Release` - explicit or dispose-based lock release
- `DistributedLock.Execute` - `ExecuteWithLockAsync` (covers acquire + execute + release)

Tags added to each span:

| Tag | Description |
|-----|-------------|
| `lock.provider` | `"dapr"` or `"redis"` |
| `lock.resource_id` | The resource identifier being locked |
| `lock.store_name` | Dapr component name (Dapr provider only) |
| `lock.expiry_seconds` | Lock TTL |
| `lock.acquired` | Whether the lock was successfully obtained |
| `lock.released` | Whether the lock was successfully released |

Example trace hierarchy:

```
[ASP.NET Core] POST /api/orders
[BBT.Aether.Aspects] OrderService.ProcessOrder
[BBT.Aether.Infrastructure] DistributedLock.Execute
[HTTP Client] POST http://localhost:3500/v1.0-alpha1/lock/lockstore
```

## Best Practices

1. **Choose appropriate lock duration** - Match to expected operation time + buffer
Expand All @@ -171,3 +200,4 @@ public async Task GenerateDailyReportAsync()

- [Distributed Cache](../distributed-cache/README.md) - Cache stampede prevention
- [Background Jobs](../background-job/README.md) - Coordinate job execution
- [OpenTelemetry](../telemetry/README.md) - Tracing configuration
18 changes: 17 additions & 1 deletion framework/docs/telemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Aether provides comprehensive OpenTelemetry integration for distributed tracing,

- **OpenTelemetry Standard** - Industry-standard observability
- **Three Pillars** - Traces, Metrics, and Logs
- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core
- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick (typo): Consider using the canonical ASP.NET Core naming for consistency.

This list currently uses AspNetCore, while other docs (e.g., trace examples) and the official product name use ASP.NET Core. Updating it here would align with the official naming and keep the docs consistent.

Suggested change
- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus
- **Automatic Instrumentation** - ASP.NET Core, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus

- **OTLP Exporters** - Export to any OTLP-compatible backend
- **Environment Variable Support** - Standard OTEL_* variables
- **Custom Instrumentation** - Extensible builder pattern
Expand Down Expand Up @@ -344,12 +344,28 @@ _logger.LogInformation(

## Context Propagation

### Infrastructure Spans

Distributed lock, cache, and background job operations automatically create spans under the `BBT.Aether.Infrastructure` ActivitySource:

- `DistributedLock.Acquire` / `DistributedLock.Release` / `DistributedLock.Execute`
- `DistributedCache.Get` / `DistributedCache.Set` / `DistributedCache.Remove` / `DistributedCache.Refresh`
- `BackgroundJob.Enqueue` / `BackgroundJob.Update` / `BackgroundJob.Delete`
- `BackgroundJob.Schedule` / `BackgroundJob.Schedule.Update` / `BackgroundJob.Schedule.Delete`
- `BackgroundJob.Execute` / `BackgroundJob.Dispatch`
- `EventBus.Publish` / `EventBus.PublishEnvelope` / `EventBus.PublishToBroker`
- `Outbox.Process`
- `Inbox.Process` / `Inbox.Invoke`

Each span includes provider-specific tags (`lock.provider`, `cache.provider`, `job.handler_name`, `job.scheduler`, `event.name`, `event.topic`, `event.broker`, etc.) and records exceptions following OpenTelemetry semantic conventions.

### Automatic Propagation

Trace context is automatically propagated:
- HTTP requests (W3C Trace Context headers)
- Dapr service invocation
- Event bus messages
- Distributed lock, cache, background job, and event bus operations (via `BBT.Aether.Infrastructure` ActivitySource)

### Manual Propagation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public static IServiceCollection AddAetherTelemetry(
}

tracing.AddSource("BBT.Aether.Aspects");
tracing.AddSource("BBT.Aether.Infrastructure");

// Custom sources
foreach (var src in opts.Tracing.AdditionalSources)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -9,6 +10,7 @@
using BBT.Aether.Events;
using BBT.Aether.Guids;
using BBT.Aether.MultiSchema;
using BBT.Aether.Telemetry;
using BBT.Aether.Uow;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -55,12 +57,22 @@ public async Task<Guid> EnqueueAsync<TPayload>(
if (string.IsNullOrWhiteSpace(schedule))
throw new ArgumentNullException(nameof(schedule));

using var activity = InfrastructureActivitySource.Source.StartActivity(
"BackgroundJob.Enqueue",
ActivityKind.Producer,
Activity.Current?.Context ?? default);

activity?.SetTag("job.handler_name", handlerName);
activity?.SetTag("job.name", jobName);
activity?.SetTag("job.schedule", schedule);

logger.LogInformation(
"Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'",
handlerName, jobName, schedule);

// Create job entity
var jobId = guidGenerator.Create();
activity?.SetTag("job.id", jobId.ToString());

// Convert metadata to nullable dictionary for ExtraPropertyDictionary
var extraProperties = new ExtraPropertyDictionary();
Expand Down Expand Up @@ -118,12 +130,14 @@ public async Task<Guid> EnqueueAsync<TPayload>(
"Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
handlerName, jobName, jobId);

activity?.SetStatus(ActivityStatusCode.Ok);
return jobId;
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName,
jobName);
RecordException(activity, ex);
await uow.RollbackAsync(cancellationToken);
throw;
}
Expand All @@ -138,6 +152,14 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can
if (string.IsNullOrWhiteSpace(newSchedule))
throw new ArgumentNullException(nameof(newSchedule));

using var activity = InfrastructureActivitySource.Source.StartActivity(
"BackgroundJob.Update",
ActivityKind.Producer,
Activity.Current?.Context ?? default);

activity?.SetTag("job.id", id.ToString());
activity?.SetTag("job.schedule", newSchedule);

logger.LogInformation("Updating job with entity id '{Id}' to new schedule '{NewSchedule}'", id, newSchedule);

await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
Expand All @@ -150,6 +172,9 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can
throw new InvalidOperationException($"Job with id '{id}' not found.");
}

activity?.SetTag("job.handler_name", jobInfo.HandlerName);
activity?.SetTag("job.name", jobInfo.JobName);

// Update schedule in entity
jobInfo.ExpressionValue = newSchedule;

Expand Down Expand Up @@ -179,10 +204,12 @@ await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedu
await uow.CommitAsync(cancellationToken);

logger.LogInformation("Successfully updated job with entity id '{Id}'", id);
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job with entity id '{Id}'", id);
RecordException(activity, ex);
await uow.RollbackAsync(cancellationToken);
throw;
}
Expand All @@ -194,6 +221,13 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken
if (id == Guid.Empty)
throw new ArgumentException("Id cannot be empty.", nameof(id));

using var activity = InfrastructureActivitySource.Source.StartActivity(
"BackgroundJob.Delete",
ActivityKind.Producer,
Activity.Current?.Context ?? default);

activity?.SetTag("job.id", id.ToString());

logger.LogInformation("Deleting job with entity id '{Id}'", id);

await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
Expand All @@ -204,9 +238,13 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken
if (jobInfo == null)
{
logger.LogWarning("Job with entity id '{Id}' not found", id);
activity?.SetStatus(ActivityStatusCode.Ok);
return false;
}

activity?.SetTag("job.handler_name", jobInfo.HandlerName);
activity?.SetTag("job.name", jobInfo.JobName);

// Delete from scheduler
await jobScheduler.DeleteAsync(jobInfo.HandlerName, jobInfo.JobName, cancellationToken);

Expand All @@ -218,13 +256,27 @@ await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow
await uow.CommitAsync(cancellationToken);

logger.LogInformation("Successfully deleted job with entity id '{Id}'", id);
activity?.SetStatus(ActivityStatusCode.Ok);
return true;
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to delete job with entity id '{Id}'", id);
RecordException(activity, ex);
await uow.RollbackAsync(cancellationToken);
throw;
}
}
}

private static void RecordException(Activity? activity, Exception ex)
{
if (activity == null) return;

activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName ?? ex.GetType().Name },
{ "exception.message", ex.Message },
}));
}
}
Loading
Loading