Skip to content

Commit 46e33b0

Browse files
authored
Separate more cuda code (#99)
1 parent 6f808df commit 46e33b0

File tree

12 files changed

+652
-573
lines changed

12 files changed

+652
-573
lines changed

cpp/include/legate_dataframe/csv.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class CSVWrite : public Task<CSVWrite, OpCode::CSVWrite> {
3838
.with_has_allocations(true)
3939
.with_elide_device_ctx_sync(true)
4040
.with_has_side_effect(true);
41+
static constexpr auto CPU_VARIANT_OPTIONS =
42+
legate::VariantOptions{}.with_has_allocations(true).with_has_side_effect(true);
4143
static void cpu_variant(legate::TaskContext context);
4244
static void gpu_variant(legate::TaskContext context);
4345
};

cpp/include/legate_dataframe/filling.hpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024, NVIDIA CORPORATION.
2+
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,9 +17,20 @@
1717
#pragma once
1818

1919
#include <legate_dataframe/core/column.hpp>
20+
#include <legate_dataframe/core/library.hpp>
2021

2122
namespace legate::dataframe {
2223

24+
namespace task {
25+
26+
class SequenceTask : public Task<SequenceTask, OpCode::Sequence> {
27+
public:
28+
static void cpu_variant(legate::TaskContext context);
29+
static void gpu_variant(legate::TaskContext context);
30+
};
31+
32+
} // namespace task
33+
2334
/**
2435
* @brief Fills a column with a sequence of int64 values
2536
*
@@ -36,9 +47,6 @@ namespace legate::dataframe {
3647
* Notice, this is primarily for C++ testing and examples for now. TODO: implement
3748
* all of the cudf features <https://github.com/rapidsai/legate-dataframe/issues/74>
3849
*
39-
* @throws cudf::logic_error if @p init is not numeric.
40-
* @throws cudf::logic_error if @p size is < 0.
41-
*
4250
* @param size Size of the output column
4351
* @param init First value in the sequence
4452
* @return The result column (int64) containing the generated sequence

cpp/include/legate_dataframe/groupby_aggregation.hpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,23 @@
1919
#include <string>
2020
#include <vector>
2121

22-
#include <cudf/aggregation.hpp>
23-
22+
#include <legate_dataframe/core/library.hpp>
2423
#include <legate_dataframe/core/table.hpp>
2524

2625
namespace legate::dataframe {
26+
namespace task {
27+
class GroupByAggregationTask : public Task<GroupByAggregationTask, OpCode::GroupByAggregation> {
28+
public:
29+
static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{}
30+
.with_has_allocations(true)
31+
.with_concurrent(true)
32+
.with_elide_device_ctx_sync(true);
33+
static constexpr auto CPU_VARIANT_OPTIONS =
34+
legate::VariantOptions{}.with_has_allocations(true).with_concurrent(true);
35+
static void cpu_variant(legate::TaskContext context);
36+
static void gpu_variant(legate::TaskContext context);
37+
};
38+
} // namespace task
2739

2840
/**
2941
* @brief Perform a groupby and aggregation in a single operation.

cpp/include/legate_dataframe/join.hpp

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
2+
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,13 +20,39 @@
2020

2121
#include <cudf/types.hpp> // cudf::null_equality
2222

23+
#include <legate_dataframe/core/library.hpp>
2324
#include <legate_dataframe/core/table.hpp>
2425

2526
namespace legate::dataframe {
26-
2727
enum class JoinType : int32_t { INNER = 0, LEFT, FULL };
2828
enum class BroadcastInput : int32_t { AUTO = 0, LEFT, RIGHT };
2929

30+
namespace task {
31+
template <bool needs_communication>
32+
class JoinTask : public Task<JoinTask<needs_communication>,
33+
needs_communication ? OpCode::JoinConcurrent : OpCode::Join> {
34+
public:
35+
static constexpr auto GPU_VARIANT_OPTIONS = legate::VariantOptions{}
36+
.with_has_allocations(true)
37+
.with_concurrent(needs_communication)
38+
.with_elide_device_ctx_sync(true);
39+
static constexpr auto CPU_VARIANT_OPTIONS =
40+
legate::VariantOptions{}.with_has_allocations(true).with_concurrent(needs_communication);
41+
42+
static void cpu_variant(legate::TaskContext context);
43+
static void gpu_variant(legate::TaskContext context);
44+
};
45+
/**
46+
* @brief Help function to determine if we need to repartition the tables
47+
*
48+
* If legate broadcast the left- or right-hand side table, we might not need to
49+
* repartition them. This depends on the join type and which table is broadcasted.
50+
*/
51+
bool is_repartition_not_needed(const TaskContext& ctx,
52+
JoinType join_type,
53+
bool lhs_broadcasted,
54+
bool rhs_broadcasted);
55+
} // namespace task
3056
/**
3157
* @brief Perform a join between the specified tables.
3258
*

cpp/src/filling.cpp

Lines changed: 20 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
#include <legate.h>
1818

19-
#include <cudf/filling.hpp>
20-
#include <cudf/scalar/scalar.hpp>
21-
2219
#include <legate_dataframe/core/library.hpp>
2320
#include <legate_dataframe/core/task_argument.hpp>
2421
#include <legate_dataframe/core/task_context.hpp>
@@ -27,55 +24,29 @@
2724
namespace legate::dataframe {
2825
namespace task {
2926

30-
class SequenceTask : public Task<SequenceTask, OpCode::Sequence> {
31-
public:
32-
static void cpu_variant(legate::TaskContext context)
33-
{
34-
TaskContext ctx{context};
35-
auto global_size = argument::get_next_scalar<size_t>(ctx);
36-
auto global_init = argument::get_next_scalar<int64_t>(ctx);
37-
auto output = argument::get_next_output<PhysicalColumn>(ctx);
38-
argument::get_parallel_launch_task(ctx);
39-
auto [local_start, local_size] = evenly_partition_work(global_size, ctx.rank, ctx.nranks);
40-
auto local_init = global_init + local_start;
41-
42-
if (local_size == 0) {
43-
output.bind_empty_data();
44-
return;
45-
}
46-
47-
arrow::Int64Builder long_builder = arrow::Int64Builder();
48-
auto status = long_builder.Reserve(local_size);
49-
for (size_t i = 0; i < local_size; i++) {
50-
long_builder.UnsafeAppend(local_init + i);
51-
}
52-
auto local_array = ARROW_RESULT(long_builder.Finish());
53-
output.move_into(std::move(local_array));
27+
/*static*/ void SequenceTask::cpu_variant(legate::TaskContext context)
28+
{
29+
TaskContext ctx{context};
30+
auto global_size = argument::get_next_scalar<size_t>(ctx);
31+
auto global_init = argument::get_next_scalar<int64_t>(ctx);
32+
auto output = argument::get_next_output<PhysicalColumn>(ctx);
33+
argument::get_parallel_launch_task(ctx);
34+
auto [local_start, local_size] = evenly_partition_work(global_size, ctx.rank, ctx.nranks);
35+
auto local_init = global_init + local_start;
36+
37+
if (local_size == 0) {
38+
output.bind_empty_data();
39+
return;
5440
}
5541

56-
static void gpu_variant(legate::TaskContext context)
57-
{
58-
TaskContext ctx{context};
59-
auto global_size = argument::get_next_scalar<size_t>(ctx);
60-
auto global_init = argument::get_next_scalar<int64_t>(ctx);
61-
auto output = argument::get_next_output<PhysicalColumn>(ctx);
62-
argument::get_parallel_launch_task(ctx);
63-
64-
auto [local_start, local_size] = evenly_partition_work(global_size, ctx.rank, ctx.nranks);
65-
auto local_init = global_init + local_start;
66-
67-
if (local_size == 0) {
68-
output.bind_empty_data();
69-
return;
70-
}
71-
72-
cudf::numeric_scalar<int64_t> cudf_init(local_init, true, ctx.stream(), ctx.mr());
73-
auto res = cudf::sequence(local_size, cudf_init, ctx.stream(), ctx.mr());
74-
75-
output.move_into(std::move(res));
42+
arrow::Int64Builder long_builder = arrow::Int64Builder();
43+
auto status = long_builder.Reserve(local_size);
44+
for (size_t i = 0; i < local_size; i++) {
45+
long_builder.UnsafeAppend(local_init + i);
7646
}
77-
};
78-
47+
auto local_array = ARROW_RESULT(long_builder.Finish());
48+
output.move_into(std::move(local_array));
49+
}
7950
} // namespace task
8051

8152
LogicalColumn sequence(size_t size, int64_t init)

cpp/src/filling.cu

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <legate.h>
18+
19+
#include <cudf/filling.hpp>
20+
#include <cudf/scalar/scalar.hpp>
21+
22+
#include <legate_dataframe/core/task_argument.hpp>
23+
#include <legate_dataframe/core/task_context.hpp>
24+
#include <legate_dataframe/filling.hpp>
25+
26+
namespace legate::dataframe::task {
27+
/*static*/ void SequenceTask::gpu_variant(legate::TaskContext context)
28+
{
29+
TaskContext ctx{context};
30+
auto global_size = argument::get_next_scalar<size_t>(ctx);
31+
auto global_init = argument::get_next_scalar<int64_t>(ctx);
32+
auto output = argument::get_next_output<PhysicalColumn>(ctx);
33+
argument::get_parallel_launch_task(ctx);
34+
35+
auto [local_start, local_size] = evenly_partition_work(global_size, ctx.rank, ctx.nranks);
36+
auto local_init = global_init + local_start;
37+
38+
if (local_size == 0) {
39+
output.bind_empty_data();
40+
return;
41+
}
42+
43+
cudf::numeric_scalar<int64_t> cudf_init(local_init, true, ctx.stream(), ctx.mr());
44+
auto res = cudf::sequence(local_size, cudf_init, ctx.stream(), ctx.mr());
45+
46+
output.move_into(std::move(res));
47+
}
48+
} // namespace legate::dataframe::task

0 commit comments

Comments
 (0)