From c270d0c7f1fb12a57bb295464a83a8dfad97f9bd Mon Sep 17 00:00:00 2001 From: "J. Ritchie Carroll" Date: Mon, 23 Feb 2026 23:59:02 -0600 Subject: [PATCH] CsvAdapters: Sync'd improvements to CsvInputAdapter --- .../Adapters/CsvAdapters/CsvInputAdapter.cs | 535 +++++++++++++++--- 1 file changed, 441 insertions(+), 94 deletions(-) diff --git a/Source/Libraries/Adapters/CsvAdapters/CsvInputAdapter.cs b/Source/Libraries/Adapters/CsvAdapters/CsvInputAdapter.cs index d37df84fd0..8791a95d6f 100755 --- a/Source/Libraries/Adapters/CsvAdapters/CsvInputAdapter.cs +++ b/Source/Libraries/Adapters/CsvAdapters/CsvInputAdapter.cs @@ -29,10 +29,12 @@ using System; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.ComponentModel; using System.Diagnostics; using System.IO; using System.Linq; +using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Timers; @@ -54,19 +56,150 @@ public class CsvInputAdapter : InputAdapterBase { #region [ Members ] + // Nested Types + + // Timer class used to precisely pace CSV play-back speed. Uses absolute deadline + // advancement to maintain consistent inter-frame intervals. For longer intervals, + // e.g., 30 FPS / ~33ms, a single bulk Thread.Sleep is used for most of the wait, + // followed by yield/spin for the final approach — minimizing OS scheduler jitter + // compared to many individual Thread.Sleep(1) calls. + private sealed class PacingTimer + { + // Guard-band in milliseconds: we sleep until this much + // time remains, then yield/spin to the exact deadline + private const double SleepGuardBand = 2.0D; + + private readonly long m_periodTicks; // Query Performance Counter (QPC) ticks per frame + private readonly long m_guardBandTicks; // QPC ticks for the guard-band + private long m_nextTick; // Next scheduled QPC tick (not equal to DateTime ticks) + + /// + /// Creates a new instance. + /// + /// The defined interval in milliseconds. + public PacingTimer(double interval) + { + if (interval <= 0) + throw new ArgumentOutOfRangeException(nameof(interval)); + + m_periodTicks = (long)Math.Round(Stopwatch.Frequency * interval / 1000.0D); + m_guardBandTicks = (long)Math.Round(Stopwatch.Frequency * SleepGuardBand / 1000.0D); + + if (m_periodTicks <= 0L) + throw new ArgumentOutOfRangeException(nameof(interval), $"{nameof(InputInterval)} is too small for the timer resolution."); + } + + /// + /// Set the next waiting period based on current time. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void SetNextPeriod() + { + m_nextTick = Stopwatch.GetTimestamp() + m_periodTicks; + } + + /// + /// Blocks until the next scheduled waiting period. + /// + /// + /// Flag that determines if next waiting period should be set; when true, the + /// deadline advances by exactly one period from the current deadline (absolute + /// cadence) to keep inter-frame intervals consistent. If the deadline has already + /// fallen behind by more than one full period, it resets to "now + period" to + /// prevent a burst of catch-up frames. When false, caller must manually + /// call to schedule the next waiting period. + /// + public void WaitNext(bool setNextPeriod = true) + { + long nextTick = m_nextTick; + + if (nextTick == 0L) + throw new InvalidOperationException("Next period has not been set. Call SetNextPeriod() before WaitNext()."); + + if (setNextPeriod) + { + // Advance deadline absolutely from previous deadline to maintain + // a steady cadence regardless of per-frame processing jitter + long advancedTick = nextTick + m_periodTicks; + + // If the advanced deadline is already in the past, we've fallen + // behind by more than a full period — reset to relative mode to + // avoid a burst of catch-up frames + m_nextTick = Stopwatch.GetTimestamp() >= advancedTick + ? Stopwatch.GetTimestamp() + m_periodTicks + : advancedTick; + } + else + { + m_nextTick = 0L; + } + + // Fast exit if already past deadline (e.g., processing took longer than interval) + if (Stopwatch.GetTimestamp() >= nextTick) + return; + + long ticksRemaining = nextTick - Stopwatch.GetTimestamp(); + + // For longer waits, sleep in one bulk call leaving only a small + // guard-band for spin/yield — this dramatically reduces the number + // of OS scheduler interactions compared to repeated Sleep(1) calls + if (ticksRemaining > m_guardBandTicks) + { + long sleepTicks = ticksRemaining - m_guardBandTicks; + int sleepMs = (int)(sleepTicks * 1000L / Stopwatch.Frequency); + + if (sleepMs > 0) + Thread.Sleep(sleepMs); + } + + // Yield / spin for the remaining guard-band to hit precise deadline + while (true) + { + long now = Stopwatch.GetTimestamp(); + + if (now >= nextTick) + return; + + double remaining = (nextTick - now) * 1000.0 / Stopwatch.Frequency; + + if (remaining >= 0.3D) + { + // Yield to reduce CPU but avoid oversleeping + Thread.Yield(); + } + else + { + // Spin to hit sub-millisecond cadence + SpinWait sw = new(); + + while (Stopwatch.GetTimestamp() < nextTick) + sw.SpinOnce(); + + return; + } + } + } + } + + // Constants + private const double DefaultInputInterval = 1000.0D / 30.0D; + private const int DefaultMeasurementsPerInterval = 5; + // Fields private StreamReader? m_inStream; private string? m_header; private readonly Dictionary m_columns; private readonly Dictionary m_columnMappings; + private ReadOnlyDictionary? m_signalIDPublishOrder; - private Timer? m_looseTimer; + private PacingTimer? m_pacingTimer; private LongSynchronizedOperation? m_readRow; private PrecisionInputTimer? m_precisionTimer; private long[]? m_subsecondDistribution; private long m_previousSecond; private int m_previousFrameIndex; + private bool m_subSecondCorrection = true; private bool m_disposed; @@ -82,8 +215,11 @@ public CsvInputAdapter() FileName = "measurements.csv"; m_columns = new Dictionary(StringComparer.CurrentCultureIgnoreCase); m_columnMappings = new Dictionary(); - InputInterval = 33.333333; - MeasurementsPerInterval = 5; + + // ReSharper disable VirtualMemberCallInConstructor + InputInterval = DefaultInputInterval; + MeasurementsPerInterval = DefaultMeasurementsPerInterval; + AutoAssignMappingsToOutputs = true; // Set minimum timer resolution to one millisecond to improve timer accuracy PrecisionTimer.SetMinimumTimerResolution(1); @@ -102,12 +238,12 @@ public CsvInputAdapter() public string FileName { get; set; } /// - /// Gets or sets the interval of time between sending frames into the concentrator. + /// Gets or sets the interval of time between reading rows from the CSV file. /// [ConnectionStringParameter] - [Description("Define the interval of time, in milliseconds, between sending frames into the concentrator.")] - [DefaultValue(33.333333)] - public double InputInterval { get; set; } + [Description("Define the interval of time, in milliseconds, between reading rows from the CSV file.")] + [DefaultValue(DefaultInputInterval)] + public virtual double InputInterval { get; set; } /// /// Gets or sets value that determines if the CSV input file data should be replayed repeatedly. @@ -126,15 +262,14 @@ public CsvInputAdapter() public int SkipRows { get; set; } /// - /// Gets or sets flag that determines if a high-resolution precision timer should be used for CSV file based input. + /// Gets or sets flag that determines if a high-resolution timer should be used for CSV file based input. /// /// - /// Useful when input frames need be accurately time-aligned to the local clock to better simulate - /// an input device and calculate downstream latencies.
- /// This is only applicable when connection is made to a file for replay purposes. + /// Useful when input needs be accurately time-aligned to a specified frame rate. Maximum interval is 1ms (1000 FPS). + /// Set to false if faster inputs are needed, e.g., 0.3333333ms (3000 FPS) or greater. ///
[ConnectionStringParameter] - [Description("Determines if a high-resolution precision timer should be used for CSV file based input.")] + [Description("Determines if a high-resolution timer should be used for CSV file based input.")] [DefaultValue(false)] public bool UseHighResolutionInputTimer { @@ -145,7 +280,7 @@ public bool UseHighResolutionInputTimer { // Note that a 1-ms timer and debug mode don't mix, so the high-resolution timer is disabled while debugging case true when m_precisionTimer is null && !Debugger.IsAttached: - m_precisionTimer = PrecisionInputTimer.Attach((int)(1000.0D / InputInterval), ex => OnProcessException(MessageLevel.Warning, ex)); + m_precisionTimer = PrecisionInputTimer.Attach(FrameRate, ex => OnProcessException(MessageLevel.Warning, ex)); break; case false when m_precisionTimer is not null: PrecisionInputTimer.Detach(ref m_precisionTimer); @@ -159,7 +294,7 @@ public bool UseHighResolutionInputTimer ///
[ConnectionStringParameter] [Description("Define the number of measurements that are read from the CSV file in each frame.")] - [DefaultValue(5)] + [DefaultValue(DefaultMeasurementsPerInterval)] public int MeasurementsPerInterval { get; set; } /// @@ -180,12 +315,60 @@ public bool UseHighResolutionInputTimer public bool SimulateTimestamp { get; set; } /// - /// Defines the column mappings, must be defined: e.g., 0=Timestamp; 1=PPA:12; 2=PPA13. + /// Manually defines the column mappings, e.g.: "0=Timestamp; 1=PPA:12; 2=PPA13". Use + /// instead to automatically assign column mappings to output measurements when transverse mode is enabled. /// [ConnectionStringParameter] - [Description("Defines the column mappings must defined: e.g., \"0=Timestamp; 1=PPA:12; 2=PPA13\".")] + [Description($""" + Manually defines the column mappings, e.g.: "0=Timestamp; 1=PPA:12; 2=PPA13". Use '{nameof(AutoMapToOutputMeasurements)}' + instead to automatically assign column mappings to output measurements when transverse mode is enabled. + """)] [DefaultValue("")] - public int ColumnMappings { get; set; } + public string ColumnMappings { get; set; } = null!; + + /// + /// Gets or sets flags that determines whether to automatically assign column mappings to output measurements when transverse mode is enabled. Output measurement definition order preserved. + /// + [ConnectionStringParameter] + [Description("Determines whether to automatically assign column mappings to output measurements when transverse mode is enabled. Output measurement definition order preserved.")] + [DefaultValue(false)] + public bool AutoMapToOutputMeasurements { get; set; } + + /// + /// Gets or sets the column index for timestamp values when is enabled. + /// + /// + /// Value is required when is enabled to identify which column contains timestamp values. + /// + [ConnectionStringParameter] + [Description($"Defines the column index for timestamp values when '{nameof(AutoMapToOutputMeasurements)}' is enabled.")] + [DefaultValue(0)] + public int TimestampColumnIndex { get; set; } + + /// + /// Gets or sets a flag that determines whether to automatically assign column mappings to output measurements when transverse mode is enabled. + /// + /// + /// Value will be set to false if is enabled. No need to reassign column mappings to output + /// measurements if they are already being used as source for column mappings. + /// + [ConnectionStringParameter] + [Description("Defines flag that determines whether to automatically assign column mappings to output measurements when transverse mode is enabled.")] + [DefaultValue(true)] + public bool AutoAssignMappingsToOutputs { get; set; } + + /// + /// Gets or sets flag that determines whether to round timestamps to the inferred . + /// + [ConnectionStringParameter] + [Description("Defines flag that determines whether to round timestamps to the inferred 'FrameRate'.")] + [DefaultValue(true)] + public bool RoundTimestampsToFrameRate { get; set; } + + /// + /// Gets inferred frame rate based on the defined . + /// + public int FrameRate { get; private set; } /// /// Gets a flag that determines if this @@ -193,6 +376,40 @@ public bool UseHighResolutionInputTimer /// protected override bool UseAsyncConnect => false; + /// + /// Gets a read-only mapping of measurement signal ID to zero-based publish order index. + /// + /// + /// This excludes the timestamp column mapping and all indexes are normalized to + /// zero-based index sequential to match publication order. + /// + protected ReadOnlyDictionary SignalIDPublishOrder + { + get + { + if (m_signalIDPublishOrder is not null) + return m_signalIDPublishOrder; + + Dictionary signalIDPublishOrder = new(); + int publishOrder = 0; + + foreach (KeyValuePair kvp in m_columnMappings.OrderBy(kvp => kvp.Key)) + { + int index = kvp.Key; + IMeasurement measurement = kvp.Value; + + if (index == TimestampColumnIndex) + continue; + + signalIDPublishOrder[measurement.ID] = publishOrder++; + } + + m_signalIDPublishOrder = new ReadOnlyDictionary(signalIDPublishOrder); + + return m_signalIDPublishOrder; + } + } + /// /// Returns the detailed status of this . /// @@ -205,16 +422,32 @@ public override string Status status.Append(base.Status); status.AppendLine(); status.AppendLine($" File name: {FilePath.TrimFileName(FileName, 51)}"); - status.AppendLine($" File header: {m_header}"); - status.AppendLine($" Input interval: {InputInterval:N3}"); + status.AppendLine($" File header: {m_header.TruncateRight(51)}"); + status.AppendLine($" Input interval: {InputInterval:N3} ({FrameRate} fps)"); status.AppendLine($" Measurements per interval: {MeasurementsPerInterval:N0}"); + status.AppendLine($" Simulate timestamp: {SimulateTimestamp}"); + status.AppendLine($" Round time to frame rate: {RoundTimestampsToFrameRate}"); status.AppendLine($" Using transverse mode: {TransverseMode}"); + + if (TransverseMode) + { + status.AppendLine($"Auto map to config outputs: {AutoMapToOutputMeasurements}"); + + if (AutoMapToOutputMeasurements) + status.AppendLine($" Timestamp column index: {TimestampColumnIndex:N0}"); + + status.AppendLine($"Auto assign map to outputs: {AutoAssignMappingsToOutputs}"); + } + status.AppendLine($" Auto-repeat: {AutoRepeat}"); status.AppendLine($" Precision input timer: {(UseHighResolutionInputTimer ? "Enabled" : "Offline")}"); status.AppendLine($" Lines to skip: {SkipRows:N0}"); if (m_precisionTimer is not null) - status.AppendLine($" Timer resynchronizations: {m_precisionTimer.Resynchronizations}"); + { + status.Append($" Timer resynchronizations: {m_precisionTimer.Resynchronizations}"); + status.AppendLine(); + } return status.ToString(); } @@ -245,10 +478,7 @@ protected override void Dispose(bool disposing) if (UseHighResolutionInputTimer) PrecisionInputTimer.Detach(ref m_precisionTimer); - else - m_looseTimer?.Dispose(); - m_looseTimer = null; m_readRow = null; m_inStream?.Dispose(); @@ -280,12 +510,19 @@ public override void Initialize() if (settings.TryGetValue(nameof(InputInterval), out setting)) InputInterval = double.Parse(setting); + FrameRate = (int)Math.Round(1000.0D / InputInterval); + if (settings.TryGetValue(nameof(MeasurementsPerInterval), out setting)) MeasurementsPerInterval = int.Parse(setting); if (settings.TryGetValue(nameof(SimulateTimestamp), out setting)) SimulateTimestamp = setting.ParseBoolean(); + if (settings.TryGetValue(nameof(RoundTimestampsToFrameRate), out setting)) + RoundTimestampsToFrameRate = setting.ParseBoolean(); + else + RoundTimestampsToFrameRate = true; + if (settings.TryGetValue(nameof(TransverseMode), out setting) || settings.TryGetValue("transverse", out setting)) TransverseMode = setting.ParseBoolean(); @@ -306,12 +543,55 @@ public override void Initialize() if (!UseHighResolutionInputTimer) { - m_looseTimer = new Timer(); + m_pacingTimer = new PacingTimer(InputInterval); m_readRow = new LongSynchronizedOperation(ReadRow, ex => OnProcessException(MessageLevel.Warning, ex)); } if (TransverseMode) { + if (settings.TryGetValue(nameof(AutoMapToOutputMeasurements), out setting)) + AutoMapToOutputMeasurements = setting.ParseBoolean(); + + if (AutoMapToOutputMeasurements) + { + if (settings.TryGetValue(nameof(ColumnMappings), out setting) && !string.IsNullOrWhiteSpace(setting)) + OnStatusMessage(MessageLevel.Warning, $"Manually assigned '{nameof(ColumnMappings)}' will be ignored since '{nameof(AutoMapToOutputMeasurements)}' is enabled in transverse mode."); + + if (OutputMeasurements?.Length == 0) + throw new InvalidOperationException($"Output measurements must be defined when using '{nameof(AutoMapToOutputMeasurements)}' in transverse mode."); + + if (settings.TryGetValue(nameof(TimestampColumnIndex), out setting) && int.TryParse(setting, out int index)) + TimestampColumnIndex = index; + + // No need to assign mappings to outputs if they are already being used as source + AutoAssignMappingsToOutputs = false; + + StringBuilder columnMappings = new(); + + // Auto-assign column mappings based on output measurements + bool timestampAssigned = false; + + for (int i = 0; i < OutputMeasurements!.Length; i++) + { + if (i > 0) + columnMappings.Append(';'); + + if (i == TimestampColumnIndex) + { + columnMappings.Append($"{i}=Timestamp;"); + timestampAssigned = true; + } + + int columnIndex = timestampAssigned ? i + 1 : i; + columnMappings.Append($"{columnIndex}={OutputMeasurements[i].ID}"); + } + + if (TimestampColumnIndex >= OutputMeasurements.Length) + columnMappings.Append($";{OutputMeasurements.Length}=Timestamp"); + + settings[nameof(ColumnMappings)] = columnMappings.ToString(); + } + // Load column mappings: if (settings.TryGetValue(nameof(ColumnMappings), out setting)) { @@ -323,25 +603,31 @@ public override void Initialize() columnMappings[index] = mapping.Value; } - if (!SimulateTimestamp && !columnMappings.Values.Contains("Timestamp", StringComparer.OrdinalIgnoreCase)) - throw new InvalidOperationException("One of the column mappings must be defined as a \"Timestamp\": e.g., columnMappings={0=Timestamp; 1=PPA:12; 2=PPA13}."); + KeyValuePair timestampColumn = columnMappings.FirstOrDefault(kvp => string.Compare(kvp.Value, "Timestamp", StringComparison.OrdinalIgnoreCase) == 0); + bool timestampColumnDefined = timestampColumn.Value is not null; - // In transverse mode, maximum measurements per interval is set to maximum columns in input file - MeasurementsPerInterval = columnMappings.Keys.Max() + 1; + // Lookup timestamp index + if (!SimulateTimestamp) + { + if (!timestampColumnDefined) + throw new InvalidOperationException("No \"Timestamp\" column mapping found: when not simulating timestamp in transverse mode, one of the column mappings must be defined as \"Timestamp\": e.g., columnMappings={0=Timestamp; 1=PPA:12; 2=PPA13}."); + + // Assign configured timestamp column index when auto-mapping is disabled (value is used by SignalIDPublishOrder map) + if (!AutoMapToOutputMeasurements) + TimestampColumnIndex = timestampColumn.Key; + } // Auto-assign output measurements based on column mappings - OutputMeasurements = columnMappings.Where(kvp => string.Compare(kvp.Value, "Timestamp", StringComparison.OrdinalIgnoreCase) != 0).Select(IMeasurement (kvp) => + IMeasurement[] outputMeasurements = columnMappings.Where(kvp => string.Compare(kvp.Value, "Timestamp", StringComparison.OrdinalIgnoreCase) != 0).Select(IMeasurement (kvp) => { string measurementID = kvp.Value; - Measurement measurement = new(); - MeasurementKey key; + MeasurementKey key = ParseInputMeasurementKeys(DataSource, false, measurementID).FirstOrDefault() ?? MeasurementKey.Undefined; - if (Guid.TryParse(measurementID, out Guid id)) - key = MeasurementKey.LookUpBySignalID(id); - else - MeasurementKey.TryParse(measurementID, out key); + if (key == MeasurementKey.Undefined) + throw new InvalidOperationException($"Unable to parse measurement identifier of expression \"{kvp.Key}={measurementID}\" defined in column mappings as a point tag, measurement key, Guid-based signal ID or filter expression."); - measurement.Metadata = key.Metadata; + Measurement measurement = new() { Metadata = key.Metadata }; + Debug.Assert(measurement.Metadata is not null); // Associate measurement with column index m_columnMappings[kvp.Key] = measurement; @@ -349,17 +635,29 @@ public override void Initialize() return measurement; }).ToArray(); + // In transverse mode, maximum measurements per interval is set to maximum column index in input file + MeasurementsPerInterval = m_columnMappings.Keys.Max(); + + // If timestamp column is included within column mappings and its index is not last in the set, + // increment measurements per interval so that publish count will include all mapped columns + if (SimulateTimestamp || (timestampColumnDefined && TimestampColumnIndex < MeasurementsPerInterval)) + MeasurementsPerInterval++; + + if (!AutoMapToOutputMeasurements && settings.TryGetValue(nameof(AutoAssignMappingsToOutputs), out setting)) + AutoAssignMappingsToOutputs = setting.ParseBoolean(); + + if (AutoAssignMappingsToOutputs) + OutputMeasurements = outputMeasurements; + if (!SimulateTimestamp) { - int timestampColumn = columnMappings.First(kvp => string.Compare(kvp.Value, "Timestamp", StringComparison.OrdinalIgnoreCase) == 0).Key; - // Reserve a column mapping for timestamp value IMeasurement timestampMeasurement = new Measurement { Metadata = new MeasurementMetadata(null!, "Timestamp", 0, 1, null) }; - m_columnMappings[timestampColumn] = timestampMeasurement; + m_columnMappings[TimestampColumnIndex] = timestampMeasurement; } } else @@ -371,13 +669,6 @@ public override void Initialize() // Override input interval based on temporal processing interval if it's not set to default if (ProcessingInterval > -1) InputInterval = ProcessingInterval == 0 ? 1 : ProcessingInterval; - - if (m_looseTimer is null) - return; - - m_looseTimer.Interval = InputInterval; - m_looseTimer.AutoReset = true; - m_looseTimer.Elapsed += m_looseTimer_Elapsed; } /// @@ -387,7 +678,7 @@ protected override void AttemptConnection() { m_inStream = new StreamReader(File.Open(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)); - // Skip specified number of header lines that exist before column heading definitions + // Skip configured number of header lines that exist before column heading definitions for (int i = 0; i < SkipRows; i++) m_inStream.ReadLine(); @@ -409,8 +700,9 @@ protected override void AttemptConnection() } else { - // Start common timer - m_looseTimer!.Start(); + // Start replay timer + m_pacingTimer!.SetNextPeriod(); + m_readRow!.RunOnceAsync(); } } @@ -426,9 +718,6 @@ protected override void AttemptDisconnection() } m_inStream = null; - - if (!UseHighResolutionInputTimer) - m_looseTimer!.Stop(); } /// @@ -441,48 +730,67 @@ public override string GetShortStatus(int maxLength) return $"{ProcessedMeasurements} measurements read from CSV file.".CenterText(maxLength); } - // Handler for loose timer measurements processing - private void m_looseTimer_Elapsed(object? sender, ElapsedEventArgs e) - { - m_readRow!.Run(); - } - private void ReadRow() { if (!Enabled) return; if (ReadNextRecord(DateTime.UtcNow.Ticks)) + { + m_pacingTimer!.WaitNext(); + m_readRow!.RunOnceAsync(); return; + } ReadComplete(); + + if (!AutoRepeat) + return; + + m_pacingTimer!.WaitNext(); + m_readRow!.RunOnceAsync(); } // Handler for precision timer measurements processing private void ProcessMeasurements() { - if (m_precisionTimer is null) - return; + do + { + if (m_precisionTimer is null) + return; - // When high resolution input timing is requested, we only need to wait for the next signal... - while (Enabled && ReadToFrame(m_precisionTimer.LastFrameTime)) - m_precisionTimer.FrameWaitHandle?.Wait(); + // When high resolution input timing is requested, we only need to wait for the next signal... + while (Enabled && ReadToFrame(m_precisionTimer.LastFrameTime)) + m_precisionTimer.FrameWaitHandle?.Wait(); - if (!Enabled) - return; + if (!Enabled) + return; - ReadComplete(); + ReadComplete(); + } + while (AutoRepeat); } private void ReadComplete() { - AttemptDisconnection(); - if (!AutoRepeat) + { + AttemptDisconnection(); return; + } OnStatusMessage(MessageLevel.Info, "Restarting CSV read for auto-repeat."); - AttemptConnection(); + Debug.Assert(m_inStream is not null); + + m_inStream!.BaseStream.Position = 0L; + m_inStream.DiscardBufferedData(); + + // Skip configured number of header lines that exist before column heading definitions + for (int i = 0; i < SkipRows; i++) + m_inStream.ReadLine(); + + // Skip header line + m_inStream.ReadLine(); } // Attempt to read as many records as necessary to reach the target frame index @@ -495,29 +803,47 @@ private bool ReadToFrame(long targetFrameTime) { if (m_subsecondDistribution is null) { - m_subsecondDistribution = Ticks.SubsecondDistribution(m_precisionTimer.FramesPerSecond) + m_subsecondDistribution = Ticks.SubsecondDistribution(FrameRate) .Select(subsecond => subsecond.Value) .ToArray(); return jumpToFrameTime(); } + if (m_subSecondCorrection) + return jumpToFrameTime(); + if (targetFrameTime - m_previousSecond > TimeSpan.FromSeconds(5.0D).Ticks) + { + m_subSecondCorrection = true; return jumpToFrameTime(); + } long currentFrameTime = calculateFrameTime(); while (currentFrameTime < targetFrameTime) { - m_previousFrameIndex = (m_previousFrameIndex + 1) % m_precisionTimer.FramesPerSecond; + m_previousFrameIndex = (m_previousFrameIndex + 1) % FrameRate; if (m_previousFrameIndex == 0) m_previousSecond += Ticks.PerSecond; currentFrameTime = calculateFrameTime(); - if (!ReadNextRecord(currentFrameTime)) - return false; + if (ReadNextRecord(currentFrameTime)) + continue; + + if (m_previousFrameIndex == 0) + { + m_previousSecond -= Ticks.PerSecond; + m_previousFrameIndex = FrameRate - 1; + } + else + { + m_previousFrameIndex--; + } + + return false; } } catch (Exception ex) @@ -540,15 +866,17 @@ bool jumpToFrameTime() TimeSpan targetFrameSubsecond = targetFrameSpan - targetFrameSecond; m_previousSecond = targetFrameSecond.Ticks; - m_previousFrameIndex = (int)Math.Round(targetFrameSubsecond.TotalSeconds * m_precisionTimer.FramesPerSecond); + m_previousFrameIndex = (int)Math.Round(targetFrameSubsecond.TotalSeconds * FrameRate); + if (m_subSecondCorrection && m_previousFrameIndex != 0) + return true; long frameTime = calculateFrameTime(); - return ReadNextRecord(frameTime); + return ReadNextRecord(frameTime, m_previousFrameIndex); } } // Attempt to read the next record - private bool ReadNextRecord(long currentTime) + private bool ReadNextRecord(long currentTime, int skip = 0) { if (m_inStream is null) return false; @@ -559,16 +887,27 @@ private bool ReadNextRecord(long currentTime) long fileTime = 0; int timestampColumn = 0; - string? line = m_inStream.ReadLine(); + string? line; + int index = 0; + + do + { + line = m_inStream.ReadLine(); + index++; + } while (index <= skip); // Null line indicates end of file, return false if (line is null) + { + m_subSecondCorrection = true; return false; + } // Parse line of CSV file accounting for quoted fields - string[] fields = ParseCsvLine(line.Trim()); + string[] fields = ParseCSVLine(line.Trim()); + m_subSecondCorrection = false; - if (m_inStream.EndOfStream || fields.Length < m_columns.Count) + if (fields.Length < m_columns.Count) return false; // Read time from Timestamp column in transverse mode @@ -592,27 +931,33 @@ private bool ReadNextRecord(long currentTime) if (TransverseMode) { // No measurement will be defined for timestamp column - if (!SimulateTimestamp && i == timestampColumn) + if (i == timestampColumn) continue; - if (!m_columnMappings.TryGetValue(i, out measurement)) - continue; - - measurement = Measurement.Clone(measurement); - - try + if (m_columnMappings.TryGetValue(i, out measurement)) { - measurement.Value = double.Parse(fields[i]); + measurement = Measurement.Clone(measurement); + + try + { + measurement.Value = double.Parse(fields[i]); + } + catch + { + measurement.Value = double.NaN; + } } - catch + else { + measurement = new Measurement(); + measurement.Metadata = MeasurementKey.Undefined.Metadata; measurement.Value = double.NaN; } if (SimulateTimestamp) - measurement.Timestamp = currentTime; + measurement.Timestamp = RoundTimestampsToFrameRate ? Ticks.RoundToSubsecondDistribution(currentTime, FrameRate) : currentTime; else if (m_columns.ContainsKey("Timestamp")) - measurement.Timestamp = fileTime; + measurement.Timestamp = RoundTimestampsToFrameRate ? Ticks.RoundToSubsecondDistribution(fileTime, FrameRate) : fileTime; } else { @@ -632,9 +977,11 @@ private bool ReadNextRecord(long currentTime) } if (SimulateTimestamp) - measurement.Timestamp = currentTime; - else if (m_columns.TryGetValue("Timestamp", out int timeColumn)) - measurement.Timestamp = long.Parse(fields[timeColumn]); + measurement.Timestamp = RoundTimestampsToFrameRate ? Ticks.RoundToSubsecondDistribution(currentTime, FrameRate) : currentTime; + else if (m_columns.TryGetValue("Timestamp", out int timeColumn) && long.TryParse(fields[timeColumn], out fileTime)) + measurement.Timestamp = RoundTimestampsToFrameRate ? Ticks.RoundToSubsecondDistribution(fileTime, FrameRate) : fileTime; + else + throw new FormatException($"Failed to parse timestamp value '{fields[timeColumn]}' as a long integer."); if (m_columns.TryGetValue("Value", out int valueColumn)) measurement.Value = double.Parse(fields[valueColumn]); @@ -654,7 +1001,7 @@ private bool ReadNextRecord(long currentTime) } // Parse a CSV line into fields, accounting for quoted fields - private static string[] ParseCsvLine(string line) + private static string[] ParseCSVLine(string line) { List fields = []; bool inQuotes = false;