Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,19 @@ public Builder useHttpCompression(boolean enabled) {
return this;
}

/**
* Configures the client to use Content-Type: multipart/form-data for passing query parameters via POST body
* as opposed to using the default application/x-www-form-urlencoded approach. This can help avoid problems
* when the url becomes too long.
*
* @param enabled - indicates if multipart form data is enabled
* @return
*/
public Builder useMultipartFormData(boolean enabled) {
this.configuration.put(ClientConfigProperties.USE_MULTIPART_FORM_DATA.getKey(), String.valueOf(enabled));
return this;
}

/**
* Tell client that compression will be handled by application.
* @param enabled - indicates that feature is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public enum ClientConfigProperties {

USE_HTTP_COMPRESSION("client.use_http_compression", Boolean.class, "false"),

USE_MULTIPART_FORM_DATA("client.use_multipart_form_data", Boolean.class, "false"),

COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE("compression.lz4.uncompressed_buffer_size", Integer.class, String.valueOf(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE)),

DISABLE_NATIVE_COMPRESSION("disable_native_compression", Boolean.class, "false"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.entity.mime.AbstractContentBody;
import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
Expand Down Expand Up @@ -94,6 +96,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -415,26 +419,70 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
if (requestConfig == null) {
requestConfig = Collections.emptyMap();
}

// only use multipart when enabled, and there are statement parameters
boolean useMultipartFormData = (boolean)ClientConfigProperties.USE_MULTIPART_FORM_DATA.getOrDefault(requestConfig)
&& requestConfig.containsKey(KEY_STATEMENT_PARAMS);

URI uri;
try {
URIBuilder uriBuilder = new URIBuilder(server.getBaseURL());
addQueryParams(uriBuilder, requestConfig);
uri = uriBuilder.normalizeSyntax().build();
addQueryParams(uriBuilder, requestConfig, useMultipartFormData);
uri = uriBuilder.optimize().build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
HttpPost req = new HttpPost(uri);
// req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding
addHeaders(req, requestConfig);

boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig);
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig);
boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig);

HttpEntity httpEntity;
if (useMultipartFormData) {
MultipartEntityBuilder builder = MultipartEntityBuilder.create()
.setCharset(StandardCharsets.UTF_8);

builder.addPart("query", new CallbackContentBody(o -> {
try {
writeCallback.execute(new OutputStream() {
@Override
public void write(int b) throws IOException {
o.write(b);
}

@Override
public void close() {
// don't close the stream, it will be closed by http client
}
});
} catch (IOException e) {
throw new ClientException("Failed to write query", e);
}
}));

// setting entity. wrapping if compression is enabled
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback),
clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig));
addStatementParams((k, v) -> builder.addPart(k, new CallbackContentBody((o) -> {
try {
o.write(v.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new ClientException("Failed to write statement params", e);
}
})), requestConfig);

httpEntity = builder.build();
} else {
httpEntity = new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback);
}

// wrapping if compression is enabled
httpEntity = wrapRequestEntity(httpEntity,
clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig);

addHeaders(req, requestConfig, httpEntity);

// setting entity
req.setEntity(httpEntity);

HttpClientContext context = HttpClientContext.create();
Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig);
Expand Down Expand Up @@ -488,8 +536,9 @@ public void closeQuietly(ClassicHttpResponse httpResponse) {

private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8");

private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {
addHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType());
private void addHeaders(HttpPost req, Map<String, Object> requestConfig, HttpEntity httpEntity) {
addHeader(req, HttpHeaders.CONTENT_TYPE, httpEntity.getContentType());

if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) {
addHeader(
req,
Expand Down Expand Up @@ -580,13 +629,12 @@ private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {
correctUserAgentHeader(req, requestConfig);
}

private void addQueryParams(URIBuilder req, Map<String, Object> requestConfig) {
private void addQueryParams(URIBuilder req, Map<String, Object> requestConfig, boolean useMultipartFormData) {
if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) {
req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString());
}
if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) {
Map<?, ?> params = (Map<?, ?>) requestConfig.get(KEY_STATEMENT_PARAMS);
params.forEach((k, v) -> req.addParameter("param_" + k, String.valueOf(v)));
if (!useMultipartFormData) {
addStatementParams(req::addParameter, requestConfig);
}

boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig);
Expand Down Expand Up @@ -622,6 +670,13 @@ private void addQueryParams(URIBuilder req, Map<String, Object> requestConfig) {
}
}

private void addStatementParams(BiConsumer<String, String> keyValueConsumer, Map<String, Object> requestConfig) {
if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) {
Map<?, ?> params = (Map<?, ?>) requestConfig.get(KEY_STATEMENT_PARAMS);
params.forEach((k, v) -> keyValueConsumer.accept("param_" + k, String.valueOf(v)));
}
}

private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompression, boolean useHttpCompression,
boolean appControlledCompression, LZ4Factory lz4Factory, Map<String, Object> requestConfig) {
LOG.debug("wrapRequestEntity: client compression: {}, http compression: {}", clientCompression, useHttpCompression);
Expand Down Expand Up @@ -822,6 +877,32 @@ private static <T> void addHeader(HttpRequest req, String headerName,
}
}

protected static class CallbackContentBody extends AbstractContentBody {
private final Consumer<OutputStream> writer;

public CallbackContentBody(Consumer<OutputStream> writer) {
super(ContentType.APPLICATION_OCTET_STREAM);
this.writer = writer;
}

@Override
public String getFilename() {
return null;
}

@Override
public void writeTo(OutputStream out) throws IOException {
// Delegate to external callback
writer.accept(out);
out.flush();
}

@Override
public long getContentLength() {
return -1; // unknown length → chunked transfer
}
}

/**
* This factory is used only when no ssl connections are required (no https endpoints).
* Internally http client would create factory and spend time if no supplied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void testDefaultSettings() {
Assert.assertEquals(config.get(p.getKey()), p.getDefaultValue(), "Default value doesn't match");
}
}
Assert.assertEquals(config.size(), 32); // to check everything is set. Increment when new added.
Assert.assertEquals(config.size(), 33); // to check everything is set. Increment when new added.
}

try (Client client = new Client.Builder()
Expand All @@ -243,13 +243,14 @@ public void testDefaultSettings() {
.compressClientRequest(true)
.compressServerResponse(false)
.useHttpCompression(true)
.useMultipartFormData(true)
.appCompressedData(true)
.setSocketTimeout(20, SECONDS)
.setSocketRcvbuf(100000)
.setSocketSndbuf(100000)
.build()) {
Map<String, String> config = client.getConfiguration();
Assert.assertEquals(config.size(), 33); // to check everything is set. Increment when new added.
Assert.assertEquals(config.size(), 34); // to check everything is set. Increment when new added.
Assert.assertEquals(config.get(ClientConfigProperties.DATABASE.getKey()), "mydb");
Assert.assertEquals(config.get(ClientConfigProperties.MAX_EXECUTION_TIME.getKey()), "10");
Assert.assertEquals(config.get(ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getKey()), "300000");
Expand Down Expand Up @@ -316,7 +317,7 @@ public void testWithOldDefaults() {
Assert.assertEquals(config.get(p.getKey()), p.getDefaultValue(), "Default value doesn't match");
}
}
Assert.assertEquals(config.size(), 32); // to check everything is set. Increment when new added.
Assert.assertEquals(config.size(), 33); // to check everything is set. Increment when new added.
}
}

Expand Down