Skip to content

Commit c4111e2

Browse files
committed
tcp: add jitter
1 parent 5ef3b1c commit c4111e2

File tree

2 files changed

+135
-26
lines changed

2 files changed

+135
-26
lines changed

netty/src/main/java/io/grpc/netty/TcpMetrics.java

Lines changed: 87 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ final class TcpMetrics {
3434
static final LongCounterMetricInstrument recurringRetransmits;
3535
static final DoubleHistogramMetricInstrument minRtt;
3636

37+
// Note: Metrics like delivery_rate, bytes_sent, packets_sent,
38+
// bytes_retransmitted, etc., are not
39+
// currently exposed by Netty's EpollTcpInfo.java wrapper around
40+
// getSockOpt(TCP_INFO)."
3741
static {
3842
MetricInstrumentRegistry registry = MetricInstrumentRegistry.getDefaultRegistry();
3943
ImmutableList<String> requiredLabels = ImmutableList.of("grpc.target");
@@ -104,48 +108,105 @@ static final class Tracker {
104108
this.target = target;
105109
}
106110

111+
private static final long RECORD_INTERVAL_MILLIS;
112+
113+
static {
114+
long interval = 5;
115+
try {
116+
String flagValue = System.getProperty("io.grpc.netty.tcpMetricsRecordIntervalMinutes");
117+
if (flagValue != null) {
118+
interval = Long.parseLong(flagValue);
119+
}
120+
} catch (NumberFormatException e) {
121+
// Use default
122+
}
123+
RECORD_INTERVAL_MILLIS = java.util.concurrent.TimeUnit.MINUTES.toMillis(interval);
124+
}
125+
126+
private static final java.util.Random RANDOM = new java.util.Random();
127+
private io.netty.util.concurrent.ScheduledFuture<?> reportTimer;
128+
107129
void channelActive(Channel channel) {
108130
if (metricRecorder != null && target != null) {
109131
java.util.List<String> labelValues = getLabelValues(channel);
110132
metricRecorder.addLongCounter(TcpMetrics.connectionsCreated, 1,
111133
Collections.singletonList(target), labelValues);
112134
metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, 1,
113135
Collections.singletonList(target), labelValues);
136+
scheduleNextReport(channel);
137+
}
138+
}
139+
140+
private void scheduleNextReport(final Channel channel) {
141+
if (RECORD_INTERVAL_MILLIS <= 0) {
142+
return;
143+
}
144+
if (!channel.isActive()) {
145+
return;
146+
}
147+
148+
double jitter = 0.1 + RANDOM.nextDouble(); // 10% to 110%
149+
long delayMillis = (long) (RECORD_INTERVAL_MILLIS * jitter);
150+
151+
try {
152+
reportTimer = channel.eventLoop().schedule(new Runnable() {
153+
@Override
154+
public void run() {
155+
if (channel.isActive()) {
156+
Tracker.this.recordTcpInfo(channel); // Renamed from channelInactive to recordTcpInfo
157+
scheduleNextReport(channel); // Re-arm
158+
}
159+
}
160+
}, delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS);
161+
} catch (Throwable t) {
162+
// Channel closed, event loop shut down, etc.
114163
}
115164
}
116165

117166
void channelInactive(Channel channel) {
167+
if (reportTimer != null) {
168+
reportTimer.cancel(false);
169+
}
118170
if (metricRecorder != null && target != null) {
119171
java.util.List<String> labelValues = getLabelValues(channel);
120172
metricRecorder.addLongUpDownCounter(TcpMetrics.connectionCount, -1,
121173
Collections.singletonList(target), labelValues);
122-
123-
try {
124-
if (channel.getClass().getName().equals("io.netty.channel.epoll.EpollSocketChannel")) {
125-
Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo",
126-
Class.forName("io.netty.channel.epoll.EpollTcpInfo"));
127-
Object info = Class.forName("io.netty.channel.epoll.EpollTcpInfo")
128-
.getDeclaredConstructor().newInstance();
129-
tcpInfoMethod.invoke(channel, info);
130-
131-
Method totalRetransMethod = info.getClass().getMethod("totalRetrans");
132-
Method retransmitsMethod = info.getClass().getMethod("retransmits");
133-
Method rttMethod = info.getClass().getMethod("rtt");
134-
135-
long totalRetrans = (Long) totalRetransMethod.invoke(info);
136-
long retransmits = (Long) retransmitsMethod.invoke(info);
137-
long rtt = ((Number) rttMethod.invoke(info)).longValue();
138-
139-
metricRecorder.addLongCounter(TcpMetrics.packetsRetransmitted, totalRetrans,
140-
Collections.singletonList(target), labelValues);
141-
metricRecorder.addLongCounter(TcpMetrics.recurringRetransmits, retransmits,
142-
Collections.singletonList(target), labelValues);
143-
metricRecorder.recordDoubleHistogram(TcpMetrics.minRtt, rtt / 1000000.0,
144-
Collections.singletonList(target), labelValues);
145-
}
146-
} catch (Throwable t) {
147-
// Epoll not available or error getting tcp_info, just ignore.
174+
// Final collection on close
175+
recordTcpInfo(channel);
176+
}
177+
}
178+
179+
private void recordTcpInfo(Channel channel) {
180+
if (metricRecorder == null || target == null) {
181+
return;
182+
}
183+
java.util.List<String> labelValues = getLabelValues(channel);
184+
try {
185+
if (channel.getClass().getName().equals("io.netty.channel.epoll.EpollSocketChannel")) {
186+
Method tcpInfoMethod = channel.getClass().getMethod("tcpInfo",
187+
Class.forName("io.netty.channel.epoll.EpollTcpInfo"));
188+
Object info = Class.forName("io.netty.channel.epoll.EpollTcpInfo")
189+
.getDeclaredConstructor().newInstance();
190+
tcpInfoMethod.invoke(channel, info);
191+
192+
Method totalRetransMethod = info.getClass().getMethod("totalRetrans");
193+
Method retransmitsMethod = info.getClass().getMethod("retransmits");
194+
Method rttMethod = info.getClass().getMethod("rtt");
195+
196+
long totalRetrans = (Long) totalRetransMethod.invoke(info);
197+
int retransmits = (Integer) retransmitsMethod.invoke(info);
198+
long rtt = (Long) rttMethod.invoke(info);
199+
200+
metricRecorder.addLongCounter(TcpMetrics.packetsRetransmitted, totalRetrans,
201+
Collections.singletonList(target), labelValues);
202+
metricRecorder.addLongCounter(TcpMetrics.recurringRetransmits, retransmits,
203+
Collections.singletonList(target), labelValues);
204+
metricRecorder.recordDoubleHistogram(TcpMetrics.minRtt,
205+
rtt / 1000000.0, // Convert microseconds to seconds
206+
Collections.singletonList(target), labelValues);
148207
}
208+
} catch (Throwable t) {
209+
// Epoll not available or error getting tcp_info, just ignore.
149210
}
150211
}
151212
}

netty/src/test/java/io/grpc/netty/TcpMetricsTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,29 @@
1616

1717
package io.grpc.netty;
1818

19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.ArgumentMatchers.anyLong;
1921
import static org.mockito.ArgumentMatchers.eq;
2022
import static org.mockito.Mockito.verify;
2123
import static org.mockito.Mockito.verifyNoMoreInteractions;
2224
import static org.mockito.Mockito.when;
2325

2426
import io.grpc.MetricRecorder;
2527
import io.netty.channel.Channel;
28+
import io.netty.channel.EventLoop;
29+
import io.netty.util.concurrent.ScheduledFuture;
2630
import java.net.InetAddress;
2731
import java.net.InetSocketAddress;
2832
import java.net.SocketAddress;
2933
import java.util.Arrays;
3034
import java.util.Collections;
35+
import java.util.concurrent.TimeUnit;
3136
import org.junit.Before;
3237
import org.junit.Rule;
3338
import org.junit.Test;
3439
import org.junit.runner.RunWith;
3540
import org.junit.runners.JUnit4;
41+
import org.mockito.ArgumentCaptor;
3642
import org.mockito.Mock;
3743
import org.mockito.junit.MockitoJUnit;
3844
import org.mockito.junit.MockitoRule;
@@ -44,11 +50,18 @@ public class TcpMetricsTest {
4450

4551
@Mock private MetricRecorder metricRecorder;
4652
@Mock private Channel channel;
53+
@Mock
54+
private EventLoop eventLoop;
55+
@Mock
56+
private ScheduledFuture<?> scheduledFuture;
4757

4858
private TcpMetrics.Tracker metrics;
4959

5060
@Before
5161
public void setUp() {
62+
when(channel.eventLoop()).thenReturn(eventLoop);
63+
when(eventLoop.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
64+
.thenAnswer(invocation -> scheduledFuture);
5265
metrics = new TcpMetrics.Tracker(metricRecorder, "target1");
5366
}
5467

@@ -151,4 +164,39 @@ public void channelInactive_decrementsCount_noEpoll_noError() {
151164
eq(Arrays.asList("", "", "", "")));
152165
verifyNoMoreInteractions(metricRecorder);
153166
}
167+
168+
@Test
169+
public void channelActive_schedulesReportTimer() {
170+
when(channel.isActive()).thenReturn(true);
171+
metrics.channelActive(channel);
172+
173+
ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
174+
ArgumentCaptor<Long> delayCaptor = ArgumentCaptor.forClass(Long.class);
175+
verify(eventLoop).schedule(
176+
runnableCaptor.capture(), delayCaptor.capture(), eq(TimeUnit.MILLISECONDS));
177+
178+
Runnable task = runnableCaptor.getValue();
179+
long delay = delayCaptor.getValue();
180+
181+
// Default RECORD_INTERVAL_MILLIS is 5 minutes (300,000 ms)
182+
// Jitter is 10% to 110%, so 30,000 ms to 330,000 ms
183+
org.junit.Assert.assertTrue("Delay should be >= 30000 but was " + delay, delay >= 30_000);
184+
org.junit.Assert.assertTrue("Delay should be <= 330000 but was " + delay, delay <= 330_000);
185+
186+
// Run the task to verify rescheduling
187+
task.run();
188+
189+
verify(eventLoop, org.mockito.Mockito.times(2))
190+
.schedule(any(Runnable.class), anyLong(), eq(TimeUnit.MILLISECONDS));
191+
}
192+
193+
@Test
194+
public void channelInactive_cancelsReportTimer() {
195+
when(channel.isActive()).thenReturn(true);
196+
metrics.channelActive(channel);
197+
198+
metrics.channelInactive(channel);
199+
200+
verify(scheduledFuture).cancel(false);
201+
}
154202
}

0 commit comments

Comments
 (0)