Skip to content

Commit 615c24a

Browse files
committed
Only use internal runtime in VssStore
We previously attempted to drop the internal runtime from `VssStore`, resulting into blocking behavior. While we recently made changes that improved our situation (having VSS CI pass again pretty reliably), we just ran into yet another case where the VSS CI hung (cf. https://github.com/lightningdevkit/vss-server/actions/runs/19023212819/job/54322173817?pr=59). Here we attempt to restore even more of the original pre- ab3d78d / #623 behavior to get rid of the reappearing blocking behavior, i.e., only use the internal runtime in `VssStore`.
1 parent eaad8f5 commit 615c24a

File tree

2 files changed

+37
-43
lines changed

2 files changed

+37
-43
lines changed

src/builder.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -731,8 +731,7 @@ impl NodeBuilder {
731731

732732
let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes();
733733

734-
let vss_store =
735-
VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime));
734+
let vss_store = VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider);
736735
build_with_store_internal(
737736
config,
738737
self.chain_data_source_config.as_ref(),

src/io/vss_store.rs

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use vss_client::util::retry::{
3535
use vss_client::util::storable_builder::{EntropySource, StorableBuilder};
3636

3737
use crate::io::utils::check_namespace_key_validity;
38-
use crate::runtime::Runtime;
3938

4039
type CustomRetryPolicy = FilteredRetryPolicy<
4140
JitteredRetryPolicy<
@@ -55,7 +54,6 @@ pub struct VssStore {
5554
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
5655
// operations aren't sensitive to the order of execution.
5756
next_version: AtomicU64,
58-
runtime: Arc<Runtime>,
5957
// A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned
6058
// blocking task to finish while the blocked thread had acquired the reactor. In particular,
6159
// this works around a previously-hit case where a concurrent call to
@@ -68,7 +66,7 @@ pub struct VssStore {
6866
impl VssStore {
6967
pub(crate) fn new(
7068
base_url: String, store_id: String, vss_seed: [u8; 32],
71-
header_provider: Arc<dyn VssHeaderProvider>, runtime: Arc<Runtime>,
69+
header_provider: Arc<dyn VssHeaderProvider>,
7270
) -> Self {
7371
let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider));
7472
let next_version = AtomicU64::new(1);
@@ -86,7 +84,7 @@ impl VssStore {
8684
.unwrap(),
8785
);
8886

89-
Self { inner, next_version, runtime, internal_runtime }
87+
Self { inner, next_version, internal_runtime }
9088
}
9189

9290
// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -133,13 +131,14 @@ impl KVStoreSync for VssStore {
133131
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await };
134132
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
135133
// times out.
136-
let spawned_fut = internal_runtime.spawn(async move {
137-
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
138-
let msg = "VssStore::read timed out";
139-
Error::new(ErrorKind::Other, msg)
140-
})
141-
});
142-
self.runtime.block_on(spawned_fut).expect("We should always finish")?
134+
tokio::task::block_in_place(move || {
135+
internal_runtime.block_on(async move {
136+
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
137+
let msg = "VssStore::read timed out";
138+
Error::new(ErrorKind::Other, msg)
139+
})
140+
})?
141+
})
143142
}
144143

145144
fn write(
@@ -171,13 +170,14 @@ impl KVStoreSync for VssStore {
171170
};
172171
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
173172
// times out.
174-
let spawned_fut = internal_runtime.spawn(async move {
175-
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
176-
let msg = "VssStore::write timed out";
177-
Error::new(ErrorKind::Other, msg)
178-
})
179-
});
180-
self.runtime.block_on(spawned_fut).expect("We should always finish")?
173+
tokio::task::block_in_place(move || {
174+
internal_runtime.block_on(async move {
175+
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
176+
let msg = "VssStore::write timed out";
177+
Error::new(ErrorKind::Other, msg)
178+
})
179+
})?
180+
})
181181
}
182182

183183
fn remove(
@@ -208,13 +208,14 @@ impl KVStoreSync for VssStore {
208208
};
209209
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
210210
// times out.
211-
let spawned_fut = internal_runtime.spawn(async move {
212-
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
213-
let msg = "VssStore::remove timed out";
214-
Error::new(ErrorKind::Other, msg)
215-
})
216-
});
217-
self.runtime.block_on(spawned_fut).expect("We should always finish")?
211+
tokio::task::block_in_place(move || {
212+
internal_runtime.block_on(async move {
213+
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
214+
let msg = "VssStore::remove timed out";
215+
Error::new(ErrorKind::Other, msg)
216+
})
217+
})?
218+
})
218219
}
219220

220221
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
@@ -229,13 +230,14 @@ impl KVStoreSync for VssStore {
229230
let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await };
230231
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
231232
// times out.
232-
let spawned_fut = internal_runtime.spawn(async move {
233-
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
234-
let msg = "VssStore::list timed out";
235-
Error::new(ErrorKind::Other, msg)
236-
})
237-
});
238-
self.runtime.block_on(spawned_fut).expect("We should always finish")?
233+
tokio::task::block_in_place(move || {
234+
internal_runtime.block_on(async move {
235+
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
236+
let msg = "VssStore::list timed out";
237+
Error::new(ErrorKind::Other, msg)
238+
})
239+
})?
240+
})
239241
}
240242
}
241243

@@ -610,7 +612,6 @@ mod tests {
610612

611613
use super::*;
612614
use crate::io::test_utils::do_read_write_remove_list_persist;
613-
use crate::logger::Logger;
614615

615616
#[test]
616617
fn vss_read_write_remove_list_persist() {
@@ -620,10 +621,7 @@ mod tests {
620621
let mut vss_seed = [0u8; 32];
621622
rng.fill_bytes(&mut vss_seed);
622623
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
623-
let logger = Arc::new(Logger::new_log_facade());
624-
let runtime = Arc::new(Runtime::new(logger).unwrap());
625-
let vss_store =
626-
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime);
624+
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
627625

628626
do_read_write_remove_list_persist(&vss_store);
629627
}
@@ -636,10 +634,7 @@ mod tests {
636634
let mut vss_seed = [0u8; 32];
637635
rng.fill_bytes(&mut vss_seed);
638636
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
639-
let logger = Arc::new(Logger::new_log_facade());
640-
let runtime = Arc::new(Runtime::new(logger).unwrap());
641-
let vss_store =
642-
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime);
637+
let vss_store = VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider);
643638

644639
do_read_write_remove_list_persist(&vss_store);
645640
drop(vss_store)

0 commit comments

Comments
 (0)