Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 28 additions & 312 deletions src/dsstats.decode/DecodeService.cs
Original file line number Diff line number Diff line change
@@ -1,343 +1,59 @@
using dsstats.challenge.Services;
using dsstats.shared;
using Microsoft.Extensions.Options;
using pax.dsstats.parser;
using s2protocol.NET;
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text.RegularExpressions;

namespace dsstats.decode;

public partial class DecodeService(IOptions<DecodeSettings> decodeSettings,
IHttpClientFactory httpClientFactory,
ILogger<DecodeService> logger)
public partial class DecodeService(
IReplayQueue replayQueue,
IOptions<DecodeSettings> decodeSettings,
ILogger<DecodeService> logger)
{

private readonly SemaphoreSlim ss = new(1, 1);
private readonly SemaphoreSlim ssRaw = new(1, 1);
private readonly SemaphoreSlim fileSemaphore = new SemaphoreSlim(1, 1);
private ReplayDecoder? replayDecoder;
private int queueCount = 0;
private ConcurrentBag<string> excludeReplays = [];

public EventHandler<DecodeEventArgs>? DecodeFinished;
public EventHandler<DecodeRawEventArgs>? DecodeRawFinished;

private async void OnDecodeFinished(DecodeEventArgs e)
{
var httpClient = httpClientFactory.CreateClient("callback");
try
{
var result = await httpClient.PostAsJsonAsync($"/api8/v1/upload/decoderesult/{e.Guid}", e.IhReplays);
result.EnsureSuccessStatusCode();
}
catch (Exception ex)
{
logger.LogError("failed reporting decoderesult: {error}", ex.Message);
}
DecodeFinished?.Invoke(this, e);
}

private async void OnDecodeRawFinished(DecodeRawEventArgs e)
{
var httpClient = httpClientFactory.CreateClient("callback");
try
{
var result = await httpClient.PostAsJsonAsync($"/api8/v1/upload/decoderawresult/{e.Guid}", e.ChallengeResponses);
result.EnsureSuccessStatusCode();
}
catch (Exception ex)
{
logger.LogError("failed reporting decoderesult: {error}", ex.Message);
}
DecodeRawFinished?.Invoke(this, e);
}
private readonly IReplayQueue replayQueue = replayQueue;
private readonly DecodeSettings decodeSettings = decodeSettings.Value;
private readonly ILogger<DecodeService> logger = logger;

public async Task<int> SaveReplays(Guid guid, List<IFormFile> files)
{
return await SaveReplays(guid, files, decodeSettings.Value.ReplayFolders.ToDo);
return await SaveAndQueueFiles(guid, files, inHouse: true);
}

public async Task<int> SaveReplaysRaw(Guid guid, List<IFormFile> files)
{
return await SaveReplays(guid, files, decodeSettings.Value.ReplayFolders.ToDoRaw);
return await SaveAndQueueFiles(guid, files, inHouse: false);
}

private async Task<int> SaveReplays(Guid guid, List<IFormFile> files, string folder)
private async Task<int> SaveAndQueueFiles(Guid guid, List<IFormFile> files, bool inHouse)
{
int filesSaved = 0;

try
{
foreach (var formFile in files)
{
if (formFile.Length > 0)
{
string fileHash;
using (var md5 = MD5.Create())
{
using var stream = formFile.OpenReadStream();
fileHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLowerInvariant();
}

var destinationFile = Path.Combine(decodeSettings.Value.ReplayFolders.Done, $"{fileHash}.SC2Replay");
var todoFolder = folder;
if (!Directory.Exists(todoFolder))
{
Directory.CreateDirectory(todoFolder);
}
var todoFile = Path.Combine(todoFolder, $"{guid}_{fileHash}.SC2Replay");

if (File.Exists(destinationFile))
{
logger.LogInformation("File {FileName} already exists. Skipping upload.", formFile.FileName);
continue;
}
if (files.Count == 0)
return 0;

try
{
var tmpFile = todoFile + ".tmp";
using (var fileStream = File.Create(tmpFile))
{
await formFile.CopyToAsync(fileStream);
fileStream.Close();
}
File.Move(tmpFile, todoFile);
filesSaved++;
}
catch (Exception ex)
{
logger.LogError(ex, "Error saving file {FileName}.", formFile.FileName);
}
}
else
{
logger.LogWarning("File {FileName} is empty and will be skipped.", formFile.FileName);
}
}
Directory.CreateDirectory(decodeSettings.ReplayFolders.Temp);

if (folder.EndsWith("raw"))
{
_ = DecodeRaw();
}
else
{
_ = Decode();
}
}
catch (Exception ex)
foreach (var formFile in files)
{
logger.LogError(ex, "Unexpected error in SaveReplays.");
return -1;
}
var tempFileName = $"{guid}_{Guid.NewGuid()}.SC2Replay";
var tempPath = Path.Combine(decodeSettings.ReplayFolders.Temp, tempFileName);

logger.LogInformation("{FilesSaved} files saved for GUID {Guid}.", filesSaved, guid);
return filesSaved;
}
// Save the file
using (var stream = new FileStream(tempPath, FileMode.Create))
await formFile.CopyToAsync(stream);

logger.LogInformation("Saved uploaded replay to temp: {path}", tempPath);

public async Task Decode()
{
Interlocked.Increment(ref queueCount);
await ss.WaitAsync();
ConcurrentDictionary<Guid, ConcurrentBag<IhReplay>> replays = [];
string? error = null;

try
{
var replayPaths = Directory.GetFiles(decodeSettings.Value.ReplayFolders.ToDo, "*SC2Replay")
.Where(f => !File.Exists(f + ".tmp"))
.Where(f => !excludeReplays.Contains(f))
.ToArray();

if (replayPaths.Length == 0)
// Prepare job
var job = new ReplayJob(Guid.NewGuid(), tempPath, "", inHouse);

// Try enqueue
if (!replayQueue.TryEnqueue(job))
{
error = "No replays found.";
return;
logger.LogWarning("Replay queue full — rejecting uploaded replay: {path}", tempPath);
return -1; // API returns 500 or 429 based on your controller
}

if (replayDecoder is null)
{
replayDecoder = new();
}

var options = new ReplayDecoderOptions()
{
Initdata = true,
Details = true,
Metadata = true,
TrackerEvents = true,
};

using var md5 = MD5.Create();

await foreach (var result in
replayDecoder.DecodeParallelWithErrorReport(replayPaths, decodeSettings.Value.Threads, options))
{
if (result.Sc2Replay is null)
{
Error(result);
error = "failed decoding replays.";
continue;
}

var metaData = GetMetaData(result.Sc2Replay);

var sc2Replay = Parse.GetDsReplay(result.Sc2Replay);

if (sc2Replay is null)
{
Error(result);
error = "failed decoding replays.";
continue;
}

var replayDto = Parse.GetReplayDto(sc2Replay, md5);

if (replayDto is null)
{
Error(result);
error = "failed decoding replays.";
continue;
}
var destination = Path.Combine(decodeSettings.Value.ReplayFolders.Done,
Path.GetFileNameWithoutExtension(result.ReplayPath)[..36] +
"_" +
replayDto.ReplayHash +
Path.GetExtension(result.ReplayPath));
await fileSemaphore.WaitAsync();
try
{
if (!File.Exists(destination))
{
File.Move(result.ReplayPath, destination);
var groupId = GetGroupIdFromFilename(result.ReplayPath);
var ihReplay = new IhReplay() { Replay = replayDto, Metadata = metaData };
replays.AddOrUpdate(groupId, [ihReplay], (k, v) => { v.Add(ihReplay); return v; });
}
}
finally
{
fileSemaphore.Release();
}
}
}
catch (Exception ex)
{
logger.LogError("failed decoding replays: {error}", ex.Message);
error = "failed decoding replays.";
}
finally
{
ss.Release();
foreach (var ent in replays)
{
OnDecodeFinished(new()
{
Guid = ent.Key,
IhReplays = [.. ent.Value],
Error = error,
});
}
Interlocked.Decrement(ref queueCount);
}
}

public async Task DecodeRaw()
{
Interlocked.Increment(ref queueCount);
await ssRaw.WaitAsync();
ConcurrentDictionary<Guid, ConcurrentBag<ChallengeResponse>> challengeResponses = [];
string? error = null;

try
{
var replayPaths = Directory.GetFiles(Path.Combine(decodeSettings.Value.ReplayFolders.ToDoRaw), "*SC2Replay")
.Where(f => !File.Exists(f + ".tmp"))
.Where(f => !excludeReplays.Contains(f))
.ToArray();

if (replayPaths.Length == 0)
{
error = "No replays found.";
return;
}

if (replayDecoder is null)
{
replayDecoder = new();
}

var options = new ReplayDecoderOptions()
{
Initdata = true,
Details = true,
Metadata = true,
TrackerEvents = true,
};

await foreach (var result in
replayDecoder.DecodeParallelWithErrorReport(replayPaths, decodeSettings.Value.Threads, options))
{
if (result.Sc2Replay is null)
{
Error(result);
error = "failed decoding replays.";
continue;
}
var challengeResponse = ChallengeService.GetChallengeResponse(result.Sc2Replay);

var destination = Path.Combine(decodeSettings.Value.ReplayFolders.Done, Path.GetFileName(result.ReplayPath));
await fileSemaphore.WaitAsync();
try
{
if (!File.Exists(destination))
{
File.Move(result.ReplayPath, destination);
var groupId = GetGroupIdFromFilename(result.ReplayPath);
challengeResponses.AddOrUpdate(groupId, [challengeResponse], (k, v) => { v.Add(challengeResponse); return v; });
}
}
finally
{
fileSemaphore.Release();
}
}
}
catch (Exception ex)
{
logger.LogError("failed decoding replays: {error}", ex.Message);
error = "failed decoding replays.";
}
finally
{
ssRaw.Release();
foreach (var ent in challengeResponses)
{
OnDecodeRawFinished(new()
{
Guid = ent.Key,
ChallengeResponses = [.. ent.Value],
Error = error,
});
}
Interlocked.Decrement(ref queueCount);
}
}

private void Error(DecodeParallelResult result)
{
logger.LogError("failed decoding replay: {path}, {error}", result.ReplayPath, result.Exception);
try
{
File.Move(result.ReplayPath, Path.Combine(decodeSettings.Value.ReplayFolders.Error, Path.GetFileName(result.ReplayPath)));
}
catch (Exception ex)
{
logger.LogWarning("failed moving error replay: {error}", ex.Message);
excludeReplays.Add(result.ReplayPath);
}
return replayQueue.QueueLength;
}

public static ReplayMetadata GetMetaData(Sc2Replay replay)
Expand Down
1 change: 1 addition & 0 deletions src/dsstats.decode/DecodeSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ public record ReplayFolders
public string ToDoRaw { get; set; } = string.Empty;
public string Done { get; set; } = string.Empty;
public string Error { get; set; } = string.Empty;
public string Temp { get; set; } = string.Empty;
}
2 changes: 2 additions & 0 deletions src/dsstats.decode/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
builder.Services.AddControllers();

builder.Services.Configure<DecodeSettings>(builder.Configuration.GetSection("DecodeSettings"));
builder.Services.AddSingleton<IReplayQueue, ReplayQueue>();
builder.Services.AddHostedService<ReplayDecoderWorker>();
builder.Services.AddSingleton<DecodeService>();

builder.Services.AddHttpClient("callback")
Expand Down
Loading