diff --git a/src/dsstats.decode/DecodeService.cs b/src/dsstats.decode/DecodeService.cs index d8c14054..8eb5fab8 100644 --- a/src/dsstats.decode/DecodeService.cs +++ b/src/dsstats.decode/DecodeService.cs @@ -1,343 +1,59 @@ -using dsstats.challenge.Services; using dsstats.shared; using Microsoft.Extensions.Options; -using pax.dsstats.parser; using s2protocol.NET; -using System.Collections.Concurrent; -using System.Security.Cryptography; using System.Text.RegularExpressions; namespace dsstats.decode; -public partial class DecodeService(IOptions decodeSettings, - IHttpClientFactory httpClientFactory, - ILogger logger) +public partial class DecodeService( + IReplayQueue replayQueue, + IOptions decodeSettings, + ILogger logger) { - - private readonly SemaphoreSlim ss = new(1, 1); - private readonly SemaphoreSlim ssRaw = new(1, 1); - private readonly SemaphoreSlim fileSemaphore = new SemaphoreSlim(1, 1); - private ReplayDecoder? replayDecoder; - private int queueCount = 0; - private ConcurrentBag excludeReplays = []; - - public EventHandler? DecodeFinished; - public EventHandler? DecodeRawFinished; - - private async void OnDecodeFinished(DecodeEventArgs e) - { - var httpClient = httpClientFactory.CreateClient("callback"); - try - { - var result = await httpClient.PostAsJsonAsync($"/api8/v1/upload/decoderesult/{e.Guid}", e.IhReplays); - result.EnsureSuccessStatusCode(); - } - catch (Exception ex) - { - logger.LogError("failed reporting decoderesult: {error}", ex.Message); - } - DecodeFinished?.Invoke(this, e); - } - - private async void OnDecodeRawFinished(DecodeRawEventArgs e) - { - var httpClient = httpClientFactory.CreateClient("callback"); - try - { - var result = await httpClient.PostAsJsonAsync($"/api8/v1/upload/decoderawresult/{e.Guid}", e.ChallengeResponses); - result.EnsureSuccessStatusCode(); - } - catch (Exception ex) - { - logger.LogError("failed reporting decoderesult: {error}", ex.Message); - } - DecodeRawFinished?.Invoke(this, e); - } + private readonly IReplayQueue replayQueue = replayQueue; + private readonly DecodeSettings decodeSettings = decodeSettings.Value; + private readonly ILogger logger = logger; public async Task SaveReplays(Guid guid, List files) { - return await SaveReplays(guid, files, decodeSettings.Value.ReplayFolders.ToDo); + return await SaveAndQueueFiles(guid, files, inHouse: true); } public async Task SaveReplaysRaw(Guid guid, List files) { - return await SaveReplays(guid, files, decodeSettings.Value.ReplayFolders.ToDoRaw); + return await SaveAndQueueFiles(guid, files, inHouse: false); } - private async Task SaveReplays(Guid guid, List files, string folder) + private async Task SaveAndQueueFiles(Guid guid, List files, bool inHouse) { - int filesSaved = 0; - - try - { - foreach (var formFile in files) - { - if (formFile.Length > 0) - { - string fileHash; - using (var md5 = MD5.Create()) - { - using var stream = formFile.OpenReadStream(); - fileHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLowerInvariant(); - } - - var destinationFile = Path.Combine(decodeSettings.Value.ReplayFolders.Done, $"{fileHash}.SC2Replay"); - var todoFolder = folder; - if (!Directory.Exists(todoFolder)) - { - Directory.CreateDirectory(todoFolder); - } - var todoFile = Path.Combine(todoFolder, $"{guid}_{fileHash}.SC2Replay"); - - if (File.Exists(destinationFile)) - { - logger.LogInformation("File {FileName} already exists. Skipping upload.", formFile.FileName); - continue; - } + if (files.Count == 0) + return 0; - try - { - var tmpFile = todoFile + ".tmp"; - using (var fileStream = File.Create(tmpFile)) - { - await formFile.CopyToAsync(fileStream); - fileStream.Close(); - } - File.Move(tmpFile, todoFile); - filesSaved++; - } - catch (Exception ex) - { - logger.LogError(ex, "Error saving file {FileName}.", formFile.FileName); - } - } - else - { - logger.LogWarning("File {FileName} is empty and will be skipped.", formFile.FileName); - } - } + Directory.CreateDirectory(decodeSettings.ReplayFolders.Temp); - if (folder.EndsWith("raw")) - { - _ = DecodeRaw(); - } - else - { - _ = Decode(); - } - } - catch (Exception ex) + foreach (var formFile in files) { - logger.LogError(ex, "Unexpected error in SaveReplays."); - return -1; - } + var tempFileName = $"{guid}_{Guid.NewGuid()}.SC2Replay"; + var tempPath = Path.Combine(decodeSettings.ReplayFolders.Temp, tempFileName); - logger.LogInformation("{FilesSaved} files saved for GUID {Guid}.", filesSaved, guid); - return filesSaved; - } + // Save the file + using (var stream = new FileStream(tempPath, FileMode.Create)) + await formFile.CopyToAsync(stream); + logger.LogInformation("Saved uploaded replay to temp: {path}", tempPath); - public async Task Decode() - { - Interlocked.Increment(ref queueCount); - await ss.WaitAsync(); - ConcurrentDictionary> replays = []; - string? error = null; - - try - { - var replayPaths = Directory.GetFiles(decodeSettings.Value.ReplayFolders.ToDo, "*SC2Replay") - .Where(f => !File.Exists(f + ".tmp")) - .Where(f => !excludeReplays.Contains(f)) - .ToArray(); - - if (replayPaths.Length == 0) + // Prepare job + var job = new ReplayJob(Guid.NewGuid(), tempPath, "", inHouse); + + // Try enqueue + if (!replayQueue.TryEnqueue(job)) { - error = "No replays found."; - return; + logger.LogWarning("Replay queue full — rejecting uploaded replay: {path}", tempPath); + return -1; // API returns 500 or 429 based on your controller } - - if (replayDecoder is null) - { - replayDecoder = new(); - } - - var options = new ReplayDecoderOptions() - { - Initdata = true, - Details = true, - Metadata = true, - TrackerEvents = true, - }; - - using var md5 = MD5.Create(); - - await foreach (var result in - replayDecoder.DecodeParallelWithErrorReport(replayPaths, decodeSettings.Value.Threads, options)) - { - if (result.Sc2Replay is null) - { - Error(result); - error = "failed decoding replays."; - continue; - } - - var metaData = GetMetaData(result.Sc2Replay); - - var sc2Replay = Parse.GetDsReplay(result.Sc2Replay); - - if (sc2Replay is null) - { - Error(result); - error = "failed decoding replays."; - continue; - } - - var replayDto = Parse.GetReplayDto(sc2Replay, md5); - - if (replayDto is null) - { - Error(result); - error = "failed decoding replays."; - continue; - } - var destination = Path.Combine(decodeSettings.Value.ReplayFolders.Done, - Path.GetFileNameWithoutExtension(result.ReplayPath)[..36] + - "_" + - replayDto.ReplayHash + - Path.GetExtension(result.ReplayPath)); - await fileSemaphore.WaitAsync(); - try - { - if (!File.Exists(destination)) - { - File.Move(result.ReplayPath, destination); - var groupId = GetGroupIdFromFilename(result.ReplayPath); - var ihReplay = new IhReplay() { Replay = replayDto, Metadata = metaData }; - replays.AddOrUpdate(groupId, [ihReplay], (k, v) => { v.Add(ihReplay); return v; }); - } - } - finally - { - fileSemaphore.Release(); - } - } - } - catch (Exception ex) - { - logger.LogError("failed decoding replays: {error}", ex.Message); - error = "failed decoding replays."; - } - finally - { - ss.Release(); - foreach (var ent in replays) - { - OnDecodeFinished(new() - { - Guid = ent.Key, - IhReplays = [.. ent.Value], - Error = error, - }); - } - Interlocked.Decrement(ref queueCount); } - } - - public async Task DecodeRaw() - { - Interlocked.Increment(ref queueCount); - await ssRaw.WaitAsync(); - ConcurrentDictionary> challengeResponses = []; - string? error = null; - - try - { - var replayPaths = Directory.GetFiles(Path.Combine(decodeSettings.Value.ReplayFolders.ToDoRaw), "*SC2Replay") - .Where(f => !File.Exists(f + ".tmp")) - .Where(f => !excludeReplays.Contains(f)) - .ToArray(); - if (replayPaths.Length == 0) - { - error = "No replays found."; - return; - } - - if (replayDecoder is null) - { - replayDecoder = new(); - } - - var options = new ReplayDecoderOptions() - { - Initdata = true, - Details = true, - Metadata = true, - TrackerEvents = true, - }; - - await foreach (var result in - replayDecoder.DecodeParallelWithErrorReport(replayPaths, decodeSettings.Value.Threads, options)) - { - if (result.Sc2Replay is null) - { - Error(result); - error = "failed decoding replays."; - continue; - } - var challengeResponse = ChallengeService.GetChallengeResponse(result.Sc2Replay); - - var destination = Path.Combine(decodeSettings.Value.ReplayFolders.Done, Path.GetFileName(result.ReplayPath)); - await fileSemaphore.WaitAsync(); - try - { - if (!File.Exists(destination)) - { - File.Move(result.ReplayPath, destination); - var groupId = GetGroupIdFromFilename(result.ReplayPath); - challengeResponses.AddOrUpdate(groupId, [challengeResponse], (k, v) => { v.Add(challengeResponse); return v; }); - } - } - finally - { - fileSemaphore.Release(); - } - } - } - catch (Exception ex) - { - logger.LogError("failed decoding replays: {error}", ex.Message); - error = "failed decoding replays."; - } - finally - { - ssRaw.Release(); - foreach (var ent in challengeResponses) - { - OnDecodeRawFinished(new() - { - Guid = ent.Key, - ChallengeResponses = [.. ent.Value], - Error = error, - }); - } - Interlocked.Decrement(ref queueCount); - } - } - - private void Error(DecodeParallelResult result) - { - logger.LogError("failed decoding replay: {path}, {error}", result.ReplayPath, result.Exception); - try - { - File.Move(result.ReplayPath, Path.Combine(decodeSettings.Value.ReplayFolders.Error, Path.GetFileName(result.ReplayPath))); - } - catch (Exception ex) - { - logger.LogWarning("failed moving error replay: {error}", ex.Message); - excludeReplays.Add(result.ReplayPath); - } + return replayQueue.QueueLength; } public static ReplayMetadata GetMetaData(Sc2Replay replay) diff --git a/src/dsstats.decode/DecodeSettings.cs b/src/dsstats.decode/DecodeSettings.cs index 8e269523..68aaabfc 100644 --- a/src/dsstats.decode/DecodeSettings.cs +++ b/src/dsstats.decode/DecodeSettings.cs @@ -15,4 +15,5 @@ public record ReplayFolders public string ToDoRaw { get; set; } = string.Empty; public string Done { get; set; } = string.Empty; public string Error { get; set; } = string.Empty; + public string Temp { get; set; } = string.Empty; } \ No newline at end of file diff --git a/src/dsstats.decode/Program.cs b/src/dsstats.decode/Program.cs index 4161f5cc..969105ca 100644 --- a/src/dsstats.decode/Program.cs +++ b/src/dsstats.decode/Program.cs @@ -8,6 +8,8 @@ builder.Services.AddControllers(); builder.Services.Configure(builder.Configuration.GetSection("DecodeSettings")); +builder.Services.AddSingleton(); +builder.Services.AddHostedService(); builder.Services.AddSingleton(); builder.Services.AddHttpClient("callback") diff --git a/src/dsstats.decode/ReplayDecoderWorker.cs b/src/dsstats.decode/ReplayDecoderWorker.cs new file mode 100644 index 00000000..09cde74f --- /dev/null +++ b/src/dsstats.decode/ReplayDecoderWorker.cs @@ -0,0 +1,202 @@ + +using System.Runtime.CompilerServices; +using System.Security.Cryptography; +using dsstats.challenge.Services; +using dsstats.shared; +using Microsoft.Extensions.Options; +using pax.dsstats.parser; +using s2protocol.NET; + +namespace dsstats.decode; + +public class ReplayDecoderWorker( + IReplayQueue replayQueue, + IOptions decodeSettings, + IHttpClientFactory httpClientFactory, + ILogger logger) : BackgroundService +{ + private readonly IReplayQueue replayQueue = replayQueue; + private readonly ILogger logger = logger; + private readonly DecodeSettings decodeSettings = decodeSettings.Value; + private readonly SemaphoreSlim fileSemaphore = new(1, 1); + private readonly ReplayDecoder replayDecoder = new(); + + public EventHandler? DecodeFinished; + public EventHandler? DecodeInHouseFinished; + + private async Task RaiseDecodeInHouseFinishedAsync(DecodeEventArgs e, CancellationToken token) + { + var httpClient = httpClientFactory.CreateClient("callback"); + try + { + var result = await httpClient.PostAsJsonAsync( + $"/api8/v1/upload/decoderesult/{e.Guid}", e.IhReplays, token); + result.EnsureSuccessStatusCode(); + } + catch (Exception ex) + { + logger.LogError("failed reporting decoderesult: {error}", ex.Message); + } + + DecodeFinished?.Invoke(this, e); + } + + private async Task RaiseDecodeFinishedAsync(DecodeRawEventArgs e, CancellationToken token) + { + var httpClient = httpClientFactory.CreateClient("callback"); + try + { + var result = await httpClient.PostAsJsonAsync( + $"/api8/v1/upload/decoderawresult/{e.Guid}", e.ChallengeResponses, token); + result.EnsureSuccessStatusCode(); + } + catch (Exception ex) + { + logger.LogError("failed reporting decoderesult: {error}", ex.Message); + } + + DecodeInHouseFinished?.Invoke(this, e); + } + + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + logger.LogInformation("ReplayDecoderWorker started"); + + await foreach (var job in replayQueue.Reader.ReadAllAsync(stoppingToken)) + { + try + { + await ProcessReplayJob(job, stoppingToken); + } + catch (Exception ex) + { + logger.LogError(ex, + "Unhandled exception processing replay job for file {file}", + job.TempFilePath); + } + finally + { + (replayQueue as ReplayQueue)?.Decrement(); + } + } + + logger.LogInformation("ReplayDecoderWorker stopped"); + } + + private async Task ProcessReplayJob(ReplayJob job, CancellationToken token) + { + logger.LogInformation("Decoding replay: {file}", job.TempFilePath); + var options = new ReplayDecoderOptions() + { + Initdata = true, + Details = true, + Metadata = true, + TrackerEvents = true, + }; + + var sc2Replay = await replayDecoder.DecodeAsync(job.TempFilePath, options, token); + if (sc2Replay == null) + { + HandleError(job, "Emty decode result"); + return; + } + + var meta = job.InHouse ? DecodeService.GetMetaData(sc2Replay) : null; + var dsReplay = Parse.GetDsReplay(sc2Replay); + if (dsReplay == null) + { + HandleError(job, "Empty parse result"); + return; + } + + using var md5 = MD5.Create(); + var replayDto = Parse.GetReplayDto(dsReplay, md5); + if (replayDto == null) + { + HandleError(job, "Empty dto result"); + return; + } + + // Output destination + string destination = + Path.Combine( + decodeSettings.ReplayFolders.Done, + $"{job.GroupId}_{replayDto.ReplayHash}.SC2Replay"); + + // Move atomically + await fileSemaphore.WaitAsync(token); + try + { + if (!File.Exists(destination)) + File.Move(job.TempFilePath, destination); + } + finally + { + fileSemaphore.Release(); + } + + // Raise event/callback + if (job.InHouse) + { + await RaiseDecodeInHouseFinishedAsync(new() + { + Guid = job.GroupId, + IhReplays = + [ + new IhReplay + { + Replay = replayDto, + Metadata = meta ?? new() + } + ], + Error = null + }, token); + } + else + { + await RaiseDecodeFinishedAsync(new() + { + Guid = job.GroupId, + ChallengeResponses = [ChallengeService.GetChallengeResponse(sc2Replay)], + }, token); + } + } + + private void HandleError(ReplayJob job, string error) + { + logger.LogError("Error decoding replay file {file}, error: {error}", job.TempFilePath, error); + + string errorDest = Path.Combine( + decodeSettings.ReplayFolders.Error, + Path.GetFileName(job.TempFilePath)); + + try + { + File.Move(job.TempFilePath, errorDest); + } + catch (Exception ioEx) + { + logger.LogWarning(ioEx, + "Failed moving error file {file}", job.TempFilePath); + } + } + + public static async Task MoveWithRetry(string src, string dest, int retries = 5) + { + for (int i = 0; i < retries; i++) + { + try + { + File.Move(src, dest); + return; + } + catch when (i < retries - 1) + { + await Task.Delay(100); + } + } + + File.Move(src, dest); // final throw + } +} diff --git a/src/dsstats.decode/ReplayJob.cs b/src/dsstats.decode/ReplayJob.cs new file mode 100644 index 00000000..f7bc4fe7 --- /dev/null +++ b/src/dsstats.decode/ReplayJob.cs @@ -0,0 +1,66 @@ + +using System.Threading.Channels; + +namespace dsstats.decode; + +public sealed record ReplayJob(Guid GroupId, string TempFilePath, string OriginalFilename, bool InHouse); + +public interface IReplayQueue +{ + ChannelWriter Writer { get; } + ChannelReader Reader { get; } + + int QueueLength { get; } + bool TryEnqueue(ReplayJob job); +} + +public class ReplayQueue : IReplayQueue +{ + private readonly Channel channel; + private int queueLength = 0; + + public int QueueLength => queueLength; + + private readonly int maxQueueSize; + + public ReplayQueue() + { + maxQueueSize = 10; + + channel = Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = false + }); + } + + public ChannelWriter Writer => channel.Writer; + public ChannelReader Reader => channel.Reader; + + public bool TryEnqueue(ReplayJob job) + { + // Check threshold + if (QueueLength >= maxQueueSize) + return false; + + // Atomically increment + Interlocked.Increment(ref queueLength); + + // Write without awaiting + if (!channel.Writer.TryWrite(job)) + { + // Roll back increment + Interlocked.Decrement(ref queueLength); + return false; + } + + return true; + } + + // Call this from the consumer + public void Decrement() + { + Interlocked.Decrement(ref queueLength); + } +} diff --git a/src/dsstats.decode/appsettings.Development.json b/src/dsstats.decode/appsettings.Development.json index e6f22564..955f92b8 100644 --- a/src/dsstats.decode/appsettings.Development.json +++ b/src/dsstats.decode/appsettings.Development.json @@ -12,7 +12,8 @@ "ToDo": "/data/ds/decode/todo", "ToDoRaw": "/data/ds/decode/todo_raw", "Done": "/data/ds/decode/done", - "Error": "/data/ds/decode/error" + "Error": "/data/ds/decode/error", + "Temp": "/data/ds/decode/temp" } } } diff --git a/src/tests/dsstats.decode.tests/ParseTests.cs b/src/tests/dsstats.decode.tests/ParseTests.cs index c6a7a1ab..f2a81f3f 100644 --- a/src/tests/dsstats.decode.tests/ParseTests.cs +++ b/src/tests/dsstats.decode.tests/ParseTests.cs @@ -9,7 +9,7 @@ public sealed class ParseTests [TestMethod] public async Task CanDecodeReplay() { - var path = "/data/ds/testreplays/Direct Strike TE (4545).SC2Replay"; + var path = "/data/ds/testreplays/Direct Strike TE (4622).SC2Replay"; var decoder = new ReplayDecoder(); var options = new ReplayDecoderOptions() { @@ -30,7 +30,7 @@ public async Task CanDecodeReplay() [TestMethod] public async Task CanDecodeReplay2() { - List paths = ["/data/ds/testreplays/Direct Strike TE (4545).SC2Replay"]; + List paths = ["/data/ds/testreplays/Direct Strike TE (4622).SC2Replay"]; var decoder = new ReplayDecoder(); var options = new ReplayDecoderOptions() { diff --git a/src/tests/dsstats.decode.tests/dsstats.decode.tests.csproj b/src/tests/dsstats.decode.tests/dsstats.decode.tests.csproj index 84278579..29703f50 100644 --- a/src/tests/dsstats.decode.tests/dsstats.decode.tests.csproj +++ b/src/tests/dsstats.decode.tests/dsstats.decode.tests.csproj @@ -8,6 +8,8 @@ + +