31
31
import java .util .function .Consumer ;
32
32
import java .util .function .Function ;
33
33
import java .util .function .Predicate ;
34
+ import java .util .stream .Collectors ;
35
+ import java .util .stream .Stream ;
34
36
37
+ import static java .util .Map .entry ;
35
38
import static org .hamcrest .Matchers .closeTo ;
36
39
import static org .hamcrest .Matchers .equalTo ;
37
40
@@ -58,27 +61,21 @@ protected String getTestRestCluster() {
58
61
59
62
@ SuppressWarnings ("unchecked" )
60
63
public void testApmIntegration () throws Exception {
61
- Map <String , Predicate <Map <String , Object >>> sampleAssertions = new HashMap <>(
64
+ Map <String , Predicate <Map <String , Object >>> valueAssertions = new HashMap <>(
62
65
Map .ofEntries (
63
66
assertion ("es.test.long_counter.total" , m -> (Double ) m .get ("value" ), closeTo (1.0 , 0.001 )),
64
67
assertion ("es.test.double_counter.total" , m -> (Double ) m .get ("value" ), closeTo (1.0 , 0.001 )),
65
68
assertion ("es.test.async_double_counter.total" , m -> (Double ) m .get ("value" ), closeTo (1.0 , 0.001 )),
66
69
assertion ("es.test.async_long_counter.total" , m -> (Integer ) m .get ("value" ), equalTo (1 )),
67
70
assertion ("es.test.double_gauge.current" , m -> (Double ) m .get ("value" ), closeTo (1.0 , 0.001 )),
68
- assertion ("es.test.long_gauge.current" , m -> (Integer ) m .get ("value" ), equalTo (1 )),
69
- assertion (
70
- "es.test.double_histogram.histogram" ,
71
- m -> ((Collection <Integer >) m .get ("counts" )).stream ().mapToInt (Integer ::intValue ).sum (),
72
- equalTo (2 )
73
- ),
74
- assertion (
75
- "es.test.long_histogram.histogram" ,
76
- m -> ((Collection <Integer >) m .get ("counts" )).stream ().mapToInt (Integer ::intValue ).sum (),
77
- equalTo (2 )
78
- )
71
+ assertion ("es.test.long_gauge.current" , m -> (Integer ) m .get ("value" ), equalTo (1 ))
79
72
)
80
73
);
81
74
75
+ Map <String , Integer > histogramAssertions = new HashMap <>(
76
+ Map .ofEntries (entry ("es.test.double_histogram.histogram" , 2 ), entry ("es.test.long_histogram.histogram" , 2 ))
77
+ );
78
+
82
79
CountDownLatch finished = new CountDownLatch (1 );
83
80
84
81
// a consumer that will remove the assertions from a map once it matched
@@ -91,21 +88,35 @@ public void testApmIntegration() throws Exception {
91
88
var samples = (Map <String , Object >) metricset .get ("samples" );
92
89
93
90
samples .forEach ((key , value ) -> {
94
- var assertion = sampleAssertions .get (key );// sample name
95
- if (assertion != null ) {
96
- logger .info ("Matched {}" , key );
91
+ var valueAssertion = valueAssertions .get (key );// sample name
92
+ if (valueAssertion != null ) {
93
+ logger .info ("Matched {}:{} " , key , value );
97
94
var sampleObject = (Map <String , Object >) value ;
98
- if (assertion .test (sampleObject )) {// sample object
95
+ if (valueAssertion .test (sampleObject )) {// sample object
96
+ logger .info ("{} assertion PASSED" , key );
97
+ valueAssertions .remove (key );
98
+ } else {
99
+ logger .error ("{} assertion FAILED" , key );
100
+ }
101
+ }
102
+ var histogramAssertion = histogramAssertions .get (key );
103
+ if (histogramAssertion != null ) {
104
+ logger .info ("Matched {}:{}" , key , value );
105
+ var samplesObject = (Map <String , Object >) value ;
106
+ var counts = ((Collection <Integer >) samplesObject .get ("counts" )).stream ().mapToInt (Integer ::intValue ).sum ();
107
+ var remaining = histogramAssertion - counts ;
108
+ if (remaining == 0 ) {
99
109
logger .info ("{} assertion PASSED" , key );
100
- sampleAssertions .remove (key );
110
+ histogramAssertions .remove (key );
101
111
} else {
102
- logger .error ("{} assertion FAILED: {}" , key , sampleObject .get ("value" ));
112
+ logger .info ("{} assertion PENDING: {} remaining" , key , remaining );
113
+ histogramAssertions .put (key , remaining );
103
114
}
104
115
}
105
116
});
106
117
}
107
118
108
- if (sampleAssertions .isEmpty ()) {
119
+ if (valueAssertions .isEmpty ()) {
109
120
finished .countDown ();
110
121
}
111
122
};
@@ -115,15 +126,17 @@ public void testApmIntegration() throws Exception {
115
126
client ().performRequest (new Request ("GET" , "/_use_apm_metrics" ));
116
127
117
128
var completed = finished .await (30 , TimeUnit .SECONDS );
118
- assertTrue ("Timeout when waiting for assertions to complete. Remaining assertions to match: " + sampleAssertions , completed );
129
+ var remainingAssertions = Stream .concat (valueAssertions .keySet ().stream (), histogramAssertions .keySet ().stream ())
130
+ .collect (Collectors .joining ());
131
+ assertTrue ("Timeout when waiting for assertions to complete. Remaining assertions to match: " + remainingAssertions , completed );
119
132
}
120
133
121
134
private <T > Map .Entry <String , Predicate <Map <String , Object >>> assertion (
122
135
String sampleKeyName ,
123
136
Function <Map <String , Object >, T > accessor ,
124
137
Matcher <T > expected
125
138
) {
126
- return Map . entry (sampleKeyName , new Predicate <>() {
139
+ return entry (sampleKeyName , new Predicate <>() {
127
140
@ Override
128
141
public boolean test (Map <String , Object > sampleObject ) {
129
142
return expected .matches (accessor .apply (sampleObject ));
0 commit comments