Skip to content

Commit 8866d81

Browse files
committed
Add initial BeatmapStatusWatcher component
1 parent e081234 commit 8866d81

File tree

2 files changed

+125
-0
lines changed

2 files changed

+125
-0
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
2+
// See the LICENCE file in the repository root for full licence text.
3+
4+
using System;
5+
using System.Linq;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Dapper;
9+
using MySqlConnector;
10+
11+
namespace osu.Server.QueueProcessor
12+
{
13+
/// <summary>
14+
/// Provides insight into whenever a beatmap has changed status based on a user or system update.
15+
/// </summary>
16+
public static class BeatmapStatusWatcher
17+
{
18+
/// <summary>
19+
/// Start a background task which will poll for beatmap sets with updates.
20+
/// </summary>
21+
/// <param name="callback">A callback to receive information about any updated beatmap sets.</param>
22+
/// <param name="pollMilliseconds">The number of milliseconds to wait between polls. Starts counting from response of previous poll.</param>
23+
/// <param name="limit">The maximum number of beatmap sets to return in a single response.</param>
24+
/// <returns>An <see cref="IDisposable"/> that should be disposed of to stop polling.</returns>
25+
public static IDisposable StartPolling(Action<BeatmapUpdates> callback, int pollMilliseconds = 10000, int limit = 50) =>
26+
new PollingBeatmapStatusWatcher(callback, pollMilliseconds, limit);
27+
28+
/// <summary>
29+
/// Check for any beatmap sets with updates since the provided queue ID.
30+
/// Should be called on a regular basis. See <see cref="StartPolling"/> for automatic polling.
31+
/// </summary>
32+
/// <param name="lastQueueId">The last checked queue ID, ie <see cref="BeatmapUpdates.LastProcessedQueueID"/>.</param>
33+
/// <param name="limit">The maximum number of beatmap sets to return in a single response.</param>
34+
/// <returns>A response containing information about any updated beatmap sets.</returns>
35+
public static async Task<BeatmapUpdates> GetUpdatedBeatmapSets(int? lastQueueId, int limit = 50)
36+
{
37+
MySqlConnection connection = await DatabaseAccess.GetConnectionAsync();
38+
39+
if (lastQueueId.HasValue)
40+
{
41+
var items = (await connection.QueryAsync<bss_process_queue_item>("SELECT * FROM bss_process_queue WHERE queue_id > @lastQueueId LIMIT @limit", new
42+
{
43+
lastQueueId,
44+
limit
45+
})).ToArray();
46+
47+
return new BeatmapUpdates
48+
{
49+
BeatmapSetIDs = items.Select(i => i.beatmapset_id).ToArray(),
50+
LastProcessedQueueID = items.LastOrDefault()?.queue_id ?? lastQueueId.Value
51+
};
52+
}
53+
54+
var lastEntry = await connection.QueryFirstOrDefaultAsync<bss_process_queue_item>("SELECT * FROM bss_process_queue ORDER BY queue_id DESC LIMIT 1");
55+
56+
return new BeatmapUpdates
57+
{
58+
BeatmapSetIDs = [],
59+
LastProcessedQueueID = lastEntry?.queue_id ?? 0
60+
};
61+
}
62+
63+
// ReSharper disable InconsistentNaming (matches database table)
64+
[Serializable]
65+
public class bss_process_queue_item
66+
{
67+
public int queue_id;
68+
public int beatmapset_id;
69+
}
70+
71+
public record BeatmapUpdates
72+
{
73+
public required int[] BeatmapSetIDs { get; init; }
74+
public required int LastProcessedQueueID { get; init; }
75+
}
76+
}
77+
78+
public class PollingBeatmapStatusWatcher : IDisposable
79+
{
80+
private readonly Action<BeatmapStatusWatcher.BeatmapUpdates> callback;
81+
82+
private readonly int pollMilliseconds;
83+
private readonly int limit;
84+
85+
private int? lastQueueId;
86+
private readonly CancellationTokenSource cts;
87+
88+
public PollingBeatmapStatusWatcher(Action<BeatmapStatusWatcher.BeatmapUpdates> callback, int pollMilliseconds, int limit = 50)
89+
{
90+
this.pollMilliseconds = pollMilliseconds;
91+
this.limit = limit;
92+
this.callback = callback;
93+
94+
cts = new CancellationTokenSource();
95+
96+
_ = poll();
97+
}
98+
99+
private async Task poll()
100+
{
101+
try
102+
{
103+
var result = await BeatmapStatusWatcher.GetUpdatedBeatmapSets(lastQueueId, limit);
104+
105+
lastQueueId = result.LastProcessedQueueID;
106+
if (result.BeatmapSetIDs.Length > 0)
107+
callback(result);
108+
}
109+
catch (Exception e)
110+
{
111+
Console.WriteLine($"Poll failed with {e}.");
112+
await Task.Delay(1000);
113+
}
114+
115+
_ = Task.Delay(pollMilliseconds, cts.Token).ContinueWith(_ => poll(), cts.Token);
116+
}
117+
118+
public void Dispose()
119+
{
120+
cts.Cancel();
121+
cts.Dispose();
122+
}
123+
}
124+
}

osu.Server.QueueProcessor/osu.Server.QueueProcessor.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
</PropertyGroup>
1212

1313
<ItemGroup>
14+
<PackageReference Include="Dapper" Version="2.1.66" />
1415
<PackageReference Include="DogStatsD-CSharp-Client" Version="8.0.0" />
1516
<PackageReference Include="MySqlConnector" Version="2.3.7" />
1617
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />

0 commit comments

Comments
 (0)