Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests-cuda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ jobs:
run: |
cargo run --release --example trace_committer
cargo run --release --example fibonacci
cargo run --release --example keccakf
NUM_THREADS=3 NUM_TASKS=10 cargo run --release --example keccakf
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ ff = { version = "0.13.0", default-features = false }
rayon = "1.10"
parking_lot = "0.12.2"
tracing = "0.1.40"
tracing-subscriber = "0.3.17"
tracing-forest = "0.1.6"
serde_json = "1.0.117"
lazy_static = "1.5.0"
once_cell = "1.19.0"
Expand Down Expand Up @@ -122,6 +124,12 @@ glob = "0.3.2"
bytesize = "2.0"
cc = { version = "1.0", default-features = false }
ctor = "0.5"
tokio = { version = "1.47.1", default-features = false }
toml = "0.8.14"
metrics-tracing-context = "0.16.0"
metrics-util = "0.17.0"
metrics-exporter-prometheus = "0.15.3"
rustls = { version = "0.23", default-features = false }

# default-features = false for no_std
itertools = { version = "0.14.0", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions crates/cuda-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ openvm-cuda-builder.workspace = true
clap = { workspace = true }
rand = { workspace = true }
p3-keccak-air = { workspace = true }
tokio = { workspace = true, features = ["full"] }

[features]
touchemall = ["openvm-cuda-common/touchemall"]
82 changes: 51 additions & 31 deletions crates/cuda-backend/examples/keccakf.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, thread};
use std::sync::Arc;

use openvm_cuda_backend::engine::GpuBabyBearPoseidon2Engine;
use openvm_cuda_common::stream::current_stream_id;
Expand Down Expand Up @@ -42,48 +42,64 @@ const LOG_BLOWUP: usize = 2;
const NUM_PERMUTATIONS: usize = 1 << 10;

fn main() {
std::panic::set_hook(Box::new(|info| {
eprintln!("Thread panicked: {}", info);
std::process::abort();
}));
setup_tracing();

// Allow override via env var NUM_THREADS; default = 4.
// NUM_THREADS: number of OS threads = CUDA streams to use
let num_threads: usize = std::env::var("NUM_THREADS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(8);

// ----- CPU keygen once, shared by all threads -----
let air = TestAir(KeccakAir {});
let engine_cpu = BabyBearPoseidon2Engine::new(
FriParameters::standard_with_100_bits_conjectured_security(LOG_BLOWUP),
);

let mut keygen_builder = engine_cpu.keygen_builder();
let air_id = keygen_builder.add_air(Arc::new(air));
let pk_host = Arc::new(keygen_builder.generate_pk());
let vk = Arc::new(pk_host.get_vk());

// Base seed to derive per-thread RNGs deterministically but independently.
let mut master_rng = create_seeded_rng();
let base_seed: u64 = master_rng.gen();

// ----- Run N independent proofs in parallel -----
thread::scope(|scope| {
for t in 0..num_threads {
.unwrap_or(2);

// NUM_TASKS: number of proofs to run
let num_tasks: usize = std::env::var("NUM_TASKS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(num_threads * 4);

let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
.enable_all()
.build()
.unwrap();

runtime.block_on(async {
// ----- CPU keygen once, shared by all threads -----
let air = TestAir(KeccakAir {});
let engine_cpu = BabyBearPoseidon2Engine::new(
FriParameters::standard_with_100_bits_conjectured_security(LOG_BLOWUP),
);

let mut keygen_builder = engine_cpu.keygen_builder();
let air_id = keygen_builder.add_air(Arc::new(air));
let pk_host = Arc::new(keygen_builder.generate_pk());
let vk = Arc::new(pk_host.get_vk());

// Base seed to derive per-thread RNGs deterministically but independently.
let mut master_rng = create_seeded_rng();
let base_seed: u64 = master_rng.gen();

let mut handles = Vec::new();

for t in 0..num_tasks {
let pk_host = pk_host.clone();
let vk = vk.clone();

scope.spawn(move || {
// Derive a per-thread RNG from a stable base seed + thread index
let thread_seed = base_seed ^ ((t as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15));
let mut rng = StdRng::seed_from_u64(thread_seed);
let handle = tokio::task::spawn(async move {
let task_seed = base_seed ^ ((t as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15));
let mut rng = StdRng::seed_from_u64(task_seed);

// Per-thread random inputs + CPU trace
let inputs = (0..NUM_PERMUTATIONS).map(|_| rng.gen()).collect::<Vec<_>>();
let trace = info_span!("generate_trace", thread=%t)
let trace = info_span!("generate_trace", task=%t)
.in_scope(|| p3_keccak_air::generate_trace_rows::<BabyBear>(inputs, 0));
let cpu_trace = Arc::new(trace);

// GPU: build a per-thread engine (uses per-thread default stream)
println!("[thread {t}] Starting GPU proof");
println!("[task {t}] Starting GPU proof");
let engine_gpu = GpuBabyBearPoseidon2Engine::new(
FriParameters::standard_with_100_bits_conjectured_security(LOG_BLOWUP),
);
Expand All @@ -99,12 +115,16 @@ fn main() {
let proof = engine_gpu.prove(&pk_dev, gpu_ctx);
engine_gpu.verify(&vk, &proof).expect("verification failed");
println!(
"[thread {t} - stream {}] Proof verified ✅",
"[task {t} - stream {}] Proof verified ✅",
current_stream_id().unwrap()
);
});
handles.push(handle);
}
});

println!("\nAll {num_threads} threads completed.");
for handle in handles {
handle.await.expect("task failed");
}
println!("\nAll {num_tasks} tasks completed on {num_threads} threads.");
});
}
8 changes: 5 additions & 3 deletions crates/cuda-common/cuda/src/vpmm_shim.cu
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ int _vpmm_map_and_set_access(CUdeviceptr va, size_t bytes, CUmemGenericAllocatio
return (int)cuMemSetAccess(va, bytes, &acc, 1);
}

int _vpmm_unmap_release(CUdeviceptr va, size_t bytes, CUmemGenericAllocationHandle h) {
CUresult r = cuMemUnmap(va, bytes);
if (r != CUDA_SUCCESS) return (int)r;
int _vpmm_unmap(CUdeviceptr va, size_t bytes) {
return (int)cuMemUnmap(va, bytes);
}

int _vpmm_release(CUmemGenericAllocationHandle h) {
return (int)cuMemRelease(h);
}

Expand Down
15 changes: 8 additions & 7 deletions crates/cuda-common/src/memory_manager/cuda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ extern "C" {
h: CUmemGenericAllocationHandle,
device_ordinal: i32,
) -> i32;
fn _vpmm_unmap_release(va: CUdeviceptr, bytes: usize, h: CUmemGenericAllocationHandle) -> i32;
fn _vpmm_unmap(va: CUdeviceptr, bytes: usize) -> i32;
fn _vpmm_release(h: CUmemGenericAllocationHandle) -> i32;
}

pub(super) unsafe fn vpmm_check_support(device_ordinal: i32) -> Result<bool, CudaError> {
Expand Down Expand Up @@ -67,10 +68,10 @@ pub(super) unsafe fn vpmm_map_and_set_access(
CudaError::from_result(_vpmm_map_and_set_access(va, bytes, h, device_ordinal))
}

pub(super) unsafe fn vpmm_unmap_release(
va: CUdeviceptr,
bytes: usize,
h: CUmemGenericAllocationHandle,
) -> Result<(), CudaError> {
CudaError::from_result(_vpmm_unmap_release(va, bytes, h))
pub(super) unsafe fn vpmm_unmap(va: CUdeviceptr, bytes: usize) -> Result<(), CudaError> {
CudaError::from_result(_vpmm_unmap(va, bytes))
}

pub(super) unsafe fn vpmm_release(h: CUmemGenericAllocationHandle) -> Result<(), CudaError> {
CudaError::from_result(_vpmm_release(h))
}
18 changes: 15 additions & 3 deletions crates/cuda-common/src/memory_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use bytesize::ByteSize;
use crate::{
error::{check, MemoryError},
stream::{
cudaStreamPerThread, cudaStream_t, current_stream_id, default_stream_sync, CudaStreamId,
cudaStreamPerThread, cudaStream_t, current_stream_id, current_stream_sync, CudaStreamId,
},
};

Expand All @@ -22,6 +22,8 @@ use vm_pool::VirtualMemoryPool;
extern "C" {
fn cudaMallocAsync(dev_ptr: *mut *mut c_void, size: usize, stream: cudaStream_t) -> i32;
fn cudaFreeAsync(dev_ptr: *mut c_void, stream: cudaStream_t) -> i32;
fn cudaDeviceGetDefaultMemPool(memPool: *mut *mut c_void, device: i32) -> i32;
fn cudaMemPoolTrimTo(pool: *mut c_void, minBytesToKeep: usize) -> i32;
}

static MM_MAP: OnceLock<Mutex<HashMap<CudaStreamId, MemoryManager>>> = OnceLock::new();
Expand Down Expand Up @@ -101,6 +103,15 @@ impl MemoryManager {
fn is_empty(&self) -> bool {
self.allocated_ptrs.is_empty() && self.pool.is_empty()
}

fn trim_async_pool(&self) {
unsafe {
let mut pool: *mut c_void = std::ptr::null_mut();
if cudaDeviceGetDefaultMemPool(&mut pool, self.pool.device_id) == 0 {
cudaMemPoolTrimTo(pool, 0);
}
}
}
}

impl Drop for MemoryManager {
Expand All @@ -117,6 +128,7 @@ impl Drop for MemoryManager {
self.allocated_ptrs.len()
);
}
self.trim_async_pool();
}
}

Expand Down Expand Up @@ -145,13 +157,13 @@ pub unsafe fn d_free(ptr: *mut c_void) -> Result<(), MemoryError> {
mm.d_free(ptr)?;
// Auto-cleanup pool if everything is freed
if mm.is_empty() {
default_stream_sync()?;
current_stream_sync()?;
tracing::info!(
"GPU mem ({}): Auto-cleanup pool {}",
stream_id,
ByteSize::b(mm.pool.memory_usage() as u64)
);
mm.pool.clear();
mm_map.remove(&stream_id);
}
} else {
panic!(
Expand Down
14 changes: 9 additions & 5 deletions crates/cuda-common/src/memory_manager/vm_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(super) struct VirtualMemoryPool {
pub(super) page_size: usize,

// Device ordinal
device_id: i32,
pub(super) device_id: i32,
}

unsafe impl Send for VirtualMemoryPool {}
Expand Down Expand Up @@ -129,7 +129,7 @@ impl VirtualMemoryPool {
if self.curr_end == self.root {
self.create_new_pages(requested)?;
}
assert!(
debug_assert!(
requested != 0 && requested % self.page_size == 0,
"Requested size must be a multiple of the page size"
);
Expand Down Expand Up @@ -273,8 +273,13 @@ impl VirtualMemoryPool {
"Some allocations are still in use"
);
unsafe {
for (&va, &handle) in &self.active_pages {
vpmm_unmap_release(va, self.page_size, handle).unwrap();
if let Err(e) = vpmm_unmap(self.root, (self.curr_end - self.root) as usize) {
tracing::error!("Failed to unmap VA range: {:?}", e);
}
for &handle in self.active_pages.values() {
if let Err(e) = vpmm_release(handle) {
tracing::error!("Failed to release handle: {:?}", e);
}
}
}
self.active_pages.clear();
Expand All @@ -288,7 +293,6 @@ impl Drop for VirtualMemoryPool {
fn drop(&mut self) {
self.clear();
unsafe {
// Free the virtual address reservation
vpmm_release_va(self.root, VIRTUAL_POOL_SIZE).unwrap();
}
}
Expand Down
18 changes: 9 additions & 9 deletions crates/cuda-common/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ extern "C" {
fn cudaEventElapsedTime(ms: *mut f32, start: cudaEvent_t, end: cudaEvent_t) -> i32;
}

pub type CudaStreamId = u64;

pub fn current_stream_id() -> Result<CudaStreamId, CudaError> {
let mut id = 0;
check(unsafe { cudaStreamGetId(cudaStreamPerThread, &mut id) })?;
Ok(id)
}

#[allow(non_camel_case_types)]
pub type cudaStream_t = *mut c_void;

Expand Down Expand Up @@ -75,7 +67,15 @@ pub type cudaEvent_t = *mut c_void;
#[allow(non_upper_case_globals)]
pub const cudaStreamPerThread: cudaStream_t = 0x02 as cudaStream_t;

pub fn default_stream_sync() -> Result<(), CudaError> {
pub type CudaStreamId = u64;

pub fn current_stream_id() -> Result<CudaStreamId, CudaError> {
let mut id = 0;
check(unsafe { cudaStreamGetId(cudaStreamPerThread, &mut id) })?;
Ok(id)
}

pub fn current_stream_sync() -> Result<(), CudaError> {
check(unsafe { cudaStreamSynchronize(cudaStreamPerThread) })
}

Expand Down
Loading
Loading