diff --git a/README.md b/README.md index b91f054..306e4ca 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,8 @@ The [default configurations](src/main/resources/application.conf) can be overwri - `SOURCE_HTTP_AUTH_GOOGLEIAM_SERVICE_ACCOUNT` Google Service Account name. - `SOURCE_HTTP_AUTH_GOOGLEIAM_SERVICE_ACCOUNT_KEY` Google Service Account Key in JSON string encoded. If not the key isn't configured, it'll try to get the token from environment. - `SOURCE_HTTP_AUTH_GOOGLEIAM_TARGET_AUDIENCE` Google Target Audience for token authentication. + - `SOURCE_HTTP_CONTENT_LENGTH_HEADER_REQUIRED` Whether the `Content-Length` header is required in the HTTP response. Default is `false`. + - `SOURCE_HTTP_HEADER_OVERRIDES` Extra HTTP headers to include in the HTTP requests, overrides potential headers added from the authentication configuration. Default is "". The expected format is CSV such as `"Accept-Encoding:text/plain,Content-Type:text/plain"`. - `NOTIFICATION_CLASS`: Class for notification in case of ACL changes in Kafka. - `io.conduktor.ksm.notification.ConsoleNotification` (default): Print changes to the console. Useful for logging diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 46947cb..36fd00e 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -158,6 +158,14 @@ source { target-audience = ${?SOURCE_HTTP_AUTH_GOOGLEIAM_TARGET_AUDIENCE} } } + + contentlength { + required = false + required = ${?SOURCE_HTTP_CONTENT_LENGTH_HEADER_REQUIRED} + } + + headers = "" + headers = ${?SOURCE_HTTP_HEADER_OVERRIDES} } } diff --git a/src/main/scala/io/conduktor/ksm/source/HttpSourceAcl.scala b/src/main/scala/io/conduktor/ksm/source/HttpSourceAcl.scala index 366a251..4f883bf 100644 --- a/src/main/scala/io/conduktor/ksm/source/HttpSourceAcl.scala +++ b/src/main/scala/io/conduktor/ksm/source/HttpSourceAcl.scala @@ -1,10 +1,9 @@ package io.conduktor.ksm.source -import com.fasterxml.jackson.databind.ObjectMapper -import com.typesafe.config.Config +import com.typesafe.config.{Config, ConfigException} import io.conduktor.ksm.parser.AclParserRegistry import io.conduktor.ksm.source.security.{GoogleIAM, HttpAuthentication} -import org.apache.http.HttpHeaders.CONTENT_LENGTH +import org.apache.http.HttpHeaders.{CONTENT_LENGTH, CONTENT_TYPE, IF_MODIFIED_SINCE, LAST_MODIFIED} import org.slf4j.LoggerFactory import skinny.http._ @@ -24,46 +23,93 @@ class HttpSourceAcl(parserRegistry: AclParserRegistry) final val PARSER = "parser" final val METHOD = "method" final val AUTHENTICATION_TYPE = "auth.type" + final val REQUIRE_CONTENT_LENGTH_HEADER = "contentlength.required" + final val HEADERS = "headers" var lastModified: Option[String] = None - val objectMapper = new ObjectMapper() var uri: String = _ var parser: String = "csv" var httpMethod: Method = _ var authentication: Option[HttpAuthentication] = _ + var requireContentLengthHeader: Boolean = _ + var headers: Map[String, String] = _ + + def configure(url: String, parser: String, method: String): Unit = { + configure(url, parser, method, None) + } def configure(url: String, parser: String, method: String, authentication: Option[HttpAuthentication]): Unit = { - this.uri = url - this.parser = parser - this.httpMethod = new Method(method) - this.authentication = authentication + configure(url, parser, method, authentication, requireContentLengthHeader = false) } - def configure(url: String, parser: String, method: String): Unit = { + + def configure(url: String, parser: String, method: String, authentication: Option[HttpAuthentication], requireContentLengthHeader: Boolean): Unit = { + configure(url, parser, method, authentication, requireContentLengthHeader, Map.empty) + } + + def configure(url: String, parser: String, method: String, + authentication: Option[HttpAuthentication], + requireContentLengthHeader: Boolean, headers: Map[String, String]): Unit = { this.uri = url this.parser = parser this.httpMethod = new Method(method) - this.authentication = None + this.authentication = authentication + this.requireContentLengthHeader = requireContentLengthHeader + this.headers = headers } /** * internal config definition for the module */ override def configure(config: Config): Unit = { - this.uri = config.getString(URL) - log.info("URL: {}", this.uri) + val uri = config.getString(URL) + log.info("URL: {}", uri) - this.parser = config.getString(PARSER) - log.info("PARSER: {}", this.parser) + val parser = config.getString(PARSER) + log.info("PARSER: {}", parser) - this.httpMethod = new Method(config.getString(METHOD)) - log.info("HTTP Method: {}", this.httpMethod) + val httpMethod = config.getString(METHOD) + log.info("HTTP Method: {}", httpMethod) - this.authentication = config.getString(AUTHENTICATION_TYPE) match { + val authentication = config.getString(AUTHENTICATION_TYPE) match { case "googleiam" => Some(new GoogleIAM(config.getConfig("auth.googleiam"))) case _ => None } - log.info("HTTP Authentication: {}", this.authentication) + log.info("HTTP Authentication: {}", authentication) + + val requireContentLengthHeader: Boolean = getContentLengthHeaderRequiredConfiguration(config) + log.info("HTTP Content-Length Header required: {}", requireContentLengthHeader) + + val headers: Map[String, String] = getHeaderConfiguration(config) + log.info("Configured HTTP Headers: {}", headers) + + configure(uri, parser, httpMethod, authentication, requireContentLengthHeader, headers) + } + + def getContentLengthHeaderRequiredConfiguration(config: Config): Boolean = { + var requireContentLengthHeader = false + if (config.hasPath(REQUIRE_CONTENT_LENGTH_HEADER)) { + requireContentLengthHeader = config.getBoolean(REQUIRE_CONTENT_LENGTH_HEADER) + } + requireContentLengthHeader + } + + def getHeaderConfiguration(config: Config): Map[String, String] = { + var headers = Map.empty[String, String] + if (!config.hasPath(HEADERS)) { + return headers + } + val headerConfig = config.getString(HEADERS) + headerConfig.split(",").foreach { header => + val headerKeyValue = header.split(":") + if (headerKeyValue.length != 2) { + throw new ConfigException.BadValue(CONFIG_PREFIX + "." + HEADERS, "Invalid header configuration. Expected format: 'name1:value1,name2:value2'") + } + val name = headerKeyValue(0).trim + val value = headerKeyValue(1).trim + headers += (name -> value) + } + headers } /** @@ -81,28 +127,32 @@ class HttpSourceAcl(parserRegistry: AclParserRegistry) val request: Request = new Request(uri) // super important in order to properly fail in case a timeout happens for example - request.enableThrowingIOException(true) - request.followRedirects(false) - request.connectTimeoutMillis(Int.MaxValue) - request.readTimeoutMillis(Int.MaxValue) + request + .enableThrowingIOException(true) + .followRedirects(false) + .connectTimeoutMillis(Int.MaxValue) + .readTimeoutMillis(Int.MaxValue) authentication.map(authentication => request.header(authentication.authHeaderKey, authentication.authHeaderValue)) // we use this header for the 304 - lastModified.foreach(header => request.header("If-Modified-Since", header)) - request.header("Content-Type", "text/plain") // only type supported for now + lastModified.foreach(header => request.header(IF_MODIFIED_SINCE, header)) + request.header(CONTENT_TYPE, "text/plain") // only type supported for now + + headers.foreach {case (name, value) => request.header(name, value)} + val response: Response = HTTP.request(httpMethod, request) response.status match { case 200 => - lastModified = response.header("Last-Modified") + lastModified = response.header(LAST_MODIFIED) // as skinny HTTP doesn't validate HTTP Header Content-Length validateBodyLength(response) Some( ParsingContext( - parserRegistry.getParser(this.parser), + parserRegistry.getParser(parser), new BufferedReader(new StringReader(response.textBody)) ) ) @@ -126,6 +176,11 @@ class HttpSourceAcl(parserRegistry: AclParserRegistry) .map(h => h._2) .map(l => l.toInt) if (optContentLengthHeader.isEmpty) { + if (requireContentLengthHeader) { + val errorMessage = s"Response doesn't contain required $CONTENT_LENGTH header, only contained the following headers: ${response.headers.keySet}. Discarding response..." + log.error(errorMessage) + throw HTTPException(Some(errorMessage), response) + } log.warn(s"Response doesn't contain $CONTENT_LENGTH header, only contained the following headers: ${response.headers.keySet}. Skipping validation...") return } diff --git a/src/test/scala/io/conduktor/ksm/source/HttpSourceAclTest.scala b/src/test/scala/io/conduktor/ksm/source/HttpSourceAclTest.scala index b178f25..cf3e99f 100644 --- a/src/test/scala/io/conduktor/ksm/source/HttpSourceAclTest.scala +++ b/src/test/scala/io/conduktor/ksm/source/HttpSourceAclTest.scala @@ -5,14 +5,16 @@ import com.github.tomakehurst.wiremock.client.WireMock import com.github.tomakehurst.wiremock.client.WireMock.{aResponse, getRequestedFor, urlPathEqualTo} import com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig import com.github.tomakehurst.wiremock.matching.EqualToPattern +import com.typesafe.config.ConfigFactory import io.conduktor.ksm.parser.AclParserRegistry import io.conduktor.ksm.parser.csv.CsvAclParser import io.conduktor.ksm.source.security.GoogleIAM -import org.apache.http.HttpHeaders.{CONTENT_LENGTH, CONTENT_TYPE} +import org.apache.http.HttpHeaders.{ACCEPT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE} import org.scalamock.scalatest.MockFactory import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} import java.io.BufferedReader +import scala.jdk.CollectionConverters class HttpSourceAclTest extends FlatSpec with Matchers with MockFactory with BeforeAndAfterEach { @@ -38,6 +40,47 @@ class HttpSourceAclTest extends FlatSpec with Matchers with MockFactory with Bef wireMockServer.stop() } + it should "not require Content-Length header from empty configuration" in { + val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock) + + val isContentLengthHeaderRequired = httpSourceAcl.getContentLengthHeaderRequiredConfiguration(ConfigFactory.empty()) + isContentLengthHeaderRequired shouldBe false + } + + it should "require Content-Length header from configuration" in { + val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock) + + val config = ConfigFactory.parseMap(CollectionConverters.mapAsJavaMap(Map( + "contentlength.required" -> "true" + )), "testConfig") + + val isContentLengthHeaderRequired = httpSourceAcl.getContentLengthHeaderRequiredConfiguration(config) + isContentLengthHeaderRequired shouldBe true + } + + it should "extract no headers from empty configuration" in { + val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock) + + val headersFromConfig = httpSourceAcl.getHeaderConfiguration(ConfigFactory.empty()) + headersFromConfig shouldBe empty + } + + it should "extract headers from configuration" in { + val config = ConfigFactory.parseMap(CollectionConverters.mapAsJavaMap(Map( + "headers" -> "Accept-Encoding:text/plain,Content-Type:text/plain" + )), "testConfig") + + val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock) + + val headersFromConfig = httpSourceAcl.getHeaderConfiguration(config) + + val headers = Map[String, String]( + CONTENT_TYPE -> "text/plain", + ACCEPT_ENCODING -> "text/plain" + ) + headersFromConfig shouldBe headers + } + it should "be able to read contents of a HTTP Endpoint" in { wireMockServer.stubFor( WireMock.get(urlPathEqualTo(path)) @@ -55,7 +98,91 @@ class HttpSourceAclTest extends FlatSpec with Matchers with MockFactory with Bef wireMockServer.verify( getRequestedFor(urlPathEqualTo(path)) - .withHeader("Content-Type", new EqualToPattern("text/plain")) + .withHeader(CONTENT_TYPE, new EqualToPattern("text/plain")) + ) + + reader match { + case Some(ParsingContext(_, x: BufferedReader)) => + val read = Stream.continually(x.readLine()).takeWhile(Option(_).nonEmpty).map(_.concat("\n")).mkString + + content shouldBe read + case _ => fail() // didn't read + } + } + + it should "add configured HTTP headers to request" in { + wireMockServer.stubFor( + WireMock.get(urlPathEqualTo(path)) + .willReturn(aResponse() + .withHeader(CONTENT_TYPE, "text/plain") + .withHeader(CONTENT_LENGTH, content.length.toString ) + .withBody(content) + .withStatus(200))) + + val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock) + + val url = wireMockServer.baseUrl() + path + val config = ConfigFactory.parseMap(CollectionConverters.mapAsJavaMap(Map( + "url" -> url, + "parser" -> csvAclParser.name, + "method" -> "GET", + "auth.type" -> "none", + "headers" -> "Accept-Encoding:text/plain" + )), "testConfig") + + httpSourceAcl.configure(config) + + val reader = httpSourceAcl.refresh() + + wireMockServer.verify( + getRequestedFor(urlPathEqualTo(path)) + .withHeader(CONTENT_TYPE, new EqualToPattern("text/plain")) + .withHeader(ACCEPT_ENCODING, new EqualToPattern("text/plain")) + ) + + reader match { + case Some(ParsingContext(_, x: BufferedReader)) => + val read = Stream.continually(x.readLine()).takeWhile(Option(_).nonEmpty).map(_.concat("\n")).mkString + + content shouldBe read + case _ => fail() // didn't read + } + } + + it should "throw Exception when missing required Content-Length header" in { + wireMockServer.stubFor( + WireMock.get(urlPathEqualTo(path)) + .willReturn(aResponse() + .withHeader(CONTENT_TYPE, "text/plain") + .withBody(content) + .withStatus(200))) + + val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock) + + httpSourceAcl.configure(wireMockServer.baseUrl() + path, csvAclParser.name, "GET", None, requireContentLengthHeader = true) + + assertThrows[Exception] { + httpSourceAcl.refresh() + } + } + + it should "skip body length validation when missing optional Content-Length header" in { + wireMockServer.stubFor( + WireMock.get(urlPathEqualTo(path)) + .willReturn(aResponse() + .withHeader(CONTENT_TYPE, "text/plain") + .withBody(content) + .withStatus(200))) + + val httpSourceAcl = new HttpSourceAcl(aclParserRegistryMock) + + httpSourceAcl.configure(wireMockServer.baseUrl() + path, csvAclParser.name, "GET") + + val reader = httpSourceAcl.refresh() + + wireMockServer.verify( + getRequestedFor(urlPathEqualTo(path)) + .withoutHeader(CONTENT_LENGTH) ) reader match { @@ -67,7 +194,7 @@ class HttpSourceAclTest extends FlatSpec with Matchers with MockFactory with Bef } } - it should "throw Exception in mismatch Content-Length " in { + it should "throw Exception in mismatch Content-Length" in { wireMockServer.stubFor( WireMock.get(urlPathEqualTo(path))