Skip to content

Commit f51383a

Browse files
authored
Merge pull request #256 from DataDog/vickenty/fps
AMLII-2058 Use correct buffer size for unix sockets
2 parents cdf99d3 + 00cbb2a commit f51383a

File tree

8 files changed

+54
-10
lines changed

8 files changed

+54
-10
lines changed

src/main/java/com/timgroup/statsd/ClientChannel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@
44

55
interface ClientChannel extends WritableByteChannel {
66
String getTransportType();
7+
8+
int getMaxPacketSizeBytes();
79
}

src/main/java/com/timgroup/statsd/DatagramClientChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,9 @@ public String getTransportType() {
5252
public String toString() {
5353
return "[" + getTransportType() + "] " + address;
5454
}
55+
56+
@Override
57+
public int getMaxPacketSizeBytes() {
58+
return NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES;
59+
}
5560
}

src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,9 @@ public String getTransportType() {
4848
public String toString() {
4949
return pipe;
5050
}
51+
52+
@Override
53+
public int getMaxPacketSizeBytes() {
54+
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
55+
}
5156
}

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
174174
protected final StatsDSender statsDSender;
175175
protected StatsDSender telemetryStatsDSender;
176176
protected final Telemetry telemetry;
177-
177+
private final int maxPacketSizeBytes;
178178
private final boolean blocking;
179179

180180
/**
@@ -268,6 +268,8 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
268268
}
269269

270270
this.blocking = blocking;
271+
this.maxPacketSizeBytes = maxPacketSizeBytes;
272+
271273
{
272274
List<String> costantPreTags = new ArrayList<>();
273275
if (constantTags != null) {
@@ -300,7 +302,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
300302

301303
ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();
302304

303-
statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
305+
statsDProcessor = createProcessor(queueSize, handler, getPacketSize(clientChannel), poolSize,
304306
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID);
305307

306308
Properties properties = new Properties();
@@ -318,7 +320,7 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
318320
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize);
319321

320322
// similar settings, but a single worker and non-blocking.
321-
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
323+
telemetryStatsDProcessor = createProcessor(queueSize, handler, getPacketSize(telemetryClientChannel),
322324
poolSize, 1, false, 0, aggregationShards, threadFactory, containerID);
323325
}
324326

@@ -1340,4 +1342,8 @@ private String getContainerID(String containerID, boolean originDetectionEnabled
13401342

13411343
return null;
13421344
}
1345+
1346+
private int getPacketSize(ClientChannel chan) {
1347+
return maxPacketSizeBytes > 0 ? maxPacketSizeBytes : chan.getMaxPacketSizeBytes();
1348+
}
13431349
}

src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -236,14 +236,8 @@ protected NonBlockingStatsDClientBuilder resolve() {
236236
throw new UnsupportedOperationException("clone");
237237
}
238238

239-
int packetSize = maxPacketSizeBytes;
240239
Callable<SocketAddress> lookup = getAddressLookup();
241240

242-
if (packetSize == 0) {
243-
packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES :
244-
NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES;
245-
}
246-
247241
Callable<SocketAddress> telemetryLookup = telemetryAddressLookup;
248242
if (telemetryLookup == null) {
249243
if (telemetryHostname == null) {
@@ -253,7 +247,6 @@ protected NonBlockingStatsDClientBuilder resolve() {
253247
}
254248
}
255249

256-
resolved.maxPacketSizeBytes = packetSize;
257250
resolved.addressLookup = lookup;
258251
resolved.telemetryAddressLookup = telemetryLookup;
259252

src/main/java/com/timgroup/statsd/UnixDatagramClientChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,9 @@ class UnixDatagramClientChannel extends DatagramClientChannel {
3131
public String getTransportType() {
3232
return "uds";
3333
}
34+
35+
@Override
36+
public int getMaxPacketSizeBytes() {
37+
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
38+
}
3439
}

src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,9 @@ public String getTransportType() {
175175
public String toString() {
176176
return "[" + getTransportType() + "] " + address;
177177
}
178+
179+
@Override
180+
public int getMaxPacketSizeBytes() {
181+
return NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES;
182+
}
178183
}

src/test/java/com/timgroup/statsd/UnixStreamSocketTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,27 @@ public void resist_dsd_timeout() throws Exception {
171171
assertThat(server.messagesReceived(), hasItem("my.prefix.mycount:30|g"));
172172
server.clear();
173173
}
174+
175+
@Test(timeout = 5000L)
176+
public void stream_uds_has_uds_buffer_size() throws Exception {
177+
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
178+
.prefix("my.prefix")
179+
.address("unixstream:///foo")
180+
.containerID("fake-container-id")
181+
.build();
182+
183+
assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES);
184+
}
185+
186+
@Test(timeout = 5000L)
187+
public void max_packet_size_override() throws Exception {
188+
final NonBlockingStatsDClient client = new NonBlockingStatsDClientBuilder()
189+
.prefix("my.prefix")
190+
.address("unixstream:///foo")
191+
.containerID("fake-container-id")
192+
.maxPacketSizeBytes(576)
193+
.build();
194+
195+
assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), 576);
196+
}
174197
}

0 commit comments

Comments
 (0)