Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ jobs:
curl -u databend:databend --request POST localhost:8000/v1/query --header 'Content-Type:application/json' --data-raw '{"sql":"select 1"}'

- name: Run Maven clean deploy with release profile
run: mvn test -DexcludedGroups=CLUSTER,FLAKY
run: mvn test -DexcludedGroups=MULTI_HOST,FLAKY
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
6 changes: 3 additions & 3 deletions .github/workflows/test_cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- uses: ./.github/actions/setup_databend_cluster
timeout-minutes: 15
with:
version: '1.2.754-nightly'
version: '1.2.797-nightly'
target: 'x86_64-unknown-linux-gnu'

- name: Test with conn to node 1
Expand All @@ -44,7 +44,7 @@ jobs:

- name: check nginx
run: |
curl -u 'databend:databend' -X POST "http://localhost:8010/v1/query" \
curl -u 'databend:databend' -X POST "http://localhost:8000/v1/query" \
-H 'Content-Type: application/json' \
-d '{"sql": "select 1", "pagination": { "wait_time_secs": 5 }}' || true
env:
Expand All @@ -57,4 +57,4 @@ jobs:
run: mvn test -DexcludedGroups=FLAKY
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
DATABEND_TEST_CONN_PORT: 8010
DATABEND_TEST_CONN_PORT: 8000
104 changes: 94 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,28 @@ import java.sql.ResultSet;

public class Main {
public static void main(String[] args) throws SQLException {
Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000", "root", "");
Statement statement = conn.createStatement();
statement.execute("SELECT number from numbers(200000) order by number");
ResultSet r = statement.getResultSet();
// ** We must call `rs.next()` otherwise the query may be canceled **
while (rs.next()) {
System.out.println(r.getInt(1));
try ( Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000", "root", "");
Statement statement = conn.createStatement()
) {
statement.execute("SELECT number from numbers(200000) order by number");
try(ResultSet r = statement.getResultSet()){
// ** We must call `rs.next()` otherwise the query may be canceled **
while (rs.next()) {
System.out.println(r.getInt(1));
}
}
}
conn.close();
}
}
```

### Important Notes

1. Because the `select`, `copy into`, `merge into` are query type SQL, they will return a `ResultSet` object, you must
1. Close Connection/Statement/ResultSet to release resources faster.
2. Because the `select`, `copy into`, `merge into` are query type SQL, they will return a `ResultSet` object, you must
call `rs.next()` before accessing the data. Otherwise, the query may be canceled. If you do not want get the result,
you can call `while(r.next(){})` to iterate over the result set.
2. For other SQL such as `create/drop table` non-query type SQL, you can call `statement.execute()` directly.
3. For other SQL such as `create/drop table` non-query type SQL, you can call `statement.execute()` directly.

## JDBC Java type mapping
The Databend type is mapped to Java type as follows:
Expand Down Expand Up @@ -99,3 +102,84 @@ For detailed references, please take a look at the following Links:

1. [Connection Parameters](./docs/Connection.md) : detailed documentation about how to use connection parameters in a
jdbc connection


# FileTransfer API

The `FileTransferAPI` interface provides a high-performance, Java-based mechanism for streaming data directly between your application and Databend's internal stage, eliminating the need for intermediate local files. It is designed for efficient bulk data operations.

## Key Features

* **Streaming Upload/Download:** Directly transfer data using `InputStream`, supporting large files without excessive memory consumption
* **Direct Table Loading:** Ingest data from streams or staged files directly into Databend tables using the `COPY INTO` command
* **Compression:** Supports on-the-fly compression and decompression during transfer to optimize network traffic
* **Flexible Data Ingestion:** Offers both stage-based and streaming-based methods for loading data into tables

## Core Methods

### `uploadStream`
Uploads a data stream as a single file to the specified internal stage.

**Parameters:**
- `stageName`: The stage which will receive the uploaded file
- `destPrefix`: The prefix of the file name in the stage
- `inputStream`: The input stream of the file data
- `destFileName`: The destination file name in the stage
- `fileSize`: The size of the file being uploaded
- `compressData`: Whether to compress the data during transfer

### `downloadStream`
Downloads a file from the internal stage and returns it as an `InputStream`.

**Parameters:**
- `stageName`: The stage which contains the file to download
- `sourceFileName`: The name of the file in the stage
- `decompress`: Whether to decompress the data during download

**Returns:** `InputStream` of the downloaded file content


### `loadStreamToTable`
A versatile method to load data from a stream directly into a table, using either a staging or streaming approach.

Available with databend-jdbc >= 0.4 AND databend-query >= 1.2.791.

**Parameters:**
- `sql`: SQL statement with specific syntax for data loading
- `inputStream`: The input stream of the file data to load
- `fileSize`: The size of the file being loaded
- `loadMethod`: The loading method - "stage" or "streaming". `stage` method first upload file to a special path in user stage, while `steaming` method load data to while transforming data.

**Returns:** Number of rows successfully loaded

## Quick Start

The following example demonstrates how to upload data and load it into a table:

```java
// 1. Upload a file to the internal stage
Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000");
FileTransferAPI api = conn.unwrap(DatabendConnection.class);

FileInputStream fileStream = new FileInputStream("data.csv");
api.uploadStream(
"my_stage",
"uploads/",
fileStream,
"data.csv",
Files.size(Paths.get("data.csv")),
true // Compress the data during upload
);
fileStream.close();

// 2. Load the staged file into a table
FileInputStream fileStream = new FileInputStream("data.csv");
String sql = "insert into my_table from @_databend_load file_format=(type=csv)"; // use special stage `_databend_load
api.loadStreamToTable(sql, file_stream, Files.size(Paths.get("data.csv")), "stage");
fileStream.close();
conn.close())


```

> **Important:** Callers are responsible for properly closing the provided `InputStream` objects after operations are complete.
17 changes: 1 addition & 16 deletions databend-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,21 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<project.build.targetJdk>8</project.build.targetJdk>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
Expand All @@ -53,14 +49,13 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okio/okio -->

<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio</artifactId>
Expand All @@ -76,18 +71,8 @@
<artifactId>okhttp-urlconnection</artifactId>
</dependency>

<dependency>
<groupId>com.github.zafarkhaja</groupId>
<artifactId>java-semver</artifactId>
</dependency>

<!-- for testing -->

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class ClientSettings {
public static final String X_DATABEND_STICKY_NODE = "X-DATABEND-STICKY-NODE";
public static final String DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE";
public static final String DatabendTenantHeader = "X-DATABEND-TENANT";
public static final String DatabendSQLHeader = "X-DATABEND-SQL";
public static final String DatabendQueryContextHeader = "X-DATABEND-QUERY-CONTEXT";
private final String host;
private final DatabendSession session;
private final Integer queryTimeoutSecs;
Expand All @@ -40,14 +42,14 @@ public class ClientSettings {
private final PaginationOptions paginationOptions;

private final StageAttachment stageAttachment;
private Map<String, String> additionalHeaders;
private final Map<String, String> additionalHeaders;

private final int retryAttempts;
// TODO(zhihanz) timezone and locale info

//ClientSettings for test case use
public ClientSettings(String host) {
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap<String, String>(), null, DEFAULT_RETRY_ATTEMPTS);
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap<>(), null, DEFAULT_RETRY_ATTEMPTS);
}

public ClientSettings(String host, String database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public interface DatabendClient extends Closeable {

String getNodeID();

String getServerVersion();


Map<String, String> getAdditionalHeaders();

Expand Down

This file was deleted.

Loading
Loading