Skip to content

Commit 97ddf05

Browse files
committed
Add dodgy hook when a subscriber is established
1 parent 83b6a56 commit 97ddf05

File tree

7 files changed

+80
-8
lines changed

7 files changed

+80
-8
lines changed

source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public void SetUp()
2727
stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server));
2828
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
2929
var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits);
30-
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For<ILog>());
30+
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For<ILog>(), new NullSubscriberObserver());
3131
}
3232

3333
// TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation

source/Halibut.Tests/Transport/SecureClientFixture.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt()
6767
var connection = Substitute.For<IConnection>();
6868
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
6969
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
70-
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log));
70+
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log, new NullSubscriberObserver()));
7171

7272
await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None);
7373
}
@@ -96,7 +96,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger)
9696
{
9797
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
9898
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
99-
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger);
99+
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger, new NullSubscriberObserver());
100100
}
101101
}
102102
}

source/Halibut/HalibutRuntime.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class HalibutRuntime : IHalibutRuntime
4646
readonly ISecureConnectionObserver secureConnectionObserver;
4747
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
4848
readonly IControlMessageObserver controlMessageObserver;
49+
readonly ISubscriberObserver subscriberObserver;
4950

5051
internal HalibutRuntime(
5152
IServiceFactory serviceFactory,
@@ -61,8 +62,8 @@ internal HalibutRuntime(
6162
IRpcObserver rpcObserver,
6263
IConnectionsObserver connectionsObserver,
6364
IControlMessageObserver controlMessageObserver,
64-
ISecureConnectionObserver secureConnectionObserver
65-
)
65+
ISecureConnectionObserver secureConnectionObserver,
66+
ISubscriberObserver subscriberObserver)
6667
{
6768
this.serverCertificate = serverCertificate;
6869
this.trustProvider = trustProvider;
@@ -77,6 +78,7 @@ ISecureConnectionObserver secureConnectionObserver
7778
TimeoutsAndLimits = halibutTimeoutsAndLimits;
7879
this.connectionsObserver = connectionsObserver;
7980
this.secureConnectionObserver = secureConnectionObserver;
81+
this.subscriberObserver = subscriberObserver;
8082
this.controlMessageObserver = controlMessageObserver;
8183

8284
connectionManager = new ConnectionManagerAsync();
@@ -118,7 +120,12 @@ public int Listen(int port)
118120

119121
ExchangeProtocolBuilder ExchangeProtocolBuilder()
120122
{
121-
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, log);
123+
return (stream, log) => new MessageExchangeProtocol(
124+
new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log),
125+
TimeoutsAndLimits,
126+
activeTcpConnectionsLimiter,
127+
log,
128+
subscriberObserver);
122129
}
123130

124131
public int Listen(IPEndPoint endpoint)

source/Halibut/HalibutRuntimeBuilder.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class HalibutRuntimeBuilder
3131
ISecureConnectionObserver? secureConnectionObserver;
3232
IControlMessageObserver? controlMessageObserver;
3333
MessageStreamWrappers queueMessageStreamWrappers = new();
34+
ISubscriberObserver? subscriberObserver;
3435

3536
public HalibutRuntimeBuilder WithQueueMessageStreamWrappers(MessageStreamWrappers queueMessageStreamWrappers)
3637
{
@@ -50,6 +51,12 @@ public HalibutRuntimeBuilder WithSecureConnectionObserver(ISecureConnectionObser
5051
return this;
5152
}
5253

54+
public HalibutRuntimeBuilder WithSubscriptionObserver(ISubscriberObserver subscriberObserver)
55+
{
56+
this.subscriberObserver = subscriberObserver;
57+
return this;
58+
}
59+
5360
internal HalibutRuntimeBuilder WithStreamFactory(IStreamFactory streamFactory)
5461
{
5562
this.streamFactory = streamFactory;
@@ -185,6 +192,7 @@ public HalibutRuntime Build()
185192
var secureConnectionObserver = this.secureConnectionObserver ?? NoOpSecureConnectionObserver.Instance;
186193
var rpcObserver = this.rpcObserver ?? new NoRpcObserver();
187194
var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver();
195+
var subscriberObserver = this.subscriberObserver ?? new NullSubscriberObserver();
188196

189197
var halibutRuntime = new HalibutRuntime(
190198
serviceFactory,
@@ -200,7 +208,8 @@ public HalibutRuntime Build()
200208
rpcObserver,
201209
connectionsObserver,
202210
controlMessageObserver,
203-
secureConnectionObserver
211+
secureConnectionObserver,
212+
subscriberObserver
204213
);
205214

206215
if (onUnauthorizedClientConnect is not null)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using System.Threading;
16+
using System.Threading.Tasks;
17+
18+
namespace Halibut.Transport.Observability
19+
{
20+
public interface ISubscriberObserver
21+
{
22+
Task SubscriberConnected(string uri, CancellationToken cancellationToken);
23+
}
24+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using System.Threading;
16+
using System.Threading.Tasks;
17+
18+
namespace Halibut.Transport.Observability
19+
{
20+
public class NullSubscriberObserver : ISubscriberObserver
21+
{
22+
public Task SubscriberConnected(string uri, CancellationToken cancellationToken)
23+
{
24+
return Task.CompletedTask;
25+
}
26+
}
27+
}

source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Halibut.Diagnostics;
66
using Halibut.Exceptions;
77
using Halibut.ServiceModel;
8+
using Halibut.Transport.Observability;
89

910
namespace Halibut.Transport.Protocol
1011
{
@@ -21,15 +22,18 @@ public class MessageExchangeProtocol
2122
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
2223
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
2324
readonly ILog log;
25+
readonly ISubscriberObserver subscriberObserver;
2426
bool identified;
2527
volatile bool acceptClientRequests = true;
2628

27-
public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log)
29+
public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log,
30+
ISubscriberObserver subscriberObserver)
2831
{
2932
this.stream = stream;
3033
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
3134
this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter;
3235
this.log = log;
36+
this.subscriberObserver = subscriberObserver;
3337
}
3438

3539
public async Task<ResponseMessage> ExchangeAsClientAsync(RequestMessage request, CancellationToken cancellationToken)
@@ -112,6 +116,7 @@ public async Task ExchangeAsServerAsync(Func<RequestMessage, Task<ResponseMessag
112116
//if the remote identity is a subscriber, we might need to limit their active TCP connections
113117
if (identity.IdentityType == RemoteIdentityType.Subscriber)
114118
{
119+
await subscriberObserver.SubscriberConnected(identity.SubscriptionId.ToString(), cancellationToken);
115120
limitedConnectionLease = activeTcpConnectionsLimiter.LeaseActiveTcpConnection(identity.SubscriptionId);
116121
}
117122

0 commit comments

Comments
 (0)