Skip to content

Commit 8d47dbe

Browse files
authored
Merge pull request #42 from DataDog/xvello/uds
Add UDS support
2 parents 0e03131 + aba76f1 commit 8d47dbe

File tree

7 files changed

+234
-38
lines changed

7 files changed

+234
-38
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,17 @@ public class Foo {
6060
}
6161
}
6262
```
63+
64+
Unix Domain Socket support
65+
---------------------------
66+
67+
As an alternative to UDP, Agent6 can receive metrics via a UNIX Socket (on Linux only). This library supports
68+
transmission via this protocol. To use it, simply pass the socket path as a hostname, and 0 as port.
69+
70+
By default, all exceptions are ignored, mimicking UDP behaviour. When using Unix Sockets, transmission errors will
71+
trigger exceptions you can choose to handle by passing a `StatsDClientErrorHandler`:
72+
73+
- Connection error because of an invalid/missing socket will trigger a `java.io.IOException: No such file or directory`
74+
- If dogstatsd's reception buffer were to fill up, the send will timeout after 100ms and throw either a
75+
`java.io.IOException: No buffer space available` or a `java.io.IOException: Resource temporarily unavailable`
76+

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@
6666
<scope>test</scope>
6767
<version>4.12</version>
6868
</dependency>
69+
<dependency>
70+
<groupId>com.github.jnr</groupId>
71+
<artifactId>jnr-unixsocket</artifactId>
72+
<version>0.18</version>
73+
</dependency>
6974
</dependencies>
7075

7176
<distributionManagement>

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.IOException;
44
import java.net.InetAddress;
5+
import java.net.SocketAddress;
56
import java.net.InetSocketAddress;
67
import java.net.UnknownHostException;
78
import java.nio.ByteBuffer;
@@ -19,7 +20,9 @@
1920
import java.util.concurrent.ThreadFactory;
2021
import java.util.concurrent.ThreadLocalRandom;
2122
import java.util.concurrent.TimeUnit;
22-
23+
import jnr.unixsocket.UnixSocketAddress;
24+
import jnr.unixsocket.UnixDatagramChannel;
25+
import jnr.unixsocket.UnixSocketOptions;
2326

2427

2528
/**
@@ -305,7 +308,7 @@ public NonBlockingStatsDClient(final String prefix, final String hostname, final
305308
* if the client could not be started
306309
*/
307310
public NonBlockingStatsDClient(final String prefix, final int queueSize, String[] constantTags, final StatsDClientErrorHandler errorHandler,
308-
final Callable<InetSocketAddress> addressLookup) throws StatsDClientException {
311+
final Callable<SocketAddress> addressLookup) throws StatsDClientException {
309312
if((prefix != null) && (!prefix.isEmpty())) {
310313
this.prefix = new StringBuilder(prefix).append(".").toString();
311314
} else {
@@ -330,7 +333,15 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String
330333
}
331334

332335
try {
333-
clientChannel = DatagramChannel.open();
336+
final SocketAddress address = addressLookup.call();
337+
if (address instanceof UnixSocketAddress) {
338+
clientChannel = UnixDatagramChannel.open();
339+
// Set send timeout to 100ms, to handle the case where the transmission buffer is full
340+
// If no timeout is set, the send becomes blocking
341+
clientChannel.setOption(UnixSocketOptions.SO_SNDTIMEO, Integer.valueOf(100));
342+
} else{
343+
clientChannel = DatagramChannel.open();
344+
}
334345
} catch (final Exception e) {
335346
throw new StatsDClientException("Failed to start StatsD client", e);
336347
}
@@ -995,10 +1006,11 @@ private boolean isInvalidSample(double sampleRate) {
9951006

9961007
private class QueueConsumer implements Runnable {
9971008
private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES);
1009+
private final Callable<SocketAddress> addressLookup;
9981010

999-
private final Callable<InetSocketAddress> addressLookup;
10001011

1001-
QueueConsumer(final Callable<InetSocketAddress> addressLookup) {
1012+
1013+
QueueConsumer(final Callable<SocketAddress> addressLookup) {
10021014
this.addressLookup = addressLookup;
10031015
}
10041016

@@ -1007,7 +1019,7 @@ private class QueueConsumer implements Runnable {
10071019
try {
10081020
final String message = queue.poll(1, TimeUnit.SECONDS);
10091021
if(null != message) {
1010-
final InetSocketAddress address = addressLookup.call();
1022+
final SocketAddress address = addressLookup.call();
10111023
final byte[] data = message.getBytes(MESSAGE_CHARSET);
10121024
if(sendBuffer.remaining() < (data.length + 1)) {
10131025
blockingSend(address);
@@ -1026,7 +1038,7 @@ private class QueueConsumer implements Runnable {
10261038
}
10271039
}
10281040

1029-
private void blockingSend(final InetSocketAddress address) throws IOException {
1041+
private void blockingSend(final SocketAddress address) throws IOException {
10301042
final int sizeOfBuffer = sendBuffer.position();
10311043
sendBuffer.flip();
10321044

@@ -1038,10 +1050,9 @@ private void blockingSend(final InetSocketAddress address) throws IOException {
10381050
handler.handle(
10391051
new IOException(
10401052
String.format(
1041-
"Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes",
1053+
"Could not send entirely stat %s to %s. Only sent %d bytes out of %d bytes",
10421054
sendBuffer.toString(),
1043-
address.getHostName(),
1044-
address.getPort(),
1055+
address.toString(),
10451056
sentBytes,
10461057
sizeOfBuffer)));
10471058
}
@@ -1055,10 +1066,14 @@ private void blockingSend(final InetSocketAddress address) throws IOException {
10551066
* @param port the port of the targeted StatsD server
10561067
* @return a function to perform the lookup
10571068
*/
1058-
public static Callable<InetSocketAddress> volatileAddressResolution(final String hostname, final int port) {
1059-
return new Callable<InetSocketAddress>() {
1060-
@Override public InetSocketAddress call() throws UnknownHostException {
1061-
return new InetSocketAddress(InetAddress.getByName(hostname), port);
1069+
public static Callable<SocketAddress> volatileAddressResolution(final String hostname, final int port) {
1070+
return new Callable<SocketAddress>() {
1071+
@Override public SocketAddress call() throws UnknownHostException {
1072+
if (port == 0) { // Hostname is a file path to the socket
1073+
return new UnixSocketAddress(hostname);
1074+
} else {
1075+
return new InetSocketAddress(InetAddress.getByName(hostname), port);
1076+
}
10621077
}
10631078
};
10641079
}
@@ -1071,16 +1086,16 @@ public static Callable<InetSocketAddress> volatileAddressResolution(final String
10711086
* @return a function that cached the result of the lookup
10721087
* @throws Exception if the lookup fails, i.e. {@link UnknownHostException}
10731088
*/
1074-
public static Callable<InetSocketAddress> staticAddressResolution(final String hostname, final int port) throws Exception {
1075-
final InetSocketAddress address = volatileAddressResolution(hostname, port).call();
1076-
return new Callable<InetSocketAddress>() {
1077-
@Override public InetSocketAddress call() {
1089+
public static Callable<SocketAddress> staticAddressResolution(final String hostname, final int port) throws Exception {
1090+
final SocketAddress address = volatileAddressResolution(hostname, port).call();
1091+
return new Callable<SocketAddress>() {
1092+
@Override public SocketAddress call() {
10781093
return address;
10791094
}
10801095
};
10811096
}
10821097

1083-
private static Callable<InetSocketAddress> staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException {
1098+
private static Callable<SocketAddress> staticStatsDAddressResolution(final String hostname, final int port) throws StatsDClientException {
10841099
try {
10851100
return staticAddressResolution(hostname, port);
10861101
} catch (final Exception e) {

src/test/java/com/timgroup/statsd/DummyStatsDServer.java

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,54 @@
22
package com.timgroup.statsd;
33

44
import java.io.IOException;
5-
import java.net.DatagramPacket;
6-
import java.net.DatagramSocket;
7-
import java.net.SocketException;
5+
import java.net.InetSocketAddress;
6+
import java.nio.ByteBuffer;
7+
import java.nio.channels.DatagramChannel;
88
import java.util.ArrayList;
99
import java.util.List;
10+
import jnr.unixsocket.UnixDatagramChannel;
11+
import jnr.unixsocket.UnixSocketAddress;
12+
import java.nio.charset.StandardCharsets;
1013

1114

1215
final class DummyStatsDServer {
1316
private final List<String> messagesReceived = new ArrayList<String>();
14-
private final DatagramSocket server;
17+
private final DatagramChannel server;
18+
private volatile Boolean freeze = false;
1519

16-
public DummyStatsDServer(int port) throws SocketException {
17-
server = new DatagramSocket(port);
20+
public DummyStatsDServer(int port) throws IOException {
21+
server = DatagramChannel.open();
22+
server.bind(new InetSocketAddress(port));
23+
this.listen();
24+
}
25+
26+
public DummyStatsDServer(String socketPath) throws IOException {
27+
server = UnixDatagramChannel.open();
28+
server.bind(new UnixSocketAddress(socketPath));
29+
this.listen();
30+
}
31+
32+
private void listen() {
1833
Thread thread = new Thread(new Runnable() {
1934
@Override
2035
public void run() {
21-
while(!server.isClosed()) {
22-
try {
23-
final DatagramPacket packet = new DatagramPacket(new byte[1500], 1500);
24-
server.receive(packet);
25-
for(String msg : new String(packet.getData(), NonBlockingStatsDClient.MESSAGE_CHARSET).split("\n")) {
26-
messagesReceived.add(msg.trim());
36+
final ByteBuffer packet = ByteBuffer.allocate(1500);
37+
while(server.isOpen()) {
38+
if (freeze) {
39+
try {
40+
Thread.sleep(10);
41+
} catch (InterruptedException e) {
42+
}
43+
} else {
44+
try {
45+
packet.clear();
46+
server.receive(packet);
47+
packet.flip();
48+
for (String msg : StandardCharsets.UTF_8.decode(packet).toString().split("\n")) {
49+
messagesReceived.add(msg.trim());
50+
}
51+
} catch (IOException e) {
2752
}
28-
} catch (IOException e) {
2953
}
3054
}
3155
}
@@ -47,7 +71,15 @@ public List<String> messagesReceived() {
4771
return new ArrayList<String>(messagesReceived);
4872
}
4973

50-
public void close() {
74+
public void freeze() {
75+
freeze = true;
76+
}
77+
78+
public void unfreeze() {
79+
freeze = false;
80+
}
81+
82+
public void close() throws IOException {
5183
server.close();
5284
}
5385

src/test/java/com/timgroup/statsd/NonBlockingStatsDClientPerfTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.timgroup.statsd;
22

33

4+
import java.io.IOException;
45
import java.net.SocketException;
56
import java.util.Random;
67
import java.util.concurrent.ExecutorService;
@@ -18,11 +19,11 @@ public final class NonBlockingStatsDClientPerfTest {
1819
private static final int STATSD_SERVER_PORT = 17255;
1920
private static final Random RAND = new Random();
2021
private static final NonBlockingStatsDClient client = new NonBlockingStatsDClient("my.prefix", "localhost", STATSD_SERVER_PORT);
21-
private final ExecutorService executor = Executors.newFixedThreadPool(20);
22+
private final ExecutorService executor = Executors.newFixedThreadPool(10);
2223
private static DummyStatsDServer server;
2324

2425
@BeforeClass
25-
public static void start() throws SocketException {
26+
public static void start() throws IOException {
2627
server = new DummyStatsDServer(STATSD_SERVER_PORT);
2728
}
2829

@@ -39,7 +40,7 @@ public void perf_test() throws Exception {
3940
for(int i = 0; i < testSize; ++i) {
4041
executor.submit(new Runnable() {
4142
public void run() {
42-
client.count("mycount", RAND.nextInt());
43+
client.count("mycount", 1);
4344
}
4445
});
4546

@@ -48,7 +49,7 @@ public void run() {
4849
executor.shutdown();
4950
executor.awaitTermination(20, TimeUnit.SECONDS);
5051

51-
for(int i = 0; i < 20000 && server.messagesReceived().size() < testSize; i += 50) {
52+
while(server.messagesReceived().size() < testSize) {
5253
try {
5354
Thread.sleep(50);
5455
} catch (InterruptedException ex) {}

src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.junit.After;
66
import org.junit.Test;
77

8+
import java.io.IOException;
89
import java.net.SocketException;
910
import java.util.Locale;
1011

@@ -19,7 +20,7 @@ public class NonBlockingStatsDClientTest {
1920
private static DummyStatsDServer server;
2021

2122
@BeforeClass
22-
public static void start() throws SocketException {
23+
public static void start() throws IOException {
2324
server = new DummyStatsDServer(STATSD_SERVER_PORT);
2425
}
2526

0 commit comments

Comments
 (0)