Skip to content

Commit 8ea4e28

Browse files
committed
feat: support streaming load
1 parent a79b7b5 commit 8ea4e28

39 files changed

+612
-372
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@ jobs:
5151
curl -u databend:databend --request POST localhost:8000/v1/query --header 'Content-Type:application/json' --data-raw '{"sql":"select 1"}'
5252
5353
- name: Run Maven clean deploy with release profile
54-
run: mvn test -DexcludedGroups=CLUSTER,FLAKY
54+
run: mvn test -DexcludedGroups=MULTI_HOST,FLAKY
5555
env:
5656
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

.github/workflows/test_cluster.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
- uses: ./.github/actions/setup_databend_cluster
3232
timeout-minutes: 15
3333
with:
34-
version: '1.2.754-nightly'
34+
version: '1.2.797-nightly'
3535
target: 'x86_64-unknown-linux-gnu'
3636

3737
- name: Test with conn to node 1

databend-client/pom.xml

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,21 @@
1818
<properties>
1919
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
2020
<project.build.targetJdk>8</project.build.targetJdk>
21-
<jackson.version>2.15.2</jackson.version>
2221
</properties>
2322
<dependencies>
2423
<dependency>
2524
<groupId>com.fasterxml.jackson.core</groupId>
2625
<artifactId>jackson-annotations</artifactId>
27-
<version>${jackson.version}</version>
2826
</dependency>
2927

3028
<dependency>
3129
<groupId>com.fasterxml.jackson.core</groupId>
3230
<artifactId>jackson-core</artifactId>
33-
<version>${jackson.version}</version>
3431
</dependency>
3532

3633
<dependency>
3734
<groupId>com.fasterxml.jackson.core</groupId>
3835
<artifactId>jackson-databind</artifactId>
39-
<version>${jackson.version}</version>
4036
</dependency>
4137

4238
<dependency>
@@ -53,14 +49,13 @@
5349
<dependency>
5450
<groupId>com.google.guava</groupId>
5551
<artifactId>guava</artifactId>
56-
<version>32.0.1-jre</version>
5752
</dependency>
5853

5954
<dependency>
6055
<groupId>com.squareup.okhttp3</groupId>
6156
<artifactId>okhttp</artifactId>
6257
</dependency>
63-
<!-- https://mvnrepository.com/artifact/com.squareup.okio/okio -->
58+
6459
<dependency>
6560
<groupId>com.squareup.okio</groupId>
6661
<artifactId>okio</artifactId>
@@ -76,18 +71,8 @@
7671
<artifactId>okhttp-urlconnection</artifactId>
7772
</dependency>
7873

79-
<dependency>
80-
<groupId>com.github.zafarkhaja</groupId>
81-
<artifactId>java-semver</artifactId>
82-
</dependency>
83-
8474
<!-- for testing -->
8575

86-
<dependency>
87-
<groupId>io.airlift</groupId>
88-
<artifactId>json</artifactId>
89-
<scope>test</scope>
90-
</dependency>
9176
<dependency>
9277
<groupId>org.testng</groupId>
9378
<artifactId>testng</artifactId>

databend-client/src/main/java/com/databend/client/ClientSettings.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public class ClientSettings {
3131
public static final String X_DATABEND_STICKY_NODE = "X-DATABEND-STICKY-NODE";
3232
public static final String DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE";
3333
public static final String DatabendTenantHeader = "X-DATABEND-TENANT";
34+
public static final String DatabendSQLHeader = "X-DATABEND-SQL";
35+
public static final String DatabendQueryContextHeader = "X-DATABEND-QUERY-CONTEXT";
3436
private final String host;
3537
private final DatabendSession session;
3638
private final Integer queryTimeoutSecs;
@@ -40,14 +42,14 @@ public class ClientSettings {
4042
private final PaginationOptions paginationOptions;
4143

4244
private final StageAttachment stageAttachment;
43-
private Map<String, String> additionalHeaders;
45+
private final Map<String, String> additionalHeaders;
4446

4547
private final int retryAttempts;
4648
// TODO(zhihanz) timezone and locale info
4749

4850
//ClientSettings for test case use
4951
public ClientSettings(String host) {
50-
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap<String, String>(), null, DEFAULT_RETRY_ATTEMPTS);
52+
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap<>(), null, DEFAULT_RETRY_ATTEMPTS);
5153
}
5254

5355
public ClientSettings(String host, String database) {

databend-client/src/main/java/com/databend/client/DatabendClient.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ public interface DatabendClient extends Closeable {
2929

3030
String getNodeID();
3131

32-
String getServerVersion();
33-
3432

3533
Map<String, String> getAdditionalHeaders();
3634

databend-client/src/main/java/com/databend/client/DatabendClientFactory.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

databend-client/src/main/java/com/databend/client/DatabendClientV1.java

Lines changed: 11 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import com.databend.client.errors.CloudErrors;
1818
import okhttp3.*;
19-
import okio.Buffer;
2019

2120
import javax.annotation.concurrent.ThreadSafe;
2221
import java.io.IOException;
@@ -50,10 +49,6 @@ public class DatabendClientV1
5049
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
5150
public static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
5251
public static final JsonCodec<DiscoveryResponseCodec.DiscoveryResponse> DISCOVERY_RESULT_CODEC = jsonCodec(DiscoveryResponseCodec.DiscoveryResponse.class);
53-
public static final String succeededState = "succeeded";
54-
public static final String failedState = "failed";
55-
public static final String runningState = "running";
56-
5752

5853
public static final String QUERY_PATH = "/v1/query";
5954
public static final String DISCOVERY_PATH = "/v1/discovery_nodes";
@@ -66,15 +61,14 @@ public class DatabendClientV1
6661
private final PaginationOptions paginationOptions;
6762
// request with retry timeout
6863
private final Integer requestTimeoutSecs;
69-
private final Map<String, String> additonalHeaders;
70-
private String serverVersion;
64+
private final Map<String, String> additionalHeaders;
7165
// client session
7266
private final AtomicReference<DatabendSession> databendSession;
7367
private String nodeID;
7468
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>(null);
7569
private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName());
7670

77-
private Consumer<DatabendSession> on_session_state_update;
71+
private final Consumer<DatabendSession> on_session_state_update;
7872

7973
public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings,
8074
Consumer<DatabendSession> on_session_state_update,
@@ -89,7 +83,7 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
8983
this.host = settings.getHost();
9084
this.paginationOptions = settings.getPaginationOptions();
9185
this.requestTimeoutSecs = settings.getQueryTimeoutSecs();
92-
this.additonalHeaders = settings.getAdditionalHeaders();
86+
this.additionalHeaders = settings.getAdditionalHeaders();
9387
this.maxRetryAttempts = settings.getRetryAttempts();
9488
this.databendSession = new AtomicReference<>(settings.getSession());
9589
this.nodeID = last_node_id.get();
@@ -126,7 +120,7 @@ public static Request.Builder prepareRequest(HttpUrl url, Map<String, String> ad
126120
}
127121

128122
private Request buildQueryRequest(String query, ClientSettings settings) {
129-
HttpUrl url = HttpUrl.get(settings.getHost());
123+
HttpUrl url = HttpUrl.parse(settings.getHost());
130124
if (url == null) {
131125
// TODO(zhihanz) use custom exception
132126
throw new IllegalArgumentException("Invalid host: " + settings.getHost());
@@ -139,7 +133,7 @@ private Request buildQueryRequest(String query, ClientSettings settings) {
139133
}
140134

141135
url = url.newBuilder().encodedPath(QUERY_PATH).build();
142-
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
136+
Request.Builder builder = prepareRequest(url, this.additionalHeaders);
143137
DatabendSession session = databendSession.get();
144138
if (session != null && session.getNeedSticky()) {
145139
builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, nodeID);
@@ -149,10 +143,6 @@ private Request buildQueryRequest(String query, ClientSettings settings) {
149143

150144
private static Request buildDiscoveryRequest(ClientSettings settings) {
151145
HttpUrl url = HttpUrl.get(settings.getHost());
152-
if (url == null) {
153-
// TODO(zhihanz) use custom exception
154-
throw new IllegalArgumentException("Invalid host: " + settings.getHost());
155-
}
156146
String discoveryPath = DISCOVERY_PATH;
157147
// intentionally use unsupported discovery path for testing
158148
if (settings.getAdditionalHeaders().get("~mock.unsupported.discovery") != null && BOOLEAN_TRUE_STR.equals(settings.getAdditionalHeaders().get("~mock.unsupported.discovery"))) {
@@ -313,19 +303,6 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi
313303
}
314304
}
315305

316-
private String requestBodyToString(Request request) {
317-
try {
318-
final Request copy = request.newBuilder().build();
319-
final Buffer buffer = new Buffer();
320-
if (copy.body() != null) {
321-
copy.body().writeTo(buffer);
322-
}
323-
return buffer.readUtf8();
324-
} catch (final IOException e) {
325-
return "did not work";
326-
}
327-
}
328-
329306
@Override
330307
public boolean execute(Request request) {
331308
return executeInternal(request, OptionalLong.empty());
@@ -340,20 +317,13 @@ private void processResponse(Headers headers, QueryResults results) {
340317
this.on_session_state_update.accept(session);
341318
}
342319
}
343-
if (results.getQueryId() != null && this.additonalHeaders.get(ClientSettings.X_Databend_Query_ID) == null) {
344-
this.additonalHeaders.put(ClientSettings.X_Databend_Query_ID, results.getQueryId());
320+
if (results.getQueryId() != null && this.additionalHeaders.get(ClientSettings.X_Databend_Query_ID) == null) {
321+
this.additionalHeaders.put(ClientSettings.X_Databend_Query_ID, results.getQueryId());
345322
}
346323
if (headers != null) {
347-
String serverVersionString = headers.get(ClientSettings.X_DATABEND_VERSION);
348-
if (serverVersionString != null) {
349-
try {
350-
serverVersion = serverVersionString;
351-
} catch (Exception ignored) {
352-
}
353-
}
354324
String route_hint = headers.get(ClientSettings.X_DATABEND_ROUTE_HINT);
355325
if (route_hint != null) {
356-
this.additonalHeaders.put(ClientSettings.X_DATABEND_ROUTE_HINT, route_hint);
326+
this.additionalHeaders.put(ClientSettings.X_DATABEND_ROUTE_HINT, route_hint);
357327
}
358328
}
359329
currentResults.set(results);
@@ -375,7 +345,7 @@ public boolean advance() {
375345
String nextUriPath = this.currentResults.get().getNextUri().toString();
376346
HttpUrl url = HttpUrl.get(this.host);
377347
url = url.newBuilder().encodedPath(nextUriPath).build();
378-
Request.Builder builder = prepareRequest(url, this.additonalHeaders);
348+
Request.Builder builder = prepareRequest(url, this.additionalHeaders);
379349
builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, this.nodeID);
380350
Request request = builder.get().build();
381351
return executeInternal(request, OptionalLong.of(MAX_MATERIALIZED_JSON_RESPONSE_SIZE));
@@ -388,7 +358,7 @@ public boolean hasNext() {
388358

389359
@Override
390360
public Map<String, String> getAdditionalHeaders() {
391-
return additonalHeaders;
361+
return additionalHeaders;
392362
}
393363

394364
@Override
@@ -406,12 +376,6 @@ public String getNodeID() {
406376
return this.nodeID;
407377
}
408378

409-
@Override
410-
public String getServerVersion() {
411-
return this.serverVersion;
412-
}
413-
414-
415379
@Override
416380
public void close() {
417381
closeQuery();
@@ -432,7 +396,7 @@ private void closeQuery() {
432396
String path = uri.toString();
433397
HttpUrl url = HttpUrl.get(this.host);
434398
url = url.newBuilder().encodedPath(path).build();
435-
Request r = prepareRequest(url, this.additonalHeaders).get().build();
399+
Request r = prepareRequest(url, this.additionalHeaders).get().build();
436400
try {
437401
httpClient.newCall(r).execute().close();
438402
} catch (IOException ignored) {

databend-client/src/main/java/com/databend/client/QuerySchema.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

databend-client/src/main/java/com/databend/client/ServerInfo.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

databend-client/src/main/java/com/databend/client/ServerVersions.java

Lines changed: 0 additions & 11 deletions
This file was deleted.

0 commit comments

Comments
 (0)