Skip to content

Commit 6cb59e2

Browse files
Final merge updates for AsyncEnumerable library support.
1 parent b9f6ff3 commit 6cb59e2

File tree

2 files changed

+44
-41
lines changed

2 files changed

+44
-41
lines changed

RSocket.Core/IRSocketStream.cs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
using System;
1+
using System;
22
using System.Buffers;
33
using System.Threading.Tasks;
44

55
namespace RSocket
66
{
7-
/// <summary>
8-
/// A stream of items from an RSocket. This is simply an Observer of the protocol's tuples.
9-
/// </summary>
10-
public interface IRSocketStream : IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>
11-
{
12-
}
7+
///// <summary>
8+
///// A stream of items from an RSocket. This is simply an Observer of the protocol's tuples.
9+
///// </summary>
10+
//public interface IRSocketStream : IObserver<(ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)>
11+
//{
12+
//}
13+
14+
//Not yet, backpressure!
1315
}

RSocket.Core/RSocket.cs

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Buffers;
33
using System.Collections.Concurrent;
44
using System.Collections.Generic;
@@ -85,7 +85,7 @@ public async Task<IRSocketChannel> RequestChannel(IRSocketStream stream, ReadOnl
8585
return channel;
8686
}
8787

88-
protected class Channel : IRSocketChannel //TODO hmmm...
88+
protected class Channel : IRSocketChannel //TODO hmmm...
8989
{
9090
readonly RSocket Socket;
9191
readonly int Stream;
@@ -219,37 +219,38 @@ async Task Stream(int stream)
219219
// IAsyncEnumerable<(ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata)> Outgoing)>
220220
// Channeler { get; set; } = (request, incoming) => throw new NotImplementedException();
221221

222-
//void IRSocketProtocol.RequestChannel(in RSocketProtocol.RequestChannel message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)
223-
//{
224-
// Channel(message.Stream).Start();
225-
226-
// //new Receiver<bool>()
227-
228-
// //new Receiver<bool>(stream => RequestFireAndForget(stream, data, metadata), _ => true).ExecuteAsync(result: true);
229-
// //var id = StreamDispatch(stream);
230-
231-
// async Task Channel(int stream)
232-
// {
233-
// var (Incoming, Outoing) = Channeler((data, metadata)); //TODO Handle Errors.
234-
235-
236-
// using (observable.Subscribe())
237-
// {
238-
// var enumerator = source.GetAsyncEnumerator();
239-
// try
240-
// {
241-
// while (await enumerator.MoveNextAsync())
242-
// {
243-
// var (Data, Metadata) = enumerator.Current;
244-
// new RSocketProtocol.Payload(stream, Data, Metadata, next: true).Write(Transport.Output, Data, Metadata);
245-
// await Transport.Output.FlushAsync();
246-
// }
247-
// new RSocketProtocol.Payload(stream, complete: true).Write(Transport.Output);
248-
// await Transport.Output.FlushAsync();
249-
// }
250-
// finally { await enumerator.DisposeAsync(); }
251-
// }
252-
// }
253-
//}
222+
void IRSocketProtocol.RequestChannel(in RSocketProtocol.RequestChannel message, ReadOnlySequence<byte> metadata, ReadOnlySequence<byte> data)
223+
{
224+
throw new NotImplementedException();
225+
// Channel(message.Stream).Start();
226+
227+
// //new Receiver<bool>()
228+
229+
// //new Receiver<bool>(stream => RequestFireAndForget(stream, data, metadata), _ => true).ExecuteAsync(result: true);
230+
// //var id = StreamDispatch(stream);
231+
232+
// async Task Channel(int stream)
233+
// {
234+
// var (Incoming, Outoing) = Channeler((data, metadata)); //TODO Handle Errors.
235+
236+
237+
// using (observable.Subscribe())
238+
// {
239+
// var enumerator = source.GetAsyncEnumerator();
240+
// try
241+
// {
242+
// while (await enumerator.MoveNextAsync())
243+
// {
244+
// var (Data, Metadata) = enumerator.Current;
245+
// new RSocketProtocol.Payload(stream, Data, Metadata, next: true).Write(Transport.Output, Data, Metadata);
246+
// await Transport.Output.FlushAsync();
247+
// }
248+
// new RSocketProtocol.Payload(stream, complete: true).Write(Transport.Output);
249+
// await Transport.Output.FlushAsync();
250+
// }
251+
// finally { await enumerator.DisposeAsync(); }
252+
// }
253+
// }
254+
}
254255
}
255256
}

0 commit comments

Comments
 (0)