Skip to content

Commit 5f8447b

Browse files
committed
Directory support first draft
1 parent 621186c commit 5f8447b

File tree

9 files changed

+98
-71
lines changed

9 files changed

+98
-71
lines changed

src/main/scala/io/conduktor/ksm/AclSynchronizer.scala

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.conduktor.ksm.source.{ParsingContext, SourceAcl}
66
import kafka.security.auth.{Acl, Authorizer, Resource}
77
import org.slf4j.{Logger, LoggerFactory}
88

9+
import scala.collection.mutable
910
import scala.util.{Failure, Success, Try}
1011

1112
object AclSynchronizer {
@@ -66,7 +67,8 @@ class AclSynchronizer(
6667

6768
import AclSynchronizer._
6869

69-
private var sourceAclsCache: Set[(Resource, Acl)] = _
70+
private val sourceAclsCache: mutable.Map[String, Set[(Resource, Acl)]] =
71+
mutable.Map()
7072
private var failedRefreshes: Int = 0
7173

7274
if (readOnly) {
@@ -84,50 +86,49 @@ class AclSynchronizer(
8486
Try(sourceAcl.refresh()) match {
8587
case Success(result) =>
8688
failedRefreshes = 0
87-
result match {
88-
// the source has not changed
89-
case None =>
90-
if (sourceAclsCache != null) {
91-
// the Kafka Acls may have changed so we check against the last known correct SourceAcl that we cached
89+
result.foreach { context: ParsingContext =>
90+
context match {
91+
case ParsingContext(resourceKey, aclParser, reader, true) =>
92+
val sourceAclResult = aclParser.aclsFromReader(reader)
93+
reader.close()
94+
sourceAclResult.result match {
95+
// the source has changed
96+
case Right(ksmAcls) =>
97+
// we have a new result, so we cache it
98+
sourceAclsCache + (resourceKey -> ksmAcls)
99+
applySourceAcls(
100+
sourceAclsCache(resourceKey),
101+
getKafkaAcls,
102+
notification,
103+
authorizer
104+
)
105+
case Left(parsingExceptions: List[Exception]) =>
106+
// parsing exceptions we want to notify
107+
log.error(
108+
"Exceptions while refreshing ACL source:",
109+
parsingExceptions.map(e => e.toString).mkString("\n")
110+
)
111+
// ugly but for now this will do
112+
notification.notifyErrors(
113+
parsingExceptions.map(e => Try(throw e))
114+
)
115+
}
116+
case ParsingContext(resourceKey, _, _, false) =>
117+
// the source does not need updating, reapply the permission
92118
applySourceAcls(
93-
sourceAclsCache,
119+
sourceAclsCache.getOrElse(resourceKey, Set()),
94120
getKafkaAcls,
95121
notification,
96122
authorizer
97123
)
98-
}
99-
case Some(ParsingContext(parser, reader)) =>
100-
val sourceAclResult = parser.aclsFromReader(reader)
101-
reader.close()
102-
sourceAclResult.result match {
103-
// the source has changed
104-
case Right(ksmAcls) =>
105-
// we have a new result, so we cache it
106-
sourceAclsCache = ksmAcls
107-
applySourceAcls(
108-
sourceAclsCache,
109-
getKafkaAcls,
110-
notification,
111-
authorizer
112-
)
113-
case Left(parsingExceptions: List[Exception]) =>
114-
// parsing exceptions we want to notify
115-
log.error(
116-
"Exceptions while refreshing ACL source:",
117-
parsingExceptions.map(e => e.toString).mkString("\n")
118-
)
119-
// ugly but for now this will do
120-
notification.notifyErrors(
121-
parsingExceptions.map(e => Try(throw e))
122-
)
123-
}
124+
}
124125
}
125126
case Failure(e) =>
126127
// errors such as HTTP exceptions when refreshing
127128
failedRefreshes += 1
128129
try {
129130
log.error("Exceptions while refreshing ACL source:", e)
130-
if(failedRefreshes >= numFailedRefreshesBeforeNotification){
131+
if (failedRefreshes >= numFailedRefreshesBeforeNotification) {
131132
notification.notifyErrors(List(Try(e)))
132133
failedRefreshes = 0
133134
}

src/main/scala/io/conduktor/ksm/source/BitbucketCloudSourceAcl.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class BitbucketCloudSourceAcl(parserRegistry: AclParserRegistry)
4545
password = config.getString(AUTH_PASSWORD_CONFIG)
4646
}
4747

48-
override def refresh(): Option[ParsingContext] = {
48+
override def refresh(): List[ParsingContext] = {
4949
// get the latest file
5050
val url = s"$apiurl/repositories/$organization/$repo/src/master/$filePath"
5151
val request: Request = new Request(url)
@@ -63,7 +63,13 @@ class BitbucketCloudSourceAcl(parserRegistry: AclParserRegistry)
6363
case 200 =>
6464
// we receive a valid response
6565
val reader = new BufferedReader(new StringReader(response.textBody))
66-
Some(source.ParsingContext(parserRegistry.getParserByFilename(filePath), reader))
66+
List(
67+
ParsingContext(
68+
filePath,
69+
parserRegistry.getParserByFilename(filePath),
70+
reader
71+
)
72+
)
6773
case _ =>
6874
// uncaught error
6975
log.warn(response.asString)

src/main/scala/io/conduktor/ksm/source/BitbucketServerSourceAcl.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class BitbucketServerSourceAcl(parserRegistry: AclParserRegistry)
6363
})
6464
}
6565

66-
override def refresh(): Option[ParsingContext] = {
66+
override def refresh(): List[ParsingContext] = {
6767
// get changes since last commit
6868
val url =
6969
s"$protocol://$hostname:$port/rest/api/1.0/projects/$project/repos/$repo/commits"
@@ -103,8 +103,9 @@ class BitbucketServerSourceAcl(parserRegistry: AclParserRegistry)
103103
// update the last commit id
104104
lastCommit = Some(values.get(0).get("id").asText())
105105
val data = fileResponse.textBody
106-
Some(
107-
source.ParsingContext(
106+
List(
107+
ParsingContext(
108+
filePath,
108109
parserRegistry.getParserByFilename(filePath),
109110
new StringReader(data)
110111
)
@@ -115,7 +116,7 @@ class BitbucketServerSourceAcl(parserRegistry: AclParserRegistry)
115116
throw HTTPException(Some(response.asString), response)
116117
}
117118
} else {
118-
None
119+
List()
119120
}
120121
case _ =>
121122
// uncaught error

src/main/scala/io/conduktor/ksm/source/FileSourceAcl.scala

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package io.conduktor.ksm.source
22

33
import com.typesafe.config.Config
44
import io.conduktor.ksm.parser.AclParserRegistry
5-
import io.conduktor.ksm.source
65

76
import java.io.{File, FileReader}
7+
import scala.util.Try
88

99
class FileSourceAcl(parserRegistry: AclParserRegistry)
1010
extends SourceAcl(parserRegistry) {
@@ -14,6 +14,7 @@ class FileSourceAcl(parserRegistry: AclParserRegistry)
1414

1515
var lastModified: Long = -1
1616
var filename: String = _
17+
val modifiedMap: Map[String, Long] = Map[String, Long]()
1718

1819
/**
1920
* internal config definition for the module
@@ -29,14 +30,31 @@ class FileSourceAcl(parserRegistry: AclParserRegistry)
2930
* Uses a CSV parser on the file afterwards
3031
* @return
3132
*/
32-
override def refresh(): Option[ParsingContext] = {
33-
val file = new File(filename)
34-
if (file.lastModified() > lastModified) {
35-
val reader = new FileReader(file)
36-
lastModified = file.lastModified()
37-
Some(source.ParsingContext(parserRegistry.getParserByFilename(filename), reader))
33+
override def refresh(): List[ParsingContext] = {
34+
35+
val path = new File(filename)
36+
if (path.exists()) {
37+
val files =
38+
if (path.isFile)
39+
List(path)
40+
else
41+
path.listFiles.filter(_.isFile).toList
42+
43+
files.map(file =>
44+
ParsingContext(
45+
file.getName,
46+
parserRegistry.getParserByFilename(file.getName),
47+
new FileReader(file),
48+
file.lastModified >= (modifiedMap.get(file.getName) match {
49+
case None =>
50+
modifiedMap + (file.getName -> file.lastModified)
51+
0L
52+
case Some(value) => value
53+
})
54+
)
55+
)
3856
} else {
39-
None
57+
List()
4058
}
4159
}
4260

src/main/scala/io/conduktor/ksm/source/GitHubSourceAcl.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package io.conduktor.ksm.source
33
import com.fasterxml.jackson.databind.ObjectMapper
44
import com.typesafe.config.Config
55
import io.conduktor.ksm.parser.AclParserRegistry
6-
import io.conduktor.ksm.source
76
import org.slf4j.LoggerFactory
87
import skinny.http.{HTTP, HTTPException, Request, Response}
98

@@ -49,7 +48,7 @@ class GitHubSourceAcl(parserRegistry: AclParserRegistry)
4948
tokenOpt = Try(config.getString(AUTH_TOKEN_CONFIG)).toOption
5049
}
5150

52-
override def refresh(): Option[ParsingContext] = {
51+
override def refresh(): List[ParsingContext] = {
5352
val url =
5453
s"https://$hostname/repos/$user/$repo/contents/$filepath?ref=$branch"
5554
val request: Request = new Request(url)
@@ -80,11 +79,15 @@ class GitHubSourceAcl(parserRegistry: AclParserRegistry)
8079
Charset.forName("UTF-8")
8180
)
8281
// use the CSV Parser
83-
Some(
84-
source.ParsingContext(parserRegistry.getParserByFilename(filepath), new StringReader(data))
82+
List(
83+
ParsingContext(
84+
filepath,
85+
parserRegistry.getParserByFilename(filepath),
86+
new StringReader(data)
87+
)
8588
)
8689
case 304 =>
87-
None
90+
List()
8891
case _ =>
8992
// we got an http error so we propagate it
9093
log.warn(response.asString)

src/main/scala/io/conduktor/ksm/source/GitLabSourceAcl.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package io.conduktor.ksm.source
33
import com.fasterxml.jackson.databind.ObjectMapper
44
import com.typesafe.config.Config
55
import io.conduktor.ksm.parser.AclParserRegistry
6-
import io.conduktor.ksm.source
76
import org.slf4j.LoggerFactory
87
import skinny.http.{HTTP, HTTPException, Request, Response}
98

@@ -42,7 +41,7 @@ class GitLabSourceAcl(parserRegistry: AclParserRegistry)
4241
accessToken = config.getString(ACCESSTOKEN_CONFIG)
4342
}
4443

45-
override def refresh(): Option[ParsingContext] = {
44+
override def refresh(): List[ParsingContext] = {
4645
val url =
4746
s"https://$hostname/api/v4/projects/$repoid/repository/files/$filepath?ref=$branch"
4847
val request: Request = new Request(url)
@@ -62,7 +61,7 @@ class GitLabSourceAcl(parserRegistry: AclParserRegistry)
6261
log.info(
6362
s"No changes were detected in the ACL file ${filepath}. Skipping .... "
6463
)
65-
None
64+
List()
6665
case _ =>
6766
val response: Response = HTTP.get(request)
6867
response.status match {
@@ -76,8 +75,9 @@ class GitLabSourceAcl(parserRegistry: AclParserRegistry)
7675
Charset.forName("UTF-8")
7776
)
7877
// use the CSV Parser
79-
Some(
80-
source.ParsingContext(
78+
List(
79+
ParsingContext(
80+
filepath,
8181
parserRegistry.getParserByFilename(filepath),
8282
new StringReader(data)
8383
)

src/main/scala/io/conduktor/ksm/source/NoSourceAcl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class NoSourceAcl(parserRegistry: AclParserRegistry) extends SourceAcl(parserReg
2626
*
2727
* @return
2828
*/
29-
override def refresh(): Option[ParsingContext] = None
29+
override def refresh(): List[ParsingContext] = List()
3030

3131
/**
3232
* Close all the necessary underlying objects or connections belonging to this instance

src/main/scala/io/conduktor/ksm/source/S3SourceAcl.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
package io.conduktor.ksm.source
22

3-
import java.io._
4-
import java.util.Date
53
import com.amazonaws.regions.Regions
64
import com.amazonaws.services.s3._
75
import com.amazonaws.services.s3.model._
8-
import io.conduktor.ksm.parser.AclParserRegistry
96
import com.typesafe.config.Config
107
import io.conduktor.ksm.parser.AclParserRegistry
11-
import io.conduktor.ksm.source
128
import org.slf4j.LoggerFactory
139

10+
import java.io._
11+
import java.util.Date
12+
1413
class S3SourceAcl(parserRegistry: AclParserRegistry)
1514
extends SourceAcl(parserRegistry) {
1615

@@ -59,7 +58,7 @@ class S3SourceAcl(parserRegistry: AclParserRegistry)
5958
*
6059
* @return
6160
*/
62-
override def refresh(): Option[ParsingContext] = {
61+
override def refresh(): List[ParsingContext] = {
6362
val s3 = s3Client()
6463
val s3object = Option(
6564
s3.getObject(
@@ -84,13 +83,14 @@ class S3SourceAcl(parserRegistry: AclParserRegistry)
8483

8584
reader.close()
8685
bucket.close()
87-
Some(
88-
source.ParsingContext(
86+
List(
87+
ParsingContext(
88+
key,
8989
parserRegistry.getParserByFilename(key),
9090
new BufferedReader(new StringReader(content))
9191
)
9292
)
93-
case None => None
93+
case None => List()
9494
}
9595
}
9696

src/main/scala/io/conduktor/ksm/source/SourceAcl.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package io.conduktor.ksm.source
22

3-
import io.conduktor.ksm.parser.AclParserRegistry
4-
53
import java.io.Reader
64
import com.typesafe.config.Config
75
import io.conduktor.ksm.parser.{AclParser, AclParserRegistry}
86

9-
case class ParsingContext(aclParser: AclParser, reader: Reader)
7+
case class ParsingContext(resourceKey: String, aclParser: AclParser, reader: Reader, shouldUpdate: Boolean = true)
108

119
abstract class SourceAcl(val parserRegistry: AclParserRegistry) {
1210

@@ -30,7 +28,7 @@ abstract class SourceAcl(val parserRegistry: AclParserRegistry) {
3028
* Kafka Security Manager will not update Acls in Kafka until there are no errors in the result
3129
* @return
3230
*/
33-
def refresh(): Option[ParsingContext]
31+
def refresh(): List[ParsingContext]
3432

3533
/**
3634
* Close all the necessary underlying objects or connections belonging to this instance

0 commit comments

Comments
 (0)