Skip to content

Commit e1e0b0f

Browse files
committed
Fix for FT.CURSOR in cluster; requires single server
- update SE.Redis ref to allow new GetServer(RedisKey) usage - add utility API to capture an IServer and database if using cluster - create internal AggregationResult subclass that includes the IServer - capture server in Aggregate[Async] - create new overloads for CusorDel[Async] and CursorRead[Async] that take AggregationResult, and push consumers towards that overload - use captured server/database when appropriate - use the new API from tests - add new I[Async]Enumerable API for simplicity: AggregateEnumerable[Async] - add tests for new API - use cluster env from cursor tests
1 parent ed92c34 commit e1e0b0f

File tree

9 files changed

+312
-40
lines changed

9 files changed

+312
-40
lines changed

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<!-- primary library -->
1313
<PackageVersion Include="NetTopologySuite" Version="2.6.0" />
1414
<PackageVersion Include="System.Text.Json" Version="9.0.7" />
15-
<PackageVersion Include="StackExchange.Redis" Version="2.8.58" />
15+
<PackageVersion Include="StackExchange.Redis" Version="2.9.11" />
1616
<!-- tests, etc -->
1717
<PackageVersion Include="BouncyCastle.Cryptography" Version="2.6.1" />
1818
<PackageVersion Include="coverlet.collector" Version="6.0.4" />

src/NRedisStack/Auxiliary.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,22 @@ public static RedisResult Execute(this IDatabase db, SerializedCommand command)
6767
return db.Execute(command.Command, command.Args);
6868
}
6969

70+
internal static RedisResult Execute(this IServer server, int? db, SerializedCommand command)
71+
{
72+
return server.Execute(db, command.Command, command.Args);
73+
}
74+
7075
public static async Task<RedisResult> ExecuteAsync(this IDatabaseAsync db, SerializedCommand command)
7176
{
7277
((IDatabase)db).SetInfoInPipeline();
7378
return await db.ExecuteAsync(command.Command, command.Args);
7479
}
7580

81+
internal static async Task<RedisResult> ExecuteAsync(this IServer server, int? db, SerializedCommand command)
82+
{
83+
return await server.ExecuteAsync(db, command.Command, command.Args);
84+
}
85+
7686
public static List<RedisResult> ExecuteBroadcast(this IDatabase db, string command)
7787
=> db.ExecuteBroadcast(new SerializedCommand(command));
7888

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,7 @@
11
#nullable enable
2+
NRedisStack.SearchCommands.AggregateEnumerable(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IEnumerable<NRedisStack.Search.Aggregation.Row>!
3+
NRedisStack.SearchCommands.CursorDel(NRedisStack.Search.AggregationResult! result) -> bool
4+
NRedisStack.SearchCommands.CursorRead(NRedisStack.Search.AggregationResult! result, int? count = null) -> NRedisStack.Search.AggregationResult!
5+
NRedisStack.SearchCommandsAsync.AggregateEnumerableAsync(string! index, NRedisStack.Search.AggregationRequest! query) -> System.Collections.Generic.IAsyncEnumerable<NRedisStack.Search.Aggregation.Row>!
6+
NRedisStack.SearchCommandsAsync.CursorDelAsync(NRedisStack.Search.AggregationResult! result) -> System.Threading.Tasks.Task<bool>!
7+
NRedisStack.SearchCommandsAsync.CursorReadAsync(NRedisStack.Search.AggregationResult! result, int? count = null) -> System.Threading.Tasks.Task<NRedisStack.Search.AggregationResult!>!

src/NRedisStack/ResponseParser.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,20 @@ public static AggregationResult ToAggregationResult(this RedisResult result, Agg
737737
}
738738
}
739739

740+
internal static AggregationResult ToAggregationResult(this RedisResult result, string indexName, AggregationRequest query, IServer? server, int? database)
741+
{
742+
if (query.IsWithCursor())
743+
{
744+
var results = (RedisResult[])result!;
745+
746+
return new AggregationResult.WithCursorAggregationResult(indexName, results[0], (long)results[1], server, database);
747+
}
748+
else
749+
{
750+
return new(result);
751+
}
752+
}
753+
740754
public static Dictionary<string, RedisResult>[] ToDictionarys(this RedisResult result)
741755
{
742756
var resArr = (RedisResult[])result!;

src/NRedisStack/Search/AggregationRequest.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public AggregationRequest Cursor(int? count = null, long? maxIdle = null)
128128

129129
if (count != null)
130130
{
131+
Count = count;
131132
args.Add(SearchArgs.COUNT);
132133
args.Add(count);
133134
}
@@ -139,6 +140,7 @@ public AggregationRequest Cursor(int? count = null, long? maxIdle = null)
139140
}
140141
return this;
141142
}
143+
internal int? Count { get; set; }
142144

143145
public AggregationRequest Params(Dictionary<string, object> nameValue)
144146
{

src/NRedisStack/Search/AggregationResult.cs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,29 @@
33

44
namespace NRedisStack.Search;
55

6-
public sealed class AggregationResult
6+
public class AggregationResult
77
{
8+
// internal subclass for WITHCURSOR calls, which need to be issued to the same connection
9+
internal sealed class WithCursorAggregationResult : AggregationResult
10+
{
11+
internal WithCursorAggregationResult(string indexName, RedisResult result, long cursorId, IServer? server,
12+
int? database) : base(result, cursorId)
13+
{
14+
IndexName = indexName;
15+
Server = server;
16+
Database = database;
17+
}
18+
public string IndexName { get; }
19+
public IServer? Server { get; }
20+
public int? Database { get; }
21+
}
22+
823
public long TotalResults { get; }
924
private readonly Dictionary<string, object>[] _results;
1025
private Dictionary<string, RedisValue>[]? _resultsAsRedisValues;
1126

1227
public long CursorId { get; }
1328

14-
1529
internal AggregationResult(RedisResult result, long cursorId = -1)
1630
{
1731
var arr = (RedisResult[])result!;
@@ -45,7 +59,6 @@ internal AggregationResult(RedisResult result, long cursorId = -1)
4559
CursorId = cursorId;
4660
}
4761

48-
4962
/// <summary>
5063
/// takes a Redis multi-bulk array represented by a RedisResult[] and recursively processes its elements.
5164
/// For each element in the array, it checks if it's another multi-bulk array, and if so, it recursively calls itself.

src/NRedisStack/Search/SearchCommands.cs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
using System.ComponentModel;
12
using NRedisStack.Search;
3+
using NRedisStack.Search.Aggregation;
24
using NRedisStack.Search.DataTypes;
35
using StackExchange.Redis;
46
namespace NRedisStack;
@@ -16,8 +18,54 @@ public RedisResult[] _List()
1618
public AggregationResult Aggregate(string index, AggregationRequest query)
1719
{
1820
SetDefaultDialectIfUnset(query);
19-
var result = db.Execute(SearchCommandBuilder.Aggregate(index, query));
20-
return result.ToAggregationResult(query);
21+
IServer? server = null;
22+
int? database = null;
23+
24+
var command = SearchCommandBuilder.Aggregate(index, query);
25+
if (query.IsWithCursor())
26+
{
27+
// we can issue this anywhere, but follow-up calls need to be on the same server
28+
server = GetRandomServerForCluster(db, out database);
29+
}
30+
31+
RedisResult result;
32+
if (server is not null)
33+
{
34+
result = server.Execute(database, command);
35+
}
36+
else
37+
{
38+
result = db.Execute(command);
39+
}
40+
41+
return result.ToAggregationResult(index, query, server, database);
42+
}
43+
44+
public IEnumerable<Row> AggregateEnumerable(string index, AggregationRequest query)
45+
{
46+
if (!query.IsWithCursor()) query.Cursor();
47+
48+
var result = Aggregate(index, query);
49+
try
50+
{
51+
while (true)
52+
{
53+
var count = checked((int)result.TotalResults);
54+
for (int i = 0; i < count; i++)
55+
{
56+
yield return result.GetRow(i);
57+
}
58+
if (result.CursorId == 0) break;
59+
result = CursorRead(result, query.Count);
60+
}
61+
}
62+
finally
63+
{
64+
if (result.CursorId != 0)
65+
{
66+
CursorDel(result);
67+
}
68+
}
2169
}
2270

2371
/// <inheritdoc/>
@@ -72,17 +120,51 @@ public bool Create(string indexName, Schema schema)
72120
}
73121

74122
/// <inheritdoc/>
123+
[Obsolete("When possible, use CursorDelAsync(AggregationResult, int?) instead.")]
124+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
75125
public bool CursorDel(string indexName, long cursorId)
76126
{
77127
return db.Execute(SearchCommandBuilder.CursorDel(indexName, cursorId)).OKtoBoolean();
78128
}
79129

130+
public bool CursorDel(AggregationResult result)
131+
{
132+
if (result is not AggregationResult.WithCursorAggregationResult withCursor)
133+
{
134+
throw new ArgumentException(
135+
message: $"{nameof(CursorDelAsync)} must be called with a value returned from a previous call to {nameof(AggregateAsync)} with a cursor.",
136+
paramName: nameof(result));
137+
}
138+
139+
var command = SearchCommandBuilder.CursorDel(withCursor.IndexName, withCursor.CursorId);
140+
var resp = withCursor.Server is { } server
141+
? server.Execute(withCursor.Database, command)
142+
: db.Execute(command);
143+
return resp.OKtoBoolean();
144+
}
145+
80146
/// <inheritdoc/>
147+
[Obsolete("When possible, use CursorReadAsync(AggregationResult, int?) instead.")]
148+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
81149
public AggregationResult CursorRead(string indexName, long cursorId, int? count = null)
82150
{
83151
var resp = db.Execute(SearchCommandBuilder.CursorRead(indexName, cursorId, count)).ToArray();
84152
return new(resp[0], (long)resp[1]);
85153
}
154+
155+
public AggregationResult CursorRead(AggregationResult result, int? count = null)
156+
{
157+
if (result is not AggregationResult.WithCursorAggregationResult withCursor)
158+
{
159+
throw new ArgumentException(message: $"{nameof(CursorReadAsync)} must be called with a value returned from a previous call to {nameof(AggregateAsync)} with a cursor.", paramName: nameof(result));
160+
}
161+
var command = SearchCommandBuilder.CursorRead(withCursor.IndexName, withCursor.CursorId, count);
162+
var rawResult = withCursor.Server is { } server
163+
? server.Execute(withCursor.Database, command)
164+
: db.Execute(command);
165+
var resp = rawResult.ToArray();
166+
return new AggregationResult.WithCursorAggregationResult(withCursor.IndexName, resp[0], (long)resp[1], withCursor.Server, withCursor.Database);
167+
}
86168

87169
/// <inheritdoc/>
88170
public long DictAdd(string dict, params string[] terms)

src/NRedisStack/Search/SearchCommandsAsync.cs

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
using System.ComponentModel;
12
using NRedisStack.Search;
3+
using NRedisStack.Search.Aggregation;
24
using NRedisStack.Search.DataTypes;
35
using StackExchange.Redis;
46
namespace NRedisStack;
@@ -40,20 +42,71 @@ public async Task<RedisResult[]> _ListAsync()
4042
return (await _db.ExecuteAsync(SearchCommandBuilder._List())).ToArray();
4143
}
4244

45+
internal static IServer? GetRandomServerForCluster(IDatabaseAsync db, out int? database)
46+
{
47+
var server = db.Multiplexer.GetServer(key: default(RedisKey));
48+
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
49+
if (server is null || server.ServerType != ServerType.Cluster)
50+
{
51+
database = null;
52+
return null;
53+
}
54+
// This is vexingly misplaced, but: it doesn't actually matter for cluster
55+
database = db is IDatabase nonAsync ? nonAsync.Database : null;
56+
return server;
57+
}
58+
4359
/// <inheritdoc/>
4460
public async Task<AggregationResult> AggregateAsync(string index, AggregationRequest query)
4561
{
4662
SetDefaultDialectIfUnset(query);
47-
var result = await _db.ExecuteAsync(SearchCommandBuilder.Aggregate(index, query));
63+
IServer? server = null;
64+
int? database = null;
65+
66+
var command = SearchCommandBuilder.Aggregate(index, query);
4867
if (query.IsWithCursor())
4968
{
50-
var results = (RedisResult[])result!;
69+
// we can issue this anywhere, but follow-up calls need to be on the same server
70+
server = GetRandomServerForCluster(_db, out database);
71+
}
5172

52-
return new(results[0], (long)results[1]);
73+
RedisResult result;
74+
if (server is not null)
75+
{
76+
result = await server.ExecuteAsync(database, command);
5377
}
5478
else
5579
{
56-
return new(result);
80+
result = await _db.ExecuteAsync(command);
81+
}
82+
83+
return result.ToAggregationResult(index, query, server, database);
84+
}
85+
86+
public async IAsyncEnumerable<Row> AggregateEnumerableAsync(string index, AggregationRequest query)
87+
{
88+
if (!query.IsWithCursor()) query.Cursor();
89+
90+
var result = await AggregateAsync(index, query);
91+
try
92+
{
93+
while (true)
94+
{
95+
var count = checked((int)result.TotalResults);
96+
for (int i = 0; i < count; i++)
97+
{
98+
yield return result.GetRow(i);
99+
}
100+
if (result.CursorId == 0) break;
101+
result = await CursorReadAsync(result, query.Count);
102+
}
103+
}
104+
finally
105+
{
106+
if (result.CursorId != 0)
107+
{
108+
await CursorDelAsync(result);
109+
}
57110
}
58111
}
59112

@@ -108,18 +161,52 @@ public async Task<bool> CreateAsync(string indexName, Schema schema)
108161
}
109162

110163
/// <inheritdoc/>
164+
[Obsolete("When possible, use CursorDelAsync(AggregationResult, int?) instead.")]
165+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
111166
public async Task<bool> CursorDelAsync(string indexName, long cursorId)
112167
{
113168
return (await _db.ExecuteAsync(SearchCommandBuilder.CursorDel(indexName, cursorId))).OKtoBoolean();
114169
}
115170

171+
public async Task<bool> CursorDelAsync(AggregationResult result)
172+
{
173+
if (result is not AggregationResult.WithCursorAggregationResult withCursor)
174+
{
175+
throw new ArgumentException(
176+
message: $"{nameof(CursorDelAsync)} must be called with a value returned from a previous call to {nameof(AggregateAsync)} with a cursor.",
177+
paramName: nameof(result));
178+
}
179+
180+
var command = SearchCommandBuilder.CursorDel(withCursor.IndexName, withCursor.CursorId);
181+
var pending = withCursor.Server is { } server
182+
? server.ExecuteAsync(withCursor.Database, command)
183+
: _db.ExecuteAsync(command);
184+
return (await pending).OKtoBoolean();
185+
}
186+
116187
/// <inheritdoc/>
188+
[Obsolete("When possible, use CursorReadAsync(AggregationResult, int?) instead.")]
189+
[Browsable(false), EditorBrowsable(EditorBrowsableState.Never)]
117190
public async Task<AggregationResult> CursorReadAsync(string indexName, long cursorId, int? count = null)
118191
{
119192
var resp = (await _db.ExecuteAsync(SearchCommandBuilder.CursorRead(indexName, cursorId, count))).ToArray();
120193
return new(resp[0], (long)resp[1]);
121194
}
122195

196+
public async Task<AggregationResult> CursorReadAsync(AggregationResult result, int? count = null)
197+
{
198+
if (result is not AggregationResult.WithCursorAggregationResult withCursor)
199+
{
200+
throw new ArgumentException(message: $"{nameof(CursorReadAsync)} must be called with a value returned from a previous call to {nameof(AggregateAsync)} with a cursor.", paramName: nameof(result));
201+
}
202+
var command = SearchCommandBuilder.CursorRead(withCursor.IndexName, withCursor.CursorId, count);
203+
var pending = withCursor.Server is { } server
204+
? server.ExecuteAsync(withCursor.Database, command)
205+
: _db.ExecuteAsync(command);
206+
var resp = (await pending).ToArray();
207+
return new AggregationResult.WithCursorAggregationResult(withCursor.IndexName, resp[0], (long)resp[1], withCursor.Server, withCursor.Database);
208+
}
209+
123210
/// <inheritdoc/>
124211
public async Task<long> DictAddAsync(string dict, params string[] terms)
125212
{

0 commit comments

Comments
 (0)