@@ -3,7 +3,6 @@ package it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.master
33import java .net .URLEncoder
44import java .nio .charset .StandardCharsets
55import java .util .UUID
6- import java .util .concurrent .TimeUnit
76import akka .actor .FSM .{CurrentState , SubscribeTransitionCallBack , Transition }
87import akka .actor .{Actor , ActorRef , ActorSystem , PoisonPill , Props , Terminated }
98import akka .cluster .ClusterEvent .{InitialStateAsEvents , MemberUp }
@@ -28,6 +27,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
2827
2928import scala .concurrent .duration .Duration
3029import scala .concurrent .{Await , Future }
30+ import scala .concurrent .duration ._
3131
3232class MultiMasterSpec
3333 extends WordSpecLike
@@ -40,9 +40,8 @@ class MultiMasterSpec
4040 import Protocol ._
4141 import SparkConsumersStreamingMasterGuardian ._
4242
43- import scala .concurrent .duration ._
4443
45- val slowTimeout : FiniteDuration = FiniteDuration ( 5 , TimeUnit . MINUTES )
44+ val slowTimeout : FiniteDuration = 5 .minutes
4645
4746 def childCreatorFactory (probe : TestProbe ): ChildCreator = { (_, name, factory) =>
4847 {
@@ -73,7 +72,7 @@ class MultiMasterSpec
7372 watchdogCreator,
7473 " collaborator" ,
7574 1 .millisecond,
76- FiniteDuration ( 5 , TimeUnit . SECONDS )
75+ 5 .seconds
7776 )
7877
7978 val childCreator = childCreatorFactory(probe)
@@ -138,7 +137,7 @@ class MultiMasterSpec
138137 watchdogCreator,
139138 " collaborator" ,
140139 1 .millisecond,
141- FiniteDuration ( 5 , TimeUnit . SECONDS ) ,
140+ 5 .seconds ,
142141 schedulingStrategy = new TestNodeLabelSchedulingStrategyFactory
143142 )
144143
@@ -212,7 +211,7 @@ class MultiMasterSpec
212211 watchdogCreator,
213212 " collaborator" ,
214213 1 .millisecond,
215- FiniteDuration ( 5 , TimeUnit . SECONDS )
214+ 5 .seconds
216215 )
217216
218217 cluster(" system-0" , props, childCreator) { (cluster0, _, _, shutdown0) =>
@@ -302,7 +301,7 @@ class MultiMasterSpec
302301 watchdogCreator,
303302 " collaborator" ,
304303 1 .millisecond,
305- FiniteDuration ( 5 , TimeUnit . SECONDS ) ,
304+ 5 .seconds ,
306305 Some (whoIsRunningTheSingletonProbe.ref)
307306 )
308307
@@ -347,7 +346,7 @@ class MultiMasterSpec
347346 watchdogCreator,
348347 " collaborator" ,
349348 1 .second,
350- FiniteDuration ( 5 , TimeUnit . SECONDS )
349+ 5 .seconds
351350 )
352351
353352 cluster(" system-0" , props, childCreator) { (cluster0, _, _, shutdown0) =>
@@ -366,6 +365,7 @@ class MultiMasterSpec
366365
367366 Seq (cluster0, cluster1, cluster2).find(_.selfUniqueAddress == address).foreach { cluster =>
368367 cluster.leave(cluster.selfAddress)
368+ cluster.down(cluster.selfAddress)
369369 }
370370
371371 probe.expectMsgPF(slowTimeout) {
@@ -418,7 +418,7 @@ class MultiMasterSpec
418418 watchdogCreator,
419419 " collaborator" ,
420420 1 .millisecond,
421- FiniteDuration ( 5 , TimeUnit . SECONDS ) ,
421+ 5 .seconds ,
422422 debugActor = Some (whoIsRunningTheSingletonProbe.ref)
423423 )
424424
@@ -520,7 +520,7 @@ class MultiMasterSpec
520520 watchdogCreator,
521521 " collaborator" ,
522522 1 .millisecond,
523- FiniteDuration ( 5 , TimeUnit . SECONDS ) ,
523+ 5 .seconds ,
524524 Some (whoIsRunningTheSingletonProbe.ref)
525525 )
526526
@@ -566,6 +566,7 @@ class MultiMasterSpec
566566 Seq (cluster0, cluster1, cluster2).find(_.selfUniqueAddress == firstOneRunningTheSingleton).map {
567567 cluster =>
568568 cluster.leave(cluster.selfAddress)
569+ cluster.down(cluster.selfAddress)
569570 cluster.selfUniqueAddress
570571 }
571572
@@ -616,6 +617,8 @@ trait SystemUtils {
616617 val proxy = system.actorOf(ClusterSingletonProxy .props(" singleton-manager" , proxySettings))
617618
618619 val shutdown : () => Future [Terminated ] = () => {
620+ cluster.leave(cluster.selfAddress)
621+ cluster.down(cluster.selfAddress)
619622 system.terminate()
620623 }
621624
@@ -677,6 +680,8 @@ trait SystemUtils {
677680 system.actorOf(CollaboratorActor .props(proxy, childCreator), " collaborator" )
678681
679682 val shutdown : () => Future [Terminated ] = () => {
683+ cluster.leave(cluster.selfAddress)
684+ cluster.down(cluster.selfAddress)
680685 system.terminate()
681686 }
682687
0 commit comments