Skip to content

Commit 14b4487

Browse files
committed
Revert merge cancel change. Causes dispose problems.
1 parent 8a9ee97 commit 14b4487

File tree

2 files changed

+6
-15
lines changed

2 files changed

+6
-15
lines changed

src/kafka-net/Common/Extensions.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Diagnostics;
43
using System.Diagnostics.Contracts;
54
using System.Linq;
6-
using System.Runtime.CompilerServices;
75
using System.Text;
86
using System.Threading.Tasks;
97
using System.Threading;

src/kafka-net/KafkaTcpSocket.cs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public KafkaTcpSocket(IKafkaLog log, KafkaEndpoint endpoint, TimeSpan? maximumRe
6060
/// <returns>Returns a byte[] array with the size of readSize.</returns>
6161
public Task<byte[]> ReadAsync(int readSize)
6262
{
63-
return EnsureReadAsync(readSize, CancellationToken.None);
63+
return EnsureReadAsync(readSize, _disposeToken.Token);
6464
}
6565

6666
/// <summary>
@@ -71,11 +71,7 @@ public Task<byte[]> ReadAsync(int readSize)
7171
/// <returns>Returns a byte[] array with the size of readSize.</returns>
7272
public Task<byte[]> ReadAsync(int readSize, CancellationToken cancellationToken)
7373
{
74-
using (var mergedCancel = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken,
75-
_disposeToken.Token))
76-
{
77-
return EnsureReadAsync(readSize, mergedCancel.Token);
78-
}
74+
return EnsureReadAsync(readSize, cancellationToken);
7975
}
8076

8177
/// <summary>
@@ -85,7 +81,7 @@ public Task<byte[]> ReadAsync(int readSize, CancellationToken cancellationToken)
8581
/// <returns>Returns Task handle to the write operation.</returns>
8682
public Task WriteAsync(byte[] buffer)
8783
{
88-
return WriteAsync(buffer, CancellationToken.None);
84+
return WriteAsync(buffer, _disposeToken.Token);
8985
}
9086

9187

@@ -97,11 +93,7 @@ public Task WriteAsync(byte[] buffer)
9793
/// <returns>Returns Task handle to the write operation.</returns>
9894
public Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
9995
{
100-
using (var mergedCancel = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken,
101-
_disposeToken.Token))
102-
{
103-
return EnsureWriteAsync(buffer, 0, buffer.Length, mergedCancel.Token);
104-
}
96+
return EnsureWriteAsync(buffer, 0, buffer.Length, cancellationToken);
10597
}
10698
#endregion
10799

@@ -142,7 +134,8 @@ private async Task<byte[]> EnsureReadAsync(int readSize, CancellationToken cance
142134
{
143135
//reading from network stream is not thread safe
144136
//https://msdn.microsoft.com/en-us/library/z2xae4f4.aspx
145-
bytesReceived = await netStream.ReadAsync(buffer, 0, readSize, cancellationToken).ConfigureAwait(false);
137+
bytesReceived = await netStream.ReadAsync(buffer, 0, readSize, cancellationToken)
138+
.WithCancellation(cancellationToken).ConfigureAwait(false);
146139

147140
if (bytesReceived <= 0)
148141
{

0 commit comments

Comments
 (0)