Skip to content

Commit cc5cc8d

Browse files
committed
removed background thread + scalariform
1 parent de5fe0c commit cc5cc8d

File tree

3 files changed

+15
-15
lines changed

3 files changed

+15
-15
lines changed

src/main/scala/com/github/simplesteph/ksm/AclSynchronizer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ object AclSynchronizer {
4343
class AclSynchronizer(
4444
authorizer: Authorizer,
4545
sourceAcl: SourceAcl,
46-
notification: Notification) extends Runnable {
46+
notification: Notification) {
4747

4848
import AclSynchronizer._
4949

5050
private var sourceAclsCache: SourceAclResult = _
5151

52-
override def run(): Unit = {
52+
def run(): Unit = {
5353

5454
// flatten the Kafka ACL
5555
val kafkaAcls: Set[(Resource, Acl)] = flattenKafkaAcls(authorizer.getAcls())
Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.github.simplesteph.ksm
22

3-
import java.util.concurrent.{ Executors, ScheduledFuture, TimeUnit }
3+
import java.util.concurrent.atomic.AtomicBoolean
44

55
import com.github.simplesteph.ksm.parser.CsvAclParser
66
import com.typesafe.config.ConfigFactory
@@ -13,6 +13,8 @@ object KafkaSecurityManager extends App {
1313
val config = ConfigFactory.load()
1414
val appConfig: AppConfig = new AppConfig(config)
1515

16+
var isCancelled: AtomicBoolean = new AtomicBoolean(false)
17+
1618
if (appConfig.KSM.extract) {
1719
new ExtractAcl(appConfig.Authorizer.authorizer, CsvAclParser).extract()
1820
} else {
@@ -21,20 +23,18 @@ object KafkaSecurityManager extends App {
2123
appConfig.Source.sourceAcl,
2224
appConfig.Notification.notification)
2325

24-
val executor = Executors.newScheduledThreadPool(1)
25-
val f: ScheduledFuture[_] = executor.scheduleAtFixedRate(aclSynchronizer, 1000,
26-
appConfig.KSM.refreshFrequencyMs, TimeUnit.MILLISECONDS)
27-
2826
Runtime.getRuntime.addShutdownHook(new Thread() {
2927
override def run(): Unit = {
28+
log.info("Received stop signal, Kafka Security Manager is shutting down...")
29+
isCancelled = new AtomicBoolean(true)
3030
aclSynchronizer.close()
31-
log.info("Kafka Security Manager is shutting down...")
32-
f.cancel(false)
33-
log.info("Waiting for thread to cleanly shutdown (10 seconds maximum)")
34-
executor.shutdown()
35-
executor.awaitTermination(10, TimeUnit.SECONDS)
36-
log.info("Kafka Security Manager has shut down...")
3731
}
3832
})
33+
34+
while (!isCancelled.get()) {
35+
aclSynchronizer.run()
36+
Thread.sleep(appConfig.KSM.refreshFrequencyMs)
37+
}
38+
3939
}
4040
}

src/main/scala/com/github/simplesteph/ksm/source/GitHubSourceAcl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ class GitHubSourceAcl extends SourceAcl {
6262
})
6363

6464
// we use this header for the 304
65-
lastModified.foreach( header => request.header("If-Modified-Since", header))
65+
lastModified.foreach(header => request.header("If-Modified-Since", header))
6666
val response: Response = HTTP.get(request)
6767

6868
response.status match {
6969
case 200 =>
7070
lastModified = response.header("Last-Modified")
7171
val b64encodedContent = objectMapper.readTree(response.textBody).get("content").asText()
72-
val data = new String(Base64.getDecoder.decode(b64encodedContent.replace("\n","").replace("\r","")), Charset.forName("UTF-8"))
72+
val data = new String(Base64.getDecoder.decode(b64encodedContent.replace("\n", "").replace("\r", "")), Charset.forName("UTF-8"))
7373
// use the CSV Parser
7474
Some(CsvAclParser.aclsFromReader(new StringReader(data)))
7575
case 304 =>

0 commit comments

Comments
 (0)