Skip to content

Commit da7420c

Browse files
authored
Kafka 2.0.0 (#30)
* KAFKA-2.0.0 WIP * final touches * relaxed constraint and made column change backwards compatible * removed KsmService.yml
1 parent 65a3cd2 commit da7420c

File tree

16 files changed

+172
-196
lines changed

16 files changed

+172
-196
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
55
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
66

77
## [0.4-SNAPSHOT]
8-
8+
- Added S3 Acl Source (#27)
9+
- Upgraded to Kafka 2.0
10+
- New format to ACLs that allows Patterns (like prefixes)
11+
- Upgrades to Docker Compose file
912

1013
## [0.3] - 13/06/2018
1114
- Added gRPC endpoint to perform API calls on KSM (the goal is to build a UI on top of KSM)

README.md

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ Your role is to ensure that Kafka Security Manager is never down, as it is now a
1616

1717
A sample CSV to manage ACL is:
1818
```
19-
KafkaPrincipal,ResourceType,ResourceName,Operation,PermissionType,Host
20-
User:alice,Topic,foo,Read,Allow,*
21-
User:bob,Group,bar,Write,Deny,12.34.56.78
22-
User:peter,Cluster,kafka-cluster,Create,Allow,*
19+
KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
20+
User:alice,Topic,LITERAL,foo,Read,Allow,*
21+
User:bob,Group,bar,PREFIXED,Write,Deny,12.34.56.78
22+
User:peter,Cluster,LITERAL,kafka-cluster,Create,Allow,*
2323
```
2424

25+
**Important Note**: As of KSM 0.4, a new column `PatternType` has been added to match the changes that happened in Kafka 2.0. This enables KSM to manage `LITERAL` and `PREFIXED` ACLs. See #28
26+
2527
# Building
2628

2729
```
@@ -136,11 +138,6 @@ docker-compose down
136138

137139
For full usage of the docker-compose file see [kafka-stack-docker-compose](https://github.com/simplesteph/kafka-stack-docker-compose)
138140

139-
Add the entry to your `/etc/hosts` file
140-
```
141-
127.0.0.1 kafka1
142-
```
143-
144141
## Extracting ACLs
145142

146143
You can initially extract all your existing ACL in Kafka by running the program with the config `extract=true` or `export EXTRACT=true`
@@ -151,10 +148,10 @@ Output should look like:
151148
[2018-03-06 21:49:44,704] INFO Getting ACLs from Kafka (ExtractAcl)
152149
[2018-03-06 21:49:44,704] INFO Closing Authorizer (ExtractAcl)
153150
154-
KafkaPrincipal,ResourceType,ResourceName,Operation,PermissionType,Host
155-
User:bob,Group,bar,Write,Deny,12.34.56.78
156-
User:alice,Topic,foo,Read,Allow,*
157-
User:peter,Cluster,kafka-cluster,Create,Allow,*
151+
KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
152+
User:bob,Group,PREFIXED,bar,Write,Deny,12.34.56.78
153+
User:alice,Topic,LITERAL,foo,Read,Allow,*
154+
User:peter,Cluster,LITERAL,kafka-cluster,Create,Allow,*
158155
```
159156

160157
You can then use place this CSV anywhere and use it as your source of truth.
@@ -176,14 +173,17 @@ This provides a REST API to consume data from KSM. Swagger definition is provide
176173

177174
The API is defined according to the proto file in [src/main/protobuf/](src/main/protobuf/)
178175

176+
# Upgrade Notes
177+
TODO: Mention to look for inter broker protocol version before doing this
178+
179179
# Compatibility
180180

181-
KSM Version | Kafka Version
182-
--- | ---
183-
0.4-SNAPSHOT | 1.1.x
184-
0.3 | 1.1.x
185-
0.2 | 1.1.x (upgrade to 0.3 recommended)
186-
0.1 | 1.0.x (might work for earlier versions)
181+
KSM Version | Kafka Version | Notes
182+
--- | --- | ---
183+
0.4-SNAPSHOT | 2.0.0 | important change: added column 'PatternType' in CSV
184+
0.3 | 1.1.x |
185+
0.2 | 1.1.x | upgrade to 0.3 recommended
186+
0.1 | 1.0.x | might work for earlier versions
187187

188188
# Contributing
189189

build.sbt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ resolvers ++= Seq(
1818

1919
libraryDependencies ++= Seq(
2020
// kafka
21-
"org.apache.kafka" %% "kafka" % "1.1.0",
22-
"net.manub" %% "scalatest-embedded-kafka" % "1.1.0-kafka1.1-nosr" % "test",
21+
"org.apache.kafka" %% "kafka" % "2.0.0",
22+
"net.manub" %% "scalatest-embedded-kafka" % "2.0.0" % "test",
23+
24+
// test
25+
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
2326

2427
// logging
2528
"org.slf4j" % "slf4j-api" % "1.7.25",

docker-compose.yml

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ services:
1212
ZOO_SERVERS: server.1=zoo1:2888:3888
1313

1414
kafka1:
15-
image: confluentinc/cp-kafka:4.0.0
15+
image: confluentinc/cp-kafka:5.0.0
1616
hostname: kafka1
1717
ports:
1818
- "9092:9092"
1919
environment:
20-
# add the entry "127.0.0.1 kafka1" to your /etc/hosts file
21-
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"
20+
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
21+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
22+
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
2223
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
2324
KAFKA_BROKER_ID: 1
2425
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
@@ -31,10 +32,14 @@ services:
3132
environment:
3233
KSM_READONLY: false
3334
AUTHORIZER_ZOOKEEPER_CONNECT: "zoo1:2181"
34-
SOURCE_CLASS: "com.github.simplesteph.ksm.source.GitHubSourceAcl"
35-
SOURCE_GITHUB_USER: "simplesteph"
36-
SOURCE_GITHUB_REPO: "kafka-security-manager-example"
37-
SOURCE_GITHUB_FILEPATH: "acls.csv"
35+
# FILE:
36+
SOURCE_CLASS: "com.github.simplesteph.ksm.source.FileSourceAcl"
37+
SOURCE_FILE_FILENAME: "example/acls.csv"
38+
# GITHUB:
39+
# SOURCE_CLASS: "com.github.simplesteph.ksm.source.GitHubSourceAcl"
40+
# SOURCE_GITHUB_USER: "simplesteph"
41+
# SOURCE_GITHUB_REPO: "kafka-security-manager-example"
42+
# SOURCE_GITHUB_FILEPATH: "acls.csv"
3843
FEATURE_GRPC: true
3944
volumes:
4045
- "./example:/example"

example/acls.csv

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
KafkaPrincipal,ResourceType,ResourceName,Operation,PermissionType,Host
2-
User:alice,Topic,foo,Read,Allow,*
3-
User:bob,Group,bar,Write,Deny,12.34.56.78
4-
User:peter,Cluster,kafka-cluster,Create,Allow,*
1+
KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
2+
User:alice,Topic,LITERAL,foo,Read,Allow,*
3+
User:alice,Topic,PREFIXED,baz,Read,Allow,*
4+
User:bob,Group,LITERAL,bar,Write,Deny,12.34.56.78
5+
User:alice,Topic,PREFIXED,my-kafka-streams-app,Create,Allow,*
6+
User:peter,Cluster,LITERAL,kafka-cluster,Create,Allow,*

src/main/protobuf/kafka/ksm.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ message ResourceAndAclPb {
1818
message ResourcePb {
1919
string name = 1;
2020
ResourceTypePb kafka_resource_type = 2;
21+
PatternTypePb pattern_type = 3;
2122
}
2223

2324
enum ResourceTypePb {
@@ -30,6 +31,13 @@ enum ResourceTypePb {
3031
RESOURCE_TYPE_DELEGATIONTOKEN = 6;
3132
}
3233

34+
enum PatternTypePb {
35+
PATTERN_TYPE_INVALID = 0;
36+
PATTERN_TYPE_UNSET = 1;
37+
PATTERN_TYPE_LITERAL = 2;
38+
PATTERN_TYPE_PREFIXED = 3;
39+
}
40+
3341
message AclPb {
3442
KafkaPrincipalPb principal = 1;
3543
PermissionTypePb permission_type = 2;

src/main/resources/specs/KsmService.yml

Lines changed: 0 additions & 97 deletions
This file was deleted.

src/main/scala/com/github/simplesteph/ksm/AclSynchronizer.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,14 @@ class AclSynchronizer(authorizer: Authorizer,
5858

5959
private var sourceAclsCache: SourceAclResult = _
6060

61-
if (readOnly) log.warn("READ-ONLY mode is activated")
61+
if (readOnly) {
62+
log.warn("""
63+
|=======================================================
64+
|========== READ-ONLY mode is activated =========
65+
|========== To disable: KSM_READONLY=false =========
66+
|=======================================================
67+
""".stripMargin)
68+
}
6269

6370
def run(): Unit = if (!readOnly) {
6471

src/main/scala/com/github/simplesteph/ksm/parser/CsvAclParser.scala

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@ import java.io.Reader
55
import com.github.simplesteph.ksm.source.SourceAclResult
66
import com.github.tototoshi.csv.{CSVFormat, CSVReader, QUOTE_MINIMAL, Quoting}
77
import kafka.security.auth._
8+
import org.apache.kafka.common.resource.PatternType
89
import org.apache.kafka.common.utils.SecurityUtils
10+
import org.slf4j.LoggerFactory
911

1012
import scala.collection.immutable
11-
import scala.util.Try
13+
import scala.util.{Failure, Success, Try}
14+
15+
class CsvAclParser
1216

1317
/**
1418
* Parser that assumes that all ACLs are flattened
@@ -18,19 +22,24 @@ import scala.util.Try
1822
*/
1923
object CsvAclParser extends AclParser {
2024

25+
private val log = LoggerFactory.getLogger(classOf[CsvAclParser])
26+
2127
final val KAFKA_PRINCIPAL_COL = "KafkaPrincipal"
2228
final val RESOURCE_TYPE_COL = "ResourceType"
2329
final val RESOURCE_NAME_COL = "ResourceName"
2430
final val OPERATION_COL = "Operation"
2531
final val PERMISSION_TYPE_COL = "PermissionType"
2632
final val HOST_COL = "Host"
33+
final val PATTERN_TYPE_COL = "PatternType"
2734

2835
final val EXPECTED_COLS = List(KAFKA_PRINCIPAL_COL,
2936
RESOURCE_TYPE_COL,
37+
PATTERN_TYPE_COL,
3038
RESOURCE_NAME_COL,
3139
OPERATION_COL,
3240
PERMISSION_TYPE_COL,
33-
HOST_COL)
41+
HOST_COL,
42+
)
3443

3544
// we treat empty lines as Nil hence the format override
3645
implicit val csvFormat: CSVFormat = new CSVFormat {
@@ -44,6 +53,7 @@ object CsvAclParser extends AclParser {
4453

4554
/**
4655
* parse a row to return an ACL
56+
*
4757
* @param row a map of column name to row value
4858
* @return an ACL
4959
*/
@@ -55,15 +65,30 @@ object CsvAclParser extends AclParser {
5565
val operation = Operation.fromString(row(OPERATION_COL))
5666
val permissionType = PermissionType.fromString(row(PERMISSION_TYPE_COL))
5767
val host = row(HOST_COL)
68+
val patternType = Try(
69+
PatternType.fromString(row(PATTERN_TYPE_COL).toUpperCase)) match {
70+
case Success(pt) => pt
71+
case Failure(e: NoSuchElementException) =>
72+
// column is missing
73+
log.warn(s"""Since you upgraded to Kafka 2.0, your CSV needs to include an extra column '$PATTERN_TYPE_COL', after $RESOURCE_TYPE_COL and before $RESOURCE_NAME_COL.
74+
|The CSV header should be: KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
75+
|For a quick fix, you can run the application with KSM_EXTRACT=true and replace your current CSV with the output of the command
76+
|For backwards compatibility, the default value $PATTERN_TYPE_COL=LITERAL has been chosen""".stripMargin)
77+
// Default
78+
PatternType.LITERAL
79+
case Failure(e) =>
80+
throw e
81+
}
5882

59-
val resource = Resource(resourceType, resourceName)
83+
val resource = Resource(resourceType, resourceName, patternType)
6084
val acl = Acl(kafkaPrincipal, permissionType, host, operation)
6185

6286
(resource, acl)
6387
}
6488

6589
/**
6690
* Parses all the ACL as provided by the reader that wraps the CSV content
91+
*
6792
* @param reader we use the reader interface to use string and files interchangeably in the parser
6893
* @return sourceAclResult
6994
*/
@@ -86,6 +111,7 @@ object CsvAclParser extends AclParser {
86111
def asCsv(r: Resource, a: Acl): String = {
87112
List(a.principal.toString,
88113
r.resourceType.toString,
114+
r.patternType,
89115
r.name,
90116
a.operation.toString,
91117
a.permissionType.toString,

0 commit comments

Comments
 (0)