diff --git a/cpp/include/legate_dataframe/csv.hpp b/cpp/include/legate_dataframe/csv.hpp index 7f08541..9904f3e 100644 --- a/cpp/include/legate_dataframe/csv.hpp +++ b/cpp/include/legate_dataframe/csv.hpp @@ -38,6 +38,8 @@ class CSVWrite : public Task { .with_has_allocations(true) .with_elide_device_ctx_sync(true) .with_has_side_effect(true); + static constexpr auto CPU_VARIANT_OPTIONS = + legate::VariantOptions{}.with_has_allocations(true).with_has_side_effect(true); static void cpu_variant(legate::TaskContext context); static void gpu_variant(legate::TaskContext context); }; diff --git a/cpp/include/legate_dataframe/filling.hpp b/cpp/include/legate_dataframe/filling.hpp index 796195d..f8f657d 100644 --- a/cpp/include/legate_dataframe/filling.hpp +++ b/cpp/include/legate_dataframe/filling.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,9 +17,20 @@ #pragma once #include +#include namespace legate::dataframe { +namespace task { + +class SequenceTask : public Task { + public: + static void cpu_variant(legate::TaskContext context); + static void gpu_variant(legate::TaskContext context); +}; + +} // namespace task + /** * @brief Fills a column with a sequence of int64 values * @@ -36,9 +47,6 @@ namespace legate::dataframe { * Notice, this is primarily for C++ testing and examples for now. TODO: implement * all of the cudf features * - * @throws cudf::logic_error if @p init is not numeric. - * @throws cudf::logic_error if @p size is < 0. - * * @param size Size of the output column * @param init First value in the sequence * @return The result column (int64) containing the generated sequence diff --git a/cpp/include/legate_dataframe/groupby_aggregation.hpp b/cpp/include/legate_dataframe/groupby_aggregation.hpp index 8c1e244..d08efb5 100644 --- a/cpp/include/legate_dataframe/groupby_aggregation.hpp +++ b/cpp/include/legate_dataframe/groupby_aggregation.hpp @@ -19,11 +19,23 @@ #include #include -#include - +#include #include namespace legate::dataframe { +namespace task { +class GroupByAggregationTask : public Task { + public: + static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{} + .with_has_allocations(true) + .with_concurrent(true) + .with_elide_device_ctx_sync(true); + static constexpr auto CPU_VARIANT_OPTIONS = + legate::VariantOptions{}.with_has_allocations(true).with_concurrent(true); + static void cpu_variant(legate::TaskContext context); + static void gpu_variant(legate::TaskContext context); +}; +} // namespace task /** * @brief Perform a groupby and aggregation in a single operation. diff --git a/cpp/include/legate_dataframe/join.hpp b/cpp/include/legate_dataframe/join.hpp index 70e1e3c..2a007de 100644 --- a/cpp/include/legate_dataframe/join.hpp +++ b/cpp/include/legate_dataframe/join.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,13 +20,39 @@ #include // cudf::null_equality +#include #include namespace legate::dataframe { - enum class JoinType : int32_t { INNER = 0, LEFT, FULL }; enum class BroadcastInput : int32_t { AUTO = 0, LEFT, RIGHT }; +namespace task { +template +class JoinTask : public Task, + needs_communication ? OpCode::JoinConcurrent : OpCode::Join> { + public: + static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{} + .with_has_allocations(true) + .with_concurrent(needs_communication) + .with_elide_device_ctx_sync(true); + static constexpr auto CPU_VARIANT_OPTIONS = + legate::VariantOptions{}.with_has_allocations(true).with_concurrent(needs_communication); + + static void cpu_variant(legate::TaskContext context); + static void gpu_variant(legate::TaskContext context); +}; +/** + * @brief Help function to determine if we need to repartition the tables + * + * If legate broadcast the left- or right-hand side table, we might not need to + * repartition them. This depends on the join type and which table is broadcasted. + */ +bool is_repartition_not_needed(const TaskContext& ctx, + JoinType join_type, + bool lhs_broadcasted, + bool rhs_broadcasted); +} // namespace task /** * @brief Perform a join between the specified tables. * diff --git a/cpp/src/filling.cpp b/cpp/src/filling.cpp index 43e6be6..3d26340 100644 --- a/cpp/src/filling.cpp +++ b/cpp/src/filling.cpp @@ -16,9 +16,6 @@ #include -#include -#include - #include #include #include @@ -27,55 +24,29 @@ namespace legate::dataframe { namespace task { -class SequenceTask : public Task { - public: - static void cpu_variant(legate::TaskContext context) - { - TaskContext ctx{context}; - auto global_size = argument::get_next_scalar(ctx); - auto global_init = argument::get_next_scalar(ctx); - auto output = argument::get_next_output(ctx); - argument::get_parallel_launch_task(ctx); - auto [local_start, local_size] = evenly_partition_work(global_size, ctx.rank, ctx.nranks); - auto local_init = global_init + local_start; - - if (local_size == 0) { - output.bind_empty_data(); - return; - } - - arrow::Int64Builder long_builder = arrow::Int64Builder(); - auto status = long_builder.Reserve(local_size); - for (size_t i = 0; i < local_size; i++) { - long_builder.UnsafeAppend(local_init + i); - } - auto local_array = ARROW_RESULT(long_builder.Finish()); - output.move_into(std::move(local_array)); +/*static*/ void SequenceTask::cpu_variant(legate::TaskContext context) +{ + TaskContext ctx{context}; + auto global_size = argument::get_next_scalar(ctx); + auto global_init = argument::get_next_scalar(ctx); + auto output = argument::get_next_output(ctx); + argument::get_parallel_launch_task(ctx); + auto [local_start, local_size] = evenly_partition_work(global_size, ctx.rank, ctx.nranks); + auto local_init = global_init + local_start; + + if (local_size == 0) { + output.bind_empty_data(); + return; } - static void gpu_variant(legate::TaskContext context) - { - TaskContext ctx{context}; - auto global_size = argument::get_next_scalar(ctx); - auto global_init = argument::get_next_scalar(ctx); - auto output = argument::get_next_output(ctx); - argument::get_parallel_launch_task(ctx); - - auto [local_start, local_size] = evenly_partition_work(global_size, ctx.rank, ctx.nranks); - auto local_init = global_init + local_start; - - if (local_size == 0) { - output.bind_empty_data(); - return; - } - - cudf::numeric_scalar cudf_init(local_init, true, ctx.stream(), ctx.mr()); - auto res = cudf::sequence(local_size, cudf_init, ctx.stream(), ctx.mr()); - - output.move_into(std::move(res)); + arrow::Int64Builder long_builder = arrow::Int64Builder(); + auto status = long_builder.Reserve(local_size); + for (size_t i = 0; i < local_size; i++) { + long_builder.UnsafeAppend(local_init + i); } -}; - + auto local_array = ARROW_RESULT(long_builder.Finish()); + output.move_into(std::move(local_array)); +} } // namespace task LogicalColumn sequence(size_t size, int64_t init) diff --git a/cpp/src/filling.cu b/cpp/src/filling.cu new file mode 100644 index 0000000..e3712e8 --- /dev/null +++ b/cpp/src/filling.cu @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include +#include +#include + +namespace legate::dataframe::task { +/*static*/ void SequenceTask::gpu_variant(legate::TaskContext context) +{ + TaskContext ctx{context}; + auto global_size = argument::get_next_scalar(ctx); + auto global_init = argument::get_next_scalar(ctx); + auto output = argument::get_next_output(ctx); + argument::get_parallel_launch_task(ctx); + + auto [local_start, local_size] = evenly_partition_work(global_size, ctx.rank, ctx.nranks); + auto local_init = global_init + local_start; + + if (local_size == 0) { + output.bind_empty_data(); + return; + } + + cudf::numeric_scalar cudf_init(local_init, true, ctx.stream(), ctx.mr()); + auto res = cudf::sequence(local_size, cudf_init, ctx.stream(), ctx.mr()); + + output.move_into(std::move(res)); +} +} // namespace legate::dataframe::task diff --git a/cpp/src/groupby_aggregation.cpp b/cpp/src/groupby_aggregation.cpp index 2d94dad..e3280bd 100644 --- a/cpp/src/groupby_aggregation.cpp +++ b/cpp/src/groupby_aggregation.cpp @@ -26,11 +26,6 @@ #include #include -#include // cudf::detail::target_type -#include -#include - -#include #include #include @@ -43,241 +38,46 @@ std::string arrow_aggregation_name(std::string name) return name; } -std::unique_ptr make_groupby_aggregation(cudf::aggregation::Kind kind) +/*static*/ void GroupByAggregationTask::cpu_variant(legate::TaskContext context) { - switch (kind) { - case cudf::aggregation::Kind::SUM: { - return cudf::make_sum_aggregation(); - } - case cudf::aggregation::Kind::PRODUCT: { - return cudf::make_product_aggregation(); - } - case cudf::aggregation::Kind::MIN: { - return cudf::make_min_aggregation(); - } - case cudf::aggregation::Kind::MAX: { - return cudf::make_max_aggregation(); - } - case cudf::aggregation::Kind::COUNT_VALID: { - return cudf::make_count_aggregation(); - } - case cudf::aggregation::Kind::MEAN: { - return cudf::make_mean_aggregation(); - } - // 0 degrees of freedom instead of default 1 to match Arrow's behavior - case cudf::aggregation::Kind::VARIANCE: { - return cudf::make_variance_aggregation(0); - } - case cudf::aggregation::Kind::STD: { - return cudf::make_std_aggregation(0); - } - case cudf::aggregation::Kind::MEDIAN: { - return cudf::make_median_aggregation(); - } - case cudf::aggregation::Kind::NUNIQUE: { - return cudf::make_nunique_aggregation(); - } - default: { - throw std::invalid_argument("Unsupported groupby aggregation"); - } + TaskContext ctx{context}; + auto table = argument::get_next_input(ctx); + auto output = argument::get_next_output(ctx); + auto _key_col_idx = argument::get_next_scalar_vector(ctx); + std::vector key_col_idx(_key_col_idx.begin(), _key_col_idx.end()); + + // Get the `column_aggs` task argument + std::vector aggregates; + auto column_aggs_size = argument::get_next_scalar(ctx); + for (size_t i = 0; i < column_aggs_size; ++i) { + auto in_col_idx = argument::get_next_scalar(ctx); + auto kind = argument::get_next_scalar(ctx); + auto out_col_idx = argument::get_next_scalar(ctx); + aggregates.push_back( + {arrow_aggregation_name(kind), std::to_string(in_col_idx), std::to_string(i)}); } -} - -cudf::aggregation::Kind arrow_to_cudf_aggregation(const std::string& agg_name) -{ - std::map agg_map = { - // Direct mappings - {"sum", cudf::aggregation::Kind::SUM}, - {"product", cudf::aggregation::Kind::PRODUCT}, - {"min", cudf::aggregation::Kind::MIN}, - {"max", cudf::aggregation::Kind::MAX}, - {"count", cudf::aggregation::Kind::COUNT_VALID}, - {"mean", cudf::aggregation::Kind::MEAN}, - {"variance", cudf::aggregation::Kind::VARIANCE}, - {"stddev", cudf::aggregation::Kind::STD}, - {"approximate_median", cudf::aggregation::Kind::MEDIAN}, - {"count_distinct", cudf::aggregation::Kind::NUNIQUE}}; - - // {"count_all", cudf::aggregation::Kind::COUNT_ALL}, - // "count_all" could be supported but needs some work as it has 0 inputs - - // Don't do these as we don't support nested types at the moment - // {"list", cudf::aggregation::Kind::COLLECT_LIST}, - // {"tdigest", cudf::aggregation::Kind::TDIGEST} - - // Arrow aggregations with no direct cuDF equivalent: - // any - cudf has as a reduction aggregation but not groupby aggregation - // all - cudf has as a reduction aggregation but not groupby aggregation - // distinct - // first - could map to NTH_ELEMENT with n=0 - // first_last - no equivalent - // kurtosis - no equivalent - // last - could map to NTH_ELEMENT with n=-1 - // min_max - no single equivalent (would need separate MIN/MAX) - // one - no equivalent - // pivot_wider - no equivalent - // skew - no equivalent - // cuDF aggregations with no direct Arrow equivalent: - // SUM_OF_SQUARES - no equivalent - // M2 - no equivalent - // QUANTILE - no equivalent - // ARGMAX - no equivalent - // ARGMIN - no equivalent - // NTH_ELEMENT - no equivalent - // ROW_NUMBER - no equivalent - // EWMA - no equivalent - // RANK - no equivalent - // COLLECT_SET - no equivalent - // LEAD - no equivalent - // LAG - no equivalent - // PTX - no equivalent - // CUDA - no equivalent - // HOST_UDF - no equivalent - // MERGE_LISTS - no equivalent - // MERGE_SETS - no equivalent - // MERGE_M2 - no equivalent - // COVARIANCE - no equivalent - // CORRELATION - no equivalent - // MERGE_TDIGEST - no equivalent - // HISTOGRAM - no equivalent - // MERGE_HISTOGRAM - no equivalent - // BITWISE_AGG - no equivalent - if (agg_map.count(agg_name) == 0) { - throw std::invalid_argument("Unsupported aggregation: " + agg_name); + std::vector dummy_column_names; + for (int i = 0; i < table.num_columns(); i++) { + dummy_column_names.push_back(std::to_string(i)); } - return agg_map.at(agg_name); -} - -class GroupByAggregationTask : public Task { - public: - static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{} - .with_has_allocations(true) - .with_concurrent(true) - .with_elide_device_ctx_sync(true); - static void cpu_variant(legate::TaskContext context) - { - TaskContext ctx{context}; - auto table = argument::get_next_input(ctx); - auto output = argument::get_next_output(ctx); - auto _key_col_idx = argument::get_next_scalar_vector(ctx); - std::vector key_col_idx(_key_col_idx.begin(), _key_col_idx.end()); + auto table_view = table.arrow_table_view(dummy_column_names); - // Get the `column_aggs` task argument - std::vector aggregates; - auto column_aggs_size = argument::get_next_scalar(ctx); - for (size_t i = 0; i < column_aggs_size; ++i) { - auto in_col_idx = argument::get_next_scalar(ctx); - auto kind = argument::get_next_scalar(ctx); - auto out_col_idx = argument::get_next_scalar(ctx); - aggregates.push_back( - {arrow_aggregation_name(kind), std::to_string(in_col_idx), std::to_string(i)}); - } - - std::vector dummy_column_names; - for (int i = 0; i < table.num_columns(); i++) { - dummy_column_names.push_back(std::to_string(i)); - } - auto table_view = table.arrow_table_view(dummy_column_names); + // Repartition `table` based on the keys such that each node can do a local groupby. + auto repartitioned = repartition_by_hash(ctx, table_view, key_col_idx); - // Repartition `table` based on the keys such that each node can do a local groupby. - auto repartitioned = repartition_by_hash(ctx, table_view, key_col_idx); - - // Do the groupby - std::vector key_names; - for (auto idx : key_col_idx) { - key_names.push_back(std::to_string(idx)); - } - arrow::acero::Declaration plan = arrow::acero::Declaration::Sequence( - {{"table_source", arrow::acero::TableSourceNodeOptions(repartitioned)}, - {"aggregate", arrow::acero::AggregateNodeOptions(aggregates, key_names)}}); - auto result = ARROW_RESULT(arrow::acero::DeclarationToTable(std::move(plan))); - - output.move_into(std::move(result)); + // Do the groupby + std::vector key_names; + for (auto idx : key_col_idx) { + key_names.push_back(std::to_string(idx)); } + arrow::acero::Declaration plan = arrow::acero::Declaration::Sequence( + {{"table_source", arrow::acero::TableSourceNodeOptions(repartitioned)}, + {"aggregate", arrow::acero::AggregateNodeOptions(aggregates, key_names)}}); + auto result = ARROW_RESULT(arrow::acero::DeclarationToTable(std::move(plan))); - static void gpu_variant(legate::TaskContext context) - { - TaskContext ctx{context}; - auto table = argument::get_next_input(ctx); - auto output = argument::get_next_output(ctx); - auto _key_col_idx = argument::get_next_scalar_vector(ctx); - std::vector key_col_idx(_key_col_idx.begin(), _key_col_idx.end()); - - // Get the `column_aggs` task argument - std::vector> column_aggs; - auto column_aggs_size = argument::get_next_scalar(ctx); - for (size_t i = 0; i < column_aggs_size; ++i) { - auto in_col_idx = argument::get_next_scalar(ctx); - auto kind = argument::get_next_scalar(ctx); - auto out_col_idx = argument::get_next_scalar(ctx); - column_aggs.push_back({in_col_idx, arrow_to_cudf_aggregation(kind), out_col_idx}); - } - - // Repartition `table` based on the keys such that each node can do a local groupby. - auto repartitioned = repartition_by_hash(ctx, table.table_view(), key_col_idx); - - // In order to create the aggregation requests, we walk through `column_aggs` and for - // each unique input-column-index, we create an aggregation request and append the - // aggregation-kinds found in `column_aggs`. - std::vector requests; - std::map> out_col_to_request_and_agg_idx; - { - std::map in_col_to_request_idx; - for (const auto& [in_col_idx, kind, out_col_idx] : column_aggs) { - // If this is the first time we see `in_col_idx`, we create a new `aggregation_request` - // with `values` set to the column of `in_col_idx` and an empty aggregation vector. - if (in_col_to_request_idx.find(in_col_idx) == in_col_to_request_idx.end()) { - in_col_to_request_idx[in_col_idx] = requests.size(); - requests.push_back(cudf::groupby::aggregation_request{ - .values = repartitioned->get_column(in_col_idx), .aggregations = {}}); - } - - // Find the `aggregation_request` that belongs to `in_col_idx` - size_t request_idx = in_col_to_request_idx.at(in_col_idx); - auto& request = requests.at(request_idx); - // Add the aggregation kind to the request - request.aggregations.push_back(make_groupby_aggregation(kind)); - - // Record in which index in `requests` and `request.aggregations`, the - // aggregation was added. - out_col_to_request_and_agg_idx[out_col_idx] = {request_idx, - request.aggregations.size() - 1}; - } - } - - // Do a local groupby - cudf::groupby::groupby gb_obj(repartitioned->select(key_col_idx), cudf::null_policy::INCLUDE); - auto [unique_keys, agg_result] = gb_obj.aggregate(requests, ctx.stream(), ctx.mr()); - - // Gather the output columns. The key columns goes first. - auto output_columns = unique_keys->release(); - - // Then we add the columns in `agg_result` using the order recorded - // in `out_col_to_request_and_agg_idx`. - output_columns.resize(output_columns.size() + out_col_to_request_and_agg_idx.size()); - auto out_types = output.cudf_types(); - for (auto [out_col_idx, request_and_agg_idx] : out_col_to_request_and_agg_idx) { - auto [request_idx, agg_idx] = request_and_agg_idx; - - output_columns.at(out_col_idx) = std::move(agg_result.at(request_idx).results.at(agg_idx)); - - // Cast the cudf output to be consistent with the output, which has output types according to - // arrow convention - if (output_columns.at(out_col_idx)->type() != out_types.at(out_col_idx)) { - output_columns.at(out_col_idx) = cudf::cast(output_columns.at(out_col_idx)->view(), - out_types.at(out_col_idx), - ctx.stream(), - ctx.mr()); - } - - std::shared_ptr arrow_type = - to_arrow_type(output_columns.at(out_col_idx)->type().id()); - } - - output.move_into(std::move(output_columns)); - } -}; + output.move_into(std::move(result)); +} } // namespace task diff --git a/cpp/src/groupby_aggregation.cu b/cpp/src/groupby_aggregation.cu new file mode 100644 index 0000000..85177bd --- /dev/null +++ b/cpp/src/groupby_aggregation.cu @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include // cudf::detail::target_type +#include +#include +#include +#include + +namespace legate::dataframe::task { + +std::unique_ptr make_groupby_aggregation(cudf::aggregation::Kind kind) +{ + switch (kind) { + case cudf::aggregation::Kind::SUM: { + return cudf::make_sum_aggregation(); + } + case cudf::aggregation::Kind::PRODUCT: { + return cudf::make_product_aggregation(); + } + case cudf::aggregation::Kind::MIN: { + return cudf::make_min_aggregation(); + } + case cudf::aggregation::Kind::MAX: { + return cudf::make_max_aggregation(); + } + case cudf::aggregation::Kind::COUNT_VALID: { + return cudf::make_count_aggregation(); + } + case cudf::aggregation::Kind::MEAN: { + return cudf::make_mean_aggregation(); + } + // 0 degrees of freedom instead of default 1 to match Arrow's behavior + case cudf::aggregation::Kind::VARIANCE: { + return cudf::make_variance_aggregation(0); + } + case cudf::aggregation::Kind::STD: { + return cudf::make_std_aggregation(0); + } + case cudf::aggregation::Kind::MEDIAN: { + return cudf::make_median_aggregation(); + } + case cudf::aggregation::Kind::NUNIQUE: { + return cudf::make_nunique_aggregation(); + } + default: { + throw std::invalid_argument("Unsupported groupby aggregation"); + } + } +} + +cudf::aggregation::Kind arrow_to_cudf_aggregation(const std::string& agg_name) +{ + std::map agg_map = { + // Direct mappings + {"sum", cudf::aggregation::Kind::SUM}, + {"product", cudf::aggregation::Kind::PRODUCT}, + {"min", cudf::aggregation::Kind::MIN}, + {"max", cudf::aggregation::Kind::MAX}, + {"count", cudf::aggregation::Kind::COUNT_VALID}, + {"mean", cudf::aggregation::Kind::MEAN}, + {"variance", cudf::aggregation::Kind::VARIANCE}, + {"stddev", cudf::aggregation::Kind::STD}, + {"approximate_median", cudf::aggregation::Kind::MEDIAN}, + {"count_distinct", cudf::aggregation::Kind::NUNIQUE}}; + + // {"count_all", cudf::aggregation::Kind::COUNT_ALL}, + // "count_all" could be supported but needs some work as it has 0 inputs + + // Don't do these as we don't support nested types at the moment + // {"list", cudf::aggregation::Kind::COLLECT_LIST}, + // {"tdigest", cudf::aggregation::Kind::TDIGEST} + + // Arrow aggregations with no direct cuDF equivalent: + // any - cudf has as a reduction aggregation but not groupby aggregation + // all - cudf has as a reduction aggregation but not groupby aggregation + // distinct + // first - could map to NTH_ELEMENT with n=0 + // first_last - no equivalent + // kurtosis - no equivalent + // last - could map to NTH_ELEMENT with n=-1 + // min_max - no single equivalent (would need separate MIN/MAX) + // one - no equivalent + // pivot_wider - no equivalent + // skew - no equivalent + + // cuDF aggregations with no direct Arrow equivalent: + // SUM_OF_SQUARES - no equivalent + // M2 - no equivalent + // QUANTILE - no equivalent + // ARGMAX - no equivalent + // ARGMIN - no equivalent + // NTH_ELEMENT - no equivalent + // ROW_NUMBER - no equivalent + // EWMA - no equivalent + // RANK - no equivalent + // COLLECT_SET - no equivalent + // LEAD - no equivalent + // LAG - no equivalent + // PTX - no equivalent + // CUDA - no equivalent + // HOST_UDF - no equivalent + // MERGE_LISTS - no equivalent + // MERGE_SETS - no equivalent + // MERGE_M2 - no equivalent + // COVARIANCE - no equivalent + // CORRELATION - no equivalent + // MERGE_TDIGEST - no equivalent + // HISTOGRAM - no equivalent + // MERGE_HISTOGRAM - no equivalent + // BITWISE_AGG - no equivalent + if (agg_map.count(agg_name) == 0) { + throw std::invalid_argument("Unsupported aggregation: " + agg_name); + } + return agg_map.at(agg_name); +} + +/*static*/ void GroupByAggregationTask::gpu_variant(legate::TaskContext context) +{ + TaskContext ctx{context}; + auto table = argument::get_next_input(ctx); + auto output = argument::get_next_output(ctx); + auto _key_col_idx = argument::get_next_scalar_vector(ctx); + std::vector key_col_idx(_key_col_idx.begin(), _key_col_idx.end()); + + // Get the `column_aggs` task argument + std::vector> column_aggs; + auto column_aggs_size = argument::get_next_scalar(ctx); + for (size_t i = 0; i < column_aggs_size; ++i) { + auto in_col_idx = argument::get_next_scalar(ctx); + auto kind = argument::get_next_scalar(ctx); + auto out_col_idx = argument::get_next_scalar(ctx); + column_aggs.push_back({in_col_idx, arrow_to_cudf_aggregation(kind), out_col_idx}); + } + + // Repartition `table` based on the keys such that each node can do a local groupby. + auto repartitioned = repartition_by_hash(ctx, table.table_view(), key_col_idx); + + // In order to create the aggregation requests, we walk through `column_aggs` and for + // each unique input-column-index, we create an aggregation request and append the + // aggregation-kinds found in `column_aggs`. + std::vector requests; + std::map> out_col_to_request_and_agg_idx; + { + std::map in_col_to_request_idx; + for (const auto& [in_col_idx, kind, out_col_idx] : column_aggs) { + // If this is the first time we see `in_col_idx`, we create a new `aggregation_request` + // with `values` set to the column of `in_col_idx` and an empty aggregation vector. + if (in_col_to_request_idx.find(in_col_idx) == in_col_to_request_idx.end()) { + in_col_to_request_idx[in_col_idx] = requests.size(); + requests.push_back(cudf::groupby::aggregation_request{ + .values = repartitioned->get_column(in_col_idx), .aggregations = {}}); + } + + // Find the `aggregation_request` that belongs to `in_col_idx` + size_t request_idx = in_col_to_request_idx.at(in_col_idx); + auto& request = requests.at(request_idx); + // Add the aggregation kind to the request + request.aggregations.push_back(make_groupby_aggregation(kind)); + + // Record in which index in `requests` and `request.aggregations`, the + // aggregation was added. + out_col_to_request_and_agg_idx[out_col_idx] = {request_idx, request.aggregations.size() - 1}; + } + } + + // Do a local groupby + cudf::groupby::groupby gb_obj(repartitioned->select(key_col_idx), cudf::null_policy::INCLUDE); + auto [unique_keys, agg_result] = gb_obj.aggregate(requests, ctx.stream(), ctx.mr()); + + // Gather the output columns. The key columns goes first. + auto output_columns = unique_keys->release(); + + // Then we add the columns in `agg_result` using the order recorded + // in `out_col_to_request_and_agg_idx`. + output_columns.resize(output_columns.size() + out_col_to_request_and_agg_idx.size()); + auto out_types = output.cudf_types(); + for (auto [out_col_idx, request_and_agg_idx] : out_col_to_request_and_agg_idx) { + auto [request_idx, agg_idx] = request_and_agg_idx; + + output_columns.at(out_col_idx) = std::move(agg_result.at(request_idx).results.at(agg_idx)); + + // Cast the cudf output to be consistent with the output, which has output types according to + // arrow convention + if (output_columns.at(out_col_idx)->type() != out_types.at(out_col_idx)) { + output_columns.at(out_col_idx) = cudf::cast( + output_columns.at(out_col_idx)->view(), out_types.at(out_col_idx), ctx.stream(), ctx.mr()); + } + } + + output.move_into(std::move(output_columns)); +} +} // namespace legate::dataframe::task diff --git a/cpp/src/join.cpp b/cpp/src/join.cpp index a6e2a34..421684e 100644 --- a/cpp/src/join.cpp +++ b/cpp/src/join.cpp @@ -25,165 +25,11 @@ #include #include #include - -#include -#include -#include -#include - -#include #include #include namespace legate::dataframe { namespace task { -namespace { - -/** - * @brief Help function to perform a cudf join operation. - * - * Since cudf's public join API doesn't accept a stream argument, we use the detail API. - */ -std::pair>, - std::unique_ptr>> -cudf_join(TaskContext& ctx, - cudf::table_view lhs, - cudf::table_view rhs, - const std::vector& lhs_keys, - const std::vector& rhs_keys, - cudf::null_equality null_equality, - JoinType join_type) -{ - cudf::hash_join joiner(rhs.select(rhs_keys), null_equality, ctx.stream()); - - switch (join_type) { - case JoinType::INNER: { - return joiner.inner_join( - lhs.select(lhs_keys), std::optional{}, ctx.stream(), ctx.mr()); - } - case JoinType::LEFT: { - return joiner.left_join( - lhs.select(lhs_keys), std::optional{}, ctx.stream(), ctx.mr()); - } - case JoinType::FULL: { - return joiner.full_join( - lhs.select(lhs_keys), std::optional{}, ctx.stream(), ctx.mr()); - } - default: { - throw std::invalid_argument("Unknown JoinType"); - } - } -} - -/** - * @brief Help function to get the left and right out-of-bounds-policy for the specified join type - */ -std::pair out_of_bounds_policy_by_join_type( - JoinType join_type) -{ - switch (join_type) { - case JoinType::INNER: { - return std::make_pair(cudf::out_of_bounds_policy::DONT_CHECK, - cudf::out_of_bounds_policy::DONT_CHECK); - } - case JoinType::LEFT: { - return std::make_pair(cudf::out_of_bounds_policy::DONT_CHECK, - cudf::out_of_bounds_policy::NULLIFY); - } - case JoinType::FULL: { - return std::make_pair(cudf::out_of_bounds_policy::NULLIFY, - cudf::out_of_bounds_policy::NULLIFY); - } - default: { - throw std::invalid_argument("Unknown JoinType"); - } - } -} - -/** - * @brief Help function to perform a cudf join and gather operation. - * - * The result is written to the physical table output - * - * Note that `lhs_table` is only passed for cleanup. - */ -void cudf_join_and_gather(TaskContext& ctx, - cudf::table_view lhs, - cudf::table_view rhs, - const std::vector lhs_keys, - const std::vector rhs_keys, - JoinType join_type, - cudf::null_equality null_equality, - const std::vector lhs_out_cols, - const std::vector rhs_out_cols, - PhysicalTable& output, - std::unique_ptr lhs_table = std::unique_ptr()) -{ - // Perform the join and convert (zero-copy) the resulting indices to columns - auto [lhs_row_idx, rhs_row_idx] = - cudf_join(ctx, lhs, rhs, lhs_keys, rhs_keys, null_equality, join_type); - auto left_indices_span = cudf::device_span{*lhs_row_idx}; - auto right_indices_span = cudf::device_span{*rhs_row_idx}; - auto left_indices_col = cudf::column_view{left_indices_span}; - auto right_indices_col = cudf::column_view{right_indices_span}; - - // Use the index columns to gather the result from the original left and right input columns - auto [left_policy, right_policy] = out_of_bounds_policy_by_join_type(join_type); - - auto left_result = - cudf::gather(lhs.select(lhs_out_cols), left_indices_col, left_policy, ctx.stream(), ctx.mr()); - // Clean up left indices and columns as quickly as possible to reduce peak memory. - // (This is the only reason for passing `lhs_table`.) - lhs_row_idx.reset(); - lhs_table.reset(); - - auto right_result = - cudf::gather(rhs.select(rhs_out_cols), right_indices_col, right_policy, ctx.stream(), ctx.mr()); - - // Finally, create a vector of both the left and right results and move it into the output table - if (get_prefer_eager_allocations() && - !output.unbound()) { // hard to guess if bound so just inspect - output.copy_into(std::move(concat(left_result->release(), right_result->release()))); - } else { - output.move_into(std::move(concat(left_result->release(), right_result->release()))); - } -} - -/** - * @brief Help function to create an empty cudf table with no rows - */ -std::unique_ptr no_rows_table_like(const PhysicalTable& other) -{ - std::vector> columns; - for (const auto& dtype : other.cudf_types()) { - columns.emplace_back(cudf::make_empty_column(dtype)); - } - return std::make_unique(std::move(columns)); -} - -/** - * @brief Help function to "revert" a broadcasted table - * - * The table is passed through on rank 0 and on the other ranks, an empty table is returned. - * The `owners` argument is used to keep new cudf allocations alive - */ -cudf::table_view revert_broadcast_cudf(TaskContext& ctx, - const PhysicalTable& table, - std::vector>& owners) -{ - if (ctx.rank == 0 || table.is_partitioned()) { - return table.table_view(); - } else { - owners.push_back(no_rows_table_like(table)); - return owners.back()->view(); - } -} -/** - * @brief Help function to determine if we need to repartition the tables - * - * If legate broadcast the left- or right-hand side table, we might not need to - * repartition them. This depends on the join type and which table is broadcasted. - */ bool is_repartition_not_needed(const TaskContext& ctx, JoinType join_type, bool lhs_broadcasted, @@ -204,6 +50,8 @@ bool is_repartition_not_needed(const TaskContext& ctx, } } +namespace { + /** * @brief Help function to map legate-dataframe join type to Arrow join type */ @@ -376,131 +224,67 @@ std::shared_ptr arrow_join_and_gather(TaskContext& ctx, } template -class JoinTask : public Task, - needs_communication ? OpCode::JoinConcurrent : OpCode::Join> { - public: - static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{} - .with_has_allocations(true) - .with_concurrent(needs_communication) - .with_elide_device_ctx_sync(true); - - static void cpu_variant(legate::TaskContext context) - { - TaskContext ctx{context}; - const auto lhs = argument::get_next_input(ctx); - const auto rhs = argument::get_next_input(ctx); - const auto lhs_keys = argument::get_next_scalar_vector(ctx); - const auto rhs_keys = argument::get_next_scalar_vector(ctx); - auto join_type = argument::get_next_scalar(ctx); - auto null_equality = argument::get_next_scalar(ctx); - const auto lhs_out_cols = argument::get_next_scalar_vector(ctx); - const auto rhs_out_cols = argument::get_next_scalar_vector(ctx); - auto output = argument::get_next_output(ctx); - - /* Use "is_paritioned" to check if the table is broadcast. */ - const bool lhs_broadcasted = !lhs.is_partitioned(); - const bool rhs_broadcasted = !rhs.is_partitioned(); - if (lhs_broadcasted && rhs_broadcasted && ctx.nranks != 1) { - throw std::runtime_error("join(): cannot have both the lhs and the rhs broadcasted"); - } - - auto arrow_lhs = lhs.arrow_table_view(); - auto arrow_rhs = rhs.arrow_table_view(); - - std::shared_ptr result; - if (is_repartition_not_needed(ctx, join_type, lhs_broadcasted, rhs_broadcasted)) { - result = arrow_join_and_gather(ctx, - - arrow_lhs, - arrow_rhs, - integer_to_string_vector(lhs_keys), - integer_to_string_vector(rhs_keys), - join_type, - null_equality == cudf::null_equality::EQUAL, - integer_to_string_vector(lhs_out_cols), - integer_to_string_vector(rhs_out_cols)); - } else { - // All-to-all repartition to one hash bucket per rank. Matching rows from - // both tables then guaranteed to be on the same rank. - auto repartitioned_lhs = repartition_by_hash(ctx, revert_broadcast_arrow(ctx, lhs), lhs_keys); - auto repartitioned_rhs = repartition_by_hash(ctx, revert_broadcast_arrow(ctx, rhs), rhs_keys); - - result = arrow_join_and_gather(ctx, - repartitioned_lhs, - repartitioned_rhs, - integer_to_string_vector(lhs_keys), - integer_to_string_vector(rhs_keys), - join_type, - null_equality == cudf::null_equality::EQUAL, - integer_to_string_vector(lhs_out_cols), - integer_to_string_vector(rhs_out_cols)); - } - // Finally, create a vector of both the left and right results and move it into the output table - if (get_prefer_eager_allocations() && - !output.unbound()) { // hard to guess if bound so just inspect - output.copy_into(std::move(result)); - } else { - output.move_into(std::move(result)); - } +/*static*/ void JoinTask::cpu_variant(legate::TaskContext context) +{ + TaskContext ctx{context}; + const auto lhs = argument::get_next_input(ctx); + const auto rhs = argument::get_next_input(ctx); + const auto lhs_keys = argument::get_next_scalar_vector(ctx); + const auto rhs_keys = argument::get_next_scalar_vector(ctx); + auto join_type = argument::get_next_scalar(ctx); + auto null_equality = argument::get_next_scalar(ctx); + const auto lhs_out_cols = argument::get_next_scalar_vector(ctx); + const auto rhs_out_cols = argument::get_next_scalar_vector(ctx); + auto output = argument::get_next_output(ctx); + + /* Use "is_paritioned" to check if the table is broadcast. */ + const bool lhs_broadcasted = !lhs.is_partitioned(); + const bool rhs_broadcasted = !rhs.is_partitioned(); + if (lhs_broadcasted && rhs_broadcasted && ctx.nranks != 1) { + throw std::runtime_error("join(): cannot have both the lhs and the rhs broadcasted"); } - static void gpu_variant(legate::TaskContext context) - { - TaskContext ctx{context}; - const auto lhs = argument::get_next_input(ctx); - const auto rhs = argument::get_next_input(ctx); - const auto lhs_keys = argument::get_next_scalar_vector(ctx); - const auto rhs_keys = argument::get_next_scalar_vector(ctx); - auto join_type = argument::get_next_scalar(ctx); - auto null_equality = argument::get_next_scalar(ctx); - const auto lhs_out_cols = argument::get_next_scalar_vector(ctx); - const auto rhs_out_cols = argument::get_next_scalar_vector(ctx); - auto output = argument::get_next_output(ctx); - - /* Use "is_paritioned" to check if the table is broadcast. */ - const bool lhs_broadcasted = !lhs.is_partitioned(); - const bool rhs_broadcasted = !rhs.is_partitioned(); - if (lhs_broadcasted && rhs_broadcasted && ctx.nranks != 1) { - throw std::runtime_error("join(): cannot have both the lhs and the rhs broadcasted"); - } - if (is_repartition_not_needed(ctx, join_type, lhs_broadcasted, rhs_broadcasted)) { - cudf_join_and_gather(ctx, - - lhs.table_view(), - rhs.table_view(), - lhs_keys, - rhs_keys, - join_type, - null_equality, - lhs_out_cols, - rhs_out_cols, - output); - } else { - std::vector> owners; - - // All-to-all repartition to one hash bucket per rank. Matching rows from - // both tables then guaranteed to be on the same rank. - auto cudf_lhs = repartition_by_hash(ctx, revert_broadcast_cudf(ctx, lhs, owners), lhs_keys); - auto cudf_rhs = repartition_by_hash(ctx, revert_broadcast_cudf(ctx, rhs, owners), rhs_keys); - - auto lhs_view = cudf_lhs->view(); // cudf_lhs unique pointer is moved. - cudf_join_and_gather(ctx, - - lhs_view, - cudf_rhs->view(), - lhs_keys, - rhs_keys, - join_type, - null_equality, - lhs_out_cols, - rhs_out_cols, - output, - std::move(cudf_lhs) // to allow early cleanup - ); - } + auto arrow_lhs = lhs.arrow_table_view(); + auto arrow_rhs = rhs.arrow_table_view(); + + std::shared_ptr result; + if (is_repartition_not_needed(ctx, join_type, lhs_broadcasted, rhs_broadcasted)) { + result = arrow_join_and_gather(ctx, + + arrow_lhs, + arrow_rhs, + integer_to_string_vector(lhs_keys), + integer_to_string_vector(rhs_keys), + join_type, + null_equality == cudf::null_equality::EQUAL, + integer_to_string_vector(lhs_out_cols), + integer_to_string_vector(rhs_out_cols)); + } else { + // All-to-all repartition to one hash bucket per rank. Matching rows from + // both tables then guaranteed to be on the same rank. + auto repartitioned_lhs = repartition_by_hash(ctx, revert_broadcast_arrow(ctx, lhs), lhs_keys); + auto repartitioned_rhs = repartition_by_hash(ctx, revert_broadcast_arrow(ctx, rhs), rhs_keys); + + result = arrow_join_and_gather(ctx, + repartitioned_lhs, + repartitioned_rhs, + integer_to_string_vector(lhs_keys), + integer_to_string_vector(rhs_keys), + join_type, + null_equality == cudf::null_equality::EQUAL, + integer_to_string_vector(lhs_out_cols), + integer_to_string_vector(rhs_out_cols)); } -}; - + // Finally, create a vector of both the left and right results and move it into the output table + if (get_prefer_eager_allocations() && + !output.unbound()) { // hard to guess if bound so just inspect + output.copy_into(std::move(result)); + } else { + output.move_into(std::move(result)); + } +} +template class JoinTask; +template class JoinTask; } // namespace task namespace { diff --git a/cpp/src/join.cu b/cpp/src/join.cu new file mode 100644 index 0000000..ca7c091 --- /dev/null +++ b/cpp/src/join.cu @@ -0,0 +1,225 @@ +/* + * Copyright (c) 2023-2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include + +namespace legate::dataframe::task { +/** + * @brief Help function to perform a cudf join operation. + * + * Since cudf's public join API doesn't accept a stream argument, we use the detail API. + */ +std::pair>, + std::unique_ptr>> +cudf_join(TaskContext& ctx, + cudf::table_view lhs, + cudf::table_view rhs, + const std::vector& lhs_keys, + const std::vector& rhs_keys, + cudf::null_equality null_equality, + JoinType join_type) +{ + cudf::hash_join joiner(rhs.select(rhs_keys), null_equality, ctx.stream()); + + switch (join_type) { + case JoinType::INNER: { + return joiner.inner_join( + lhs.select(lhs_keys), std::optional{}, ctx.stream(), ctx.mr()); + } + case JoinType::LEFT: { + return joiner.left_join( + lhs.select(lhs_keys), std::optional{}, ctx.stream(), ctx.mr()); + } + case JoinType::FULL: { + return joiner.full_join( + lhs.select(lhs_keys), std::optional{}, ctx.stream(), ctx.mr()); + } + default: { + throw std::invalid_argument("Unknown JoinType"); + } + } +} + +/** + * @brief Help function to get the left and right out-of-bounds-policy for the specified join type + */ +std::pair out_of_bounds_policy_by_join_type( + JoinType join_type) +{ + switch (join_type) { + case JoinType::INNER: { + return std::make_pair(cudf::out_of_bounds_policy::DONT_CHECK, + cudf::out_of_bounds_policy::DONT_CHECK); + } + case JoinType::LEFT: { + return std::make_pair(cudf::out_of_bounds_policy::DONT_CHECK, + cudf::out_of_bounds_policy::NULLIFY); + } + case JoinType::FULL: { + return std::make_pair(cudf::out_of_bounds_policy::NULLIFY, + cudf::out_of_bounds_policy::NULLIFY); + } + default: { + throw std::invalid_argument("Unknown JoinType"); + } + } +} + +/** + * @brief Help function to perform a cudf join and gather operation. + * + * The result is written to the physical table output + * + * Note that `lhs_table` is only passed for cleanup. + */ +void cudf_join_and_gather(TaskContext& ctx, + cudf::table_view lhs, + cudf::table_view rhs, + const std::vector lhs_keys, + const std::vector rhs_keys, + JoinType join_type, + cudf::null_equality null_equality, + const std::vector lhs_out_cols, + const std::vector rhs_out_cols, + PhysicalTable& output, + std::unique_ptr lhs_table = std::unique_ptr()) +{ + // Perform the join and convert (zero-copy) the resulting indices to columns + auto [lhs_row_idx, rhs_row_idx] = + cudf_join(ctx, lhs, rhs, lhs_keys, rhs_keys, null_equality, join_type); + auto left_indices_span = cudf::device_span{*lhs_row_idx}; + auto right_indices_span = cudf::device_span{*rhs_row_idx}; + auto left_indices_col = cudf::column_view{left_indices_span}; + auto right_indices_col = cudf::column_view{right_indices_span}; + + // Use the index columns to gather the result from the original left and right input columns + auto [left_policy, right_policy] = out_of_bounds_policy_by_join_type(join_type); + + auto left_result = + cudf::gather(lhs.select(lhs_out_cols), left_indices_col, left_policy, ctx.stream(), ctx.mr()); + // Clean up left indices and columns as quickly as possible to reduce peak memory. + // (This is the only reason for passing `lhs_table`.) + lhs_row_idx.reset(); + lhs_table.reset(); + + auto right_result = + cudf::gather(rhs.select(rhs_out_cols), right_indices_col, right_policy, ctx.stream(), ctx.mr()); + + // Finally, create a vector of both the left and right results and move it into the output table + if (get_prefer_eager_allocations() && + !output.unbound()) { // hard to guess if bound so just inspect + output.copy_into(std::move(concat(left_result->release(), right_result->release()))); + } else { + output.move_into(std::move(concat(left_result->release(), right_result->release()))); + } +} + +/** + * @brief Help function to create an empty cudf table with no rows + */ +std::unique_ptr no_rows_table_like(const PhysicalTable& other) +{ + std::vector> columns; + for (const auto& dtype : other.cudf_types()) { + columns.emplace_back(cudf::make_empty_column(dtype)); + } + return std::make_unique(std::move(columns)); +} + +/** + * @brief Help function to "revert" a broadcasted table + * + * The table is passed through on rank 0 and on the other ranks, an empty table is returned. + * The `owners` argument is used to keep new cudf allocations alive + */ +cudf::table_view revert_broadcast_cudf(TaskContext& ctx, + const PhysicalTable& table, + std::vector>& owners) +{ + if (ctx.rank == 0 || table.is_partitioned()) { + return table.table_view(); + } else { + owners.push_back(no_rows_table_like(table)); + return owners.back()->view(); + } +} + +template +/* static */ void JoinTask::gpu_variant(legate::TaskContext context) +{ + TaskContext ctx{context}; + const auto lhs = argument::get_next_input(ctx); + const auto rhs = argument::get_next_input(ctx); + const auto lhs_keys = argument::get_next_scalar_vector(ctx); + const auto rhs_keys = argument::get_next_scalar_vector(ctx); + auto join_type = argument::get_next_scalar(ctx); + auto null_equality = argument::get_next_scalar(ctx); + const auto lhs_out_cols = argument::get_next_scalar_vector(ctx); + const auto rhs_out_cols = argument::get_next_scalar_vector(ctx); + auto output = argument::get_next_output(ctx); + + /* Use "is_paritioned" to check if the table is broadcast. */ + const bool lhs_broadcasted = !lhs.is_partitioned(); + const bool rhs_broadcasted = !rhs.is_partitioned(); + if (lhs_broadcasted && rhs_broadcasted && ctx.nranks != 1) { + throw std::runtime_error("join(): cannot have both the lhs and the rhs broadcasted"); + } + + if (is_repartition_not_needed(ctx, join_type, lhs_broadcasted, rhs_broadcasted)) { + cudf_join_and_gather(ctx, + + lhs.table_view(), + rhs.table_view(), + lhs_keys, + rhs_keys, + join_type, + null_equality, + lhs_out_cols, + rhs_out_cols, + output); + } else { + std::vector> owners; + + // All-to-all repartition to one hash bucket per rank. Matching rows from + // both tables then guaranteed to be on the same rank. + auto cudf_lhs = repartition_by_hash(ctx, revert_broadcast_cudf(ctx, lhs, owners), lhs_keys); + auto cudf_rhs = repartition_by_hash(ctx, revert_broadcast_cudf(ctx, rhs, owners), rhs_keys); + + auto lhs_view = cudf_lhs->view(); // cudf_lhs unique pointer is moved. + cudf_join_and_gather(ctx, + + lhs_view, + cudf_rhs->view(), + lhs_keys, + rhs_keys, + join_type, + null_equality, + lhs_out_cols, + rhs_out_cols, + output, + std::move(cudf_lhs) // to allow early cleanup + ); + } +} +template class JoinTask; +template class JoinTask; +} // namespace legate::dataframe::task diff --git a/cpp/tests/test_groupby_aggregation.cpp b/cpp/tests/test_groupby_aggregation.cpp index d5a7318..41c11e3 100644 --- a/cpp/tests/test_groupby_aggregation.cpp +++ b/cpp/tests/test_groupby_aggregation.cpp @@ -78,8 +78,7 @@ void assert_arrow_tables_equal(const std::vector& keys, TYPED_TEST(GroupByAggregationTest, single_sum_with_nulls) { - using V = TypeParam; - auto SUM = cudf::aggregation::Kind::SUM; + using V = TypeParam; auto keys_column = LogicalColumn(narrow({1, 2, 3, 1, 2, 2, 1, 3, 3, 2})); auto values_column = @@ -104,9 +103,7 @@ TYPED_TEST(GroupByAggregationTest, single_sum_with_nulls) TYPED_TEST(GroupByAggregationTest, nunique_and_max) { - using V = TypeParam; - auto NUNIQUE = cudf::aggregation::Kind::NUNIQUE; - auto MAX = cudf::aggregation::Kind::MAX; + using V = TypeParam; auto keys_column = LogicalColumn(narrow({1, 2, 3, 1, 2, 2, 1, 3, 3, 2})); auto vals1_column = LogicalColumn(narrow({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); diff --git a/cpp/tests/test_task.cpp b/cpp/tests/test_task.cpp index 1a8d78e..8dcdb00 100644 --- a/cpp/tests/test_task.cpp +++ b/cpp/tests/test_task.cpp @@ -96,7 +96,7 @@ struct TaskArgumentMix : public legate::LegateTask { } }; -legate::Library get_library() +legate::Library get_test_library() { static bool prepared = false; auto runtime = legate::Runtime::get_runtime(); @@ -112,7 +112,7 @@ legate::Library get_library() void check_global_row_offset(LogicalTable& input) { auto runtime = legate::Runtime::get_runtime(); - auto task = runtime->create_task(get_library(), GlobalRowOffsetTask::TASK_CONFIG.task_id()); + auto task = runtime->create_task(get_test_library(), GlobalRowOffsetTask::TASK_CONFIG.task_id()); // Launch task LogicalColumn res = @@ -175,7 +175,7 @@ TEST(TaskTest, GlobalRowOffsetEmpty) TEST(TaskTest, TaskArgumentMix) { auto runtime = legate::Runtime::get_runtime(); - auto task = runtime->create_task(get_library(), TaskArgumentMix::TASK_CONFIG.task_id()); + auto task = runtime->create_task(get_test_library(), TaskArgumentMix::TASK_CONFIG.task_id()); auto input = sequence(100, 0); auto output = LogicalColumn::empty_like(input);