Skip to content

Commit 2dcfbd2

Browse files
authored
Merge pull request #156 from pdet/arrow_ipc_2
Have the callback back to throw errors
2 parents 561ad92 + 8b0d64a commit 2dcfbd2

File tree

1 file changed

+57
-35
lines changed

1 file changed

+57
-35
lines changed

src/connection.cpp

Lines changed: 57 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ Napi::FunctionReference Connection::Init(Napi::Env env, Napi::Object exports) {
1717
Napi::HandleScope scope(env);
1818

1919
Napi::Function t = DefineClass(
20-
env, "Connection",
21-
{InstanceMethod("prepare", &Connection::Prepare), InstanceMethod("exec", &Connection::Exec),
22-
InstanceMethod("register_udf_bulk", &Connection::RegisterUdf),
23-
InstanceMethod("register_buffer", &Connection::RegisterBuffer),
24-
InstanceMethod("unregister_udf", &Connection::UnregisterUdf), InstanceMethod("close", &Connection::Close),
25-
InstanceMethod("unregister_buffer", &Connection::UnRegisterBuffer)});
20+
env, "Connection",
21+
{InstanceMethod("prepare", &Connection::Prepare), InstanceMethod("exec", &Connection::Exec),
22+
InstanceMethod("register_udf_bulk", &Connection::RegisterUdf),
23+
InstanceMethod("register_buffer", &Connection::RegisterBuffer),
24+
InstanceMethod("unregister_udf", &Connection::UnregisterUdf), InstanceMethod("close", &Connection::Close),
25+
InstanceMethod("unregister_buffer", &Connection::UnRegisterBuffer)});
2626

2727
exports.Set("Connection", t);
2828

@@ -234,14 +234,14 @@ void DuckDBNodeUDFLauncher(Napi::Env env, Napi::Function jsudf, std::nullptr_t *
234234

235235
struct RegisterUdfTask : public Task {
236236
RegisterUdfTask(Connection &connection, std::string name, std::string return_type_name, Napi::Function callback)
237-
: Task(connection, callback), name(std::move(name)), return_type_name(std::move(return_type_name)) {
237+
: Task(connection, callback), name(std::move(name)), return_type_name(std::move(return_type_name)) {
238238
}
239239

240240
void DoWork() override {
241241
auto &connection = Get<Connection>();
242242
auto &udf_ptr = connection.udfs[name];
243243
duckdb::scalar_function_t udf_function = [&udf_ptr](duckdb::DataChunk &args, duckdb::ExpressionState &state,
244-
duckdb::Vector &result) -> void {
244+
duckdb::Vector &result) -> void {
245245
// here we can do only DuckDB stuff because we do not have a functioning env
246246

247247
// Flatten all args to simplify udfs
@@ -271,7 +271,7 @@ struct RegisterUdfTask : public Task {
271271
auto return_type = cast.cast_type;
272272

273273
connection.connection->CreateVectorizedFunction(name, vector<duckdb::LogicalType> {}, return_type, udf_function,
274-
duckdb::LogicalType::ANY);
274+
duckdb::LogicalType::ANY);
275275
}
276276
std::string name;
277277
std::string return_type_name;
@@ -296,22 +296,22 @@ Napi::Value Connection::RegisterUdf(const Napi::CallbackInfo &info) {
296296
}
297297

298298
auto udf = duckdb_node_udf_function_t::New(env, udf_callback, "duckdb_node_udf" + name, 0, 1, nullptr,
299-
[](Napi::Env, void *, std::nullptr_t *ctx) {});
299+
[](Napi::Env, void *, std::nullptr_t *ctx) {});
300300

301301
// we have to unref the udf because otherwise there is a circular ref with the connection somehow(?)
302302
// this took far too long to figure out
303303
udf.Unref(env);
304304
udfs[name] = udf;
305305

306306
database_ref->Schedule(info.Env(),
307-
duckdb::make_uniq<RegisterUdfTask>(*this, name, return_type_name, completion_callback));
307+
duckdb::make_uniq<RegisterUdfTask>(*this, name, return_type_name, completion_callback));
308308

309309
return Value();
310310
}
311311

312312
struct UnregisterUdfTask : public Task {
313313
UnregisterUdfTask(Connection &connection, std::string name, Napi::Function callback)
314-
: Task(connection, callback), name(std::move(name)) {
314+
: Task(connection, callback), name(std::move(name)) {
315315
}
316316

317317
void DoWork() override {
@@ -354,7 +354,7 @@ Napi::Value Connection::UnregisterUdf(const Napi::CallbackInfo &info) {
354354

355355
struct ExecTask : public Task {
356356
ExecTask(Connection &connection, std::string sql, Napi::Function callback)
357-
: Task(connection, callback), sql(std::move(sql)) {
357+
: Task(connection, callback), sql(std::move(sql)) {
358358
}
359359

360360
void DoWork() override {
@@ -395,8 +395,8 @@ struct ExecTask : public Task {
395395

396396
struct ExecTaskWithCallback : public ExecTask {
397397
ExecTaskWithCallback(Connection &connection, std::string sql, Napi::Function js_callback,
398-
std::function<void(void)> cpp_callback)
399-
: ExecTask(connection, sql, js_callback), cpp_callback(cpp_callback) {
398+
std::function<void(void)> cpp_callback)
399+
: ExecTask(connection, sql, js_callback), cpp_callback(cpp_callback) {
400400
}
401401

402402
void Callback() override {
@@ -456,24 +456,41 @@ Napi::Value Connection::Exec(const Napi::CallbackInfo &info) {
456456
}
457457

458458
struct CreateArrowViewTask : public Task {
459-
CreateArrowViewTask(Connection &connection, duckdb::vector<duckdb::Value>& parameters, std::string &view_name)
460-
: Task(connection), parameters(parameters), view_name(view_name) {
459+
CreateArrowViewTask(Connection &connection, duckdb::vector<duckdb::Value>& parameters, std::string &view_name, Napi::Function callback)
460+
: Task(connection, callback), parameters(parameters), view_name(view_name) {
461461
}
462462

463463
void DoWork() override {
464464
auto &connection = Get<Connection>();
465-
auto &con = *connection.connection;
466-
// Now we create a table function relation
467-
auto table_function_relation = duckdb::make_shared_ptr<duckdb::TableFunctionRelation>(con.context,"scan_arrow_ipc",parameters);
468-
// Creates a relation for a temporary view that does replace
469-
auto view_relation = table_function_relation->CreateView(view_name,true,true);
470-
471-
view_relation->Execute();
472-
465+
success = true;
466+
try {
467+
auto &con = *connection.connection;
468+
// Now we create a table function relation
469+
auto table_function_relation = duckdb::make_shared_ptr<duckdb::TableFunctionRelation>(con.context,"scan_arrow_ipc",parameters);
470+
// Creates a relation for a temporary view that does replace
471+
auto view_relation = table_function_relation->CreateView(view_name,true,true);
472+
auto res = view_relation->Execute();
473+
if (res->HasError()) {
474+
success = false;
475+
error = res->GetErrorObject();
476+
}
477+
} catch (duckdb::Exception &e) {
478+
success = false;
479+
error = duckdb::ErrorData(e);
480+
return;
481+
}
473482
}
474483

484+
void Callback() override {
485+
auto env = object.Env();
486+
Napi::HandleScope scope(env);
487+
callback.Value().MakeCallback(object.Value(), {success ? env.Null() : Utils::CreateError(env, error)});
488+
};
489+
475490
duckdb::vector<duckdb::Value> parameters;
476491
std::string view_name;
492+
bool success;
493+
duckdb::ErrorData error;
477494
};
478495

479496
// Register Arrow IPC buffers for scanning from DuckDB
@@ -512,20 +529,25 @@ Napi::Value Connection::RegisterBuffer(const Napi::CallbackInfo &info) {
512529
Napi::Uint8Array arr = v.As<Napi::Uint8Array>();
513530
auto raw_ptr = reinterpret_cast<uint64_t>(arr.ArrayBuffer().Data());
514531
auto length = (uint64_t)arr.ElementLength();
515-
duckdb::child_list_t<duckdb::Value> buffer_values;
516-
// This is a little bit evil, but allows us to support both libraries in between 1.2 and 1.3
517-
if (db.ExtensionIsLoaded("nanoarrow")){
518-
buffer_values.push_back({"ptr", duckdb::Value::POINTER(raw_ptr)});
519-
} else {
520-
buffer_values.push_back({"ptr", duckdb::Value::UBIGINT(raw_ptr)});
521-
}
532+
duckdb::child_list_t<duckdb::Value> buffer_values;
533+
// This is a little bit evil, but allows us to support both libraries in between 1.2 and 1.3
534+
if (db.ExtensionIsLoaded("nanoarrow")){
535+
buffer_values.push_back({"ptr", duckdb::Value::POINTER(raw_ptr)});
536+
} else {
537+
buffer_values.push_back({"ptr", duckdb::Value::UBIGINT(raw_ptr)});
538+
}
522539
buffer_values.push_back({"size", duckdb::Value::UBIGINT(length)});
523540
values.push_back(duckdb::Value::STRUCT(buffer_values));
524541
}
525542
duckdb::vector<duckdb::Value> list_value;
526-
list_value.push_back(duckdb::Value::LIST(values));
543+
list_value.push_back(duckdb::Value::LIST(values));
544+
545+
Napi::Function callback;
546+
if (info.Length() > 3 && info[3].IsFunction()) {
547+
callback = info[3].As<Napi::Function>();
548+
}
527549

528-
database_ref->Schedule(info.Env(), duckdb::make_uniq<CreateArrowViewTask>(*this, list_value, name));
550+
database_ref->Schedule(info.Env(), duckdb::make_uniq<CreateArrowViewTask>(*this, list_value, name, callback));
529551

530552
return Value();
531553
}
@@ -551,7 +573,7 @@ Napi::Value Connection::UnRegisterBuffer(const Napi::CallbackInfo &info) {
551573
};
552574

553575
database_ref->Schedule(info.Env(),
554-
duckdb::make_uniq<ExecTaskWithCallback>(*this, final_query, callback, cpp_callback));
576+
duckdb::make_uniq<ExecTaskWithCallback>(*this, final_query, callback, cpp_callback));
555577

556578
return Value();
557579
}

0 commit comments

Comments
 (0)