Skip to content

Commit b4adb1f

Browse files
clear buffers before returning to pool
1 parent 740e5ce commit b4adb1f

File tree

2 files changed

+59
-1
lines changed

2 files changed

+59
-1
lines changed

src/main/java/com/timgroup/statsd/StatsDSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ void sendLoop() {
8181
try {
8282

8383
if (buffer != null) {
84+
buffer.clear();
8485
pool.put(buffer);
8586
}
8687

@@ -94,7 +95,6 @@ void sendLoop() {
9495
buffer.flip();
9596
final int sentBytes = clientChannel.send(buffer, address);
9697

97-
buffer.clear();
9898
if (sizeOfBuffer != sentBytes) {
9999
throw new IOException(
100100
String.format("Could not send stat %s entirely to %s. Only sent %d out of %d bytes",

src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import java.util.Random;
1717
import java.util.concurrent.Callable;
1818
import java.util.concurrent.CountDownLatch;
19+
import java.util.concurrent.ArrayBlockingQueue;
20+
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.concurrent.BlockingQueue;
1922
import java.util.logging.Logger;
2023

2124
import static org.hamcrest.MatcherAssert.assertThat;
@@ -1337,4 +1340,59 @@ public SlowStatsDNonBlockingStatsDClient build() throws StatsDClientException {
13371340
}
13381341
}
13391342
}
1343+
1344+
@Test(timeout = 5000L)
1345+
public void failed_write_buffer() throws Exception {
1346+
final BlockingQueue sync = new ArrayBlockingQueue(1);
1347+
final IOException marker = new IOException();
1348+
NonBlockingStatsDClientBuilder builder = new NonBlockingStatsDClientBuilder() {
1349+
@Override
1350+
public NonBlockingStatsDClient build() {
1351+
this.originDetectionEnabled(false);
1352+
this.bufferPoolSize(1);
1353+
return new NonBlockingStatsDClient(resolve()) {
1354+
@Override
1355+
ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeout, int bufferSize) throws Exception {
1356+
return new DatagramClientChannel(addressLookup.call()) {
1357+
@Override
1358+
public int write(ByteBuffer data) throws IOException {
1359+
try {
1360+
sync.put(new Object());
1361+
} catch (InterruptedException e) {
1362+
}
1363+
throw marker;
1364+
}
1365+
};
1366+
}
1367+
};
1368+
}
1369+
};
1370+
1371+
final BlockingQueue errors = new LinkedBlockingQueue();
1372+
NonBlockingStatsDClient client = builder
1373+
.hostname("localhost")
1374+
.blocking(true)
1375+
.errorHandler(new StatsDClientErrorHandler() {
1376+
@Override
1377+
public void handle(Exception ex) {
1378+
if (ex == marker) {
1379+
return;
1380+
}
1381+
System.out.println(ex.toString());
1382+
try {
1383+
errors.put(ex);
1384+
} catch (InterruptedException e) {
1385+
}
1386+
}
1387+
1388+
})
1389+
.build();
1390+
1391+
client.gauge("test", 1);
1392+
sync.take();
1393+
client.gauge("test", 1);
1394+
client.stop();
1395+
1396+
assertEquals(0, errors.size());
1397+
}
13401398
}

0 commit comments

Comments
 (0)