diff --git a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs index 1cb9371d9..4043a27ec 100644 --- a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs @@ -29,6 +29,11 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource, shouldCancelOnDequeue, onResponseApplied); } + public Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken) + { + return Task.FromResult(CreateQueue(endpoint)); + } + class Decorator : IPendingRequestQueue { readonly CancellationTokenSource cancellationTokenSource; diff --git a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs index 63e0ca75b..4b03d05d2 100644 --- a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs @@ -28,6 +28,11 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSources); } + public Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken) + { + return Task.FromResult(CreateQueue(endpoint)); + } + class Decorator : IPendingRequestQueue { readonly CancellationTokenSource[] cancellationTokenSources; diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index 70b11fdd7..a03e0ef59 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -27,7 +27,7 @@ public void SetUp() stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server)); var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits); - protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For()); + protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, NoSubscribersObserver.Instance, Substitute.For()); } // TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation diff --git a/source/Halibut.Tests/Transport/SecureClientFixture.cs b/source/Halibut.Tests/Transport/SecureClientFixture.cs index 3a8a83673..3ef1af270 100644 --- a/source/Halibut.Tests/Transport/SecureClientFixture.cs +++ b/source/Halibut.Tests/Transport/SecureClientFixture.cs @@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt() var connection = Substitute.For(); var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits); - connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log)); + connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, NoSubscribersObserver.Instance, log)); await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None); } @@ -96,7 +96,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger) { var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits); - return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger); + return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, NoSubscribersObserver.Instance, logger); } } } \ No newline at end of file diff --git a/source/Halibut.Tests/Transport/SecureListenerFixture.cs b/source/Halibut.Tests/Transport/SecureListenerFixture.cs index 0914aad3b..5ff09696b 100644 --- a/source/Halibut.Tests/Transport/SecureListenerFixture.cs +++ b/source/Halibut.Tests/Transport/SecureListenerFixture.cs @@ -73,7 +73,8 @@ public async Task SecureListenerDoesNotCreateHundredsOfIoEventsPerSecondOnWindow (_, _) => UnauthorizedClientConnectResponse.BlockConnection, timeoutsAndLimits, new StreamFactory(), - NoOpConnectionsObserver.Instance + NoOpConnectionsObserver.Instance, + NoSubscribersObserver.Instance ); var idleAverage = CollectCounterValues(opsPerSec) diff --git a/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs b/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs index c14066816..7fb8b44c7 100644 --- a/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs @@ -1,4 +1,6 @@ using System; +using System.Threading; +using System.Threading.Tasks; using Halibut.ServiceModel; namespace Halibut.Tests.Util @@ -16,5 +18,10 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) { return createQueue(endpoint); } + + public Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken) + { + return Task.FromResult(createQueue(endpoint)); + } } } \ No newline at end of file diff --git a/source/Halibut/DataStream.cs b/source/Halibut/DataStream.cs index 414457fc0..9abf2ee4d 100644 --- a/source/Halibut/DataStream.cs +++ b/source/Halibut/DataStream.cs @@ -10,7 +10,7 @@ namespace Halibut { public class DataStream : IEquatable, IDataStreamInternal { - readonly Func writerAsync; + protected readonly Func writerAsync; IDataStreamReceiver? receiver; [JsonConstructor] diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index effb8abc9..1e7d5b41d 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -45,6 +45,7 @@ public class HalibutRuntime : IHalibutRuntime readonly IConnectionsObserver connectionsObserver; readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter; readonly IControlMessageObserver controlMessageObserver; + readonly ISubscribersObserver subscribersObserver; internal HalibutRuntime( IServiceFactory serviceFactory, @@ -59,7 +60,8 @@ internal HalibutRuntime( IStreamFactory streamFactory, IRpcObserver rpcObserver, IConnectionsObserver connectionsObserver, - IControlMessageObserver controlMessageObserver) + IControlMessageObserver controlMessageObserver, + ISubscribersObserver subscribersObserver) { this.serverCertificate = serverCertificate; this.trustProvider = trustProvider; @@ -74,6 +76,7 @@ internal HalibutRuntime( TimeoutsAndLimits = halibutTimeoutsAndLimits; this.connectionsObserver = connectionsObserver; this.controlMessageObserver = controlMessageObserver; + this.subscribersObserver = subscribersObserver; connectionManager = new ConnectionManagerAsync(); this.tcpConnectionFactory = new TcpConnectionFactory(serverCertificate, TimeoutsAndLimits, streamFactory); @@ -106,7 +109,7 @@ public int Listen(int port) ExchangeProtocolBuilder ExchangeProtocolBuilder() { - return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, log); + return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, subscribersObserver, log); } public int Listen(IPEndPoint endpoint) @@ -122,7 +125,8 @@ public int Listen(IPEndPoint endpoint) HandleUnauthorizedClientConnect, TimeoutsAndLimits, streamFactory, - connectionsObserver); + connectionsObserver, + subscribersObserver); listeners.DoWithExclusiveAccess(l => { diff --git a/source/Halibut/HalibutRuntimeBuilder.cs b/source/Halibut/HalibutRuntimeBuilder.cs index 377ec3a2e..c742054ec 100644 --- a/source/Halibut/HalibutRuntimeBuilder.cs +++ b/source/Halibut/HalibutRuntimeBuilder.cs @@ -27,6 +27,7 @@ public class HalibutRuntimeBuilder IRpcObserver? rpcObserver; IConnectionsObserver? connectionsObserver; IControlMessageObserver? controlMessageObserver; + ISubscribersObserver? identityObserver; public HalibutRuntimeBuilder WithConnectionsObserver(IConnectionsObserver connectionsObserver) { @@ -125,6 +126,12 @@ public HalibutRuntimeBuilder WithRpcObserver(IRpcObserver rpcObserver) return this; } + public HalibutRuntimeBuilder WithSubscribersObserver(ISubscribersObserver subscribersObserver) + { + this.identityObserver = subscribersObserver; + return this; + } + public HalibutRuntime Build() { var halibutTimeoutsAndLimits = this.halibutTimeoutsAndLimits; @@ -157,6 +164,7 @@ public HalibutRuntime Build() var connectionsObserver = this.connectionsObserver ?? NoOpConnectionsObserver.Instance; var rpcObserver = this.rpcObserver ?? new NoRpcObserver(); var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver(); + var identityObserver = this.identityObserver ?? NoSubscribersObserver.Instance; var halibutRuntime = new HalibutRuntime( serviceFactory, @@ -171,7 +179,8 @@ public HalibutRuntime Build() streamFactory, rpcObserver, connectionsObserver, - controlMessageObserver); + controlMessageObserver, + identityObserver); if (onUnauthorizedClientConnect is not null) { diff --git a/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs b/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs index 8d79150da..8874c3355 100644 --- a/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs +++ b/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs @@ -1,9 +1,12 @@ using System; +using System.Threading; +using System.Threading.Tasks; namespace Halibut.ServiceModel { public interface IPendingRequestQueueFactory { IPendingRequestQueue CreateQueue(Uri endpoint); + Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs b/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs index 3143ef372..c215543a9 100644 --- a/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs +++ b/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs @@ -1,4 +1,6 @@ using System; +using System.Threading; +using System.Threading.Tasks; using Halibut.Diagnostics; namespace Halibut.ServiceModel @@ -18,5 +20,10 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) { return new PendingRequestQueueAsync(halibutTimeoutsAndLimits, logFactory.ForEndpoint(endpoint)); } + + public Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken) + { + return Task.FromResult(CreateQueue(endpoint)); + } } } \ No newline at end of file diff --git a/source/Halibut/Transport/Observability/ISubscribersObserver.cs b/source/Halibut/Transport/Observability/ISubscribersObserver.cs new file mode 100644 index 000000000..ffbf8c4d2 --- /dev/null +++ b/source/Halibut/Transport/Observability/ISubscribersObserver.cs @@ -0,0 +1,25 @@ +// Copyright 2012-2013 Octopus Deploy Pty. Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using Halibut.Transport.Protocol; + +namespace Halibut.Transport.Observability +{ + public interface ISubscribersObserver + { + void SubscriberJoined(Uri subscriptionId); + void SubscriberLeft(Uri subscriptionId); + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Observability/NoSubscribersObserver.cs b/source/Halibut/Transport/Observability/NoSubscribersObserver.cs new file mode 100644 index 000000000..e2aaa2265 --- /dev/null +++ b/source/Halibut/Transport/Observability/NoSubscribersObserver.cs @@ -0,0 +1,32 @@ +// Copyright 2012-2013 Octopus Deploy Pty. Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using Halibut.Transport.Protocol; + +namespace Halibut.Transport.Observability +{ + public class NoSubscribersObserver : ISubscribersObserver + { + static NoSubscribersObserver? singleInstance; + public static NoSubscribersObserver Instance => singleInstance ??= new NoSubscribersObserver(); + public void SubscriberJoined(Uri subscriptionId) + { + } + + public void SubscriberLeft(Uri subscriptionId) + { + } + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs index 3f156705d..8379d05f5 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs @@ -5,6 +5,7 @@ using Halibut.Diagnostics; using Halibut.Exceptions; using Halibut.ServiceModel; +using Halibut.Transport.Observability; namespace Halibut.Transport.Protocol { @@ -20,15 +21,21 @@ public class MessageExchangeProtocol readonly IMessageExchangeStream stream; readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits; readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter; + readonly ISubscribersObserver subscribersObserver; readonly ILog log; bool identified; volatile bool acceptClientRequests = true; - public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log) + public MessageExchangeProtocol(IMessageExchangeStream stream, + HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, + IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, + ISubscribersObserver subscribersObserver, + ILog log) { this.stream = stream; this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits; this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter; + this.subscribersObserver = subscribersObserver; this.log = log; } @@ -125,9 +132,17 @@ public async Task ExchangeAsServerAsync(Func unauthorizedClientConnect, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IStreamFactory streamFactory, - IConnectionsObserver connectionsObserver) + IConnectionsObserver connectionsObserver, + ISubscribersObserver subscribersObserver) { this.endPoint = endPoint; this.serverCertificate = serverCertificate; @@ -81,6 +83,7 @@ public SecureListener( this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits; this.streamFactory = streamFactory; this.connectionsObserver = connectionsObserver; + this.subscribersObserver = subscribersObserver; this.cts = new CancellationTokenSource(); this.cancellationToken = cts.Token;