Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions ext/PlotsExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ logs_to_df(logs::Dict, ::Val{target}; kwargs...) where target =
Dagger.render_logs(logs::Dict, ::Val{:plots_gantt};
target=:execution,
colors, name_to_color, color_by=:fn,
show_task_ids=true,
kwargs...)

Render a Gantt chart of task execution in `logs` using Plots.
Expand All @@ -134,13 +135,16 @@ Keyword arguments affect rendering behavior:
- `colors`: A list of colors to use for rendering.
- `name_to_color`: A function mapping names to colors.
- `color_by`: Whether to color by function name (`:fn`) or processor name (`:proc`).
- `show_task_ids`: Whether to display task IDs on each task bar (default: `true`). Only applies to `:execution` target.
- `kwargs` are passed to `plot` directly.
"""
function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt};
target=:execution,
colors=Dagger.Viz.default_colors,
name_to_color=Dagger.Viz.name_to_color,
color_by=:fn, kwargs...)
color_by=:fn,
show_task_ids=true,
kwargs...)
df = logs_to_df(logs, Val{target}(); colors, name_to_color, color_by)
y_elem = if target == :execution || target == :processor
:proc_name
Expand Down Expand Up @@ -181,12 +185,24 @@ function Dagger.render_logs(logs::Dict, ::Val{:plots_gantt};
end
end

return plot(r; color=permutedims(df.color), labels,
yticks=(1.5:(nrow(df) + 0.5), u),
xlabel="Time (seconds)", ylabel,
xlim=(0.0, (global_t_end - global_t_start) / 1e9),
legendalpha=0, legend=:outertopright,
kwargs...)
plt = plot(r; color=permutedims(df.color), labels,
yticks=(1.5:(nrow(df) + 0.5), u),
xlabel="Time (seconds)", ylabel,
xlim=(0.0, (global_t_end - global_t_start) / 1e9),
legendalpha=0, legend=:outertopright,
kwargs...)

# Add task ID annotations to each task bar
if show_task_ids && target == :execution
for (i, row) in enumerate(eachrow(df))
# Calculate center position of the task bar
x_center = t_start[i] + duration[i] / 2
y_center = dy[row[y_elem]] + 0.5
annotate!(plt, x_center, y_center, text(string(row.tid), :center, 8, :black))
end
end

return plt
end

end # module PlotsExt
2 changes: 1 addition & 1 deletion src/array/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ function Base.fetch(c::DArray{T}) where T
sz = size(thunks)
dmn = domain(c)
dmnchunks = domainchunks(c)
return fetch(Dagger.spawn(Options(meta=true), thunks...) do results...
return fetch(Dagger.spawn(Options(meta=true, name="fetch(DArray)"), thunks...) do results...
t = eltype(fetch(results[1]))
DArray(t, dmn, dmnchunks, reshape(Any[results...], sz),
c.partitioning, c.concat)
Expand Down
2 changes: 1 addition & 1 deletion src/array/indexing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ function Base.getindex(A::DArray, idx...)
sz = ntuple(i->length(inds[i]), nd)
# TODO: Pad out to same number of dims?
part = nd == length(A.partitioning.blocksize) ? A.partitioning : auto_blocks(sz)
B = zeros(part, eltype(A), sz) # FIXME: Use undef initializer
B = DArray{eltype(A)}(undef, part, sz)
copyto!(B, A_view)
if size(A_view) != sz
# N.B. Base automatically transposes a row vector to a column vector
Expand Down
2 changes: 1 addition & 1 deletion src/array/map-reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ function stage(ctx::Context, r::MapReduce{T,N}) where {T,N}
dims = r.dims === nothing ? Colon() : r.dims
reduced_parts = map(chunks(inp)) do part
if r.op_inner !== nothing
Dagger.@spawn r.op_inner(r.f, part; dims, init=r.init)
Dagger.@spawn name="mapreduce inner" r.op_inner(r.f, part; dims, init=r.init)
else
Dagger.@spawn mapreduce(r.f, r.op_outer, part; dims=dims, init=r.init)
end
Expand Down
126 changes: 55 additions & 71 deletions src/stencil.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,83 +18,77 @@ function validate_neigh_dist(neigh_dist, size)
end
end

function load_neighbor_edge(arr, dim, dir, neigh_dist)
# Load a halo region from a neighboring chunk
# region_code: N-tuple where each element is -1 (low), 0 (full extent), or +1 (high)
# For dimensions with code 0, we take the full extent of the array
# For dimensions with code -1, we take the last neigh_dist elements (to go to neighbor's low side)
# For dimensions with code +1, we take the first neigh_dist elements (to go to neighbor's high side)
function load_neighbor_region(arr, region_code::NTuple{N,Int}, neigh_dist) where N
validate_neigh_dist(neigh_dist, size(arr))
if dir == -1
start_idx = CartesianIndex(ntuple(i -> i == dim ? (lastindex(arr, i) - neigh_dist + 1) : firstindex(arr, i), ndims(arr)))
stop_idx = CartesianIndex(ntuple(i -> i == dim ? lastindex(arr, i) : lastindex(arr, i), ndims(arr)))
elseif dir == 1
start_idx = CartesianIndex(ntuple(i -> i == dim ? firstindex(arr, i) : firstindex(arr, i), ndims(arr)))
stop_idx = CartesianIndex(ntuple(i -> i == dim ? (firstindex(arr, i) + neigh_dist - 1) : lastindex(arr, i), ndims(arr)))
end
start_idx = CartesianIndex(ntuple(N) do i
if region_code[i] == -1
lastindex(arr, i) - neigh_dist + 1
else
firstindex(arr, i)
end
end)
stop_idx = CartesianIndex(ntuple(N) do i
if region_code[i] == +1
firstindex(arr, i) + neigh_dist - 1
else
lastindex(arr, i)
end
end)
# FIXME: Don't collect
return move(task_processor(), collect(@view arr[start_idx:stop_idx]))
end
function load_neighbor_corner(arr, corner_side, neigh_dist)
validate_neigh_dist(neigh_dist, size(arr))
start_idx = CartesianIndex(ntuple(i -> corner_side[i] == 0 ? (lastindex(arr, i) - neigh_dist + 1) : firstindex(arr, i), ndims(arr)))
stop_idx = CartesianIndex(ntuple(i -> corner_side[i] == 0 ? lastindex(arr, i) : (firstindex(arr, i) + neigh_dist - 1), ndims(arr)))
return move(task_processor(), collect(@view arr[start_idx:stop_idx]))
end
function select_neighborhood_chunks(chunks, idx, neigh_dist, boundary)
validate_neigh_dist(neigh_dist)

N = ndims(chunks)
# FIXME: Depends on neigh_dist and chunk size
chunk_dist = 1

# Get the center
accesses = Any[chunks[idx]]

# Get the edges
for dim in 1:ndims(chunks)
for dir in (-1, +1)
new_idx = idx + CartesianIndex(ntuple(i -> i == dim ? dir*chunk_dist : 0, ndims(chunks)))
if is_past_boundary(size(chunks), new_idx)
if boundary_has_transition(boundary)
new_idx = boundary_transition(boundary, new_idx, size(chunks))
else
new_idx = idx
end
chunk = chunks[new_idx]
push!(accesses, Dagger.@spawn load_boundary_edge(boundary, chunk, dim, dir, neigh_dist))
else
chunk = chunks[new_idx]
push!(accesses, Dagger.@spawn load_neighbor_edge(chunk, dim, dir, neigh_dist))
end
# Iterate over all 3^N - 1 halo regions (excluding center)
# Each region is identified by a code tuple where each element is -1, 0, or +1
for i in 0:(3^N - 1)
region_code = ntuple(N) do d
((i ÷ 3^(d-1)) % 3) - 1 # Maps 0,1,2 -> -1,0,+1
end
end
all(==(0), region_code) && continue # Skip center

# Get the corners
for corner_num in 1:(2^ndims(chunks))
corner_side = CartesianIndex(reverse(ntuple(ndims(chunks)) do i
((corner_num-1) >> (((ndims(chunks) - i) + 1) - 1)) & 1
end))
corner_new_idx = CartesianIndex(ntuple(ndims(chunks)) do i
corner_shift = iszero(corner_side[i]) ? -1 : 1
return idx[i] + corner_shift
# Compute the chunk offset for this region
# For each dimension: -1 means go to previous chunk, +1 means go to next chunk, 0 means same chunk
chunk_offset = CartesianIndex(ntuple(N) do d
region_code[d] * chunk_dist
end)
if is_past_boundary(size(chunks), corner_new_idx)
new_idx = idx + chunk_offset

if is_past_boundary(size(chunks), new_idx)
if boundary_has_transition(boundary)
corner_new_idx = boundary_transition(boundary, corner_new_idx, size(chunks))
new_idx = boundary_transition(boundary, new_idx, size(chunks))
else
corner_new_idx = idx
new_idx = idx
end
chunk = chunks[corner_new_idx]
push!(accesses, Dagger.@spawn load_boundary_corner(boundary, chunk, corner_side, neigh_dist))
chunk = chunks[new_idx]
push!(accesses, Dagger.@spawn load_boundary_region(boundary, chunk, region_code, neigh_dist))
else
chunk = chunks[corner_new_idx]
push!(accesses, Dagger.@spawn load_neighbor_corner(chunk, corner_side, neigh_dist))
chunk = chunks[new_idx]
push!(accesses, Dagger.@spawn load_neighbor_region(chunk, region_code, neigh_dist))
end
end

@assert length(accesses) == 1+2*ndims(chunks)+2^ndims(chunks) "Accesses mismatch: $(length(accesses))"
@assert length(accesses) == 3^N "Accesses mismatch: expected $(3^N), got $(length(accesses))"
return accesses
end
function build_halo(neigh_dist, boundary, center, all_neighbors...)
function build_halo(neigh_dist, boundary, center, all_halos...)
N = ndims(center)
edges = all_neighbors[1:(2*N)]
corners = all_neighbors[((2^N)+1):end]
@assert length(edges) == 2*N && length(corners) == 2^N "Halo mismatch: edges=$(length(edges)) corners=$(length(corners))"
return HaloArray(center, (edges...,), (corners...,), ntuple(_->neigh_dist, N))
expected_halos = 3^N - 1
@assert length(all_halos) == expected_halos "Halo mismatch: N=$N expected $expected_halos halos, got $(length(all_halos))"
return HaloArray(center, (all_halos...,), ntuple(_->neigh_dist, N))
end
function load_neighborhood(arr::HaloArray{T,N}, idx) where {T,N}
@assert all(arr.halo_width .== arr.halo_width[1])
Expand All @@ -121,31 +115,21 @@ struct Wrap end
boundary_has_transition(::Wrap) = true
boundary_transition(::Wrap, idx, size) =
CartesianIndex(ntuple(i -> mod1(idx[i], size[i]), length(size)))
load_boundary_edge(::Wrap, arr, dim, dir, neigh_dist) = load_neighbor_edge(arr, dim, dir, neigh_dist)
load_boundary_corner(::Wrap, arr, corner_side, neigh_dist) = load_neighbor_corner(arr, corner_side, neigh_dist)
load_boundary_region(::Wrap, arr, region_code, neigh_dist) = load_neighbor_region(arr, region_code, neigh_dist)

struct Pad{T}
padval::T
end
boundary_has_transition(::Pad) = false
function load_boundary_edge(pad::Pad, arr, dim, dir, neigh_dist)
if dir == -1
start_idx = CartesianIndex(ntuple(i -> i == dim ? (lastindex(arr, i) - neigh_dist + 1) : firstindex(arr, i), ndims(arr)))
stop_idx = CartesianIndex(ntuple(i -> i == dim ? lastindex(arr, i) : lastindex(arr, i), ndims(arr)))
elseif dir == 1
start_idx = CartesianIndex(ntuple(i -> i == dim ? firstindex(arr, i) : firstindex(arr, i), ndims(arr)))
stop_idx = CartesianIndex(ntuple(i -> i == dim ? (firstindex(arr, i) + neigh_dist - 1) : lastindex(arr, i), ndims(arr)))
function load_boundary_region(pad::Pad, arr, region_code::NTuple{N,Int}, neigh_dist) where N
# Compute the size of this halo region
# For dimensions with code 0, use full array size
# For dimensions with code -1 or +1, use neigh_dist
region_size = ntuple(N) do i
region_code[i] == 0 ? size(arr, i) : neigh_dist
end
edge_size = ntuple(i -> length(start_idx[i]:stop_idx[i]), ndims(arr))
# FIXME: return Fill(pad.padval, edge_size)
return move(task_processor(), fill(pad.padval, edge_size))
end
function load_boundary_corner(pad::Pad, arr, corner_side, neigh_dist)
start_idx = CartesianIndex(ntuple(i -> corner_side[i] == 0 ? (lastindex(arr, i) - neigh_dist + 1) : firstindex(arr, i), ndims(arr)))
stop_idx = CartesianIndex(ntuple(i -> corner_side[i] == 0 ? lastindex(arr, i) : (firstindex(arr, i) + neigh_dist - 1), ndims(arr)))
corner_size = ntuple(i -> length(start_idx[i]:stop_idx[i]), ndims(arr))
# FIXME: return Fill(pad.padval, corner_size)
return move(task_processor(), fill(pad.padval, corner_size))
# FIXME: return Fill(pad.padval, region_size)
return move(task_processor(), fill(pad.padval, region_size))
end

"""
Expand Down
12 changes: 2 additions & 10 deletions src/stream-transfer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ function stream_push_values!(fetcher::RemoteChannelFetcher, T, our_store::Stream
our_uid = our_store.uid
their_uid = their_stream.uid
if _THEIR_TID[] == 0
_THEIR_TID[] = remotecall_fetch(1) do
lock(Sch.EAGER_ID_MAP) do id_map
id_map[their_uid]
end
end
_THEIR_TID[] = Int(their_uid)
end
their_tid = _THEIR_TID[]
@dagdebug our_tid :stream_push "taking output value: $our_tid -> $their_tid"
Expand Down Expand Up @@ -41,11 +37,7 @@ function stream_pull_values!(fetcher::RemoteChannelFetcher, T, our_store::Stream
our_uid = our_store.uid
their_uid = their_stream.uid
if _THEIR_TID[] == 0
_THEIR_TID[] = remotecall_fetch(1) do
lock(Sch.EAGER_ID_MAP) do id_map
id_map[their_uid]
end
end
_THEIR_TID[] = Int(their_uid)
end
their_tid = _THEIR_TID[]
@dagdebug our_tid :stream_pull "pulling input value: $their_tid -> $our_tid"
Expand Down
Loading