Skip to content

Commit 3e90678

Browse files
committed
Make ClientPointer and MessageContainer internal (including rename to PascalCase), move ClusterScanCommand to GlideClusterClient.
Signed-off-by: currantw <[email protected]>
1 parent daf65ef commit 3e90678

File tree

2 files changed

+103
-96
lines changed

2 files changed

+103
-96
lines changed

sources/Valkey.Glide/BaseClient.cs

Lines changed: 22 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,21 @@ public void Dispose()
2222
GC.SuppressFinalize(this);
2323
lock (_lock)
2424
{
25-
if (_clientPointer == IntPtr.Zero)
25+
if (ClientPointer == IntPtr.Zero)
2626
{
2727
return;
2828
}
29-
_messageContainer.DisposeWithError(null);
30-
CloseClientFfi(_clientPointer);
31-
_clientPointer = IntPtr.Zero;
29+
MessageContainer.DisposeWithError(null);
30+
CloseClientFfi(ClientPointer);
31+
ClientPointer = IntPtr.Zero;
3232
}
3333
}
3434

3535
public async ValueTask DisposeAsync() => await Task.Run(Dispose);
3636

37-
public override string ToString() => $"{GetType().Name} {{ 0x{_clientPointer:X} {_clientInfo} }}";
37+
public override string ToString() => $"{GetType().Name} {{ 0x{ClientPointer:X} {_clientInfo} }}";
3838

39-
public override int GetHashCode() => (int)_clientPointer;
39+
public override int GetHashCode() => (int)ClientPointer;
4040

4141
#endregion public methods
4242

@@ -49,11 +49,11 @@ protected static async Task<T> CreateClient<T>(BaseClientConfiguration config, F
4949
nint failureCallbackPointer = Marshal.GetFunctionPointerForDelegate(client._failureCallbackDelegate);
5050

5151
using FFI.ConnectionConfig request = config.Request.ToFfi();
52-
Message message = client._messageContainer.GetMessageForCall();
52+
Message message = client.MessageContainer.GetMessageForCall();
5353
CreateClientFfi(request.ToPtr(), successCallbackPointer, failureCallbackPointer);
54-
client._clientPointer = await message; // This will throw an error thru failure callback if any
54+
client.ClientPointer = await message; // This will throw an error thru failure callback if any
5555

56-
if (client._clientPointer == IntPtr.Zero)
56+
if (client.ClientPointer == IntPtr.Zero)
5757
{
5858
throw new ConnectionException("Failed creating a client");
5959
}
@@ -65,7 +65,7 @@ protected BaseClient()
6565
{
6666
_successCallbackDelegate = SuccessCallback;
6767
_failureCallbackDelegate = FailureCallback;
68-
_messageContainer = new(this);
68+
MessageContainer = new(this);
6969
}
7070

7171
protected internal delegate T ResponseHandler<T>(IntPtr response);
@@ -83,8 +83,8 @@ internal virtual async Task<T> Command<R, T>(Cmd<R, T> command, Route? route = n
8383
using FFI.Route? ffiRoute = route?.ToFfi();
8484

8585
// 3. Sumbit request to the rust part
86-
Message message = _messageContainer.GetMessageForCall();
87-
CommandFfi(_clientPointer, (ulong)message.Index, cmd.ToPtr(), ffiRoute?.ToPtr() ?? IntPtr.Zero);
86+
Message message = MessageContainer.GetMessageForCall();
87+
CommandFfi(ClientPointer, (ulong)message.Index, cmd.ToPtr(), ffiRoute?.ToPtr() ?? IntPtr.Zero);
8888

8989
// 4. Get a response and Handle it
9090
IntPtr response = await message;
@@ -109,8 +109,8 @@ internal virtual async Task<T> Command<R, T>(Cmd<R, T> command, Route? route = n
109109
using FFI.BatchOptions? ffiOptions = options?.ToFfi();
110110

111111
// 3. Sumbit request to the rust part
112-
Message message = _messageContainer.GetMessageForCall();
113-
BatchFfi(_clientPointer, (ulong)message.Index, ffiBatch.ToPtr(), raiseOnError, ffiOptions?.ToPtr() ?? IntPtr.Zero);
112+
Message message = MessageContainer.GetMessageForCall();
113+
BatchFfi(ClientPointer, (ulong)message.Index, ffiBatch.ToPtr(), raiseOnError, ffiOptions?.ToPtr() ?? IntPtr.Zero);
114114

115115
// 4. Get a response and Handle it
116116
IntPtr response = await message;
@@ -137,16 +137,22 @@ internal virtual async Task<T> Command<R, T>(Cmd<R, T> command, Route? route = n
137137
protected static readonly Version DefaultServerVersion = new(8, 0, 0);
138138
#endregion protected fields
139139

140+
#region internal fields
141+
/// Raw pointer to the underlying native client.
142+
internal IntPtr ClientPointer;
143+
internal readonly MessageContainer MessageContainer;
144+
#endregion internal fields
145+
140146
#region private methods
141147
private void SuccessCallback(ulong index, IntPtr ptr) =>
142148
// Work needs to be offloaded from the calling thread, because otherwise we might starve the client's thread pool.
143-
Task.Run(() => _messageContainer.GetMessage((int)index).SetResult(ptr));
149+
Task.Run(() => MessageContainer.GetMessage((int)index).SetResult(ptr));
144150

145151
private void FailureCallback(ulong index, IntPtr strPtr, RequestErrorType errType)
146152
{
147153
string str = Marshal.PtrToStringAnsi(strPtr)!;
148154
// Work needs to be offloaded from the calling thread, because otherwise we might starve the client's thread pool.
149-
_ = Task.Run(() => _messageContainer.GetMessage((int)index).SetException(Create(errType, str)));
155+
_ = Task.Run(() => MessageContainer.GetMessage((int)index).SetException(Create(errType, str)));
150156
}
151157

152158
~BaseClient() => Dispose();
@@ -155,83 +161,6 @@ private void FailureCallback(ulong index, IntPtr strPtr, RequestErrorType errTyp
155161

156162
protected abstract Task<Version> GetServerVersionAsync();
157163

158-
/// <summary>
159-
/// Executes a cluster scan command with the given cursor and arguments.
160-
/// </summary>
161-
/// <param name="cursor">The cursor for the scan iteration.</param>
162-
/// <param name="args">Additional arguments for the scan command.</param>
163-
/// <returns>A tuple containing the next cursor and the keys found in this iteration.</returns>
164-
protected async Task<(string cursor, ValkeyKey[] keys)> ClusterScanCommand(string cursor, string[] args)
165-
{
166-
var message = _messageContainer.GetMessageForCall();
167-
IntPtr cursorPtr = Marshal.StringToHGlobalAnsi(cursor);
168-
169-
IntPtr[]? argPtrs = null;
170-
IntPtr argsPtr = IntPtr.Zero;
171-
IntPtr argLengthsPtr = IntPtr.Zero;
172-
173-
try
174-
{
175-
if (args.Length > 0)
176-
{
177-
// 1. Get a pointer to the array of argument string pointers.
178-
// Example: if args = ["MATCH", "key*"], then argPtrs[0] points
179-
// to "MATCH", argPtrs[1] points to "key*", and argsPtr points
180-
// to the argsPtrs array.
181-
argPtrs = [.. args.Select(Marshal.StringToHGlobalAnsi)];
182-
argsPtr = Marshal.AllocHGlobal(IntPtr.Size * args.Length);
183-
Marshal.Copy(argPtrs, 0, argsPtr, args.Length);
184-
185-
// 2. Get a pointer to an array of argument string lengths.
186-
// Example: if args = ["MATCH", "key*"], then argLengths[0] = 5
187-
// (length of "MATCH"), argLengths[1] = 4 (length of "key*"),
188-
// and argLengthsPtr points to the argLengths array.
189-
var argLengths = args.Select(arg => (ulong)arg.Length).ToArray();
190-
argLengthsPtr = Marshal.AllocHGlobal(sizeof(ulong) * args.Length);
191-
Marshal.Copy(argLengths.Select(l => (long)l).ToArray(), 0, argLengthsPtr, args.Length);
192-
}
193-
194-
// Submit request to Rust and wait for response.
195-
RequestClusterScanFfi(_clientPointer, (ulong)message.Index, cursorPtr, (ulong)args.Length, argsPtr, argLengthsPtr);
196-
IntPtr response = await message;
197-
198-
try
199-
{
200-
var result = HandleResponse(response);
201-
var array = (object[])result!;
202-
var nextCursor = array[0]!.ToString()!;
203-
var keys = ((object[])array[1]!).Select(k => new ValkeyKey(k!.ToString())).ToArray();
204-
return (nextCursor, keys);
205-
}
206-
finally
207-
{
208-
FreeResponse(response);
209-
}
210-
}
211-
finally
212-
{
213-
// Clean up args memory
214-
if (argLengthsPtr != IntPtr.Zero)
215-
{
216-
Marshal.FreeHGlobal(argLengthsPtr);
217-
}
218-
219-
if (argsPtr != IntPtr.Zero)
220-
{
221-
Marshal.FreeHGlobal(argsPtr);
222-
}
223-
224-
if (argPtrs != null)
225-
{
226-
Array.ForEach(argPtrs, Marshal.FreeHGlobal);
227-
}
228-
229-
// Clean up cursor in Rust
230-
RemoveClusterScanCursorFfi(cursorPtr);
231-
Marshal.FreeHGlobal(cursorPtr);
232-
}
233-
}
234-
235164
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
236165
private delegate void SuccessAction(ulong index, IntPtr ptr);
237166
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
@@ -248,9 +177,6 @@ private void FailureCallback(ulong index, IntPtr strPtr, RequestErrorType errTyp
248177
/// and held in order to prevent the cost of marshalling on each function call.
249178
private readonly SuccessAction _successCallbackDelegate;
250179

251-
/// Raw pointer to the underlying native client.
252-
private IntPtr _clientPointer;
253-
private readonly MessageContainer _messageContainer;
254180
private readonly object _lock = new();
255181
private string _clientInfo = ""; // used to distinguish and identify clients during tests
256182

sources/Valkey.Glide/GlideClusterClient.cs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
22

3+
using System.Runtime.InteropServices;
4+
35
using Valkey.Glide.Commands;
46
using Valkey.Glide.Commands.Options;
57
using Valkey.Glide.Internals;
68
using Valkey.Glide.Pipeline;
79

810
using static Valkey.Glide.ConnectionConfiguration;
911
using static Valkey.Glide.Errors;
12+
using static Valkey.Glide.Internals.FFI;
13+
using static Valkey.Glide.Internals.ResponseHandler;
1014
using static Valkey.Glide.Pipeline.Options;
1115
using static Valkey.Glide.Route;
1216

@@ -319,6 +323,83 @@ protected override async Task<Version> GetServerVersionAsync()
319323
return _serverVersion;
320324
}
321325

326+
/// <summary>
327+
/// Executes a cluster scan command with the given cursor and arguments.
328+
/// </summary>
329+
/// <param name="cursor">The cursor for the scan iteration.</param>
330+
/// <param name="args">Additional arguments for the scan command.</param>
331+
/// <returns>A tuple containing the next cursor and the keys found in this iteration.</returns>
332+
private async Task<(string cursor, ValkeyKey[] keys)> ClusterScanCommand(string cursor, string[] args)
333+
{
334+
var message = MessageContainer.GetMessageForCall();
335+
IntPtr cursorPtr = Marshal.StringToHGlobalAnsi(cursor);
336+
337+
IntPtr[]? argPtrs = null;
338+
IntPtr argsPtr = IntPtr.Zero;
339+
IntPtr argLengthsPtr = IntPtr.Zero;
340+
341+
try
342+
{
343+
if (args.Length > 0)
344+
{
345+
// 1. Get a pointer to the array of argument string pointers.
346+
// Example: if args = ["MATCH", "key*"], then argPtrs[0] points
347+
// to "MATCH", argPtrs[1] points to "key*", and argsPtr points
348+
// to the argsPtrs array.
349+
argPtrs = [.. args.Select(Marshal.StringToHGlobalAnsi)];
350+
argsPtr = Marshal.AllocHGlobal(IntPtr.Size * args.Length);
351+
Marshal.Copy(argPtrs, 0, argsPtr, args.Length);
352+
353+
// 2. Get a pointer to an array of argument string lengths.
354+
// Example: if args = ["MATCH", "key*"], then argLengths[0] = 5
355+
// (length of "MATCH"), argLengths[1] = 4 (length of "key*"),
356+
// and argLengthsPtr points to the argLengths array.
357+
var argLengths = args.Select(arg => (ulong)arg.Length).ToArray();
358+
argLengthsPtr = Marshal.AllocHGlobal(sizeof(ulong) * args.Length);
359+
Marshal.Copy(argLengths.Select(l => (long)l).ToArray(), 0, argLengthsPtr, args.Length);
360+
}
361+
362+
// Submit request to Rust and wait for response.
363+
RequestClusterScanFfi(ClientPointer, (ulong)message.Index, cursorPtr, (ulong)args.Length, argsPtr, argLengthsPtr);
364+
IntPtr response = await message;
365+
366+
try
367+
{
368+
var result = HandleResponse(response);
369+
var array = (object[])result!;
370+
var nextCursor = array[0]!.ToString()!;
371+
var keys = ((object[])array[1]!).Select(k => new ValkeyKey(k!.ToString())).ToArray();
372+
return (nextCursor, keys);
373+
}
374+
finally
375+
{
376+
FreeResponse(response);
377+
}
378+
}
379+
finally
380+
{
381+
// Clean up args memory
382+
if (argLengthsPtr != IntPtr.Zero)
383+
{
384+
Marshal.FreeHGlobal(argLengthsPtr);
385+
}
386+
387+
if (argsPtr != IntPtr.Zero)
388+
{
389+
Marshal.FreeHGlobal(argsPtr);
390+
}
391+
392+
if (argPtrs != null)
393+
{
394+
Array.ForEach(argPtrs, Marshal.FreeHGlobal);
395+
}
396+
397+
// Clean up cursor in Rust
398+
RemoveClusterScanCursorFfi(cursorPtr);
399+
Marshal.FreeHGlobal(cursorPtr);
400+
}
401+
}
402+
322403
/// <summary>
323404
/// Iterates incrementally over keys in the cluster.
324405
/// </summary>

0 commit comments

Comments
 (0)