Skip to content

Commit cdee941

Browse files
Matteo Seclìagilelab-tmnd1991
authored andcommitted
[!463] - Feature/580 upgrade akka in order to be compatible with java 17
# New features and improvements Change the Akka version to be compatible with Java 17. List of dependency updates: - akka = "2.6.21" - akkaHttp = "10.2.10" - typesafeConfig = "1.4.2" - akkaKryo = "1.1.5" New dependency: akka-cluster-metrics Removed dependency: akka-contrib Change akkaKryo from: "**com.github.romix.akka**" %% "akka-kryo-serialization" to: "**io.altoo**" %% "akka-kryo-serialization" Akka Remoting has a new behavior called Artery, but we keep the **classic** behavior in this feature. # Breaking changes - Scala 2.11 is no longer supported - Prefix serializer "com.romix.akka.serialization.*" in configs not working anymore - Akka Remoting has a new behavior called Artery, so the configuration akka.remote.* is not working anymore, and also artery is enabled by default # Migration Applications using this WASP version should pay attention to configuration changes. - New prefix serializer: io.altoo.akka.serialization.* - akka.remote configuration changes needed: - akka.remote.artery.enabled = false - other akka.remote.* configs must be changed to akka.remote.classic.* For other clarification, here are the Akka migration guides: - https://doc.akka.io/libraries/akka-core/2.5/project/migration-guide-2.4.x-2.5.x.html - https://doc.akka.io/libraries/akka-core/2.6/project/migration-guide-2.5.x-2.6.x.html # Bug fixes none # How this feature was tested Unit test # Related issue Closes #580
1 parent 02e1d9c commit cdee941

File tree

35 files changed

+376
-429
lines changed

35 files changed

+376
-429
lines changed

consumers-spark/src/main/resources/reference.conf

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ wasp {
1414
actor {
1515

1616
serializers {
17-
kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
17+
kryo = "io.altoo.akka.serialization.kryo.KryoSerializer"
1818
}
1919

2020
kryo {
@@ -66,7 +66,8 @@ wasp {
6666

6767
}
6868
remote {
69-
netty.tcp {
69+
artery.enabled = false
70+
classic.netty.tcp {
7071
port = 0
7172
hostname = "localhost"
7273
}

consumers-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/streaming/actor/master/SparkConsumersStreamingMasterGuardian.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class SparkConsumersStreamingMasterGuardian(
146146
unstashAll()
147147
log.debug("Unstashing")
148148
log.info(s"Setting ${Timers.unschedulableCheck} to recover unschedulable pipegraphs")
149-
setTimer(Timers.unschedulableCheck, RecoverUnschedulable, unschedulableCheckInterval, repeat = true)
149+
startTimerAtFixedRate(Timers.unschedulableCheck, RecoverUnschedulable, unschedulableCheckInterval)
150150
}
151151

152152
private def handleUnschedulable: StateFunction = {
@@ -351,7 +351,7 @@ class SparkConsumersStreamingMasterGuardian(
351351

352352
case Event(_: ChildProtocol.WorkNotCancelled, _: Schedule)
353353
=>
354-
setTimer(
354+
startTimerAtFixedRate(
355355
Timers.workNotCancelledRetryTimer,
356356
RetryEnvelope(ChildProtocol.WorkNotCancelled, sender()),
357357
retryInterval

consumers-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/streaming/actor/pipegraph/PipegraphGuardian.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ class PipegraphGuardian(private val master: ActorRef,
305305
case (Activating, Activating) => nextStateData match {
306306
case ActivatingData.ShouldRetry() =>
307307
log.info("[Activating->Activating] Scheduling Retry")
308-
setTimer("retry", MyProtocol.PerformRetry, retryDuration)
308+
startTimerAtFixedRate("retry", MyProtocol.PerformRetry, retryDuration)
309309
case ActivatingData.ToBeActivated(etl) =>
310310
log.info("[Activating->Activating] Activating [{}]", etl.name)
311311
self ! MyProtocol.ActivateETL(etl)
@@ -386,7 +386,7 @@ class PipegraphGuardian(private val master: ActorRef,
386386
case (Materializing, Materializing) => nextStateData match {
387387
case MaterializingData.ShouldRetry() =>
388388
log.info("[Materializing->Materializing] Scheduling Retry")
389-
setTimer("retry", MyProtocol.PerformRetry, retryDuration)
389+
startTimerAtFixedRate("retry", MyProtocol.PerformRetry, retryDuration)
390390
case MaterializingData.ToBeMaterialized(WorkerToEtlAssociation(worker, data)) =>
391391
log.info("[Materializing->Materializing] materialize [{}->{}]", worker, data.name)
392392
self ! MyProtocol.MaterializeETL(worker, data)
@@ -425,13 +425,13 @@ class PipegraphGuardian(private val master: ActorRef,
425425
case (Monitoring, Monitored) => nextStateData match {
426426
case MonitoredData.ShouldRetry() =>
427427
log.info("[Monitoring->Monitored] Scheduling Retry")
428-
setTimer("retry", MyProtocol.PerformRetry, retryDuration)
428+
startTimerAtFixedRate("retry", MyProtocol.PerformRetry, retryDuration)
429429
case MonitoredData.ShouldStopAll() =>
430430
log.info("[Monitoring->Monitored] StopAll")
431431
self ! MyProtocol.CancelWork
432432
case MonitoredData.ShouldMonitorAgain() =>
433433
log.info("[Monitoring->Monitored] Scheduling Monitoring")
434-
setTimer("monitoring", MyProtocol.MonitorPipegraph, monitoringInterval)
434+
startTimerAtFixedRate("monitoring", MyProtocol.MonitorPipegraph, monitoringInterval)
435435

436436
case MonitoredData.NothingToMonitor() =>
437437
log.info("[Monitoring->Monitored] Nothing to monitor")
@@ -453,8 +453,9 @@ class PipegraphGuardian(private val master: ActorRef,
453453
case ActivatingData.ToBeActivated(etl) =>
454454
log.info("[Monitored->Activating] Scheduling activation")
455455
self ! MyProtocol.ActivateETL(etl)
456-
457-
456+
case ActivatingData.AllActive() =>
457+
log.info("[Monitored->Activating] AllActive")
458+
self ! MyProtocol.ActivationFinished
458459
}
459460

460461
case (Monitored, Monitoring) => nextStateData match {

consumers-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/streaming/actor/telemetry/TelemetryActor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class TelemetryActor private() extends Actor with CompatibilityTelemetryActor{
168168

169169
implicit val ec: ExecutionContextExecutor = context.system.dispatcher
170170

171-
val _ = context.system.scheduler.schedule(
171+
val _ = context.system.scheduler.scheduleAtFixedRate(
172172
FiniteDuration(5, TimeUnit.SECONDS),
173173
FiniteDuration(5, TimeUnit.SECONDS),
174174
mediator,

consumers-spark/src/main/scala/it/agilelab/bigdata/wasp/consumers/spark/streaming/actor/watchdog/SparkContextWatchDog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ class SparkContextWatchDog private(sc: SparkContext, failureAction: () => Unit)
1414
import scala.concurrent.duration._
1515

1616
implicit val ec = context.system.dispatcher
17-
context.system.scheduler.schedule(0.seconds, 1.second, self, MonitorSparkContext)
18-
context.system.scheduler.schedule(0.seconds, 1.second, self, MonitorHdfTokens)
17+
context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.second, self, MonitorSparkContext)
18+
context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.second, self, MonitorHdfTokens)
1919

2020

2121
def waitForSparkContextToBeAvailable : Receive = {

consumers-spark/src/test/resources/application.conf

Lines changed: 26 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@ akka {
88

99

1010
remote {
11-
log-remote-lifecycle-events = off
12-
enabled-transports = ["akka.remote.netty.tcp"]
13-
netty.tcp {
14-
port = 2892
15-
hostname = "localhost"
11+
artery.enabled = false
12+
classic {
13+
log-remote-lifecycle-events = off
14+
enabled-transports = ["akka.remote.classic.netty.tcp"]
15+
netty.tcp {
16+
port = 2892
17+
hostname = "localhost"
18+
}
1619
}
1720
}
1821

@@ -32,7 +35,7 @@ akka {
3235
}
3336
}
3437

35-
system-0 {
38+
default_multimaster {
3639
akka {
3740
loglevel = "DEBUG"
3841
loggers = ["akka.event.slf4j.Slf4jLogger"]
@@ -43,11 +46,13 @@ system-0 {
4346

4447

4548
remote {
46-
log-remote-lifecycle-events = off
47-
enabled-transports = ["akka.remote.netty.tcp"]
48-
netty.tcp {
49-
port = 2893
50-
hostname = "localhost"
49+
artery.enabled = false
50+
classic {
51+
log-remote-lifecycle-events = off
52+
enabled-transports = ["akka.remote.classic.netty.tcp"]
53+
netty.tcp {
54+
hostname = "localhost"
55+
}
5156
}
5257
}
5358

@@ -63,116 +68,21 @@ system-0 {
6368
publish-stats-interval = 10s
6469
metrics.gossip-interval = 10s
6570
metrics.collect-interval = 10s
66-
roles = ["consumers-spark-streaming"]
6771
}
6872
}
6973
}
7074

71-
72-
system-1 {
73-
akka {
74-
loglevel = "DEBUG"
75-
loggers = ["akka.event.slf4j.Slf4jLogger"]
76-
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
77-
logger-startup-timeout = 60s
78-
log-dead-letters = off
79-
log-dead-letters-during-shutdown = off
80-
81-
82-
remote {
83-
log-remote-lifecycle-events = off
84-
enabled-transports = ["akka.remote.netty.tcp"]
85-
netty.tcp {
86-
port = 2894
87-
hostname = "localhost"
88-
}
89-
}
90-
91-
actor {
92-
provider = "akka.cluster.ClusterActorRefProvider"
93-
debug.fsm = true
94-
}
95-
96-
cluster {
97-
log-info = on
98-
seed-nodes = ["akka.tcp://WASP@localhost:2893"]
99-
gossip-interval = 5s
100-
publish-stats-interval = 10s
101-
metrics.gossip-interval = 10s
102-
metrics.collect-interval = 10s
103-
roles = ["consumers-spark-streaming"]
104-
}
105-
}
75+
system-0.akka {
76+
remote.classic.netty.tcp.port = 2893
77+
cluster.roles=["consumers-spark-streaming"]
10678
}
79+
system-1.akka {
80+
remote.classic.netty.tcp.port = 2894
81+
cluster.roles=["consumers-spark-streaming"]
10782

108-
109-
system-2 {
110-
akka {
111-
loglevel = "DEBUG"
112-
loggers = ["akka.event.slf4j.Slf4jLogger"]
113-
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
114-
logger-startup-timeout = 60s
115-
log-dead-letters = off
116-
log-dead-letters-during-shutdown = off
117-
118-
119-
remote {
120-
log-remote-lifecycle-events = off
121-
enabled-transports = ["akka.remote.netty.tcp"]
122-
netty.tcp {
123-
port = 2895
124-
hostname = "localhost"
125-
}
126-
}
127-
128-
actor {
129-
provider = "akka.cluster.ClusterActorRefProvider"
130-
debug.fsm = true
131-
}
132-
133-
cluster {
134-
log-info = on
135-
seed-nodes = ["akka.tcp://WASP@localhost:2893"]
136-
gossip-interval = 5s
137-
publish-stats-interval = 10s
138-
metrics.gossip-interval = 10s
139-
metrics.collect-interval = 10s
140-
roles = ["consumers-spark-streaming"]
141-
}
142-
}
14383
}
144-
145-
coordinator {
146-
akka {
147-
loglevel = "DEBUG"
148-
loggers = ["akka.event.slf4j.Slf4jLogger"]
149-
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
150-
logger-startup-timeout = 60s
151-
log-dead-letters = off
152-
log-dead-letters-during-shutdown = off
153-
154-
155-
remote {
156-
log-remote-lifecycle-events = off
157-
enabled-transports = ["akka.remote.netty.tcp"]
158-
netty.tcp {
159-
port = 2896
160-
hostname = "localhost"
161-
}
162-
}
163-
164-
actor {
165-
provider = "akka.cluster.ClusterActorRefProvider"
166-
debug.fsm = true
167-
}
168-
169-
cluster {
170-
log-info = on
171-
seed-nodes = ["akka.tcp://WASP@localhost:2893"]
172-
gossip-interval = 5s
173-
publish-stats-interval = 10s
174-
metrics.gossip-interval = 10s
175-
metrics.collect-interval = 10s
176-
}
177-
}
84+
system-2.akka {
85+
remote.classic.netty.tcp.port = 2895
86+
cluster.roles=["consumers-spark-streaming"]
17887
}
88+
coordinator.akka.remote.classic.netty.tcp.port = 2896

consumers-spark/src/test/resources/log4j.xml

Lines changed: 0 additions & 15 deletions
This file was deleted.

consumers-spark/src/test/resources/log4j2.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
<Configuration status="WARN">
33
<Appenders>
44
<Console name="Console" target="SYSTEM_OUT">
5-
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
5+
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %c{1}.%M - %msg%n"/>
66
</Console>
77
</Appenders>
88
<Loggers>
9-
<Root level="ERROR">
9+
<Root level="WARN">
1010
<AppenderRef ref="Console"/>
1111
</Root>
1212
</Loggers>

0 commit comments

Comments
 (0)