From 44e05d95153575fffe2e1fd99000979dc2ef5fa6 Mon Sep 17 00:00:00 2001 From: Andris Rauda Date: Wed, 3 Sep 2025 14:31:41 +0300 Subject: [PATCH 1/8] first mockup, does not really work yet --- .../com/clickhouse/client/api/Client.java | 13 +++ .../client/api/ClientConfigProperties.java | 2 + .../api/internal/HttpAPIClientHelper.java | 93 +++++++++++++++++-- 3 files changed, 100 insertions(+), 8 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 089ec4282..d24256bd4 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -601,6 +601,19 @@ public Builder useHttpCompression(boolean enabled) { return this; } + /** + * Configures the client to use Content-Type: multipart/form-data for passing query parameters via POST body + * as opposed to using the default application/x-www-form-urlencoded approach. This can help avoid problems + * when the url becomes too long. + * + * @param enabled - indicates if multipart form data is enabled + * @return + */ + public Builder useMultipartFormData(boolean enabled) { + this.configuration.put(ClientConfigProperties.USE_MULTIPART_FORM_DATA.getKey(), String.valueOf(enabled)); + return this; + } + /** * Tell client that compression will be handled by application. * @param enabled - indicates that feature is enabled. diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java b/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java index 89e0f5a12..cbf6fe749 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java @@ -85,6 +85,8 @@ public enum ClientConfigProperties { USE_HTTP_COMPRESSION("client.use_http_compression", Boolean.class, "false"), + USE_MULTIPART_FORM_DATA("client.use_multipart_form_data", Boolean.class, "false"), + COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE("compression.lz4.uncompressed_buffer_size", Integer.class, String.valueOf(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE)), DISABLE_NATIVE_COMPRESSION("disable_native_compression", Boolean.class, "false"), diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 5ce5feca4..dc517f4eb 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -20,6 +20,7 @@ import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; @@ -66,9 +67,13 @@ import javax.net.ssl.SSLException; import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLSocket; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.io.UnsupportedEncodingException; import java.lang.reflect.Method; import java.net.ConnectException; @@ -96,6 +101,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.regex.Pattern; +import java.util.stream.Collectors; public class HttpAPIClientHelper { @@ -415,26 +421,54 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r if (requestConfig == null) { requestConfig = Collections.emptyMap(); } + boolean useMultipartFormData = ClientConfigProperties.USE_MULTIPART_FORM_DATA.getOrDefault(requestConfig); URI uri; try { URIBuilder uriBuilder = new URIBuilder(server.getBaseURL()); - addQueryParams(uriBuilder, requestConfig); - uri = uriBuilder.normalizeSyntax().build(); + if (!useMultipartFormData) { + addQueryParams(uriBuilder, requestConfig); + } + uri = uriBuilder.optimize().build(); } catch (URISyntaxException e) { throw new RuntimeException(e); } HttpPost req = new HttpPost(uri); // req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding - addHeaders(req, requestConfig); boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); + HttpEntity httpEntity; + if (useMultipartFormData) { + final PipedOutputStream out = new PipedOutputStream(); + PipedInputStream in = new PipedInputStream(out); + writeCallback.execute(out); + + String query = new BufferedReader( + new InputStreamReader(in, StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + + MultipartEntityBuilder builder = MultipartEntityBuilder.create() + .setCharset(StandardCharsets.UTF_8) + .addTextBody("query", query); - // setting entity. wrapping if compression is enabled - req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback), - clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig)); + addQueryParams(builder, requestConfig); + + httpEntity = builder.build(); + } else { + httpEntity = new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback); + } + + // wrapping if compression is enabled + httpEntity = wrapRequestEntity(httpEntity, + clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig); + + addHeaders(req, requestConfig, httpEntity); + + // setting entity + req.setEntity(httpEntity); HttpClientContext context = HttpClientContext.create(); Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); @@ -488,8 +522,9 @@ public void closeQuietly(ClassicHttpResponse httpResponse) { private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8"); - private void addHeaders(HttpPost req, Map requestConfig) { - addHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType()); + private void addHeaders(HttpPost req, Map requestConfig, HttpEntity httpEntity) { + addHeader(req, HttpHeaders.CONTENT_TYPE, httpEntity.getContentType()); + if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) { addHeader( req, @@ -580,6 +615,48 @@ private void addHeaders(HttpPost req, Map requestConfig) { correctUserAgentHeader(req, requestConfig); } + private void addQueryParams(MultipartEntityBuilder builder, Map requestConfig) { + if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { + builder.addTextBody(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); + } + if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { + Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); + params.forEach((k, v) -> builder.addTextBody("param_" + k, String.valueOf(v))); + } + + boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); + boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); + boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); + + if (useHttpCompression) { + // enable_http_compression make server react on http header + // for client side compression Content-Encoding should be set + // for server side compression Accept-Encoding should be set + builder.addTextBody("enable_http_compression", "1"); + } else { + if (serverCompression) { + builder.addTextBody("compress", "1"); + } + if (clientCompression) { + builder.addTextBody("decompress", "1"); + } + } + + Collection sessionRoles = ClientConfigProperties.SESSION_DB_ROLES.getOrDefault(requestConfig); + if (!(sessionRoles == null || sessionRoles.isEmpty())) { + sessionRoles.forEach(r -> builder.addTextBody(ClickHouseHttpProto.QPARAM_ROLE, r)); + } + + for (String key : requestConfig.keySet()) { + if (key.startsWith(ClientConfigProperties.SERVER_SETTING_PREFIX)) { + Object val = requestConfig.get(key); + if (val != null) { + builder.addTextBody(key.substring(ClientConfigProperties.SERVER_SETTING_PREFIX.length()), String.valueOf(requestConfig.get(key))); + } + } + } + } + private void addQueryParams(URIBuilder req, Map requestConfig) { if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); From c109d30294660bfd2da4f322ff82559dfe33e070 Mon Sep 17 00:00:00 2001 From: Andris Rauda Date: Thu, 4 Sep 2025 08:40:31 +0300 Subject: [PATCH 2/8] got queries working --- .../client/api/internal/HttpAPIClientHelper.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index dc517f4eb..4f26db670 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -425,9 +425,9 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r URI uri; try { URIBuilder uriBuilder = new URIBuilder(server.getBaseURL()); - if (!useMultipartFormData) { +// if (!useMultipartFormData) { addQueryParams(uriBuilder, requestConfig); - } +// } uri = uriBuilder.optimize().build(); } catch (URISyntaxException e) { throw new RuntimeException(e); @@ -616,14 +616,14 @@ private void addHeaders(HttpPost req, Map requestConfig, HttpEnt } private void addQueryParams(MultipartEntityBuilder builder, Map requestConfig) { - if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { - builder.addTextBody(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); - } +// if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { +// builder.addTextBody(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); +// } if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); params.forEach((k, v) -> builder.addTextBody("param_" + k, String.valueOf(v))); } - +/* boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); @@ -655,13 +655,15 @@ private void addQueryParams(MultipartEntityBuilder builder, Map } } } + */ } private void addQueryParams(URIBuilder req, Map requestConfig) { if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); } - if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { + boolean useMultipartFormData = ClientConfigProperties.USE_MULTIPART_FORM_DATA.getOrDefault(requestConfig); + if (!useMultipartFormData && requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); params.forEach((k, v) -> req.addParameter("param_" + k, String.valueOf(v))); } From 3a1168329ff93d1e643b088f7bf84fd31ab0245b Mon Sep 17 00:00:00 2001 From: Andris Rauda Date: Thu, 4 Sep 2025 08:48:47 +0300 Subject: [PATCH 3/8] cleanup --- .../api/internal/HttpAPIClientHelper.java | 40 +------------------ 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 4f26db670..0b5d4628b 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -425,9 +425,7 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r URI uri; try { URIBuilder uriBuilder = new URIBuilder(server.getBaseURL()); -// if (!useMultipartFormData) { - addQueryParams(uriBuilder, requestConfig); -// } + addQueryParams(uriBuilder, requestConfig); uri = uriBuilder.optimize().build(); } catch (URISyntaxException e) { throw new RuntimeException(e); @@ -616,46 +614,10 @@ private void addHeaders(HttpPost req, Map requestConfig, HttpEnt } private void addQueryParams(MultipartEntityBuilder builder, Map requestConfig) { -// if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { -// builder.addTextBody(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); -// } if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); params.forEach((k, v) -> builder.addTextBody("param_" + k, String.valueOf(v))); } -/* - boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); - boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); - boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); - - if (useHttpCompression) { - // enable_http_compression make server react on http header - // for client side compression Content-Encoding should be set - // for server side compression Accept-Encoding should be set - builder.addTextBody("enable_http_compression", "1"); - } else { - if (serverCompression) { - builder.addTextBody("compress", "1"); - } - if (clientCompression) { - builder.addTextBody("decompress", "1"); - } - } - - Collection sessionRoles = ClientConfigProperties.SESSION_DB_ROLES.getOrDefault(requestConfig); - if (!(sessionRoles == null || sessionRoles.isEmpty())) { - sessionRoles.forEach(r -> builder.addTextBody(ClickHouseHttpProto.QPARAM_ROLE, r)); - } - - for (String key : requestConfig.keySet()) { - if (key.startsWith(ClientConfigProperties.SERVER_SETTING_PREFIX)) { - Object val = requestConfig.get(key); - if (val != null) { - builder.addTextBody(key.substring(ClientConfigProperties.SERVER_SETTING_PREFIX.length()), String.valueOf(requestConfig.get(key))); - } - } - } - */ } private void addQueryParams(URIBuilder req, Map requestConfig) { From 25bbc52dc3a0e5f34d3616a91482bab10c0cb011 Mon Sep 17 00:00:00 2001 From: Andris Rauda Date: Thu, 4 Sep 2025 08:54:22 +0300 Subject: [PATCH 4/8] addStatementParams --- .../api/internal/HttpAPIClientHelper.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 0b5d4628b..6c860ac52 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -99,6 +99,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -452,7 +453,7 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r .setCharset(StandardCharsets.UTF_8) .addTextBody("query", query); - addQueryParams(builder, requestConfig); + addStatementParams(builder::addTextBody, requestConfig); httpEntity = builder.build(); } else { @@ -613,21 +614,13 @@ private void addHeaders(HttpPost req, Map requestConfig, HttpEnt correctUserAgentHeader(req, requestConfig); } - private void addQueryParams(MultipartEntityBuilder builder, Map requestConfig) { - if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { - Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); - params.forEach((k, v) -> builder.addTextBody("param_" + k, String.valueOf(v))); - } - } - private void addQueryParams(URIBuilder req, Map requestConfig) { if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); } boolean useMultipartFormData = ClientConfigProperties.USE_MULTIPART_FORM_DATA.getOrDefault(requestConfig); - if (!useMultipartFormData && requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { - Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); - params.forEach((k, v) -> req.addParameter("param_" + k, String.valueOf(v))); + if (!useMultipartFormData) { + addStatementParams(req::addParameter, requestConfig); } boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); @@ -663,6 +656,13 @@ private void addQueryParams(URIBuilder req, Map requestConfig) { } } + private void addStatementParams(BiConsumer keyValueConsumer, Map requestConfig) { + if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { + Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); + params.forEach((k, v) -> keyValueConsumer.accept("param_" + k, String.valueOf(v))); + } + } + private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompression, boolean useHttpCompression, boolean appControlledCompression, LZ4Factory lz4Factory, Map requestConfig) { LOG.debug("wrapRequestEntity: client compression: {}, http compression: {}", clientCompression, useHttpCompression); From 106e1a16a15275248b01f865acafd0d5bdfdd053 Mon Sep 17 00:00:00 2001 From: Andris Rauda Date: Thu, 4 Sep 2025 09:12:09 +0300 Subject: [PATCH 5/8] eliminated writing query to String --- .../client/api/internal/HttpAPIClientHelper.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 6c860ac52..e5326be00 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -440,18 +440,13 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r HttpEntity httpEntity; if (useMultipartFormData) { - final PipedOutputStream out = new PipedOutputStream(); + PipedOutputStream out = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(out); - writeCallback.execute(out); - - String query = new BufferedReader( - new InputStreamReader(in, StandardCharsets.UTF_8)) - .lines() - .collect(Collectors.joining("\n")); MultipartEntityBuilder builder = MultipartEntityBuilder.create() .setCharset(StandardCharsets.UTF_8) - .addTextBody("query", query); + .addBinaryBody("query", in); + writeCallback.execute(out); addStatementParams(builder::addTextBody, requestConfig); From 8cd81b27654769afd58af66e53d5b98262711c03 Mon Sep 17 00:00:00 2001 From: Andris Rauda Date: Thu, 4 Sep 2025 12:26:44 +0300 Subject: [PATCH 6/8] CallbackContentBody approach (Piped io was getting stuck) --- .../com/clickhouse/client/api/Client.java | 6 +- .../api/internal/HttpAPIClientHelper.java | 75 ++++++++++++++----- 2 files changed, 61 insertions(+), 20 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index d24256bd4..b46e0fde3 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -1279,7 +1279,7 @@ public CompletableFuture insert(String tableName, List data, } } out.close(); - })) { + }, false)) { // Check response @@ -1482,7 +1482,7 @@ public CompletableFuture insert(String tableName, out -> { writer.onOutput(out); out.close(); - })) { + }, false)) { // Check response @@ -1606,7 +1606,7 @@ public CompletableFuture query(String sqlQuery, Map { output.write(sqlQuery.getBytes(StandardCharsets.UTF_8)); output.close(); - }); + }, true); // Check response if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index e5326be00..7e5385237 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -20,6 +20,7 @@ import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.entity.mime.AbstractContentBody; import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; @@ -67,13 +68,9 @@ import javax.net.ssl.SSLException; import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLSocket; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.io.UnsupportedEncodingException; import java.lang.reflect.Method; import java.net.ConnectException; @@ -100,9 +97,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.regex.Pattern; -import java.util.stream.Collectors; public class HttpAPIClientHelper { @@ -413,7 +410,7 @@ public Exception readError(ClassicHttpResponse httpResponse) { private final AtomicLong timeToPoolVent = new AtomicLong(0); public ClassicHttpResponse executeRequest(Endpoint server, Map requestConfig, LZ4Factory lz4Factory, - IOCallback writeCallback) throws Exception { + IOCallback writeCallback, boolean query) throws Exception { if (poolControl != null && timeToPoolVent.get() < System.currentTimeMillis()) { timeToPoolVent.set(System.currentTimeMillis() + POOL_VENT_TIMEOUT); poolControl.closeExpired(); @@ -422,11 +419,11 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r if (requestConfig == null) { requestConfig = Collections.emptyMap(); } - boolean useMultipartFormData = ClientConfigProperties.USE_MULTIPART_FORM_DATA.getOrDefault(requestConfig); + boolean useMultipartFormData = query && (boolean)ClientConfigProperties.USE_MULTIPART_FORM_DATA.getOrDefault(requestConfig); URI uri; try { URIBuilder uriBuilder = new URIBuilder(server.getBaseURL()); - addQueryParams(uriBuilder, requestConfig); + addQueryParams(uriBuilder, requestConfig, useMultipartFormData); uri = uriBuilder.optimize().build(); } catch (URISyntaxException e) { throw new RuntimeException(e); @@ -440,15 +437,34 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r HttpEntity httpEntity; if (useMultipartFormData) { - PipedOutputStream out = new PipedOutputStream(); - PipedInputStream in = new PipedInputStream(out); - MultipartEntityBuilder builder = MultipartEntityBuilder.create() - .setCharset(StandardCharsets.UTF_8) - .addBinaryBody("query", in); - writeCallback.execute(out); + .setCharset(StandardCharsets.UTF_8); + + builder.addPart("query", new CallbackContentBody(o -> { + try { + writeCallback.execute(new OutputStream() { + @Override + public void write(int b) throws IOException { + o.write(b); + } + + @Override + public void close() { + // don't close the stream, it will be closed by http client + } + }); + } catch (IOException e) { + throw new ClientException("Failed to write query", e); + } + })); - addStatementParams(builder::addTextBody, requestConfig); + addStatementParams((k, v) -> builder.addPart(k, new CallbackContentBody((o) -> { + try { + o.write(v.getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + throw new ClientException("Failed to write statement params", e); + } + })), requestConfig); httpEntity = builder.build(); } else { @@ -609,11 +625,10 @@ private void addHeaders(HttpPost req, Map requestConfig, HttpEnt correctUserAgentHeader(req, requestConfig); } - private void addQueryParams(URIBuilder req, Map requestConfig) { + private void addQueryParams(URIBuilder req, Map requestConfig, boolean useMultipartFormData) { if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); } - boolean useMultipartFormData = ClientConfigProperties.USE_MULTIPART_FORM_DATA.getOrDefault(requestConfig); if (!useMultipartFormData) { addStatementParams(req::addParameter, requestConfig); } @@ -858,6 +873,32 @@ private static void addHeader(HttpRequest req, String headerName, } } + protected static class CallbackContentBody extends AbstractContentBody { + private final Consumer writer; + + public CallbackContentBody(Consumer writer) { + super(ContentType.APPLICATION_OCTET_STREAM); + this.writer = writer; + } + + @Override + public String getFilename() { + return null; + } + + @Override + public void writeTo(OutputStream out) throws IOException { + // Delegate to external callback + writer.accept(out); + out.flush(); + } + + @Override + public long getContentLength() { + return -1; // unknown length → chunked transfer + } + } + /** * This factory is used only when no ssl connections are required (no https endpoints). * Internally http client would create factory and spend time if no supplied. From 09a6a4c7e5766e26ab20089afd61d027f0867916 Mon Sep 17 00:00:00 2001 From: Andris Rauda Date: Thu, 4 Sep 2025 13:09:14 +0300 Subject: [PATCH 7/8] use multipart only when there are statement parameters --- .../src/main/java/com/clickhouse/client/api/Client.java | 6 +++--- .../client/api/internal/HttpAPIClientHelper.java | 8 ++++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index b46e0fde3..d24256bd4 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -1279,7 +1279,7 @@ public CompletableFuture insert(String tableName, List data, } } out.close(); - }, false)) { + })) { // Check response @@ -1482,7 +1482,7 @@ public CompletableFuture insert(String tableName, out -> { writer.onOutput(out); out.close(); - }, false)) { + })) { // Check response @@ -1606,7 +1606,7 @@ public CompletableFuture query(String sqlQuery, Map { output.write(sqlQuery.getBytes(StandardCharsets.UTF_8)); output.close(); - }, true); + }); // Check response if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 7e5385237..1483cf080 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -410,7 +410,7 @@ public Exception readError(ClassicHttpResponse httpResponse) { private final AtomicLong timeToPoolVent = new AtomicLong(0); public ClassicHttpResponse executeRequest(Endpoint server, Map requestConfig, LZ4Factory lz4Factory, - IOCallback writeCallback, boolean query) throws Exception { + IOCallback writeCallback) throws Exception { if (poolControl != null && timeToPoolVent.get() < System.currentTimeMillis()) { timeToPoolVent.set(System.currentTimeMillis() + POOL_VENT_TIMEOUT); poolControl.closeExpired(); @@ -419,7 +419,11 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r if (requestConfig == null) { requestConfig = Collections.emptyMap(); } - boolean useMultipartFormData = query && (boolean)ClientConfigProperties.USE_MULTIPART_FORM_DATA.getOrDefault(requestConfig); + + // only use multipart when enabled, and there are statement parameters + boolean useMultipartFormData = (boolean)ClientConfigProperties.USE_MULTIPART_FORM_DATA.getOrDefault(requestConfig) + && requestConfig.containsKey(KEY_STATEMENT_PARAMS); + URI uri; try { URIBuilder uriBuilder = new URIBuilder(server.getBaseURL()); From 0d280631f9c3213123ff496af7d6ac70a3d6e312 Mon Sep 17 00:00:00 2001 From: Andris Rauda Date: Thu, 4 Sep 2025 13:09:20 +0300 Subject: [PATCH 8/8] patched tests --- .../src/test/java/com/clickhouse/client/ClientTests.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java index fe592a160..975eb8721 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java @@ -216,7 +216,7 @@ public void testDefaultSettings() { Assert.assertEquals(config.get(p.getKey()), p.getDefaultValue(), "Default value doesn't match"); } } - Assert.assertEquals(config.size(), 32); // to check everything is set. Increment when new added. + Assert.assertEquals(config.size(), 33); // to check everything is set. Increment when new added. } try (Client client = new Client.Builder() @@ -243,13 +243,14 @@ public void testDefaultSettings() { .compressClientRequest(true) .compressServerResponse(false) .useHttpCompression(true) + .useMultipartFormData(true) .appCompressedData(true) .setSocketTimeout(20, SECONDS) .setSocketRcvbuf(100000) .setSocketSndbuf(100000) .build()) { Map config = client.getConfiguration(); - Assert.assertEquals(config.size(), 33); // to check everything is set. Increment when new added. + Assert.assertEquals(config.size(), 34); // to check everything is set. Increment when new added. Assert.assertEquals(config.get(ClientConfigProperties.DATABASE.getKey()), "mydb"); Assert.assertEquals(config.get(ClientConfigProperties.MAX_EXECUTION_TIME.getKey()), "10"); Assert.assertEquals(config.get(ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getKey()), "300000"); @@ -316,7 +317,7 @@ public void testWithOldDefaults() { Assert.assertEquals(config.get(p.getKey()), p.getDefaultValue(), "Default value doesn't match"); } } - Assert.assertEquals(config.size(), 32); // to check everything is set. Increment when new added. + Assert.assertEquals(config.size(), 33); // to check everything is set. Increment when new added. } }