Skip to content

Commit f69b567

Browse files
authored
docs: add execute_update examples for rust, go, python (#2)
* feat: add execute_update example for rust * feat: add execute_update example for python * feat: add execute_update example for go * chore: add warn for execute_update impl of go and python * remove warn on execute update
1 parent 4651298 commit f69b567

File tree

6 files changed

+146
-21
lines changed

6 files changed

+146
-21
lines changed

go/client.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,16 @@ func (client *Client) Execute(sql string) ([]arrow.Record, error) {
102102
return client.doGet(flightInfo.GetEndpoint()[0].GetTicket())
103103
}
104104

105+
// Executes the sql on Datalayers and returns the affected rows.
106+
// The supported sqls are Insert and Delete. Note, the development for supporting Delete is in progress.
107+
func (client *Client) ExecuteUpdate(sql string) (int64, error) {
108+
affectedRows, err := client.inner.ExecuteUpdate(client.ctx, sql)
109+
if err != nil {
110+
return 0, fmt.Errorf("failed to execute a sql: %v", err)
111+
}
112+
return affectedRows, nil
113+
}
114+
105115
// Creates a prepared statement.
106116
func (client *Client) Prepare(sql string) (*flightsql.PreparedStatement, error) {
107117
return client.inner.Prepare(client.ctx, sql)
@@ -119,6 +129,11 @@ func (client *Client) ExecutePrepared(preparedStmt *flightsql.PreparedStatement,
119129
return client.doGet(flightInfo.GetEndpoint()[0].GetTicket())
120130
}
121131

132+
// Closes the prepared statement.
133+
func (client *Client) ClosePrepared(preparedStmt *flightsql.PreparedStatement) error {
134+
return preparedStmt.Close(client.ctx)
135+
}
136+
122137
// Calls the `DoGet` method of the FlightSQL client.
123138
func (client *Client) doGet(ticket *flight.Ticket) ([]arrow.Record, error) {
124139
reader, err := client.inner.DoGet(client.ctx, ticket)

go/main.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,40 @@ func main() {
147147
// 2024-09-01 10:05:00 +0800 CST 2 15.30 1
148148
// 2024-09-02 10:05:00 +0800 CST 2 15.30 1
149149
PrintRecords(result)
150+
151+
// Closes the prepared statement to notify releasing resources on server side.
152+
if err = client.ClosePrepared(preparedStmt); err != nil {
153+
fmt.Println("Failed to close a prepared statement: ", err)
154+
return
155+
}
156+
157+
// There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete.
158+
// This interface directly returns the affected rows which might be convenient for some use cases.
159+
//
160+
// Note, Datalayers does not support Update and the development for Delete is in progress.
161+
sql = `
162+
INSERT INTO go.demo (ts, sid, value, flag) VALUES
163+
('2024-09-03T10:00:00+08:00', 1, 4.5, 0),
164+
('2024-09-03T10:05:00+08:00', 2, 11.6, 1);`
165+
affectedRows, err := client.ExecuteUpdate(sql)
166+
if err != nil {
167+
fmt.Println("Failed to insert data: ", err)
168+
return
169+
}
170+
// The output should be:
171+
// Affected rows: 2
172+
fmt.Println("Affected rows: ", affectedRows)
173+
174+
// Checks that the data are inserted successfully.
175+
sql = "SELECT * FROM go.demo where ts >= '2024-09-03T10:00:00+08:00'"
176+
result, err = client.Execute(sql)
177+
if err != nil {
178+
fmt.Println("Failed to scan data: ", err)
179+
return
180+
}
181+
// The result should be:
182+
// ts sid value flag
183+
// 2024-09-03 10:00:00 +0800 CST 1 4.50 0
184+
// 2024-09-03 10:05:00 +0800 CST 2 11.60 1
185+
PrintRecords(result)
150186
}

python/client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ def execute(self, sql: str) -> pandas.DataFrame:
102102
df = reader.read_pandas()
103103
return df
104104

105+
def execute_update(self, sql: str) -> int:
106+
"""
107+
Executes the sql on Datalayers and returns the affected rows.
108+
This method is meant to be used for executing DMLs, including Insert and Delete.
109+
Note, Datalayers does not support Update and the development of Delete is in progress.
110+
"""
111+
112+
return self.inner.execute_update(sql, None)
113+
105114
def prepare(self, sql: str) -> PreparedStatement:
106115
"""
107116
Creates a prepared statement.
@@ -122,6 +131,15 @@ def execute_prepared(
122131
df = reader.read_pandas()
123132
return df
124133

134+
def close_prepared(self, prepared_stmt: PreparedStatement):
135+
"""
136+
Closes the prepared statement.
137+
Note, generally you should not call this method explicitly.
138+
Use with clause to manage the life cycle of a prepared statement instead.
139+
"""
140+
141+
prepared_stmt.close()
142+
125143
def close(self):
126144
"""
127145
Closes the inner Arrow FlightSQL client.

python/main.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,29 @@ def main():
105105
# 1 2024-09-02 10:05:00+08:00 2 15.3 1
106106
print(result)
107107

108+
# There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete.
109+
# This interface directly returns the affected rows which might be convenient for some use cases.
110+
#
111+
# Note, Datalayers does not support Update and the development for Delete is in progress.
112+
sql = """
113+
INSERT INTO python.demo (ts, sid, value, flag) VALUES
114+
('2024-09-03T10:00:00+08:00', 1, 4.5, 0),
115+
('2024-09-03T10:05:00+08:00', 2, 11.6, 1);
116+
"""
117+
affected_rows = client.execute_update(sql)
118+
# The output should be:
119+
# Affected rows: 2
120+
print("Affected rows: {}".format(affected_rows))
121+
122+
# Checks that the data are inserted successfully.
123+
sql = "SELECT * FROM python.demo where ts >= '2024-09-03T10:00:00+08:00'"
124+
result = client.execute(sql)
125+
# The result should be:
126+
# ts sid value flag
127+
# 0 2024-09-03 10:00:00+08:00 1 4.5 0
128+
# 1 2024-09-03 10:05:00+08:00 2 11.6 1
129+
print(result)
130+
108131

109132
if __name__ == "__main__":
110133
main()

rust/bin/main.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,34 @@ async fn main() -> Result<()> {
117117
// +---------------------------+-----+-------+------+
118118
print_batches(&result);
119119

120+
// Closes the prepared statement to notify releasing resources on server side.
121+
client.close_prepared(prepared_stmt).await?;
122+
123+
// There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete.
124+
// This interface directly returns the affected rows which might be convenient for some use cases.
125+
//
126+
// Note, Datalayers does not support Update and the development for Delete is in progress.
127+
sql = r#"
128+
INSERT INTO rust.demo (ts, sid, value, flag) VALUES
129+
('2024-09-03T10:00:00+08:00', 1, 4.5, 0),
130+
('2024-09-03T10:05:00+08:00', 2, 11.6, 1);
131+
"#;
132+
let affected_rows = client.execute_update(sql).await?;
133+
// The output should be:
134+
// Affected rows: 2
135+
println!("Affected rows: {}", affected_rows);
136+
137+
// Checks that the data are inserted successfully.
138+
sql = "SELECT * FROM rust.demo where ts >= '2024-09-03T10:00:00+08:00'";
139+
result = client.execute(sql).await?;
140+
// The result should be:
141+
// +---------------------------+-----+-------+------+
142+
// | ts | sid | value | flag |
143+
// +---------------------------+-----+-------+------+
144+
// | 2024-09-03T10:00:00+08:00 | 1 | 4.5 | 0 |
145+
// | 2024-09-03T10:05:00+08:00 | 2 | 11.6 | 1 |
146+
// +---------------------------+-----+-------+------+
147+
print_batches(&result);
148+
120149
Ok(())
121150
}

rust/src/client.rs

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl Client {
6464
.handshake(&config.username, &config.password)
6565
.await
6666
.inspect_err(|e| {
67-
println!("Failed to do handshake: {}", filter_message(&e.to_string()));
67+
println!("{}", filter_message(&e.to_string()));
6868
exit(1)
6969
});
7070

@@ -83,10 +83,7 @@ impl Client {
8383
.execute(sql.to_string(), None)
8484
.await
8585
.inspect_err(|e| {
86-
println!(
87-
"Failed to execute a sql: {}",
88-
filter_message(&e.to_string())
89-
);
86+
println!("{}", filter_message(&e.to_string()));
9087
exit(1)
9188
})?;
9289
let ticket = flight_info
@@ -100,16 +97,25 @@ impl Client {
10097
Ok(batches)
10198
}
10299

100+
pub async fn execute_update(&mut self, sql: &str) -> Result<i64> {
101+
let affected_rows = self
102+
.inner
103+
.execute_update(sql.to_string(), None)
104+
.await
105+
.inspect_err(|e| {
106+
println!("{}", filter_message(&e.to_string()));
107+
exit(1)
108+
})?;
109+
Ok(affected_rows)
110+
}
111+
103112
pub async fn prepare(&mut self, sql: &str) -> Result<PreparedStatement<Channel>> {
104113
let prepared_stmt = self
105114
.inner
106115
.prepare(sql.to_string(), None)
107116
.await
108117
.inspect_err(|e| {
109-
println!(
110-
"Failed to execute a sql: {}",
111-
filter_message(&e.to_string())
112-
);
118+
println!("{}", filter_message(&e.to_string()));
113119
exit(1)
114120
})?;
115121
Ok(prepared_stmt)
@@ -124,10 +130,7 @@ impl Client {
124130
.set_parameters(binding)
125131
.context("Failed to bind a record batch to the prepared statement")?;
126132
let flight_info = prepared_stmt.execute().await.inspect_err(|e| {
127-
println!(
128-
"Failed to execute the prepared statement: {}",
129-
filter_message(&e.to_string())
130-
);
133+
println!("{}", filter_message(&e.to_string()));
131134
exit(1)
132135
})?;
133136
let ticket = flight_info
@@ -141,19 +144,20 @@ impl Client {
141144
Ok(batches)
142145
}
143146

147+
pub async fn close_prepared(&self, prepared_stmt: PreparedStatement<Channel>) -> Result<()> {
148+
prepared_stmt
149+
.close()
150+
.await
151+
.context("Failed to close a prepared statement")
152+
}
153+
144154
async fn do_get(&mut self, ticket: Ticket) -> Result<Vec<RecordBatch>> {
145155
let stream = self.inner.do_get(ticket).await.inspect_err(|e| {
146-
println!(
147-
"Failed to perform do_get: {}",
148-
filter_message(&e.to_string())
149-
);
156+
println!("{}", filter_message(&e.to_string()));
150157
exit(1)
151158
})?;
152159
let batches = stream.try_collect::<Vec<_>>().await.inspect_err(|e| {
153-
println!(
154-
"Failed to consume flight record batch stream: {}",
155-
filter_message(&e.to_string())
156-
);
160+
println!("{}", filter_message(&e.to_string()));
157161
exit(1)
158162
})?;
159163
if batches.is_empty() {

0 commit comments

Comments
 (0)