Skip to content

Commit 5868337

Browse files
authored
Make sure we can reach the user's requested FVM concurrency (#449)
We previously used the FVM's `ThreadedExecutor` to execute messages on separate threads because the FVM requires 64MiB of stack space. 1. The FVM v3 supported for 8 concurrent threads. 2. The FVM v4 supports up to the number of CPU threads available. Unfortunately, neither version was influenced by the `LOTUS_FVM_CONCURRENCY` environment variable. This patch fixes this by: 1. Moving the thread-pool to the FFI itself (sharing it between FVM versions). 2. Setting the thread-pool size equal to `LOTUS_FVM_CONCURRENCY`. It also defaults `LOTUS_FVM_CONCURRENCY` to the number of available CPU threads instead of the previous 4. NOTE: I've also tried increasing the stack size instead of using threads, but Go _does not_ like it when other foreign mess with the stack size of _its_ threads (but it has no problem if we create our own threads). fixes filecoin-project/lotus#11817
1 parent b715c94 commit 5868337

File tree

4 files changed

+80
-66
lines changed

4 files changed

+80
-66
lines changed

rust/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@ serde = "1.0.117"
4646
serde_tuple = "0.5"
4747
safer-ffi = { version = "0.0.7", features = ["proc_macros"] }
4848
filecoin-proofs-api = { version = "16.1", default-features = false }
49+
yastl = "0.1.2"
4950

5051
[dev-dependencies]
5152
memmap2 = "0.5"
5253
tempfile = "3.0.8"
5354

5455
[features]
55-
default = ["cuda", "multicore-sdr" ]
56+
default = ["cuda", "multicore-sdr"]
5657
blst-portable = ["bls-signatures/blst-portable", "blstrs/portable"]
5758
cuda = ["filecoin-proofs-api/cuda", "rust-gpu-tools/cuda", "fvm2/cuda", "fvm3/cuda", "fvm4/cuda"]
5859
cuda-supraseal = ["filecoin-proofs-api/cuda-supraseal", "rust-gpu-tools/cuda", "fvm3/cuda-supraseal", "fvm4/cuda-supraseal"]

rust/src/fvm/engine.rs

Lines changed: 10 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::collections::HashMap;
2-
use std::ops::RangeInclusive;
32
use std::sync::{Arc, Mutex};
43

54
use anyhow::anyhow;
@@ -18,7 +17,7 @@ use super::externs::CgoExterns;
1817
use super::types::*;
1918

2019
// Generic executor; uses the current (v3) engine types
21-
pub trait CgoExecutor {
20+
pub trait CgoExecutor: Send {
2221
fn execute_message(
2322
&mut self,
2423
msg: Message,
@@ -65,9 +64,6 @@ pub struct MultiEngineContainer {
6564
engines: Mutex<HashMap<EngineVersion, Arc<dyn AbstractMultiEngine + 'static>>>,
6665
}
6766

68-
const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY";
69-
const VALID_CONCURRENCY_RANGE: RangeInclusive<u32> = 1..=128;
70-
7167
impl TryFrom<u32> for EngineVersion {
7268
type Error = anyhow::Error;
7369
fn try_from(value: u32) -> Result<Self, Self::Error> {
@@ -81,41 +77,6 @@ impl TryFrom<u32> for EngineVersion {
8177
}
8278

8379
impl MultiEngineContainer {
84-
/// Constructs a new multi-engine container with the default concurrency (4).
85-
pub fn new() -> MultiEngineContainer {
86-
Self::with_concurrency(4)
87-
}
88-
89-
/// Constructs a new multi-engine container with the concurrency specified in the
90-
/// `LOTUS_FVM_CONCURRENCY` environment variable.
91-
pub fn new_env() -> MultiEngineContainer {
92-
let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) {
93-
Some(v) => v,
94-
None => return Self::new(),
95-
};
96-
let valstr = match valosstr.to_str() {
97-
Some(s) => s,
98-
None => {
99-
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value");
100-
return Self::new();
101-
}
102-
};
103-
let concurrency: u32 = match valstr.parse() {
104-
Ok(v) => v,
105-
Err(e) => {
106-
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}");
107-
return Self::new();
108-
}
109-
};
110-
if !VALID_CONCURRENCY_RANGE.contains(&concurrency) {
111-
log::error!(
112-
"{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}"
113-
);
114-
return Self::new();
115-
}
116-
Self::with_concurrency(concurrency)
117-
}
118-
11980
pub fn with_concurrency(concurrency: u32) -> MultiEngineContainer {
12081
MultiEngineContainer {
12182
engines: Mutex::new(HashMap::new()),
@@ -146,12 +107,6 @@ impl MultiEngineContainer {
146107
}
147108
}
148109

149-
impl Default for MultiEngineContainer {
150-
fn default() -> MultiEngineContainer {
151-
MultiEngineContainer::new()
152-
}
153-
}
154-
155110
// fvm v4 implementation
156111
mod v4 {
157112
use anyhow::anyhow;
@@ -160,10 +115,7 @@ mod v4 {
160115

161116
use fvm4::call_manager::DefaultCallManager as DefaultCallManager4;
162117
use fvm4::engine::{EnginePool as EnginePool4, MultiEngine as MultiEngine4};
163-
use fvm4::executor::{
164-
ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4,
165-
ThreadedExecutor as ThreadedExecutor4,
166-
};
118+
use fvm4::executor::{ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4};
167119
use fvm4::kernel::filecoin::DefaultFilecoinKernel as DefaultFilecoinKernel4;
168120
use fvm4::machine::{DefaultMachine as DefaultMachine4, NetworkConfig};
169121
use fvm4_shared::{chainid::ChainID, clock::ChainEpoch, message::Message};
@@ -175,14 +127,13 @@ mod v4 {
175127
use super::Config;
176128

177129
type CgoMachine4 = DefaultMachine4<CgoBlockstore, CgoExterns>;
178-
type BaseExecutor4 = DefaultExecutor4<DefaultFilecoinKernel4<DefaultCallManager4<CgoMachine4>>>;
179-
type CgoExecutor4 = ThreadedExecutor4<BaseExecutor4>;
130+
type CgoExecutor4 = DefaultExecutor4<DefaultFilecoinKernel4<DefaultCallManager4<CgoMachine4>>>;
180131

181132
fn new_executor(
182133
engine_pool: EnginePool4,
183134
machine: CgoMachine4,
184135
) -> anyhow::Result<CgoExecutor4> {
185-
Ok(ThreadedExecutor4(BaseExecutor4::new(engine_pool, machine)?))
136+
CgoExecutor4::new(engine_pool, machine)
186137
}
187138

188139
impl CgoExecutor for CgoExecutor4 {
@@ -254,8 +205,7 @@ mod v3 {
254205
};
255206
use fvm3::engine::{EnginePool as EnginePool3, MultiEngine as MultiEngine3};
256207
use fvm3::executor::{
257-
ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3,
258-
DefaultExecutor as DefaultExecutor3, ThreadedExecutor as ThreadedExecutor3,
208+
ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3, DefaultExecutor as DefaultExecutor3,
259209
};
260210
use fvm3::machine::{DefaultMachine as DefaultMachine3, NetworkConfig as NetworkConfig3};
261211
use fvm3::trace::ExecutionEvent as ExecutionEvent3;
@@ -284,14 +234,13 @@ mod v3 {
284234
use super::Config;
285235

286236
type CgoMachine3 = DefaultMachine3<CgoBlockstore, CgoExterns>;
287-
type BaseExecutor3 = DefaultExecutor3<DefaultKernel3<DefaultCallManager3<CgoMachine3>>>;
288-
type CgoExecutor3 = ThreadedExecutor3<BaseExecutor3>;
237+
type CgoExecutor3 = DefaultExecutor3<DefaultKernel3<DefaultCallManager3<CgoMachine3>>>;
289238

290239
fn new_executor(
291240
engine_pool: EnginePool3,
292241
machine: CgoMachine3,
293242
) -> anyhow::Result<CgoExecutor3> {
294-
Ok(ThreadedExecutor3(BaseExecutor3::new(engine_pool, machine)?))
243+
CgoExecutor3::new(engine_pool, machine)
295244
}
296245

297246
impl CgoExecutor for CgoExecutor3 {
@@ -533,8 +482,7 @@ mod v2 {
533482
backtrace::Cause as Cause2, DefaultCallManager as DefaultCallManager2,
534483
};
535484
use fvm2::executor::{
536-
ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2,
537-
DefaultExecutor as DefaultExecutor2, ThreadedExecutor as ThreadedExecutor2,
485+
ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2, DefaultExecutor as DefaultExecutor2,
538486
};
539487
use fvm2::machine::{
540488
DefaultMachine as DefaultMachine2, MultiEngine as MultiEngine2,
@@ -565,11 +513,10 @@ mod v2 {
565513
use super::Config;
566514

567515
type CgoMachine2 = DefaultMachine2<CgoBlockstore, CgoExterns>;
568-
type BaseExecutor2 = DefaultExecutor2<DefaultKernel2<DefaultCallManager2<CgoMachine2>>>;
569-
type CgoExecutor2 = ThreadedExecutor2<BaseExecutor2>;
516+
type CgoExecutor2 = DefaultExecutor2<DefaultKernel2<DefaultCallManager2<CgoMachine2>>>;
570517

571518
fn new_executor(machine: CgoMachine2) -> CgoExecutor2 {
572-
ThreadedExecutor2(BaseExecutor2::new(machine))
519+
CgoExecutor2::new(machine)
573520
}
574521

575522
fn bytes_to_block(bytes: RawBytes) -> Option<IpldBlock> {

rust/src/fvm/machine.rs

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::borrow::Cow;
22
use std::convert::{TryFrom, TryInto};
3+
use std::ops::RangeInclusive;
34

45
use anyhow::{anyhow, Context};
56
use cid::Cid;
@@ -27,8 +28,54 @@ use super::types::*;
2728
use crate::destructor;
2829
use crate::util::types::{catch_panic_response, catch_panic_response_no_default, Result};
2930

31+
const STACK_SIZE: usize = 64 << 20; // 64MiB
32+
3033
lazy_static! {
31-
static ref ENGINES: MultiEngineContainer = MultiEngineContainer::new_env();
34+
static ref CONCURRENCY: u32 = get_concurrency();
35+
static ref ENGINES: MultiEngineContainer = MultiEngineContainer::with_concurrency(*CONCURRENCY);
36+
static ref THREAD_POOL: yastl::Pool = yastl::Pool::with_config(
37+
*CONCURRENCY as usize,
38+
yastl::ThreadConfig::new()
39+
.prefix("fvm")
40+
.stack_size(STACK_SIZE)
41+
);
42+
}
43+
44+
const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY";
45+
const VALID_CONCURRENCY_RANGE: RangeInclusive<u32> = 1..=256;
46+
47+
fn available_parallelism() -> u32 {
48+
std::thread::available_parallelism()
49+
.map(usize::from)
50+
.unwrap_or(8) as u32
51+
}
52+
53+
fn get_concurrency() -> u32 {
54+
let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) {
55+
Some(v) => v,
56+
None => return available_parallelism(),
57+
};
58+
let valstr = match valosstr.to_str() {
59+
Some(s) => s,
60+
None => {
61+
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value");
62+
return available_parallelism();
63+
}
64+
};
65+
let concurrency: u32 = match valstr.parse() {
66+
Ok(v) => v,
67+
Err(e) => {
68+
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}");
69+
return available_parallelism();
70+
}
71+
};
72+
if !VALID_CONCURRENCY_RANGE.contains(&concurrency) {
73+
log::error!(
74+
"{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}"
75+
);
76+
return available_parallelism();
77+
}
78+
concurrency
3279
}
3380

3481
#[allow(clippy::too_many_arguments)]
@@ -181,14 +228,32 @@ fn create_fvm_debug_machine(
181228
)
182229
}
183230

231+
fn with_new_stack<F, T>(name: &str, pool: &yastl::Pool, callback: F) -> repr_c::Box<Result<T>>
232+
where
233+
T: Sized + Default + Send,
234+
F: FnOnce() -> anyhow::Result<T> + std::panic::UnwindSafe + Send,
235+
{
236+
let mut res = None;
237+
pool.scoped(|scope| scope.execute(|| res = Some(catch_panic_response(name, callback))));
238+
239+
res.unwrap_or_else(|| {
240+
repr_c::Box::new(Result::err(
241+
format!("failed to schedule {name}")
242+
.into_bytes()
243+
.into_boxed_slice(),
244+
))
245+
})
246+
}
247+
184248
#[ffi_export]
185249
fn fvm_machine_execute_message(
186250
executor: &'_ InnerFvmMachine,
187251
message: c_slice::Ref<u8>,
188252
chain_len: u64,
189253
apply_kind: u64, /* 0: Explicit, _: Implicit */
190254
) -> repr_c::Box<Result<FvmMachineExecuteResponse>> {
191-
catch_panic_response("fvm_machine_execute_message", || {
255+
// Execute in the thread-pool because we need a 64MiB stack.
256+
with_new_stack("fvm_machine_execute_message", &THREAD_POOL, || {
192257
let apply_kind = if apply_kind == 0 {
193258
ApplyKind::Explicit
194259
} else {

0 commit comments

Comments
 (0)