Skip to content

Commit cacab7f

Browse files
committed
SlackNotification . This fixes #8
1 parent 4d8d560 commit cacab7f

File tree

6 files changed

+104
-14
lines changed

6 files changed

+104
-14
lines changed

CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
55
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
8+
9+
10+
## [0.1] - 08/03/2017
11+
- Kafka 1.0.1
812
- Initial Release
913
- Travis CI automation
1014
- Docker Hub release
@@ -14,4 +18,5 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1418
- Extract ACLs from Kafka.
1519
- Tests including with Kafka running
1620
- GitHub Enterprise Support
17-
- GitHub authentication Support
21+
- GitHub authentication Support
22+
- Slack Notification Support

src/main/resources/application.conf

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ notification {
5858

5959
}
6060
slack {
61-
61+
// https://hooks.slack.com/services/etc/etc/etc
62+
webhook = ${?NOTIFICATION_SLACK_WEBHOOK}
63+
username = "Kafka Security Manager"
64+
username = ${?NOTIFICATION_SLACK_USERNAME}
65+
icon = "https://cdn.iconscout.com/public/images/icon/free/png-512/kafka-logo-brand-3f2f9aed5b9ae1e2-512x512.png"
66+
icon = ${?NOTIFICATION_SLACK_ICON}
67+
channel = "general"
68+
channel = ${?NOTIFICATION_SLACK_CHANNEL}
6269
}
6370
}

src/main/scala/com/github/simplesteph/ksm/notification/ConsoleNotification.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,7 @@ case class ConsoleNotification() extends Notification {
2121
override def configure(config: Config): Unit = ()
2222

2323
override def notifyErrors(errs: List[Try[Throwable]]): Unit = {
24-
if (errs.nonEmpty) {
25-
errs.foreach(e =>
26-
e.get match {
27-
case cPE: CsvParserException => log.error(s"${cPE.getLocalizedMessage} | Row: ${cPE.printRow()}")
28-
case _ => log.error("error while parsing ACL source", e.get)
29-
})
30-
}
24+
NotificationUtils.errorsToString(errs).foreach(println)
3125
}
3226

3327
override protected def notifyOne(action: String, acls: Set[(Resource, Acl)]): Unit = {

src/main/scala/com/github/simplesteph/ksm/notification/Notification.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,6 @@ object Notification {
5252

5353
// helper to pretty print a long ACL, as the toString provided by Kafka is not suitable here
5454
def printAcl(acl: Acl, resource: Resource): String = {
55-
s"${acl.principal}, $resource, ${acl.operation}, ${acl.permissionType}, ${acl.host}"
55+
s"${acl.principal}, $resource, Operation: ${acl.operation}, Permission: ${acl.permissionType}, Host: ${acl.host}"
5656
}
5757
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.github.simplesteph.ksm.notification
2+
3+
import com.github.simplesteph.ksm.parser.CsvParserException
4+
5+
import scala.util.Try
6+
7+
object NotificationUtils {
8+
9+
def errorsToString(errs: List[Try[Throwable]]): List[String] = {
10+
errs.map(e =>
11+
e.get match {
12+
case cPE: CsvParserException => s"${cPE.getLocalizedMessage} | Row: ${cPE.printRow()}"
13+
case _ => s"error while parsing ACL source: ${e.get}"
14+
})
15+
}
16+
17+
}
Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package com.github.simplesteph.ksm.notification
2+
import com.fasterxml.jackson.databind.ObjectMapper
3+
import com.github.simplesteph.ksm.parser.CsvParserException
24
import com.typesafe.config.Config
35
import kafka.security.auth.{ Acl, Resource }
6+
import org.slf4j.LoggerFactory
7+
import skinny.http.HTTP
48

59
import scala.util.Try
610

@@ -10,14 +14,77 @@ class SlackNotification extends Notification {
1014
*/
1115
override val CONFIG_PREFIX: String = "slack"
1216

17+
private val log = LoggerFactory.getLogger(classOf[SlackNotification])
18+
19+
final val WEBHOOK_CONFIG = "webhook"
20+
final val USERNAME_CONFIG = "username"
21+
final val ICON_CONFIG = "icon"
22+
final val CHANNEL_CONFIG = "channel"
23+
24+
val objectMapper: ObjectMapper = new ObjectMapper()
25+
var webhook: String = _
26+
var username: String = _
27+
var icon: String = _
28+
var channel: String = _
29+
1330
/**
1431
* internal config definition for the module
1532
*/
16-
override def configure(config: Config): Unit = ???
33+
override def configure(config: Config): Unit = {
34+
webhook = config.getString(WEBHOOK_CONFIG)
35+
username = config.getString(USERNAME_CONFIG)
36+
icon = config.getString(ICON_CONFIG)
37+
channel = config.getString(CHANNEL_CONFIG)
38+
}
39+
40+
override def notifyOne(action: String, acls: Set[(Resource, Acl)]): Unit = {
41+
if (acls.nonEmpty) {
42+
val messages = acls.map {
43+
case (resource, acl) =>
44+
val message = Notification.printAcl(acl, resource)
45+
s"$action $message"
46+
}.toList
47+
48+
sendToSlack(messages)
49+
}
50+
}
51+
52+
def sendToSlack(messages: List[String], retries: Int = 5): Unit = {
53+
if (retries > 0) {
54+
messages.grouped(50).foreach(msgChunks => {
55+
val text =
56+
s"""```
57+
|${msgChunks.mkString("\n")}
58+
|```
59+
""".stripMargin
60+
61+
val payload = objectMapper.createObjectNode()
62+
.put("text", text)
63+
.put("username", username)
64+
.put("icon_url", icon)
65+
.put("channel", channel)
66+
67+
val response = HTTP.post(webhook, payload.toString)
68+
69+
response.status match {
70+
case 200 => ()
71+
case _ =>
72+
log.warn(response.asString)
73+
if (retries > 1) log.warn("Retrying...")
74+
Thread.sleep(300)
75+
sendToSlack(msgChunks, retries - 1)
76+
}
77+
})
78+
} else {
79+
log.error("Can't send notification to Slack after retries")
80+
}
81+
}
1782

18-
override def notifyOne(action: String, acls: Set[(Resource, Acl)]): Unit = ???
83+
override def notifyErrors(errs: List[Try[Throwable]]): Unit = {
84+
sendToSlack(NotificationUtils.errorsToString(errs))
85+
}
1986

20-
override def notifyErrors(errs: List[Try[Throwable]]): Unit = ???
87+
override def close(): Unit = {
2188

22-
override def close(): Unit = ???
89+
}
2390
}

0 commit comments

Comments
 (0)