11package it .agilelab .bigdata .wasp .consumers .spark .eventengine
22
3- import java .io .InputStreamReader
4- import java .time .{Clock , Instant , ZoneId }
5-
63import com .typesafe .config .ConfigFactory
4+ import it .agilelab .bigdata .wasp .consumers .spark .utils .SparkSuite
75import it .agilelab .bigdata .wasp .core .eventengine .Event
86import it .agilelab .bigdata .wasp .core .logging .Logging
97import org .scalatest .{BeforeAndAfter , Matchers , WordSpec }
108
9+ import java .io .InputStreamReader
10+ import java .time .{Clock , Instant , ZoneId }
1111import scala .util .Random
1212
13+ class InnerEventStrategySpec extends WordSpec with Matchers with BeforeAndAfter with SparkSuite with Logging {
1314
14- class InnerEventStrategySpec extends WordSpec
15- with Matchers with BeforeAndAfter with SparkSetup with Logging {
16-
17- private val randomSeed = 0
18- private val rand = new Random (randomSeed)
15+ private val randomSeed = 0
16+ private val rand = new Random (randomSeed)
1917 private val fixedClock : Clock = Clock .fixed(Instant .EPOCH , ZoneId .of(" UTC" ))
2018
2119 // This seq is made of random messages which may or may not trigger events
2220 private val randomSeq =
2321 Range (0 , 1000 )
24- .map(i => FakeData (s " sensor_ $i" , rand.nextFloat() * 200 , System .currentTimeMillis(), if (i % 2 == 0 ) " pari" else " dispari" , rand.nextInt(101 )))
22+ .map(i =>
23+ FakeData (
24+ s " sensor_ $i" ,
25+ rand.nextFloat() * 200 ,
26+ System .currentTimeMillis(),
27+ if (i % 2 == 0 ) " pari" else " dispari" ,
28+ rand.nextInt(101 )
29+ )
30+ )
2531
2632 // This seq is made of pre-defined messages which have to trigger a specific list of events
2733 private val mixedSeq = Seq (
28- FakeData ( // Triggers both events
34+ FakeData ( // Triggers both events
2935 name = " sensor_0" ,
3036 temperature = 200 ,
31- someLong= 0 ,
32- someStuff= " dispari" ,
33- someNumber = 99 ),
34- FakeData ( // Trigger no events
37+ someLong = 0 ,
38+ someStuff = " dispari" ,
39+ someNumber = 99
40+ ),
41+ FakeData ( // Trigger no events
3542 name = " sensor_1" ,
3643 temperature = 10 ,
37- someLong= 0 ,
38- someStuff= " pari" ,
39- someNumber = 100 ),
40- FakeData ( // Triggers HighTemperature event in WARN severity
44+ someLong = 0 ,
45+ someStuff = " pari" ,
46+ someNumber = 100
47+ ),
48+ FakeData ( // Triggers HighTemperature event in WARN severity
4149 name = " sensor_2" ,
4250 temperature = 120 ,
43- someLong= 0 ,
44- someStuff= " dispari" ,
45- someNumber = 50 ),
46- FakeData ( // Triggers HighTemperature event with prova2=false as payload
51+ someLong = 0 ,
52+ someStuff = " dispari" ,
53+ someNumber = 50
54+ ),
55+ FakeData ( // Triggers HighTemperature event with prova2=false as payload
4756 name = " sensor_3" ,
4857 temperature = 620 ,
49- someLong= 0 ,
50- someStuff= " dispari" ,
51- someNumber = 5 ),
52- FakeData ( // Trigger OddHighNumbers in LOW_TEMP severity
58+ someLong = 0 ,
59+ someStuff = " dispari" ,
60+ someNumber = 5
61+ ),
62+ FakeData ( // Trigger OddHighNumbers in LOW_TEMP severity
5363 name = " sensor_4" ,
5464 temperature = 45 ,
55- someLong= 0 ,
56- someStuff= " dispari" ,
57- someNumber = 101 )
65+ someLong = 0 ,
66+ someStuff = " dispari" ,
67+ someNumber = 101
68+ )
5869 )
5970 // Control Seq of events generated from mixedSeq (not taking into account timestamp and id)
6071 private val mixedSeqEvents = Seq (
@@ -66,7 +77,8 @@ class InnerEventStrategySpec extends WordSpec
6677 severity = " CRITICAL" ,
6778 sourceId = Some (" sensor_0" ),
6879 eventId = " 0" ,
69- timestamp = 0 ),
80+ timestamp = 0
81+ ),
7082 Event (
7183 eventRuleName = " HighTemperature" ,
7284 source = " streamingSource1" ,
@@ -85,7 +97,8 @@ class InnerEventStrategySpec extends WordSpec
8597 severity = " CRITICAL" ,
8698 sourceId = Some (" sensor_3" ),
8799 eventId = " 0" ,
88- timestamp = 0 ),
100+ timestamp = 0
101+ ),
89102 Event (
90103 eventRuleName = " OddHighNumbers" ,
91104 source = " streamingSource2" ,
@@ -108,97 +121,97 @@ class InnerEventStrategySpec extends WordSpec
108121 )
109122 )
110123
111- private val highTempEventsQuantity = 3
124+ private val highTempEventsQuantity = 3
112125 private val oddHighNumbersEventsQuantity = 2
113- private val totalEventsQuantity = highTempEventsQuantity + oddHighNumbersEventsQuantity
126+ private val totalEventsQuantity = highTempEventsQuantity + oddHighNumbersEventsQuantity
114127
115128 // Fake configuration objects to fetch event rules
116- private val reader = new InputStreamReader (getClass.getResourceAsStream(" inner_event_strategy.conf" ))
117- private val fakeConfig = try {
118- ConfigFactory .parseReader(reader)
119- } finally {
120- reader.close()
121- }
129+ private val reader = new InputStreamReader (getClass.getResourceAsStream(" inner_event_strategy.conf" ))
130+ private val fakeConfig =
131+ try {
132+ ConfigFactory .parseReader(reader)
133+ } finally {
134+ reader.close()
135+ }
122136
123137 // This seq doesn't trigger any event
124138 private val fruitlessSeq = Range (0 , 1000 )
125- .map(i => FakeData (s " sensor_ $i" , rand.nextFloat() * 50 , System .currentTimeMillis(), if (i % 2 == 0 ) " pari" else " dispari" , rand.nextInt(50 )))
139+ .map(i =>
140+ FakeData (
141+ s " sensor_ $i" ,
142+ rand.nextFloat() * 50 ,
143+ System .currentTimeMillis(),
144+ if (i % 2 == 0 ) " pari" else " dispari" ,
145+ rand.nextInt(50 )
146+ )
147+ )
126148
127149 // This seq doesn't contain any message
128150 private val emptySeq : Seq [FakeData ] = Seq .empty
129151
130-
131152 " When dealing with multiple event rules, InnerEventStrategy" should {
132153
133154 val target = new InnerEventStrategy (fakeConfig.getConfig(" multipleRules" ), fixedClock, FixedIdGenerator )
134155
135156 // TODO: this should not be a unit test but actually a bench
136- s " Process a big random sequence " in withSparkSession { ss => {
137- import ss .implicits ._
138- val events : Array [Event ] = target.transform(ss .sparkContext.parallelize(randomSeq).toDF).as[Event ].collect()
157+ s " Process a big random sequence " in {
158+ import spark .implicits ._
159+ val events : Array [Event ] = target.transform(spark .sparkContext.parallelize(randomSeq).toDF).as[Event ].collect()
139160 println(events.length)
140- }}
161+ }
141162
142- s " Find $totalEventsQuantity test events in mixedSeq which match the control event seq " in withSparkSession { ss => {
143- import ss .implicits ._
144- val events : Array [Event ] = target.transform(ss .sparkContext.parallelize(mixedSeq).toDF).as[Event ].collect()
163+ s " Find $totalEventsQuantity test events in mixedSeq which match the control event seq " in {
164+ import spark .implicits ._
165+ val events : Array [Event ] = target.transform(spark .sparkContext.parallelize(mixedSeq).toDF).as[Event ].collect()
145166
146- events.length should be (totalEventsQuantity)
167+ events.length should be(totalEventsQuantity)
147168
148169 val (highTempEvents, oddNumberEvents) = events.partition(e => e.eventRuleName.equals(" HighTemperature" ))
149- highTempEvents.length should be (highTempEventsQuantity)
150- oddNumberEvents.length should be (oddHighNumbersEventsQuantity)
151-
152- mixedSeqEvents should contain theSameElementsAs events
153-
154- }}
155-
156- " Find no events in fruitlessSeq" in withSparkSession { ss => {
157- import ss .implicits ._
158- val events : Array [Event ] = target.transform(ss.sparkContext.parallelize(fruitlessSeq).toDF).as[Event ].collect()
159- events.length should be (0 )
160- }}
161-
162- " Find no events in emptySeq" in withSparkSession { ss => {
163- import ss .implicits ._
164- val events : Array [Event ] = target.transform(ss.sparkContext.parallelize(emptySeq).toDF).as[Event ].collect()
165- events.length should be (0 )
166- }}
167-
170+ highTempEvents.length should be(highTempEventsQuantity)
171+ oddNumberEvents.length should be(oddHighNumbersEventsQuantity)
172+
173+ mixedSeqEvents should contain theSameElementsAs events
174+ }
175+ " Find no events in fruitlessSeq" in {
176+ import spark .implicits ._
177+ val events : Array [Event ] = target.transform(spark.sparkContext.parallelize(fruitlessSeq).toDF).as[Event ].collect()
178+ events.length should be(0 )
179+ }
180+ " Find no events in emptySeq" in {
181+ import spark .implicits ._
182+ val events : Array [Event ] = target.transform(spark.sparkContext.parallelize(emptySeq).toDF).as[Event ].collect()
183+ events.length should be(0 )
184+ }
168185 }
169186
170187 " When dealing with a single event rule, InnerEventStrategy" should {
171188
172189 val target = new InnerEventStrategy (fakeConfig.getConfig(" singleRule" ), fixedClock, FixedIdGenerator )
173190
174- s " Find exactly $highTempEventsQuantity in mixedSeq " in withSparkSession { ss => {
175- import ss .implicits ._
176- val events : Array [Event ] = target.transform(ss .sparkContext.parallelize(mixedSeq).toDF).as[Event ].collect()
191+ s " Find exactly $highTempEventsQuantity in mixedSeq " in {
192+ import spark .implicits ._
193+ val events : Array [Event ] = target.transform(spark .sparkContext.parallelize(mixedSeq).toDF).as[Event ].collect()
177194
178- events.length should be (highTempEventsQuantity)
195+ events.length should be(highTempEventsQuantity)
179196
180197 val (highTempEvents, oddNumberEvents) = events.partition(e => e.eventRuleName.equals(" HighTemperature" ))
181- highTempEvents.length should be (highTempEventsQuantity)
182- oddNumberEvents.length should be (0 )
183- }}
184-
185- " Find no events in noEventSeq" in withSparkSession { ss => {
186- import ss .implicits ._
187- val events : Array [Event ] = target.transform(ss.sparkContext.parallelize(fruitlessSeq).toDF).as[Event ].collect()
188- events.length should be (0 )
189- }}
190-
191- " Find no events in emptySeq" in withSparkSession { ss => {
192- import ss .implicits ._
193- val events : Array [Event ] = target.transform(ss.sparkContext.parallelize(emptySeq).toDF).as[Event ].collect()
194- events.length should be (0 )
195- }}
196-
198+ highTempEvents.length should be(highTempEventsQuantity)
199+ oddNumberEvents.length should be(0 )
200+ }
201+ " Find no events in noEventSeq" in {
202+ import spark .implicits ._
203+ val events : Array [Event ] = target.transform(spark.sparkContext.parallelize(fruitlessSeq).toDF).as[Event ].collect()
204+ events.length should be(0 )
205+ }
206+
207+ " Find no events in emptySeq" in {
208+ import spark .implicits ._
209+ val events : Array [Event ] = target.transform(spark.sparkContext.parallelize(emptySeq).toDF).as[Event ].collect()
210+ events.length should be(0 )
211+ }
197212 }
198-
199-
200213}
201214
202215case object FixedIdGenerator extends IDGenerator {
203216 override def generate (): String = " 0"
204- }
217+ }
0 commit comments