Skip to content

Commit cb7e94c

Browse files
committed
feat: support streaming load
1 parent a79b7b5 commit cb7e94c

40 files changed

+706
-382
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@ jobs:
5151
curl -u databend:databend --request POST localhost:8000/v1/query --header 'Content-Type:application/json' --data-raw '{"sql":"select 1"}'
5252
5353
- name: Run Maven clean deploy with release profile
54-
run: mvn test -DexcludedGroups=CLUSTER,FLAKY
54+
run: mvn test -DexcludedGroups=MULTI_HOST,FLAKY
5555
env:
5656
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

.github/workflows/test_cluster.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
- uses: ./.github/actions/setup_databend_cluster
3232
timeout-minutes: 15
3333
with:
34-
version: '1.2.754-nightly'
34+
version: '1.2.797-nightly'
3535
target: 'x86_64-unknown-linux-gnu'
3636

3737
- name: Test with conn to node 1

README.md

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,25 +50,28 @@ import java.sql.ResultSet;
5050

5151
public class Main {
5252
public static void main(String[] args) throws SQLException {
53-
Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000", "root", "");
54-
Statement statement = conn.createStatement();
55-
statement.execute("SELECT number from numbers(200000) order by number");
56-
ResultSet r = statement.getResultSet();
57-
// ** We must call `rs.next()` otherwise the query may be canceled **
58-
while (rs.next()) {
59-
System.out.println(r.getInt(1));
53+
try ( Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000", "root", "");
54+
Statement statement = conn.createStatement()
55+
) {
56+
statement.execute("SELECT number from numbers(200000) order by number");
57+
try(ResultSet r = statement.getResultSet()){
58+
// ** We must call `rs.next()` otherwise the query may be canceled **
59+
while (rs.next()) {
60+
System.out.println(r.getInt(1));
61+
}
62+
}
6063
}
61-
conn.close();
6264
}
6365
}
6466
```
6567

6668
### Important Notes
6769

68-
1. Because the `select`, `copy into`, `merge into` are query type SQL, they will return a `ResultSet` object, you must
70+
1. Close Connection/Statement/ResultSet to release resources faster.
71+
2. Because the `select`, `copy into`, `merge into` are query type SQL, they will return a `ResultSet` object, you must
6972
call `rs.next()` before accessing the data. Otherwise, the query may be canceled. If you do not want get the result,
7073
you can call `while(r.next(){})` to iterate over the result set.
71-
2. For other SQL such as `create/drop table` non-query type SQL, you can call `statement.execute()` directly.
74+
3. For other SQL such as `create/drop table` non-query type SQL, you can call `statement.execute()` directly.
7275

7376
## JDBC Java type mapping
7477
The Databend type is mapped to Java type as follows:
@@ -99,3 +102,84 @@ For detailed references, please take a look at the following Links:
99102

100103
1. [Connection Parameters](./docs/Connection.md) : detailed documentation about how to use connection parameters in a
101104
jdbc connection
105+
106+
107+
# FileTransfer API
108+
109+
The `FileTransferAPI` interface provides a high-performance, Java-based mechanism for streaming data directly between your application and Databend's internal stage, eliminating the need for intermediate local files. It is designed for efficient bulk data operations.
110+
111+
## Key Features
112+
113+
* **Streaming Upload/Download:** Directly transfer data using `InputStream`, supporting large files without excessive memory consumption
114+
* **Direct Table Loading:** Ingest data from streams or staged files directly into Databend tables using the `COPY INTO` command
115+
* **Compression:** Supports on-the-fly compression and decompression during transfer to optimize network traffic
116+
* **Flexible Data Ingestion:** Offers both stage-based and streaming-based methods for loading data into tables
117+
118+
## Core Methods
119+
120+
### `uploadStream`
121+
Uploads a data stream as a single file to the specified internal stage.
122+
123+
**Parameters:**
124+
- `stageName`: The stage which will receive the uploaded file
125+
- `destPrefix`: The prefix of the file name in the stage
126+
- `inputStream`: The input stream of the file data
127+
- `destFileName`: The destination file name in the stage
128+
- `fileSize`: The size of the file being uploaded
129+
- `compressData`: Whether to compress the data during transfer
130+
131+
### `downloadStream`
132+
Downloads a file from the internal stage and returns it as an `InputStream`.
133+
134+
**Parameters:**
135+
- `stageName`: The stage which contains the file to download
136+
- `sourceFileName`: The name of the file in the stage
137+
- `decompress`: Whether to decompress the data during download
138+
139+
**Returns:** `InputStream` of the downloaded file content
140+
141+
142+
### `loadStreamToTable`
143+
A versatile method to load data from a stream directly into a table, using either a staging or streaming approach.
144+
145+
Available with databend-jdbc >= 0.4 AND databend-query >= 1.2.791.
146+
147+
**Parameters:**
148+
- `sql`: SQL statement with specific syntax for data loading
149+
- `inputStream`: The input stream of the file data to load
150+
- `fileSize`: The size of the file being loaded
151+
- `loadMethod`: The loading method - "stage" or "streaming". `stage` method first upload file to a special path in user stage, while `steaming` method load data to while transforming data.
152+
153+
**Returns:** Number of rows successfully loaded
154+
155+
## Quick Start
156+
157+
The following example demonstrates how to upload data and load it into a table:
158+
159+
```java
160+
// 1. Upload a file to the internal stage
161+
Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000");
162+
FileTransferAPI api = conn.unwrap(DatabendConnection.class);
163+
164+
FileInputStream fileStream = new FileInputStream("data.csv");
165+
api.uploadStream(
166+
"my_stage",
167+
"uploads/",
168+
fileStream,
169+
"data.csv",
170+
Files.size(Paths.get("data.csv")),
171+
true // Compress the data during upload
172+
);
173+
fileStream.close();
174+
175+
// 2. Load the staged file into a table
176+
FileInputStream fileStream = new FileInputStream("data.csv");
177+
String sql = "insert into my_table from @_databend_load file_format=(type=csv)"; // use special stage `_databend_load
178+
api.loadStreamToTable(sql, file_stream, Files.size(Paths.get("data.csv")), "stage");
179+
fileStream.close();
180+
conn.close())
181+
182+
183+
```
184+
185+
> **Important:** Callers are responsible for properly closing the provided `InputStream` objects after operations are complete.

databend-client/pom.xml

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,21 @@
1818
<properties>
1919
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
2020
<project.build.targetJdk>8</project.build.targetJdk>
21-
<jackson.version>2.15.2</jackson.version>
2221
</properties>
2322
<dependencies>
2423
<dependency>
2524
<groupId>com.fasterxml.jackson.core</groupId>
2625
<artifactId>jackson-annotations</artifactId>
27-
<version>${jackson.version}</version>
2826
</dependency>
2927

3028
<dependency>
3129
<groupId>com.fasterxml.jackson.core</groupId>
3230
<artifactId>jackson-core</artifactId>
33-
<version>${jackson.version}</version>
3431
</dependency>
3532

3633
<dependency>
3734
<groupId>com.fasterxml.jackson.core</groupId>
3835
<artifactId>jackson-databind</artifactId>
39-
<version>${jackson.version}</version>
4036
</dependency>
4137

4238
<dependency>
@@ -53,14 +49,13 @@
5349
<dependency>
5450
<groupId>com.google.guava</groupId>
5551
<artifactId>guava</artifactId>
56-
<version>32.0.1-jre</version>
5752
</dependency>
5853

5954
<dependency>
6055
<groupId>com.squareup.okhttp3</groupId>
6156
<artifactId>okhttp</artifactId>
6257
</dependency>
63-
<!-- https://mvnrepository.com/artifact/com.squareup.okio/okio -->
58+
6459
<dependency>
6560
<groupId>com.squareup.okio</groupId>
6661
<artifactId>okio</artifactId>
@@ -76,18 +71,8 @@
7671
<artifactId>okhttp-urlconnection</artifactId>
7772
</dependency>
7873

79-
<dependency>
80-
<groupId>com.github.zafarkhaja</groupId>
81-
<artifactId>java-semver</artifactId>
82-
</dependency>
83-
8474
<!-- for testing -->
8575

86-
<dependency>
87-
<groupId>io.airlift</groupId>
88-
<artifactId>json</artifactId>
89-
<scope>test</scope>
90-
</dependency>
9176
<dependency>
9277
<groupId>org.testng</groupId>
9378
<artifactId>testng</artifactId>

databend-client/src/main/java/com/databend/client/ClientSettings.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public class ClientSettings {
3131
public static final String X_DATABEND_STICKY_NODE = "X-DATABEND-STICKY-NODE";
3232
public static final String DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE";
3333
public static final String DatabendTenantHeader = "X-DATABEND-TENANT";
34+
public static final String DatabendSQLHeader = "X-DATABEND-SQL";
35+
public static final String DatabendQueryContextHeader = "X-DATABEND-QUERY-CONTEXT";
3436
private final String host;
3537
private final DatabendSession session;
3638
private final Integer queryTimeoutSecs;
@@ -40,14 +42,14 @@ public class ClientSettings {
4042
private final PaginationOptions paginationOptions;
4143

4244
private final StageAttachment stageAttachment;
43-
private Map<String, String> additionalHeaders;
45+
private final Map<String, String> additionalHeaders;
4446

4547
private final int retryAttempts;
4648
// TODO(zhihanz) timezone and locale info
4749

4850
//ClientSettings for test case use
4951
public ClientSettings(String host) {
50-
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap<String, String>(), null, DEFAULT_RETRY_ATTEMPTS);
52+
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap<>(), null, DEFAULT_RETRY_ATTEMPTS);
5153
}
5254

5355
public ClientSettings(String host, String database) {

databend-client/src/main/java/com/databend/client/DatabendClient.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ public interface DatabendClient extends Closeable {
2929

3030
String getNodeID();
3131

32-
String getServerVersion();
33-
3432

3533
Map<String, String> getAdditionalHeaders();
3634

databend-client/src/main/java/com/databend/client/DatabendClientFactory.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

0 commit comments

Comments
 (0)