Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions docs/src/threading.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Threading Support

DiskArrays.jl provides support for threaded algorithms when the underlying
storage backend supports thread-safe read operations.

## Threading Trait System

The threading support is based on a trait system that allows backends to
declare whether they support thread-safe operations:

```julia
using DiskArrays

# Check if an array supports threading
is_thread_safe(my_array)

# Get the threading trait
threading_trait(my_array) # Returns ThreadSafe() or NotThreadSafe()
```

## Global Threading Control

You can globally enable or disable threading for all DiskArray operations:

```julia
# Disable threading globally
disable_threading()

# Enable threading globally (default)
enable_threading()

# Check current status
threading_enabled()
```

## Implementing Threading Support in Backends

Backend developers can opt into threading support by overriding the threading_trait method:

```julia
# For a hypothetical ThreadSafeArray type
DiskArrays.threading_trait(::Type{ThreadSafeArray}) = DiskArrays.ThreadSafe()
```

Important: Only declare your backend as thread-safe if:

* Multiple threads can safely read from the storage simultaneously
* The underlying storage system (files, network, etc.) supports concurrent access
* No global state is modified during read operations

## Threaded Algorithms

Currently supported threaded algorithms:

### unique
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really need more than one application to be worth adding this concept

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more would be nice, but unique is all i need for now. i was hoping to just put the infrastructure in place so that others could chip in going forward.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who are these others you speak of ;)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most likely outcome is no one adds anything for years, and we have all this documentation and code just for unique, that hardly anyone uses. Eventually @meggart and I will have to maintain it, and to me its doubtful if thats worth having.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a second commit adds a threaded version of count. 4-5x faster in my hands.

i use these daily. would these or other threaded methods really not speed up your workflow?


```julia
# Will automatically use threading if backend supports it
result = unique(my_disk_array)

# With a function
result = unique(x -> x % 10, my_disk_array)

# Explicitly use threaded version
result = unique(Val{true}(), f, my_disk_array)
```

The threaded unique algorithm:

* Processes each chunk in parallel using `Threads.@threads :greedy`
* Combines results using a reduction operation
* Falls back to single-threaded implementation for non-thread-safe backends

## Performance Considerations

* Threading is most beneficial for arrays with many chunks
* I/O bound operations may see limited speedup due to storage bottlenecks
* Consider the overhead of thread coordination for small arrays
* Test with your specific storage backend and access patterns
5 changes: 5 additions & 0 deletions src/DiskArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ include("zip.jl")
include("show.jl")
include("cached.jl")
include("pad.jl")
include("threading.jl")

export ThreadingTrait, ThreadSafe, NotThreadSafe,
threading_trait, is_thread_safe,
enable_threading, disable_threading, threading_enabled

# The all-in-one macro

Expand Down
17 changes: 15 additions & 2 deletions src/mapreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,22 @@ function Base.count(f, v::AbstractDiskArray)
end
end

Base.unique(v::AbstractDiskArray) = unique(identity, v)
function Base.unique(f, v::AbstractDiskArray)
Base.unique(v::AbstractDiskArray) = unique(should_use_threading(v), identity, v)
Base.unique(f, v::AbstractDiskArray) = unique(should_use_threading(v), f, v)

function Base.unique(::Val{false}, f, v::AbstractDiskArray)
reduce((unique(f, v[c...]) for c in eachchunk(v))) do acc, u
unique!(f, append!(acc, u))
end
end

function Base.unique(::Val{true}, f, v::AbstractDiskArray)
chunks = eachchunk(v)
u = Vector{Vector{eltype(v)}}(undef, length(chunks))
Threads.@threads for i in 1:length(chunks)
u[i] = unique(f, v[chunks[i]...])
end
reduce(u) do acc, t
unique!(f, append!(acc, t))
end
end
71 changes: 71 additions & 0 deletions src/threading.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
ThreadingTrait

Trait to indicate whether a DiskArray backend supports thread-safe operations.
"""
abstract type ThreadingTrait end

"""
ThreadSafe()

Indicates that the DiskArray backend supports thread-safe read operations.
"""
struct ThreadSafe <: ThreadingTrait end

"""
NotThreadSafe()

Indicates that the DiskArray backend does not support thread-safe operations.
Default for all backends unless explicitly overridden.
"""
struct NotThreadSafe <: ThreadingTrait end

"""
threading_trait(::Type{T}) -> ThreadingTrait
threading_trait(x) -> ThreadingTrait

Return the threading trait for a DiskArray type or instance.
Defaults to `NotThreadSafe()` for safety.
"""
threading_trait(::Type{<:AbstractDiskArray}) = NotThreadSafe()
threading_trait(x::AbstractDiskArray) = threading_trait(typeof(x))

"""
is_thread_safe(x) -> Bool

Check if a DiskArray supports thread-safe operations.
"""
is_thread_safe(x) = threading_trait(x) isa ThreadSafe

# Global threading control
const THREADING_ENABLED = Ref(true)

"""
enable_threading(enable::Bool=true)

Globally enable or disable threading for DiskArray operations.
When disabled, all algorithms will run single-threaded regardless of backend support.
"""
enable_threading(enable::Bool=true) = (THREADING_ENABLED[] = enable)

"""
disable_threading()

Globally disable threading for DiskArray operations.
"""
disable_threading() = enable_threading(false)

"""
threading_enabled() -> Bool

Check if threading is globally enabled.
"""
threading_enabled() = THREADING_ENABLED[]

"""
should_use_threading(x) -> Val(Bool)

Determine if threading should be used for a given DiskArray.
Returns true only if both global threading is enabled AND the backend is thread-safe.
"""
should_use_threading(x) = Val(threading_enabled() && is_thread_safe(x))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return the trait singleton here? Val{true} isn't readable there is no indication what true means

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the traits are currently ThreadSafe and NotThreadSafe. these describe a DiskArray, which is related but distinctly separate from whether a threaded algorithm should be used. because you can opt to not do so even if the DiskArray is thread safe using enable_threading(false).

agreed that it the call signature to the 3-arg method of unique is not readable given the boolean. what would you suggest specifically instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, those are made up by an LLM? ;)

What are some words that describe both traits?

Off the top of my head, Threaded and NotThreaded are more general. And maybe threadsafe is too specific anyway: some backends may be threadsafe but still not efficient to use threads with. As this is proposed as a default behavior the question is really "should threading be used", rather than "is it safe to use threading"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new commit now adds a new set of traits to specify whether methods use threads or not, separately from whether backends support them.

2 changes: 2 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ using TraceFuns, Suppressor
# using JET
# JET.report_package(DiskArrays)

include("threading.jl")

@testset "Aqua.jl" begin
Aqua.test_ambiguities([DiskArrays, Base, Core])
Aqua.test_unbound_args(DiskArrays)
Expand Down
79 changes: 79 additions & 0 deletions test/threading.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Mock thread-safe DiskArray for testing
struct MockThreadSafeDiskArray{T,N} <: AbstractDiskArray{T,N}
data::Array{T,N}
chunks::NTuple{N,Int}
end

Base.size(a::MockThreadSafeDiskArray) = size(a.data)
Base.getindex(a::MockThreadSafeDiskArray, i::Int...) = a.data[i...]
DiskArrays.eachchunk(a::MockThreadSafeDiskArray) = DiskArrays.GridChunks(a, a.chunks)
DiskArrays.haschunks(::MockThreadSafeDiskArray) = DiskArrays.Chunked()
DiskArrays.readblock!(a::MockThreadSafeDiskArray, aout, r::AbstractUnitRange...) = (aout .= a.data[r...])

# Override threading trait for our mock array
DiskArrays.threading_trait(::Type{<:MockThreadSafeDiskArray}) = DiskArrays.ThreadSafe()

@testset "Threading Traits" begin
# Test default behavior (not thread safe)
regular_array = ChunkedDiskArray(rand(10, 10), (5, 5))
@test DiskArrays.threading_trait(regular_array) isa DiskArrays.NotThreadSafe
@test !DiskArrays.is_thread_safe(regular_array)

# Test thread-safe array
thread_safe_array = MockThreadSafeDiskArray(rand(10, 10), (5, 5))
@test DiskArrays.threading_trait(thread_safe_array) isa DiskArrays.ThreadSafe
@test DiskArrays.is_thread_safe(thread_safe_array)
end

@testset "Threading Control" begin
# Test global threading control
@test DiskArrays.threading_enabled() # Should be true by default

DiskArrays.disable_threading()
@test !DiskArrays.threading_enabled()

DiskArrays.enable_threading()
@test DiskArrays.threading_enabled()

# Test should_use_threading logic
thread_safe_array = MockThreadSafeDiskArray(rand(10, 10), (5, 5))
regular_array = ChunkedDiskArray(rand(10, 10), (5, 5))

DiskArrays.enable_threading()
@test DiskArrays.should_use_threading(thread_safe_array) == Val{true}()
@test DiskArrays.should_use_threading(regular_array) == Val{false}()

DiskArrays.disable_threading()
@test DiskArrays.should_use_threading(thread_safe_array) == Val{false}()
@test DiskArrays.should_use_threading(regular_array) == Val{false}()

# Reset to default
DiskArrays.enable_threading()
end

@testset "Threaded unique" begin
# Test with thread-safe array
data = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 1, 2, 3, 4, 5, 5, 6, 6, 6, 7]
reshape_data = reshape(data, 4, 5)
thread_safe_array = MockThreadSafeDiskArray(reshape_data, (2, 3))

result = unique(thread_safe_array)
expected = unique(data)
@test sort(result) == sort(expected)

# Test with function
result_with_func = unique(x -> x % 3, thread_safe_array)
expected_with_func = unique(x -> x % 3, data)
@test sort(result_with_func) == sort(expected_with_func)

# Test fallback for non-thread-safe array
regular_array = ChunkedDiskArray(reshape_data, (2, 3))
result_fallback = unique(regular_array)
@test sort(result_fallback) == sort(expected)

# Test with threading disabled
DiskArrays.disable_threading()
result_no_threading = unique(thread_safe_array)
@test sort(result_no_threading) == sort(expected)
DiskArrays.enable_threading() # Reset
end
Loading