Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased]

- Amend to not log HTTP request response and header values by default.
- Added http 2 support.

## [0.22.0] - 2025-10-03
Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,9 @@ that implements interface `HttpPostRequestCallbackFactory<HttpRequest>` to creat
of class `CustomHttpSinkPostRequestCallbackFactory` in `resources/META-INF/services/org.apache.flink.table.factories.Factory` file
and then reference identifier `rest-sink-logger` in the HttpSink DDL property field `gid.connector.http.sink.request-callback`.

A default implementation that logs those pairs as *INFO* level logs using Slf4j
([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java))
is provided.

A default implementation that logs those pairs as *INFO* level logs using Slf4j ([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java)) is provided.
If you would like to log more http content (that maybe contain sensitive information), then you can provide a customized version
of this callback; for inspiration on how to customize in this way, look back in the git history of this file.

- Http Lookup Source processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
behaviour of the additional stage of processing done by Table Function API by implementing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,9 @@ private HttpRowDataWrapper processHttpResponse(
boolean isError) throws IOException {

this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());

var responseBody = response.body();

log.debug("Received status code [{}] for RestTableSource request with Server response body [{}] ",
response.statusCode(), responseBody);
log.debug("Received status code [{}] for RestTableSource request", response.statusCode());
if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response))) {
return HttpRowDataWrapper.builder()
.data(Collections.emptyList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.net.http.HttpRequest.Builder;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -60,11 +59,6 @@ public RequestFactoryBase(
);

this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(headerMap);

log.debug("RequestFactoryBase headersAndValues: " +
Arrays.stream(headersAndValues)
.map(Object::toString)
.collect(Collectors.joining(",")));
this.httpRequestTimeOutSeconds = Integer.parseInt(
options.getProperties().getProperty(
HttpConnectorConfigConstants.LOOKUP_HTTP_TIMEOUT_SECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@

import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringJoiner;

import lombok.extern.slf4j.Slf4j;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.utils.ConfigUtils;

/**
* A {@link HttpPostRequestCallback} that logs pairs of request and response as <i>INFO</i> level
Expand All @@ -31,40 +27,26 @@ public void call(
Map<String, String> headerMap) {

HttpRequest httpRequest = requestEntry.getHttpRequest();
StringJoiner headers = new StringJoiner(";");

for (Entry<String, List<String>> reqHeaders : httpRequest.headers().map().entrySet()) {
StringJoiner values = new StringJoiner(";");
for (String value : reqHeaders.getValue()) {
values.add(value);
}
String header = reqHeaders.getKey() + ": [" + values + "]";
headers.add(header);
}

if (response == null) {
log.warn("Null Http response for request " + httpRequest.uri().toString());

log.info(
"Got response for a request.\n Request:\n URL: {}\n " +
"Method: {}\n Headers: {}\n Params/Body: {}\nResponse: null",
"Method: {}\n Params/Body: {}\nResponse: null",
httpRequest.uri().toString(),
httpRequest.method(),
headers,
requestEntry.getLookupQueryInfo()
);
} else {
log.info(
"Got response for a request.\n Request:\n URL: {}\n " +
"Method: {}\n Headers: {}\n Params/Body: {}\nResponse: {}\n Body: {}",
"Method: {}\n Params/Body: {}\nResponse status code: {}\n",
httpRequest.uri().toString(),
httpRequest.method(),
headers,
requestEntry.getLookupQueryInfo(),
response,
response.body().replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, "")
response.statusCode()
);
}

}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package com.getindata.connectors.http.internal.table.sink;

import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.utils.ConfigUtils;

/**
* A {@link HttpPostRequestCallback} that logs pairs of request and response as <i>INFO</i> level
* logs using <i>Slf4j</i>.
* logs using <i>Slf4j</i>. As the request/response body or header might contain sensitive information,
* we do not log those values.
*
* <p>Serving as a default implementation of {@link HttpPostRequestCallback} for
* the {@link HttpDynamicSink}.
Expand All @@ -28,25 +26,23 @@ public void call(
String endpointUrl,
Map<String, String> headerMap) {

String requestBody = requestEntry.getElements().stream()
.map(element -> new String(element, StandardCharsets.UTF_8))
.collect(Collectors.joining());
// Uncomment if you want to see the requestBody in the log
//String requestBody = requestEntry.getElements().stream()
// .map(element -> new String(element, StandardCharsets.UTF_8))
// .collect(Collectors.joining());

if (response == null) {
log.info(
"Got response for a request.\n Request:\n " +
"Method: {}\n Body: {}\n Response: null",
requestEntry.getMethod(),
requestBody
"Method: {}\n Response: null",
requestEntry.getMethod()
);
} else {
log.info(
"Got response for a request.\n Request:\n " +
"Method: {}\n Body: {}\n Response: {}\n Body: {}",
"Method: {}\n Response status code: {}\n ",
requestEntry.method,
requestBody,
response,
response.body().replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, "")
response.statusCode()
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/simpleLogger.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
org.slf4j.simpleLogger.defaultLogLevel=INFO
org.slf4j.simpleLogger.log.com.getindata.connectors.http.internal.table.lookup.RequestAndResponseLogger=DEBUG
Loading