Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ The [default configurations](src/main/resources/application.conf) can be overwri
- `SOURCE_HTTP_AUTH_GOOGLEIAM_SERVICE_ACCOUNT` Google Service Account name.
- `SOURCE_HTTP_AUTH_GOOGLEIAM_SERVICE_ACCOUNT_KEY` Google Service Account Key in JSON string encoded. If not the key isn't configured, it'll try to get the token from environment.
- `SOURCE_HTTP_AUTH_GOOGLEIAM_TARGET_AUDIENCE` Google Target Audience for token authentication.
- `SOURCE_HTTP_CONTENT_LENGTH_HEADER_REQUIRED` Whether the `Content-Length` header is required in the HTTP response. Default is `false`.
- `SOURCE_HTTP_HEADER_OVERRIDES` Extra HTTP headers to include in the HTTP requests, overrides potential headers added from the authentication configuration. Default is "". The expected format is CSV such as `"Accept-Encoding:text/plain,Content-Type:text/plain"`.

- `NOTIFICATION_CLASS`: Class for notification in case of ACL changes in Kafka.
- `io.conduktor.ksm.notification.ConsoleNotification` (default): Print changes to the console. Useful for logging
Expand Down
8 changes: 8 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ source {
target-audience = ${?SOURCE_HTTP_AUTH_GOOGLEIAM_TARGET_AUDIENCE}
}
}

contentlength {
required = false
required = ${?SOURCE_HTTP_CONTENT_LENGTH_HEADER_REQUIRED}
}

headers = ""
headers = ${?SOURCE_HTTP_HEADER_OVERRIDES}
}
}

Expand Down
107 changes: 81 additions & 26 deletions src/main/scala/io/conduktor/ksm/source/HttpSourceAcl.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.conduktor.ksm.source

import com.fasterxml.jackson.databind.ObjectMapper
import com.typesafe.config.Config
import com.typesafe.config.{Config, ConfigException}
import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.source.security.{GoogleIAM, HttpAuthentication}
import org.apache.http.HttpHeaders.CONTENT_LENGTH
import org.apache.http.HttpHeaders.{CONTENT_LENGTH, CONTENT_TYPE, IF_MODIFIED_SINCE, LAST_MODIFIED}
import org.slf4j.LoggerFactory
import skinny.http._

Expand All @@ -24,46 +23,93 @@ class HttpSourceAcl(parserRegistry: AclParserRegistry)
final val PARSER = "parser"
final val METHOD = "method"
final val AUTHENTICATION_TYPE = "auth.type"
final val REQUIRE_CONTENT_LENGTH_HEADER = "contentlength.required"
final val HEADERS = "headers"

var lastModified: Option[String] = None
val objectMapper = new ObjectMapper()

var uri: String = _
var parser: String = "csv"
var httpMethod: Method = _
var authentication: Option[HttpAuthentication] = _
var requireContentLengthHeader: Boolean = _
var headers: Map[String, String] = _

def configure(url: String, parser: String, method: String): Unit = {
configure(url, parser, method, None)
}

def configure(url: String, parser: String, method: String, authentication: Option[HttpAuthentication]): Unit = {
this.uri = url
this.parser = parser
this.httpMethod = new Method(method)
this.authentication = authentication
configure(url, parser, method, authentication, requireContentLengthHeader = false)
}
def configure(url: String, parser: String, method: String): Unit = {

def configure(url: String, parser: String, method: String, authentication: Option[HttpAuthentication], requireContentLengthHeader: Boolean): Unit = {
configure(url, parser, method, authentication, requireContentLengthHeader, Map.empty)
}

def configure(url: String, parser: String, method: String,
authentication: Option[HttpAuthentication],
requireContentLengthHeader: Boolean, headers: Map[String, String]): Unit = {
this.uri = url
this.parser = parser
this.httpMethod = new Method(method)
this.authentication = None
this.authentication = authentication
this.requireContentLengthHeader = requireContentLengthHeader
this.headers = headers
}

/**
* internal config definition for the module
*/
override def configure(config: Config): Unit = {
this.uri = config.getString(URL)
log.info("URL: {}", this.uri)
val uri = config.getString(URL)
log.info("URL: {}", uri)

this.parser = config.getString(PARSER)
log.info("PARSER: {}", this.parser)
val parser = config.getString(PARSER)
log.info("PARSER: {}", parser)

this.httpMethod = new Method(config.getString(METHOD))
log.info("HTTP Method: {}", this.httpMethod)
val httpMethod = config.getString(METHOD)
log.info("HTTP Method: {}", httpMethod)

this.authentication = config.getString(AUTHENTICATION_TYPE) match {
val authentication = config.getString(AUTHENTICATION_TYPE) match {
case "googleiam" => Some(new GoogleIAM(config.getConfig("auth.googleiam")))
case _ => None
}
log.info("HTTP Authentication: {}", this.authentication)
log.info("HTTP Authentication: {}", authentication)

val requireContentLengthHeader: Boolean = getContentLengthHeaderRequiredConfiguration(config)
log.info("HTTP Content-Length Header required: {}", requireContentLengthHeader)

val headers: Map[String, String] = getHeaderConfiguration(config)
log.info("Configured HTTP Headers: {}", headers)

configure(uri, parser, httpMethod, authentication, requireContentLengthHeader, headers)
}

def getContentLengthHeaderRequiredConfiguration(config: Config): Boolean = {
var requireContentLengthHeader = false
if (config.hasPath(REQUIRE_CONTENT_LENGTH_HEADER)) {
requireContentLengthHeader = config.getBoolean(REQUIRE_CONTENT_LENGTH_HEADER)
}
requireContentLengthHeader
}

def getHeaderConfiguration(config: Config): Map[String, String] = {
var headers = Map.empty[String, String]
if (!config.hasPath(HEADERS)) {
return headers
}
val headerConfig = config.getString(HEADERS)
headerConfig.split(",").foreach { header =>
val headerKeyValue = header.split(":")
if (headerKeyValue.length != 2) {
throw new ConfigException.BadValue(CONFIG_PREFIX + "." + HEADERS, "Invalid header configuration. Expected format: 'name1:value1,name2:value2'")
}
val name = headerKeyValue(0).trim
val value = headerKeyValue(1).trim
headers += (name -> value)
}
headers
}

/**
Expand All @@ -81,28 +127,32 @@ class HttpSourceAcl(parserRegistry: AclParserRegistry)

val request: Request = new Request(uri)
// super important in order to properly fail in case a timeout happens for example
request.enableThrowingIOException(true)
request.followRedirects(false)
request.connectTimeoutMillis(Int.MaxValue)
request.readTimeoutMillis(Int.MaxValue)
request
.enableThrowingIOException(true)
.followRedirects(false)
.connectTimeoutMillis(Int.MaxValue)
.readTimeoutMillis(Int.MaxValue)

authentication.map(authentication => request.header(authentication.authHeaderKey, authentication.authHeaderValue))

// we use this header for the 304
lastModified.foreach(header => request.header("If-Modified-Since", header))
request.header("Content-Type", "text/plain") // only type supported for now
lastModified.foreach(header => request.header(IF_MODIFIED_SINCE, header))
request.header(CONTENT_TYPE, "text/plain") // only type supported for now

headers.foreach {case (name, value) => request.header(name, value)}

val response: Response = HTTP.request(httpMethod, request)

response.status match {
case 200 =>
lastModified = response.header("Last-Modified")
lastModified = response.header(LAST_MODIFIED)

// as skinny HTTP doesn't validate HTTP Header Content-Length
validateBodyLength(response)

Some(
ParsingContext(
parserRegistry.getParser(this.parser),
parserRegistry.getParser(parser),
new BufferedReader(new StringReader(response.textBody))
)
)
Expand All @@ -126,6 +176,11 @@ class HttpSourceAcl(parserRegistry: AclParserRegistry)
.map(h => h._2)
.map(l => l.toInt)
if (optContentLengthHeader.isEmpty) {
if (requireContentLengthHeader) {
val errorMessage = s"Response doesn't contain required $CONTENT_LENGTH header, only contained the following headers: ${response.headers.keySet}. Discarding response..."
log.error(errorMessage)
throw HTTPException(Some(errorMessage), response)
}
log.warn(s"Response doesn't contain $CONTENT_LENGTH header, only contained the following headers: ${response.headers.keySet}. Skipping validation...")
return
}
Expand Down
133 changes: 130 additions & 3 deletions src/test/scala/io/conduktor/ksm/source/HttpSourceAclTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import com.github.tomakehurst.wiremock.client.WireMock
import com.github.tomakehurst.wiremock.client.WireMock.{aResponse, getRequestedFor, urlPathEqualTo}
import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
import com.github.tomakehurst.wiremock.matching.EqualToPattern
import com.typesafe.config.ConfigFactory
import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.parser.csv.CsvAclParser
import io.conduktor.ksm.source.security.GoogleIAM
import org.apache.http.HttpHeaders.{CONTENT_LENGTH, CONTENT_TYPE}
import org.apache.http.HttpHeaders.{ACCEPT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}

import java.io.BufferedReader
import scala.jdk.CollectionConverters

class HttpSourceAclTest extends FlatSpec with Matchers with MockFactory with BeforeAndAfterEach {

Expand All @@ -38,6 +40,47 @@ class HttpSourceAclTest extends FlatSpec with Matchers with MockFactory with Bef
wireMockServer.stop()
}

it should "not require Content-Length header from empty configuration" in {
val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock)

val isContentLengthHeaderRequired = httpSourceAcl.getContentLengthHeaderRequiredConfiguration(ConfigFactory.empty())
isContentLengthHeaderRequired shouldBe false
}

it should "require Content-Length header from configuration" in {
val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock)

val config = ConfigFactory.parseMap(CollectionConverters.mapAsJavaMap(Map(
"contentlength.required" -> "true"
)), "testConfig")

val isContentLengthHeaderRequired = httpSourceAcl.getContentLengthHeaderRequiredConfiguration(config)
isContentLengthHeaderRequired shouldBe true
}

it should "extract no headers from empty configuration" in {
val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock)

val headersFromConfig = httpSourceAcl.getHeaderConfiguration(ConfigFactory.empty())
headersFromConfig shouldBe empty
}

it should "extract headers from configuration" in {
val config = ConfigFactory.parseMap(CollectionConverters.mapAsJavaMap(Map(
"headers" -> "Accept-Encoding:text/plain,Content-Type:text/plain"
)), "testConfig")

val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock)

val headersFromConfig = httpSourceAcl.getHeaderConfiguration(config)

val headers = Map[String, String](
CONTENT_TYPE -> "text/plain",
ACCEPT_ENCODING -> "text/plain"
)
headersFromConfig shouldBe headers
}

it should "be able to read contents of a HTTP Endpoint" in {
wireMockServer.stubFor(
WireMock.get(urlPathEqualTo(path))
Expand All @@ -55,7 +98,91 @@ class HttpSourceAclTest extends FlatSpec with Matchers with MockFactory with Bef

wireMockServer.verify(
getRequestedFor(urlPathEqualTo(path))
.withHeader("Content-Type", new EqualToPattern("text/plain"))
.withHeader(CONTENT_TYPE, new EqualToPattern("text/plain"))
)

reader match {
case Some(ParsingContext(_, x: BufferedReader)) =>
val read = Stream.continually(x.readLine()).takeWhile(Option(_).nonEmpty).map(_.concat("\n")).mkString

content shouldBe read
case _ => fail() // didn't read
}
}

it should "add configured HTTP headers to request" in {
wireMockServer.stubFor(
WireMock.get(urlPathEqualTo(path))
.willReturn(aResponse()
.withHeader(CONTENT_TYPE, "text/plain")
.withHeader(CONTENT_LENGTH, content.length.toString )
.withBody(content)
.withStatus(200)))

val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock)

val url = wireMockServer.baseUrl() + path
val config = ConfigFactory.parseMap(CollectionConverters.mapAsJavaMap(Map(
"url" -> url,
"parser" -> csvAclParser.name,
"method" -> "GET",
"auth.type" -> "none",
"headers" -> "Accept-Encoding:text/plain"
)), "testConfig")

httpSourceAcl.configure(config)

val reader = httpSourceAcl.refresh()

wireMockServer.verify(
getRequestedFor(urlPathEqualTo(path))
.withHeader(CONTENT_TYPE, new EqualToPattern("text/plain"))
.withHeader(ACCEPT_ENCODING, new EqualToPattern("text/plain"))
)

reader match {
case Some(ParsingContext(_, x: BufferedReader)) =>
val read = Stream.continually(x.readLine()).takeWhile(Option(_).nonEmpty).map(_.concat("\n")).mkString

content shouldBe read
case _ => fail() // didn't read
}
}

it should "throw Exception when missing required Content-Length header" in {
wireMockServer.stubFor(
WireMock.get(urlPathEqualTo(path))
.willReturn(aResponse()
.withHeader(CONTENT_TYPE, "text/plain")
.withBody(content)
.withStatus(200)))

val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock)

httpSourceAcl.configure(wireMockServer.baseUrl() + path, csvAclParser.name, "GET", None, requireContentLengthHeader = true)

assertThrows[Exception] {
httpSourceAcl.refresh()
}
}

it should "skip body length validation when missing optional Content-Length header" in {
wireMockServer.stubFor(
WireMock.get(urlPathEqualTo(path))
.willReturn(aResponse()
.withHeader(CONTENT_TYPE, "text/plain")
.withBody(content)
.withStatus(200)))

val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock)

httpSourceAcl.configure(wireMockServer.baseUrl() + path, csvAclParser.name, "GET")

val reader = httpSourceAcl.refresh()

wireMockServer.verify(
getRequestedFor(urlPathEqualTo(path))
.withoutHeader(CONTENT_LENGTH)
)

reader match {
Expand All @@ -67,7 +194,7 @@ class HttpSourceAclTest extends FlatSpec with Matchers with MockFactory with Bef
}
}

it should "throw Exception in mismatch Content-Length " in {
it should "throw Exception in mismatch Content-Length" in {

wireMockServer.stubFor(
WireMock.get(urlPathEqualTo(path))
Expand Down