Skip to content

Commit 855f909

Browse files
authored
Merge pull request #200 from DataDog/vickenty/buffer-clear
Clear buffers before returning them to the pool
2 parents e4fc80f + 0abcbca commit 855f909

File tree

2 files changed

+60
-1
lines changed

2 files changed

+60
-1
lines changed

src/main/java/com/timgroup/statsd/StatsDSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ void sendLoop() {
7878
try {
7979

8080
if (buffer != null) {
81+
buffer.clear();
8182
pool.put(buffer);
8283
}
8384

@@ -91,7 +92,6 @@ void sendLoop() {
9192
buffer.flip();
9293
final int sentBytes = clientChannel.write(buffer);
9394

94-
buffer.clear();
9595
if (sizeOfBuffer != sentBytes) {
9696
throw new IOException(
9797
String.format("Could not send stat %s entirely to %s. Only sent %d out of %d bytes",

src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import java.util.Random;
1919
import java.util.concurrent.Callable;
2020
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.ArrayBlockingQueue;
22+
import java.util.concurrent.LinkedBlockingQueue;
23+
import java.util.concurrent.BlockingQueue;
2124
import java.util.logging.Logger;
2225
import java.text.NumberFormat;
2326

@@ -1735,4 +1738,60 @@ public int write(ByteBuffer data) throws IOException {
17351738
client.close();
17361739
assertEquals(true, blocking_close_sent);
17371740
}
1741+
1742+
1743+
@Test(timeout = 5000L)
1744+
public void failed_write_buffer() throws Exception {
1745+
final BlockingQueue sync = new ArrayBlockingQueue(1);
1746+
final IOException marker = new IOException();
1747+
NonBlockingStatsDClientBuilder builder = new NonBlockingStatsDClientBuilder() {
1748+
@Override
1749+
public NonBlockingStatsDClient build() {
1750+
this.originDetectionEnabled(false);
1751+
this.bufferPoolSize(1);
1752+
return new NonBlockingStatsDClient(resolve()) {
1753+
@Override
1754+
ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeout, int bufferSize) throws Exception {
1755+
return new DatagramClientChannel(addressLookup.call()) {
1756+
@Override
1757+
public int write(ByteBuffer data) throws IOException {
1758+
try {
1759+
sync.put(new Object());
1760+
} catch (InterruptedException e) {
1761+
}
1762+
throw marker;
1763+
}
1764+
};
1765+
}
1766+
};
1767+
}
1768+
};
1769+
1770+
final BlockingQueue errors = new LinkedBlockingQueue();
1771+
NonBlockingStatsDClient client = builder
1772+
.hostname("localhost")
1773+
.blocking(true)
1774+
.errorHandler(new StatsDClientErrorHandler() {
1775+
@Override
1776+
public void handle(Exception ex) {
1777+
if (ex == marker) {
1778+
return;
1779+
}
1780+
System.out.println(ex.toString());
1781+
try {
1782+
errors.put(ex);
1783+
} catch (InterruptedException e) {
1784+
}
1785+
}
1786+
1787+
})
1788+
.build();
1789+
1790+
client.gauge("test", 1);
1791+
sync.take();
1792+
client.gauge("test", 1);
1793+
client.stop();
1794+
1795+
assertEquals(0, errors.size());
1796+
}
17381797
}

0 commit comments

Comments
 (0)