Skip to content

Commit f818610

Browse files
authored
Merge pull request #19 from peppy/push-multiple
Add ability to push multiple items to a queue in a single redis round-trip
2 parents d8dfeb4 + eef78bd commit f818610

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

osu.Server.QueueProcessor.Tests/InputOnlyQueueTests.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,35 @@ public void SendThenReceive_Multiple()
7373
Assert.Equal(objects, receivedObjects);
7474
}
7575

76+
[Fact]
77+
public void SendThenReceive_MultipleUsingSingleCall()
78+
{
79+
const int send_count = 10000;
80+
81+
var cts = new CancellationTokenSource(10000);
82+
83+
var objects = new HashSet<FakeData>();
84+
for (int i = 0; i < send_count; i++)
85+
objects.Add(FakeData.New());
86+
87+
var receivedObjects = new HashSet<FakeData>();
88+
89+
processor.PushToQueue(objects);
90+
91+
processor.Received += o =>
92+
{
93+
lock (receivedObjects)
94+
receivedObjects.Add(o);
95+
96+
if (receivedObjects.Count == send_count)
97+
cts.Cancel();
98+
};
99+
100+
processor.Run(cts.Token);
101+
102+
Assert.Equal(objects, receivedObjects);
103+
}
104+
76105
/// <summary>
77106
/// If the processor is cancelled mid-operation, every item should either be processed or still in the queue.
78107
/// </summary>

osu.Server.QueueProcessor/QueueProcessor.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,19 @@ private void outputStats()
208208
}
209209
}
210210

211-
public void PushToQueue(T obj) =>
212-
redis.GetDatabase().ListLeftPush(QueueName, JsonConvert.SerializeObject(obj));
211+
/// <summary>
212+
/// Push a single item to the queue.
213+
/// </summary>
214+
/// <param name="item"></param>
215+
public void PushToQueue(T item) =>
216+
redis.GetDatabase().ListLeftPush(QueueName, JsonConvert.SerializeObject(item));
217+
218+
/// <summary>
219+
/// Push multiple items to the queue.
220+
/// </summary>
221+
/// <param name="items"></param>
222+
public void PushToQueue(IEnumerable<T> items) =>
223+
redis.GetDatabase().ListLeftPush(QueueName, items.Select(obj => new RedisValue(JsonConvert.SerializeObject(obj))).ToArray());
213224

214225
public long GetQueueSize() =>
215226
redis.GetDatabase().ListLength(QueueName);

0 commit comments

Comments
 (0)