Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/KurrentDB.Core/Bus/InMemoryBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public partial class InMemoryBus : ISubscriber, IAsyncHandle<Message> {
public static InMemoryBus CreateTest(bool watchSlowMsg = true) =>
new("Test", watchSlowMsg);

public static readonly TimeSpan DefaultSlowMessageThreshold = TimeSpan.FromMilliseconds(48);
private static readonly ILogger Log = Serilog.Log.ForContext<InMemoryBus>();

private readonly FrozenDictionary<Type, MessageTypeHandler> _handlers;
Expand All @@ -34,7 +33,7 @@ public InMemoryBus(string name, bool watchSlowMsg = true, TimeSpan? slowMsgThres
Name = name;

if (watchSlowMsg)
_slowMsgThresholdMs = slowMsgThreshold.GetValueOrDefault(DefaultSlowMessageThreshold).TotalMilliseconds;
_slowMsgThresholdMs = slowMsgThreshold.GetValueOrDefault(TimeSpan.FromMilliseconds(ClusterVNodeOptions.InMemoryBusOptions.DefaultSlowMessageThreshold)).TotalMilliseconds;
}

public string Name { get; }
Expand Down
2 changes: 1 addition & 1 deletion src/KurrentDB.Core/Bus/QueuedHandlerThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public QueuedHandlerThreadPool(IAsyncHandle<Message> consumer,
_lifetimeToken = _lifetimeSource.Token;

_watchSlowMsg = watchSlowMsg;
_slowMsgThreshold = slowMsgThreshold ?? InMemoryBus.DefaultSlowMessageThreshold;
_slowMsgThreshold = slowMsgThreshold ?? TimeSpan.FromMilliseconds(ClusterVNodeOptions.InMemoryBusOptions.DefaultSlowMessageThreshold);
_threadStopWaitTimeout = threadStopWaitTimeout ?? DefaultStopWaitTimeout;

_queueMonitor = QueueMonitor.Default;
Expand Down
2 changes: 1 addition & 1 deletion src/KurrentDB.Core/ClusterVNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public ClusterVNode(ClusterVNodeOptions options,

var trackers = new Trackers();
var metricsConfiguration = MetricsConfiguration.Get(configuration);
MetricsBootstrapper.Bootstrap(metricsConfiguration, dbConfig, trackers);
MetricsBootstrapper.Bootstrap(metricsConfiguration, dbConfig, trackers, options.InMemoryBus.SlowMessageThresholdMs);

var namingStrategy = new VersionedPatternFileNamingStrategy(dbConfig.Path, "chunk-");
IChunkFileSystem fileSystem = new ChunkLocalFileSystem(namingStrategy);
Expand Down
10 changes: 10 additions & 0 deletions src/KurrentDB.Core/Configuration/ClusterVNodeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public partial record ClusterVNodeOptions {
[OptionGroup] public GrpcOptions Grpc { get; init; } = new();
[OptionGroup] public InterfaceOptions Interface { get; init; } = new();
[OptionGroup] public ProjectionOptions Projection { get; init; } = new();
[OptionGroup] public InMemoryBusOptions InMemoryBus { get; set; } = new();
public UnknownOptions Unknown { get; init; } = new([]);

public byte IndexBitnessVersion { get; init; } = PTableVersions.IndexV4;
Expand Down Expand Up @@ -85,6 +86,7 @@ public static ClusterVNodeOptions FromConfiguration(IConfigurationRoot configura
Grpc = configuration.BindOptions<GrpcOptions>(),
Interface = configuration.BindOptions<InterfaceOptions>(),
Projection = configuration.BindOptions<ProjectionOptions>(),
InMemoryBus = configuration.BindOptions<InMemoryBusOptions>(),

Unknown = UnknownOptions.FromConfiguration(configuration),
ConfigurationRoot = configurationRoot,
Expand Down Expand Up @@ -591,6 +593,14 @@ public record ProjectionOptions {
public int MaxProjectionStateSize { get; set; } = Opts.MaxProjectionStateSizeDefault;
}

[Description("InMemoryBus Options")]
public record InMemoryBusOptions {
public const int DefaultSlowMessageThreshold = 48;
[Description("The time limit in milliseconds beyond which a SLOW MSG would be logged"),
Unit("ms")]
public int SlowMessageThresholdMs { get; set; } = DefaultSlowMessageThreshold;
}

public record UnknownOptions(IReadOnlyList<(string, string)> Options) {
/// <summary>
/// Identifies unknown options in the configuration and provides suggestions for known options.
Expand Down
1 change: 1 addition & 0 deletions src/KurrentDB.Core/KurrentDB.Core.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
<LangVersion>preview</LangVersion>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>
<ItemGroup>
Expand Down
8 changes: 6 additions & 2 deletions src/KurrentDB.Core/Metrics/GCSuspensionMetric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ public class GcSuspensionMetric(DurationMaxTracker? tracker) : EventListener {
private static readonly ILogger Log = Serilog.Log.ForContext<GcSuspensionMetric>();

// Match DefaultSlowMessageThreshold so slow messages can be attributed to GC.
private static readonly TimeSpan LongSuspensionThreshold = InMemoryBus.DefaultSlowMessageThreshold;
private static TimeSpan LongSuspensionThreshold;
public static void ConfigureLongSuspensionThreshold(int ? _slowMessageThresholdMs) {
LongSuspensionThreshold = TimeSpan.FromMilliseconds(_slowMessageThresholdMs.GetValueOrDefault(ClusterVNodeOptions.InMemoryBusOptions
.DefaultSlowMessageThreshold));
}
private static readonly TimeSpan VeryLongSuspensionThreshold = TimeSpan.FromMilliseconds(600);
private static readonly TimeSpan LongSuspensionLogPeriod = TimeSpan.FromMilliseconds(10_000);

Expand Down Expand Up @@ -183,7 +187,7 @@ is GCStartType.BlockingOutsideBackgroundGC
_periodLongSuspensionCount = 0;
_periodLongSuspensionsElapsedTotal = TimeSpan.Zero;
} else {
// have logged recently
// have logged recently
_periodLongSuspensionCount++;
_periodLongSuspensionsElapsedTotal += elapsed;
}
Expand Down
3 changes: 2 additions & 1 deletion src/KurrentDB.Core/Metrics/ProcessMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class ProcessMetrics(Meter meter, TimeSpan timeout, int scrapingPeriodInS
private readonly Func<DiskIoData> _getDiskIo = Functions.Debounce(ProcessStats.GetDiskIo, timeout);
private readonly Func<Process> _getCurrentProc = Functions.Debounce(Process.GetCurrentProcess, timeout);

public void CreateObservableMetrics(Dictionary<ProcessTracker, string> metricNames) {
public void CreateObservableMetrics(Dictionary<ProcessTracker, string> metricNames, int ? _slowThresholdMs = null) {
var enabledNames = metricNames
.Where(kvp => config.TryGetValue(kvp.Key, out var enabled) && enabled)
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
Expand All @@ -34,6 +34,7 @@ public void CreateObservableMetrics(Dictionary<ProcessTracker, string> metricNam
if (enabledNames.TryGetValue(ProcessTracker.GcPauseDuration, out var gcMaxPauseName)) {
var maxGcPauseDurationMetric = new DurationMaxMetric(meter, gcMaxPauseName, legacyNames);
var maxGcPauseDurationTracker = new DurationMaxTracker(maxGcPauseDurationMetric, null, scrapingPeriodInSeconds);
GcSuspensionMetric.ConfigureLongSuspensionThreshold(_slowThresholdMs);
_ = new GcSuspensionMetric(maxGcPauseDurationTracker);
}

Expand Down
4 changes: 2 additions & 2 deletions src/KurrentDB.Core/MetricsBootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static class MetricsBootstrapper {
public static void Bootstrap(
Conf conf,
TFChunkDbConfig dbConfig,
Trackers trackers) {
Trackers trackers, int _slowMessageThresholdMs) {

OptionsFormatter.LogConfig("Metrics", conf);

Expand Down Expand Up @@ -307,7 +307,7 @@ public static void Bootstrap(
? $"{serviceName}-gc-total-allocated"
: $"{serviceName}-gc-allocated" },
{ Conf.ProcessTracker.GcPauseDuration, $"{serviceName}-gc-pause-duration-max" },
});
}, _slowMessageThresholdMs);

processMetrics.CreateMemoryMetric($"{serviceName}-proc-mem", new() {
{ Conf.ProcessTracker.MemWorkingSet, "working-set" },
Expand Down
4 changes: 2 additions & 2 deletions src/KurrentDB.Core/Services/VNode/ClusterVNodeController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public ClusterVNodeController(
_forwardingTimeout = TimeSpan.FromMilliseconds(options.Database.PrepareTimeoutMs +
options.Database.CommitTimeoutMs + 300);

_outputBus = new InMemoryBus("MainBus");
_outputBus = new InMemoryBus("MainBus", true, TimeSpan.FromMilliseconds(options.InMemoryBus.SlowMessageThresholdMs));
_fsm = CreateFSM();
_mainQueue = new QueuedHandlerThreadPool(_fsm, "MainQueue", statsManager, trackers.QueueTrackers);
_mainQueue = new QueuedHandlerThreadPool(_fsm, "MainQueue", statsManager, trackers.QueueTrackers, true, TimeSpan.FromMilliseconds(options.InMemoryBus.SlowMessageThresholdMs));
_publishEnvelope = _mainQueue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>
<None Remove="Prelude\1Prelude.js" />
Expand Down
12 changes: 7 additions & 5 deletions src/KurrentDB.Projections.Core/ProjectionCoreWorkersNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ namespace KurrentDB.Projections.Core;
public static class ProjectionCoreWorkersNode {
public static Dictionary<Guid, CoreWorker> CreateCoreWorkers(
StandardComponents standardComponents,
ProjectionsStandardComponents projectionsStandardComponents) {
ProjectionsStandardComponents projectionsStandardComponents , TimeSpan _slowMessageThresholdMs) {
var coreWorkers = new Dictionary<Guid, CoreWorker>();
while (coreWorkers.Count < projectionsStandardComponents.ProjectionWorkerThreadCount) {
var coreInputBus = new InMemoryBus("bus");
var coreInputBus = new InMemoryBus("bus" , true , _slowMessageThresholdMs);
var coreInputQueue = new QueuedHandlerThreadPool(coreInputBus,
"Projection Core #" + coreWorkers.Count,
standardComponents.QueueStatsManager,
standardComponents.QueueTrackers,
standardComponents.QueueTrackers, true,
_slowMessageThresholdMs,
groupName: "Projection Core");
var coreOutputBus = new InMemoryBus("output bus");
var coreOutputBus = new InMemoryBus("output bus" , true , _slowMessageThresholdMs);
var coreOutputQueue = new QueuedHandlerThreadPool(coreOutputBus,
"Projection Core #" + coreWorkers.Count + " output",
standardComponents.QueueStatsManager,
standardComponents.QueueTrackers,
standardComponents.QueueTrackers, true,
_slowMessageThresholdMs,
groupName: "Projection Core");
var workerId = Guid.NewGuid();
var projectionNode = new ProjectionWorkerNode(
Expand Down
19 changes: 13 additions & 6 deletions src/KurrentDB.Projections.Core/ProjectionsSubsystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public sealed class ProjectionsSubsystem : ISubsystem,
"$by_correlation_id"
};

public ProjectionsSubsystem(ProjectionSubsystemOptions projectionSubsystemOptions) {
private readonly TimeSpan slowMessageThresholdMs;

public ProjectionsSubsystem(ProjectionSubsystemOptions projectionSubsystemOptions , int? _slowMessageThresholdMs = null) {
if (projectionSubsystemOptions.RunProjections <= ProjectionType.System)
_projectionWorkerThreadCount = 1;
else
Expand All @@ -114,8 +116,9 @@ public ProjectionsSubsystem(ProjectionSubsystemOptions projectionSubsystemOption
_projectionsQueryExpiry = projectionSubsystemOptions.ProjectionQueryExpiry;
_faultOutOfOrderProjections = projectionSubsystemOptions.FaultOutOfOrderProjections;

_leaderInputBus = new InMemoryBus("manager input bus");
_leaderOutputBus = new InMemoryBus("ProjectionManagerAndCoreCoordinatorOutput");
slowMessageThresholdMs = TimeSpan.FromMilliseconds(_slowMessageThresholdMs ?? ClusterVNodeOptions.InMemoryBusOptions.DefaultSlowMessageThreshold);
_leaderInputBus = new InMemoryBus("manager input bus" , true , slowMessageThresholdMs);
_leaderOutputBus = new InMemoryBus("ProjectionManagerAndCoreCoordinatorOutput" , true , slowMessageThresholdMs);

_subsystemInitialized = new();
_executionTimeout = projectionSubsystemOptions.ExecutionTimeout;
Expand All @@ -142,13 +145,17 @@ public void ConfigureApplication(IApplicationBuilder builder, IConfiguration con
_leaderInputBus,
"Projections Leader",
standardComponents.QueueStatsManager,
standardComponents.QueueTrackers
standardComponents.QueueTrackers,
true,
this.slowMessageThresholdMs
);
_leaderOutputQueue = new QueuedHandlerThreadPool(
_leaderOutputBus,
"Projections Leader",
standardComponents.QueueStatsManager,
standardComponents.QueueTrackers
standardComponents.QueueTrackers,
true,
this.slowMessageThresholdMs
);

LeaderInputBus.Subscribe<ProjectionSubsystemMessage.RestartSubsystem>(this);
Expand Down Expand Up @@ -177,7 +184,7 @@ public void ConfigureApplication(IApplicationBuilder builder, IConfiguration con
projectionTrackers);

CreateAwakerService(standardComponents);
_coreWorkers = ProjectionCoreWorkersNode.CreateCoreWorkers(standardComponents, projectionsStandardComponents);
_coreWorkers = ProjectionCoreWorkersNode.CreateCoreWorkers(standardComponents, projectionsStandardComponents , this.slowMessageThresholdMs);
_queueMap = _coreWorkers.ToDictionary(v => v.Key, v => v.Value.CoreInputQueue.As<IPublisher>());

ProjectionManagerNode.CreateManagerService(standardComponents, projectionsStandardComponents, _queueMap,
Expand Down
2 changes: 1 addition & 1 deletion src/KurrentDB/ClusterVNodeHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ClusterVNodeHostedService(
options.Projection.FaultOutOfOrderProjections,
options.Projection.ProjectionCompilationTimeout,
options.Projection.ProjectionExecutionTimeout,
options.Projection.MaxProjectionStateSize)))
options.Projection.MaxProjectionStateSize) , options.InMemoryBus.SlowMessageThresholdMs))
: options;

if (!_options.Database.MemDb) {
Expand Down
Loading