Skip to content

Commit ed5f23b

Browse files
committed
Change polling flow to not require initial separate call
1 parent 7a45186 commit ed5f23b

File tree

2 files changed

+58
-52
lines changed

2 files changed

+58
-52
lines changed

osu.Server.QueueProcessor/BeatmapStatusWatcher.cs

Lines changed: 47 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,40 +19,40 @@ public static class BeatmapStatusWatcher
1919
/// Start a background task which will poll for beatmap sets with updates.
2020
/// </summary>
2121
/// <remarks>
22-
/// Prior to polling, a blocking call to <see cref="GetUpdatedBeatmapSets"/> is required to ensure no initial updates are missed.
2322
/// The general flow of usage should be:
2423
///
25-
/// // before doing anything else
26-
/// var updates = await GetUpdatedBeatmapSets();
27-
/// // can now query and cache beatmaps.
28-
/// StartPolling(updates, callback);
24+
/// // before doing anything else, start polling.
25+
/// // it's important to await the completion of this operation to ensure no updates are missed.
26+
/// using var pollingOperation = await StartPollingAsync(updates, callback);
2927
///
3028
/// void callback(BeatmapUpdates u)
3129
/// {
32-
/// foreach (int id in u.BeatmapSetIDs)
30+
/// foreach (int beatmapSetId in u.BeatmapSetIDs)
3331
/// {
34-
/// // invalidate `id`
32+
/// // invalidate anything related to `beatmapSetId`
3533
/// }
3634
/// }
3735
/// </remarks>
38-
/// <param name="initialUpdates">The response from an initial call to <see cref="GetUpdatedBeatmapSets"/>.</param>
3936
/// <param name="callback">A callback to receive information about any updated beatmap sets.</param>
4037
/// <param name="pollMilliseconds">The number of milliseconds to wait between polls. Starts counting from response of previous poll.</param>
4138
/// <param name="limit">The maximum number of beatmap sets to return in a single response.</param>
4239
/// <returns>An <see cref="IDisposable"/> that should be disposed of to stop polling.</returns>
43-
public static IDisposable StartPolling(BeatmapUpdates initialUpdates, Action<BeatmapUpdates> callback, int pollMilliseconds = 10000, int limit = 50) =>
44-
new PollingBeatmapStatusWatcher(callback, pollMilliseconds, limit);
40+
public static async Task<IDisposable> StartPollingAsync(Action<BeatmapUpdates> callback, int pollMilliseconds = 10000, int limit = 50)
41+
{
42+
var initialUpdates = await GetUpdatedBeatmapSetsAsync(limit: limit);
43+
return new PollingBeatmapStatusWatcher(initialUpdates.LastProcessedQueueID, callback, pollMilliseconds, limit);
44+
}
4545

4646
/// <summary>
4747
/// Check for any beatmap sets with updates since the provided queue ID.
48-
/// Should be called on a regular basis. See <see cref="StartPolling"/> for automatic polling after the first call.
48+
/// Should be called on a regular basis. See <see cref="StartPollingAsync"/> for automatic polling after the first call.
4949
/// </summary>
5050
/// <param name="lastQueueId">The last checked queue ID, ie <see cref="BeatmapUpdates.LastProcessedQueueID"/>.</param>
5151
/// <param name="limit">The maximum number of beatmap sets to return in a single response.</param>
5252
/// <returns>A response containing information about any updated beatmap sets.</returns>
53-
public static async Task<BeatmapUpdates> GetUpdatedBeatmapSets(int? lastQueueId, int limit = 50)
53+
public static async Task<BeatmapUpdates> GetUpdatedBeatmapSetsAsync(int? lastQueueId = null, int limit = 50)
5454
{
55-
MySqlConnection connection = await DatabaseAccess.GetConnectionAsync();
55+
using MySqlConnection connection = await DatabaseAccess.GetConnectionAsync();
5656

5757
if (lastQueueId.HasValue)
5858
{
@@ -86,57 +86,52 @@ public class bss_process_queue_item
8686
public int beatmapset_id;
8787
}
8888

89-
public record BeatmapUpdates
89+
private class PollingBeatmapStatusWatcher : IDisposable
9090
{
91-
public required int[] BeatmapSetIDs { get; init; }
92-
public required int LastProcessedQueueID { get; init; }
93-
}
94-
}
95-
96-
public class PollingBeatmapStatusWatcher : IDisposable
97-
{
98-
private readonly Action<BeatmapStatusWatcher.BeatmapUpdates> callback;
91+
private readonly Action<BeatmapUpdates> callback;
9992

100-
private readonly int pollMilliseconds;
101-
private readonly int limit;
93+
private readonly int pollMilliseconds;
94+
private readonly int limit;
10295

103-
private int? lastQueueId;
104-
private readonly CancellationTokenSource cts;
96+
private int lastQueueId;
97+
private readonly CancellationTokenSource cts;
10598

106-
public PollingBeatmapStatusWatcher(Action<BeatmapStatusWatcher.BeatmapUpdates> callback, int pollMilliseconds, int limit = 50)
107-
{
108-
this.pollMilliseconds = pollMilliseconds;
109-
this.limit = limit;
110-
this.callback = callback;
99+
public PollingBeatmapStatusWatcher(int initialQueueId, Action<BeatmapUpdates> callback, int pollMilliseconds, int limit = 50)
100+
{
101+
this.lastQueueId = initialQueueId;
102+
this.pollMilliseconds = pollMilliseconds;
103+
this.limit = limit;
104+
this.callback = callback;
111105

112-
cts = new CancellationTokenSource();
106+
cts = new CancellationTokenSource();
113107

114-
_ = poll();
115-
}
108+
_ = poll();
109+
}
116110

117-
private async Task poll()
118-
{
119-
try
111+
private async Task poll()
120112
{
121-
var result = await BeatmapStatusWatcher.GetUpdatedBeatmapSets(lastQueueId, limit);
113+
try
114+
{
115+
var result = await BeatmapStatusWatcher.GetUpdatedBeatmapSetsAsync(lastQueueId, limit);
122116

123-
lastQueueId = result.LastProcessedQueueID;
124-
if (result.BeatmapSetIDs.Length > 0)
125-
callback(result);
117+
lastQueueId = result.LastProcessedQueueID;
118+
if (result.BeatmapSetIDs.Length > 0)
119+
callback(result);
120+
}
121+
catch (Exception e)
122+
{
123+
Console.WriteLine($"Poll failed with {e}.");
124+
await Task.Delay(1000);
125+
}
126+
127+
_ = Task.Delay(pollMilliseconds, cts.Token).ContinueWith(_ => poll(), cts.Token);
126128
}
127-
catch (Exception e)
129+
130+
public void Dispose()
128131
{
129-
Console.WriteLine($"Poll failed with {e}.");
130-
await Task.Delay(1000);
132+
cts.Cancel();
133+
cts.Dispose();
131134
}
132-
133-
_ = Task.Delay(pollMilliseconds, cts.Token).ContinueWith(_ => poll(), cts.Token);
134-
}
135-
136-
public void Dispose()
137-
{
138-
cts.Cancel();
139-
cts.Dispose();
140135
}
141136
}
142137
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright (c) ppy Pty Ltd <[email protected]>. Licensed under the MIT Licence.
2+
// See the LICENCE file in the repository root for full licence text.
3+
4+
namespace osu.Server.QueueProcessor
5+
{
6+
public record BeatmapUpdates
7+
{
8+
public required int[] BeatmapSetIDs { get; init; }
9+
public required int LastProcessedQueueID { get; init; }
10+
}
11+
}

0 commit comments

Comments
 (0)