From 59cead74b32b9881aaf2a19243fbb33de0755265 Mon Sep 17 00:00:00 2001 From: Carlos Alejandro Gutierrez Sandoval Date: Tue, 24 Oct 2023 14:39:01 -0600 Subject: [PATCH 1/5] fix arc-mutexes --- romeo/src/stacks_client.rs | 42 ++++++++++++-------------------------- romeo/src/system.rs | 42 ++++++++++++++------------------------ 2 files changed, 28 insertions(+), 56 deletions(-) diff --git a/romeo/src/stacks_client.rs b/romeo/src/stacks_client.rs index a0820be0..5077eae1 100644 --- a/romeo/src/stacks_client.rs +++ b/romeo/src/stacks_client.rs @@ -1,6 +1,6 @@ //! Stacks client -use std::{io::Cursor, sync::Arc, time::Duration}; +use std::{io::Cursor, time::Duration}; use anyhow::{anyhow, Error}; use blockstack_lib::{ @@ -23,10 +23,7 @@ use reqwest::{Request, RequestBuilder, Response, StatusCode}; use serde::de::DeserializeOwned; use serde_json::Value; use stacks_core::{codec::Codec, uint::Uint256}; -use tokio::{ - sync::{Mutex, MutexGuard}, - time::sleep, -}; +use tokio::time::sleep; use tracing::{debug, trace, warn}; use crate::{config::Config, event::TransactionStatus}; @@ -34,27 +31,14 @@ use crate::{config::Config, event::TransactionStatus}; const BLOCK_POLLING_INTERVAL: Duration = Duration::from_secs(5); /// Wrapped Stacks Client which can be shared safely between threads. -#[derive(Clone, Debug)] -pub struct LockedClient(Arc>); -impl LockedClient { - /// Lock and obtain a handle to the inner stacks client - pub async fn lock(&self) -> MutexGuard { - self.0.lock().await - } -} - -impl From for LockedClient { - fn from(client: StacksClient) -> Self { - Self(Arc::new(Mutex::new(client))) - } -} +pub type LockedClient = StacksClient; /// Stateful client for creating and broadcasting Stacks transactions /// /// This client keeps track of the last executed nonce for the given /// key. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StacksClient { config: Config, http_client: reqwest::Client, @@ -126,7 +110,7 @@ impl StacksClient { /// Sign and broadcast an unsigned stacks transaction pub async fn sign_and_broadcast( - &mut self, + &self, mut tx: StacksTransaction, ) -> anyhow::Result { #[cfg(debug_assertions)] @@ -179,7 +163,7 @@ impl StacksClient { /// Get transaction status for a given txid pub async fn get_transation_status( - &mut self, + &self, txid: StacksTxId, ) -> anyhow::Result { let res: anyhow::Result = self @@ -213,7 +197,7 @@ impl StacksClient { }) } - async fn get_nonce_info(&mut self) -> anyhow::Result { + async fn get_nonce_info(&self) -> anyhow::Result { self.send_request(|| { self.http_client .get(self.cachebust(self.nonce_url())) @@ -225,7 +209,7 @@ impl StacksClient { /// Get the block height of the contract pub async fn get_contract_block_height( - &mut self, + &self, name: ContractName, ) -> anyhow::Result { let addr = self.config.stacks_credentials.address(); @@ -255,7 +239,7 @@ impl StacksClient { /// Get the Bitcoin block height for a Stacks block height pub async fn get_bitcoin_block_height( - &mut self, + &self, block_height: u32, ) -> anyhow::Result { let res: Value = self @@ -272,7 +256,7 @@ impl StacksClient { /// Get the block at height pub async fn get_block( - &mut self, + &self, block_height: u32, ) -> anyhow::Result> { let res: Value = loop { @@ -322,7 +306,7 @@ impl StacksClient { /// Get the block at height pub async fn get_transaction( - &mut self, + &self, id: StacksTxId, ) -> anyhow::Result { let res: Value = self @@ -347,7 +331,7 @@ impl StacksClient { /// Get the block hash for a given Bitcoin height pub async fn get_block_hash_from_bitcoin_height( - &mut self, + &self, height: u32, ) -> anyhow::Result { let res: Value = self @@ -525,7 +509,7 @@ mod tests { .expect("Failed to find config file"); let http_client = reqwest::Client::new(); - let mut stacks_client = StacksClient::new(config, http_client); + let stacks_client = StacksClient::new(config, http_client); let nonce_info = stacks_client.get_nonce_info().await.unwrap(); assert_eq!(nonce_info.possible_next_nonce, 122); diff --git a/romeo/src/system.rs b/romeo/src/system.rs index 30d27b25..0334cd5e 100644 --- a/romeo/src/system.rs +++ b/romeo/src/system.rs @@ -28,7 +28,7 @@ use crate::{ config::Config, event::Event, proof_data::{ProofData, ProofDataClarityValues}, - stacks_client::{LockedClient, StacksClient}, + stacks_client::StacksClient, state, state::{DepositInfo, WithdrawalInfo}, task::Task, @@ -48,7 +48,7 @@ pub async fn run(config: Config) { let (tx, mut rx) = mpsc::channel::(128); // TODO: Make capacity configurable let bitcoin_client = BitcoinClient::new(config.clone()) .expect("Failed to instantiate bitcoin client"); - let stacks_client: LockedClient = + let stacks_client: StacksClient = StacksClient::new(config.clone(), reqwest::Client::new()).into(); info!("Starting replay of persisted events"); @@ -130,7 +130,7 @@ impl Storage { fn spawn( config: Config, bitcoin_client: BitcoinClient, - stacks_client: LockedClient, + stacks_client: StacksClient, task: Task, result: mpsc::Sender, ) -> JoinHandle<()> { @@ -146,15 +146,15 @@ fn spawn( async fn run_task( config: &Config, bitcoin_client: BitcoinClient, - stacks_client: LockedClient, + stacks_client: StacksClient, task: Task, ) -> Event { match task { Task::GetContractBlockHeight => { - get_contract_block_height(config, stacks_client).await + get_contract_block_height(config, &stacks_client).await } Task::UpdateContractPublicKey => { - update_contract_public_key(config, stacks_client).await + update_contract_public_key(config, &stacks_client).await } Task::CreateMint(deposit_info) => { mint_asset(config, bitcoin_client, stacks_client, deposit_info) @@ -190,18 +190,14 @@ async fn run_task( async fn get_contract_block_height( config: &Config, - client: LockedClient, + client: &StacksClient, ) -> Event { let block_height = client - .lock() - .await .get_contract_block_height(config.contract_name.clone()) .await .expect("Could not get block height. Binary needs to be restarted after contract deployment."); let bitcoin_block_height = client - .lock() - .await .get_bitcoin_block_height(block_height) .await .expect("Could not get burnchain block height. Binary needs to be restarted after bitcoin node is online again."); @@ -211,7 +207,7 @@ async fn get_contract_block_height( async fn update_contract_public_key( config: &Config, - stacks_client: LockedClient, + stacks_client: &StacksClient, ) -> Event { let public_key = StacksPublicKey::from_slice( &config.stacks_credentials.public_key().serialize(), @@ -253,8 +249,6 @@ async fn update_contract_public_key( let tx = StacksTransaction::new(tx_version, tx_auth, tx_payload); let txid = stacks_client - .lock() - .await .sign_and_broadcast(tx) .await .expect("Unable to sign and broadcast the set public key transaction"); @@ -265,7 +259,7 @@ async fn update_contract_public_key( async fn mint_asset( config: &Config, bitcoin_client: BitcoinClient, - stacks_client: LockedClient, + stacks_client: StacksClient, deposit_info: DepositInfo, ) -> Event { let proof_data = get_tx_proof( @@ -314,7 +308,7 @@ async fn mint_asset( let tx = StacksTransaction::new(tx_version, tx_auth, tx_payload); - match stacks_client.lock().await.sign_and_broadcast(tx).await { + match stacks_client.sign_and_broadcast(tx).await { Ok(txid) => Event::MintBroadcasted(deposit_info, txid), Err(err) => { if config.strict { @@ -333,7 +327,7 @@ async fn mint_asset( async fn burn_asset( config: &Config, bitcoin_client: BitcoinClient, - stacks_client: LockedClient, + stacks_client: StacksClient, withdrawal_info: WithdrawalInfo, ) -> Event { let proof_data = get_tx_proof( @@ -382,7 +376,7 @@ async fn burn_asset( let tx = StacksTransaction::new(tx_version, tx_auth, tx_payload); - match stacks_client.lock().await.sign_and_broadcast(tx).await { + match stacks_client.sign_and_broadcast(tx).await { Ok(txid) => Event::BurnBroadcasted(withdrawal_info, txid), Err(err) => { if config.strict { @@ -401,12 +395,10 @@ async fn burn_asset( async fn fulfill_asset( config: &Config, bitcoin_client: BitcoinClient, - stacks_client: LockedClient, + stacks_client: StacksClient, withdrawal_info: WithdrawalInfo, ) -> Event { let stacks_chain_tip = stacks_client - .lock() - .await .get_block_hash_from_bitcoin_height(withdrawal_info.block_height) .await .expect("Unable to get stacks block hash"); @@ -462,12 +454,10 @@ async fn check_bitcoin_transaction_status( } async fn check_stacks_transaction_status( - client: LockedClient, + client: StacksClient, txid: StacksTxId, ) -> Event { let status = client - .lock() - .await .get_transation_status(txid) .await .expect("Could not get Stacks transaction status"); @@ -475,10 +465,8 @@ async fn check_stacks_transaction_status( Event::StacksTransactionUpdate(txid, status) } -async fn fetch_stacks_block(client: LockedClient, block_height: u32) -> Event { +async fn fetch_stacks_block(client: StacksClient, block_height: u32) -> Event { let txs = client - .lock() - .await .get_block(block_height) .await .expect("Failed to get Stacks block"); From 4e2aade8f7f18eb40bab060161227b481ef2a483 Mon Sep 17 00:00:00 2001 From: Carlos Alejandro Gutierrez Sandoval Date: Tue, 24 Oct 2023 15:25:18 -0600 Subject: [PATCH 2/5] stacks_client's fields --- romeo/src/stacks_client.rs | 66 +++++++++++++++++--------------------- romeo/src/system.rs | 2 +- 2 files changed, 30 insertions(+), 38 deletions(-) diff --git a/romeo/src/stacks_client.rs b/romeo/src/stacks_client.rs index 5077eae1..b32ead56 100644 --- a/romeo/src/stacks_client.rs +++ b/romeo/src/stacks_client.rs @@ -22,33 +22,40 @@ use rand::{distributions::Alphanumeric, thread_rng, Rng}; use reqwest::{Request, RequestBuilder, Response, StatusCode}; use serde::de::DeserializeOwned; use serde_json::Value; -use stacks_core::{codec::Codec, uint::Uint256}; +use stacks_core::{codec::Codec, uint::Uint256, wallet::Credentials}; use tokio::time::sleep; use tracing::{debug, trace, warn}; +use url::Url; use crate::{config::Config, event::TransactionStatus}; const BLOCK_POLLING_INTERVAL: Duration = Duration::from_secs(5); -/// Wrapped Stacks Client which can be shared safely between threads. - -pub type LockedClient = StacksClient; - /// Stateful client for creating and broadcasting Stacks transactions /// /// This client keeps track of the last executed nonce for the given /// key. #[derive(Debug, Clone)] pub struct StacksClient { - config: Config, + hiro_api_key: Option, + stacks_node_url: Url, + stacks_credentials: Credentials, http_client: reqwest::Client, } impl StacksClient { /// Create a new StacksClient - pub fn new(config: Config, http_client: reqwest::Client) -> Self { + pub fn new(config: &Config, http_client: reqwest::Client) -> Self { + let Config { + hiro_api_key, + stacks_node_url, + stacks_credentials, + .. + } = config.clone(); Self { - config, + hiro_api_key, + stacks_node_url, + stacks_credentials, http_client, } } @@ -97,7 +104,7 @@ impl StacksClient { /// if hiro_api_key is set, add it to the request fn add_stacks_api_key(&self, request: Request) -> Request { - match &self.config.hiro_api_key { + match &self.hiro_api_key { Some(api_key) => { RequestBuilder::from_parts(self.http_client.clone(), request) .header("x-hiro-api-key", api_key) @@ -130,11 +137,7 @@ impl StacksClient { signer .sign_origin( &StacksPrivateKey::from_slice( - &self - .config - .stacks_credentials - .private_key() - .secret_bytes(), + &self.stacks_credentials.private_key().secret_bytes(), ) .unwrap(), ) @@ -212,7 +215,7 @@ impl StacksClient { &self, name: ContractName, ) -> anyhow::Result { - let addr = self.config.stacks_credentials.address(); + let addr = self.stacks_credentials.address(); let id = QualifiedContractIdentifier::new( StandardPrincipalData( addr.version() as u8, @@ -366,29 +369,23 @@ impl StacksClient { } fn transaction_url(&self) -> reqwest::Url { - self.config - .stacks_node_url - .join("/v2/transactions") - .unwrap() + self.stacks_node_url.join("/v2/transactions").unwrap() } fn get_raw_transaction_url(&self, txid: StacksTxId) -> reqwest::Url { - self.config - .stacks_node_url + self.stacks_node_url .join(&format!("/extended/v1/tx/{}/raw", txid)) .unwrap() } fn block_by_height_url(&self, height: u32) -> reqwest::Url { - self.config - .stacks_node_url + self.stacks_node_url .join(&format!("/extended/v1/block/by_height/{}", height)) .unwrap() } fn block_by_bitcoin_height_url(&self, height: u32) -> reqwest::Url { - self.config - .stacks_node_url + self.stacks_node_url .join(&format!( "/extended/v1/block/by_burn_block_height/{}", height @@ -397,15 +394,13 @@ impl StacksClient { } fn contract_info_url(&self, id: impl AsRef) -> reqwest::Url { - self.config - .stacks_node_url + self.stacks_node_url .join(&format!("/extended/v1/contract/{}", id.as_ref())) .unwrap() } fn get_transation_details_url(&self, txid: StacksTxId) -> reqwest::Url { - self.config - .stacks_node_url + self.stacks_node_url .join(&format!("/extended/v1/tx/{}", txid)) .unwrap() } @@ -433,17 +428,14 @@ impl StacksClient { fn nonce_url(&self) -> reqwest::Url { let path = format!( "/extended/v1/address/{}/nonces", - self.config.stacks_credentials.address(), + self.stacks_credentials.address(), ); - self.config.stacks_node_url.join(&path).unwrap() + self.stacks_node_url.join(&path).unwrap() } fn fee_url(&self) -> reqwest::Url { - self.config - .stacks_node_url - .join("/v2/fees/transfer") - .unwrap() + self.stacks_node_url.join("/v2/fees/transfer").unwrap() } } @@ -509,7 +501,7 @@ mod tests { .expect("Failed to find config file"); let http_client = reqwest::Client::new(); - let stacks_client = StacksClient::new(config, http_client); + let stacks_client = StacksClient::new(&config, http_client); let nonce_info = stacks_client.get_nonce_info().await.unwrap(); assert_eq!(nonce_info.possible_next_nonce, 122); @@ -522,7 +514,7 @@ mod tests { .expect("Failed to find config file"); let http_client = reqwest::Client::new(); - let stacks_client = StacksClient::new(config, http_client); + let stacks_client = StacksClient::new(&config, http_client); stacks_client.calculate_fee(123).await.unwrap(); } diff --git a/romeo/src/system.rs b/romeo/src/system.rs index 0334cd5e..d9a23ea1 100644 --- a/romeo/src/system.rs +++ b/romeo/src/system.rs @@ -49,7 +49,7 @@ pub async fn run(config: Config) { let bitcoin_client = BitcoinClient::new(config.clone()) .expect("Failed to instantiate bitcoin client"); let stacks_client: StacksClient = - StacksClient::new(config.clone(), reqwest::Client::new()).into(); + StacksClient::new(&config, reqwest::Client::new()).into(); info!("Starting replay of persisted events"); From c0dc02fd00c019e59a7b10804d674e6b8391b950 Mon Sep 17 00:00:00 2001 From: Carlos Alejandro Gutierrez Sandoval Date: Wed, 25 Oct 2023 12:14:47 -0600 Subject: [PATCH 3/5] send_request + guarded send request --- romeo/Cargo.toml | 4 + romeo/src/stacks_client.rs | 399 ++++++++++++++++++++++++++----------- romeo/src/system.rs | 14 +- 3 files changed, 298 insertions(+), 119 deletions(-) diff --git a/romeo/Cargo.toml b/romeo/Cargo.toml index 58483c26..7c40bab6 100644 --- a/romeo/Cargo.toml +++ b/romeo/Cargo.toml @@ -23,3 +23,7 @@ tracing-subscriber.workspace = true tracing.workspace = true url.workspace = true rs_merkle.workspace = true + +[dev-dependencies] +assert_matches = "1.5.0" +mockito = "1.2.0" diff --git a/romeo/src/stacks_client.rs b/romeo/src/stacks_client.rs index b32ead56..5ffdff4c 100644 --- a/romeo/src/stacks_client.rs +++ b/romeo/src/stacks_client.rs @@ -17,17 +17,17 @@ use blockstack_lib::{ ContractName, }, }; -use futures::Future; +use futures::{stream::FuturesUnordered, Future, StreamExt}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use reqwest::{Request, RequestBuilder, Response, StatusCode}; use serde::de::DeserializeOwned; use serde_json::Value; use stacks_core::{codec::Codec, uint::Uint256, wallet::Credentials}; use tokio::time::sleep; -use tracing::{debug, trace, warn}; +use tracing::{trace, warn}; use url::Url; -use crate::{config::Config, event::TransactionStatus}; +use crate::event::TransactionStatus; const BLOCK_POLLING_INTERVAL: Duration = Duration::from_secs(5); @@ -45,13 +45,12 @@ pub struct StacksClient { impl StacksClient { /// Create a new StacksClient - pub fn new(config: &Config, http_client: reqwest::Client) -> Self { - let Config { - hiro_api_key, - stacks_node_url, - stacks_credentials, - .. - } = config.clone(); + pub fn new( + hiro_api_key: Option, + stacks_node_url: Url, + stacks_credentials: Credentials, + http_client: reqwest::Client, + ) -> Self { Self { hiro_api_key, stacks_node_url, @@ -60,46 +59,23 @@ impl StacksClient { } } - async fn send_request(&self, request_builder: B) -> anyhow::Result + async fn send_request(&self, request: Request) -> anyhow::Result where - B: Clone + Fn() -> Request, T: DeserializeOwned, { - let request_url = request_builder().url().to_string(); + let request = self.add_stacks_api_key(request); + // TODO; reintroduce retry + let res = self.http_client.execute(request).await?; - let res = retry(|| { - self.http_client - .execute(self.add_stacks_api_key(request_builder())) - }) - .await?; - - let status = res.status(); - let body = res.text().await?; - - serde_json::from_str(&body).map_err(|err| { - let error_details = serde_json::from_str::(&body).ok().map(|details| { - let error = details["error"].as_str(); - let reason = details["reason"].as_str(); - - format!( - "{}: {}", - error.unwrap_or_default(), - reason.unwrap_or_default() - ) - }); - - if error_details.is_none() { - debug!("Failed request response body: {:?}", body); - } - - anyhow!( - "Could not parse response JSON, URL is {}, status is {}: {:?}: {}", - request_url, - status, - err, - error_details.unwrap_or_default() - ) - }) + match res.error_for_status() { + Ok(res) => { + let body = res.text().await?; + + Ok(serde_json::from_str(&body) + .map_err(|e| anyhow!("{e}: body {body}"))?) + } + Err(e) => Err(anyhow!(e)), + } } /// if hiro_api_key is set, add it to the request @@ -149,7 +125,7 @@ impl StacksClient { tx.consensus_serialize(&mut tx_bytes).unwrap(); let res = self - .send_request(|| { + .send_request({ let tx_bytes = tx_bytes.clone(); self.http_client @@ -170,13 +146,13 @@ impl StacksClient { txid: StacksTxId, ) -> anyhow::Result { let res: anyhow::Result = self - .send_request(|| { + .send_request( self.http_client .get(self.cachebust(self.get_transation_details_url(txid))) .header("Accept", "application/json") .build() - .unwrap() - }) + .unwrap(), + ) .await; let tx_status_str = match res { @@ -201,12 +177,12 @@ impl StacksClient { } async fn get_nonce_info(&self) -> anyhow::Result { - self.send_request(|| { + self.send_request( self.http_client .get(self.cachebust(self.nonce_url())) .build() - .unwrap() - }) + .unwrap(), + ) .await } @@ -224,20 +200,13 @@ impl StacksClient { name, ); - let res: Value = self - .send_request(|| { - self.http_client - .get(self.contract_info_url(id.to_string())) - .build() - .unwrap() - }) - .await?; + let req = self + .http_client + .get(self.contract_info_url(id.to_string())) + .build() + .unwrap(); - if let Some(err) = res["error"].as_str() { - Err(Error::msg(err.to_string())) - } else { - Ok(res["block_height"].as_u64().unwrap() as u32) - } + self.send_error_guarded_request(req, "block_height").await } /// Get the Bitcoin block height for a Stacks block height @@ -245,16 +214,14 @@ impl StacksClient { &self, block_height: u32, ) -> anyhow::Result { - let res: Value = self - .send_request(|| { - self.http_client - .get(self.block_by_height_url(block_height)) - .build() - .unwrap() - }) - .await?; - - Ok(res["burn_block_height"].as_u64().unwrap() as u32) + self.send_error_guarded_request::( + self.http_client + .get(self.block_by_height_url(block_height)) + .build() + .unwrap(), + "burn_block_height", + ) + .await } /// Get the block at height @@ -262,20 +229,21 @@ impl StacksClient { &self, block_height: u32, ) -> anyhow::Result> { - let res: Value = loop { + let raw_txids: Value = loop { let maybe_response: Result = self - .send_request(|| { + .send_error_guarded_request( self.http_client .get(self.block_by_height_url(block_height)) .build() - .unwrap() - }) + .unwrap(), + "txs", + ) .await; - if let Ok(inner_response) = maybe_response { - if inner_response["txs"].is_array() { + if let Ok(txs_value) = maybe_response { + if txs_value.is_array() { trace!("Found Stacks block of height {}", block_height); - break inner_response; + break txs_value; } } @@ -283,28 +251,22 @@ impl StacksClient { sleep(BLOCK_POLLING_INTERVAL).await; }; - let tx_ids: Vec = res["txs"] + raw_txids .as_array() - .unwrap_or_else(|| { - panic!("Could not get txs from response: {:?}", res) - }) + .expect("An array, found {raw_txids:?") .iter() .map(|id| { - let mut id = id.as_str().unwrap().to_string(); - id = id.replace("0x", ""); - - StacksTxId::from_hex(&id).unwrap() + StacksTxId::from_hex( + id.as_str().unwrap().trim_start_matches("0x"), + ) + .unwrap() }) - .collect(); - - let mut txs = Vec::with_capacity(tx_ids.len()); - - for id in tx_ids { - let tx = self.get_transaction(id).await?; - txs.push(tx); - } - - Ok(txs) + .map(|txid| self.get_transaction(txid)) + .collect::>() + .collect::>() + .await + .into_iter() + .collect::, _>>() } /// Get the block at height @@ -313,18 +275,17 @@ impl StacksClient { id: StacksTxId, ) -> anyhow::Result { let res: Value = self - .send_request(|| { + .send_error_guarded_request( self.http_client .get(self.get_raw_transaction_url(id)) .header("Accept", "application/octet-stream") .build() - .unwrap() - }) + .unwrap(), + "raw_tx", + ) .await?; - let mut raw_tx: String = res["raw_tx"].as_str().unwrap().to_string(); - raw_tx = raw_tx.replace("0x", ""); - + let raw_tx = res.as_str().unwrap().trim_start_matches("0x"); let bytes = hex::decode(raw_tx).unwrap(); let tx = StacksTransaction::consensus_deserialize(&mut &bytes[..]).unwrap(); @@ -338,19 +299,21 @@ impl StacksClient { height: u32, ) -> anyhow::Result { let res: Value = self - .send_request(|| { + .send_error_guarded_request( self.http_client .get(self.block_by_bitcoin_height_url(height)) .header("Accept", "application/json") .build() - .unwrap() - }) + .unwrap(), + "hash", + ) .await?; - let hash_str = res["hash"] + let hash_str = res .as_str() - .unwrap_or_else(|| panic!("Could not get block hash: {:?}", res)); - let hash_bytes = hex::decode(hash_str.replace("0x", ""))?; + .expect("Could not get block hash: {res:?}") + .trim_start_matches("0x"); + let hash_bytes = hex::decode(hash_str)?; Ok(Uint256::deserialize(&mut Cursor::new(hash_bytes))?) } @@ -437,6 +400,24 @@ impl StacksClient { fn fee_url(&self) -> reqwest::Url { self.stacks_node_url.join("/v2/fees/transfer").unwrap() } + + async fn send_error_guarded_request( + &self, + req: Request, + index: &str, + ) -> anyhow::Result + where + T: DeserializeOwned, + { + let res: Value = self.send_request(req).await?; + + if let Some(err) = res["error"].as_str() { + let reason = res["reason"].as_str(); + Err(anyhow!("{err}; reason: {reason:?}")) + } else { + Ok(serde_json::from_value(res[index].clone())?) + } + } } #[derive(serde::Deserialize)] @@ -489,6 +470,11 @@ where #[cfg(test)] mod tests { + + use std::ops::Add; + + use assert_matches::assert_matches; + use super::*; use crate::config::Config; @@ -497,11 +483,21 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[ignore] async fn get_nonce_info() { - let config = Config::from_path("./testing/config.json") + let Config { + hiro_api_key, + stacks_node_url, + stacks_credentials, + .. + } = Config::from_path("./testing/config.json") .expect("Failed to find config file"); let http_client = reqwest::Client::new(); - let stacks_client = StacksClient::new(&config, http_client); + let stacks_client = StacksClient::new( + hiro_api_key, + stacks_node_url, + stacks_credentials, + http_client, + ); let nonce_info = stacks_client.get_nonce_info().await.unwrap(); assert_eq!(nonce_info.possible_next_nonce, 122); @@ -510,12 +506,181 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[ignore] async fn get_fee_rate() { - let config = Config::from_path("./testing/config.json") + let Config { + hiro_api_key, + stacks_node_url, + stacks_credentials, + .. + } = Config::from_path("./testing/config.json") .expect("Failed to find config file"); let http_client = reqwest::Client::new(); - let stacks_client = StacksClient::new(&config, http_client); + let stacks_client = StacksClient::new( + hiro_api_key, + stacks_node_url, + stacks_credentials, + http_client, + ); stacks_client.calculate_fee(123).await.unwrap(); } + + #[tokio::test] + async fn missing_contract_errors() { + let Config { + hiro_api_key, + stacks_credentials, + .. + } = Config::from_path("../devenv/sbtc/docker/config.json") + .expect("Failed to find config file"); + + let contract = "missingcontract"; + + let mut server = mockito::Server::new(); + let m = server + .mock( + "GET", + format!( + "/extended/v1/contract/{}.{contract}", + stacks_credentials.address() + ) + .as_str(), + ) + .with_status(404) + .create(); + + let stacks_client = StacksClient::new( + hiro_api_key, + server.url().parse().unwrap(), + stacks_credentials, + reqwest::Client::new(), + ); + + assert_eq!( + stacks_client + .get_contract_block_height(contract.try_into().unwrap(),) + .await + .expect_err("contract not deployed") + .downcast::() + .unwrap() + .status() + .unwrap(), + 404 + ); + + m.assert(); + } + + #[tokio::test] + async fn send_request_prints_body_on_err() { + let Config { + stacks_credentials, .. + } = Config::from_path("../devenv/sbtc/docker/config.json") + .expect("Failed to find config file"); + + let contract = "missingcontract"; + + let mut server = mockito::Server::new(); + let path = format!( + "/extended/v1/contract/{}.{contract}", + stacks_credentials.address() + ); + let body = r#"{ "block_height": 2 }"#; + let m = server.mock("GET", path.as_str()).with_body(body).create(); + + let stacks_client = StacksClient::new( + None, + server.url().parse().unwrap(), + stacks_credentials, + reqwest::Client::new(), + ); + + let request = stacks_client + .http_client + .get(server.url().add(&path)) + .build() + .unwrap(); + + assert_matches!(stacks_client.send_request::(request).await, Err(e)=>{ + assert!(e.to_string().contains(body)); + }); + + m.assert(); + } + + #[tokio::test] + async fn get_contract_block_height_positive() { + let Config { + stacks_credentials, .. + } = Config::from_path("../devenv/sbtc/docker/config.json") + .expect("Failed to find config file"); + + let contract = "contractname"; + + let mut server = mockito::Server::new(); + let path = format!( + "/extended/v1/contract/{}.{contract}", + stacks_credentials.address() + ); + let m = server + .mock("GET", path.as_str()) + .with_body(r#"{ "block_height": 2 }"#) + .create(); + + let stacks_client = StacksClient::new( + None, + server.url().parse().unwrap(), + stacks_credentials, + reqwest::Client::new(), + ); + + assert_eq!( + stacks_client + .get_contract_block_height(contract.try_into().unwrap(),) + .await + .unwrap(), + 2 + ); + + m.assert(); + } + + #[tokio::test] + async fn send_error_guarded_request_errors_if_error_field_present() { + let Config { + stacks_credentials, .. + } = Config::from_path("../devenv/sbtc/docker/config.json") + .expect("Failed to find config file"); + + let mut server = mockito::Server::new(); + let path = format!( + "/extended/v1/contract/{}.contract", + stacks_credentials.address() + ); + let m = server + .mock("GET", path.as_str()) + .with_body(r#"{ "any": 1, "error": "Oops", "reason": "Ay!" }"#) + .create(); + + let stacks_client = StacksClient::new( + None, + server.url().parse().unwrap(), + stacks_credentials, + reqwest::Client::new(), + ); + + let request = stacks_client + .http_client + .get(server.url().add(&path)) + .build() + .unwrap(); + + let error = stacks_client + .send_error_guarded_request::<()>(request, "any") + .await + .expect_err("response body contains an error field"); + assert!(error.to_string().contains("reason")); + + m.assert(); + } } diff --git a/romeo/src/system.rs b/romeo/src/system.rs index d9a23ea1..e12446b9 100644 --- a/romeo/src/system.rs +++ b/romeo/src/system.rs @@ -48,8 +48,18 @@ pub async fn run(config: Config) { let (tx, mut rx) = mpsc::channel::(128); // TODO: Make capacity configurable let bitcoin_client = BitcoinClient::new(config.clone()) .expect("Failed to instantiate bitcoin client"); - let stacks_client: StacksClient = - StacksClient::new(&config, reqwest::Client::new()).into(); + let Config { + hiro_api_key, + stacks_node_url, + stacks_credentials, + .. + } = config.clone(); + let stacks_client: StacksClient = StacksClient::new( + hiro_api_key, + stacks_node_url, + stacks_credentials, + reqwest::Client::new(), + ); info!("Starting replay of persisted events"); From 76bcb4cf7a384061fa4cf0a067207e348569fd13 Mon Sep 17 00:00:00 2001 From: Carlos Alejandro Gutierrez Sandoval Date: Wed, 25 Oct 2023 15:33:18 -0600 Subject: [PATCH 4/5] reintroduce retry with enhanced error handling --- romeo/src/stacks_client.rs | 182 +++++++++++++++---------------------- 1 file changed, 75 insertions(+), 107 deletions(-) diff --git a/romeo/src/stacks_client.rs b/romeo/src/stacks_client.rs index 5ffdff4c..bf4896e2 100644 --- a/romeo/src/stacks_client.rs +++ b/romeo/src/stacks_client.rs @@ -17,9 +17,9 @@ use blockstack_lib::{ ContractName, }, }; -use futures::{stream::FuturesUnordered, Future, StreamExt}; +use futures::{stream::FuturesUnordered, StreamExt}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; -use reqwest::{Request, RequestBuilder, Response, StatusCode}; +use reqwest::{RequestBuilder, Response}; use serde::de::DeserializeOwned; use serde_json::Value; use stacks_core::{codec::Codec, uint::Uint256, wallet::Credentials}; @@ -59,34 +59,25 @@ impl StacksClient { } } - async fn send_request(&self, request: Request) -> anyhow::Result + async fn send_request( + &self, + builder: RequestBuilder, + ) -> anyhow::Result where T: DeserializeOwned, { - let request = self.add_stacks_api_key(request); - // TODO; reintroduce retry - let res = self.http_client.execute(request).await?; + let res = self.retry(self.add_stacks_api_key(builder)).await?; + res.error_for_status_ref().expect("retry propagates errors"); - match res.error_for_status() { - Ok(res) => { - let body = res.text().await?; + let body = res.text().await?; - Ok(serde_json::from_str(&body) - .map_err(|e| anyhow!("{e}: body {body}"))?) - } - Err(e) => Err(anyhow!(e)), - } + serde_json::from_str(&body).map_err(|e| anyhow!("{e}: body {body}")) } /// if hiro_api_key is set, add it to the request - fn add_stacks_api_key(&self, request: Request) -> Request { + fn add_stacks_api_key(&self, request: RequestBuilder) -> RequestBuilder { match &self.hiro_api_key { - Some(api_key) => { - RequestBuilder::from_parts(self.http_client.clone(), request) - .header("x-hiro-api-key", api_key) - .build() - .unwrap() - } + Some(api_key) => request.header("x-hiro-api-key", api_key), None => request, } } @@ -132,8 +123,6 @@ impl StacksClient { .post(self.transaction_url()) .header("Content-type", "application/octet-stream") .body(tx_bytes) - .build() - .unwrap() }) .await?; @@ -149,17 +138,15 @@ impl StacksClient { .send_request( self.http_client .get(self.cachebust(self.get_transation_details_url(txid))) - .header("Accept", "application/json") - .build() - .unwrap(), + .header("Accept", "application/json"), ) .await; let tx_status_str = match res { Ok(json) => json["tx_status"] .as_str() - .map(|s| s.to_string()) - .expect("Could not get raw transaction from response"), + .expect("Could not get raw transaction from response") + .to_string(), // Stacks node sometimes returns 404 for pending transactions // :shrug: Err(err) if err.to_string().contains("404 Not Found") => { @@ -178,10 +165,7 @@ impl StacksClient { async fn get_nonce_info(&self) -> anyhow::Result { self.send_request( - self.http_client - .get(self.cachebust(self.nonce_url())) - .build() - .unwrap(), + self.http_client.get(self.cachebust(self.nonce_url())), ) .await } @@ -200,13 +184,11 @@ impl StacksClient { name, ); - let req = self - .http_client - .get(self.contract_info_url(id.to_string())) - .build() - .unwrap(); + let req_builder = + self.http_client.get(self.contract_info_url(id.to_string())); - self.send_error_guarded_request(req, "block_height").await + self.send_error_guarded_request(req_builder, "block_height") + .await } /// Get the Bitcoin block height for a Stacks block height @@ -215,10 +197,7 @@ impl StacksClient { block_height: u32, ) -> anyhow::Result { self.send_error_guarded_request::( - self.http_client - .get(self.block_by_height_url(block_height)) - .build() - .unwrap(), + self.http_client.get(self.block_by_height_url(block_height)), "burn_block_height", ) .await @@ -233,9 +212,7 @@ impl StacksClient { let maybe_response: Result = self .send_error_guarded_request( self.http_client - .get(self.block_by_height_url(block_height)) - .build() - .unwrap(), + .get(self.block_by_height_url(block_height)), "txs", ) .await; @@ -278,9 +255,7 @@ impl StacksClient { .send_error_guarded_request( self.http_client .get(self.get_raw_transaction_url(id)) - .header("Accept", "application/octet-stream") - .build() - .unwrap(), + .header("Accept", "application/octet-stream"), "raw_tx", ) .await?; @@ -302,9 +277,7 @@ impl StacksClient { .send_error_guarded_request( self.http_client .get(self.block_by_bitcoin_height_url(height)) - .header("Accept", "application/json") - .build() - .unwrap(), + .header("Accept", "application/json"), "hash", ) .await?; @@ -403,7 +376,7 @@ impl StacksClient { async fn send_error_guarded_request( &self, - req: Request, + req: RequestBuilder, index: &str, ) -> anyhow::Result where @@ -418,6 +391,50 @@ impl StacksClient { Ok(serde_json::from_value(res[index].clone())?) } } + + async fn retry(&self, builder: RequestBuilder) -> anyhow::Result { + use backoff::Error as BackOffError; + + let operation = || async { + let request = builder + .try_clone() + .expect("not a stream") + .build() + .map_err(|e| BackOffError::permanent(anyhow!(e)))?; + + self.http_client + .execute(request) + .await + .and_then(Response::error_for_status) + .map_err(|e| { + if e.is_request() { + BackOffError::transient(anyhow!(e)) + } else if e.is_status() { + match e + .status() + .expect("Is status <-> has status: qed") + .as_u16() + { + 429 | 522 => BackOffError::transient(anyhow!(e)), + _ => BackOffError::permanent(anyhow!(e)), + } + } else { + BackOffError::permanent(anyhow!(e)) + } + }) + }; + + let notify = |err, duration| { + warn!("Retrying in {:?} after error: {:?}", duration, err); + }; + + backoff::future::retry_notify( + backoff::ExponentialBackoff::default(), + operation, + notify, + ) + .await + } } #[derive(serde::Deserialize)] @@ -425,49 +442,6 @@ struct NonceInfo { possible_next_nonce: u64, } -async fn retry(operation: O) -> anyhow::Result -where - O: Clone + Fn() -> Fut, - Fut: Future>, -{ - let operation = || async { - operation.clone()() - .await - .and_then(Response::error_for_status) - .map_err(|err| { - if err.is_request() { - backoff::Error::transient(anyhow::anyhow!(err)) - } else if err.is_status() { - // Impossible not to have a status code at this section. May - // as well be a teapot. - let status_code_number = err - .status() - .unwrap_or(StatusCode::IM_A_TEAPOT) - .as_u16(); - match status_code_number { - 429 | 522 => { - backoff::Error::transient(anyhow::anyhow!(err)) - } - _ => backoff::Error::permanent(anyhow::anyhow!(err)), - } - } else { - backoff::Error::permanent(anyhow::anyhow!(err)) - } - }) - }; - - let notify = |err, duration| { - warn!("Retrying in {:?} after error: {:?}", duration, err); - }; - - backoff::future::retry_notify( - backoff::ExponentialBackoff::default(), - operation, - notify, - ) - .await -} - #[cfg(test)] mod tests { @@ -595,13 +569,10 @@ mod tests { reqwest::Client::new(), ); - let request = stacks_client - .http_client - .get(server.url().add(&path)) - .build() - .unwrap(); + let req_builder = + stacks_client.http_client.get(server.url().add(&path)); - assert_matches!(stacks_client.send_request::(request).await, Err(e)=>{ + assert_matches!(stacks_client.send_request::(req_builder).await, Err(e)=>{ assert!(e.to_string().contains(body)); }); @@ -669,14 +640,11 @@ mod tests { reqwest::Client::new(), ); - let request = stacks_client - .http_client - .get(server.url().add(&path)) - .build() - .unwrap(); + let req_builder = + stacks_client.http_client.get(server.url().add(&path)); let error = stacks_client - .send_error_guarded_request::<()>(request, "any") + .send_error_guarded_request::<()>(req_builder, "any") .await .expect_err("response body contains an error field"); assert!(error.to_string().contains("reason")); From 41990f1c6490c450a9661d3f11106dd72240ac8e Mon Sep 17 00:00:00 2001 From: Carlos Alejandro Gutierrez Sandoval Date: Wed, 15 Nov 2023 23:57:42 -0600 Subject: [PATCH 5/5] don't discard body upon status error --- romeo/src/stacks_client.rs | 66 +++++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/romeo/src/stacks_client.rs b/romeo/src/stacks_client.rs index bf4896e2..1f53f397 100644 --- a/romeo/src/stacks_client.rs +++ b/romeo/src/stacks_client.rs @@ -402,26 +402,41 @@ impl StacksClient { .build() .map_err(|e| BackOffError::permanent(anyhow!(e)))?; - self.http_client - .execute(request) - .await - .and_then(Response::error_for_status) - .map_err(|e| { - if e.is_request() { - BackOffError::transient(anyhow!(e)) - } else if e.is_status() { - match e - .status() - .expect("Is status <-> has status: qed") - .as_u16() - { - 429 | 522 => BackOffError::transient(anyhow!(e)), - _ => BackOffError::permanent(anyhow!(e)), + match self.http_client.execute(request).await { + Ok(r) if r.error_for_status_ref().is_err() => { + let e = r + .error_for_status_ref() + .expect_err("Is status error; qed"); + let raw_body = r.text().await.unwrap_or(format!( + "body parsing failed {}:{}", + file!(), + line!() + )); + Err((e, raw_body)) + } + Ok(r) => Ok(r), + Err(e) => Err((e, String::new())), + } + .map_err(|(e, reason)| { + if e.is_request() { + BackOffError::transient(anyhow!(e).context(reason)) + } else if e.is_status() { + match e + .status() + .expect("Is status <-> has status: qed") + .as_u16() + { + 429 | 522 => { + BackOffError::transient(anyhow!(e).context(reason)) + } + _ => { + BackOffError::permanent(anyhow!(e).context(reason)) } - } else { - BackOffError::permanent(anyhow!(e)) } - }) + } else { + BackOffError::permanent(anyhow!(e).context(reason)) + } + }) }; let notify = |err, duration| { @@ -521,6 +536,7 @@ mod tests { .as_str(), ) .with_status(404) + .with_body(r#"{"error":"transaction rejected","reason":"ConflictingNonceInMempool","txid":"375c9048dae6564059344502a2a8e9b2ecad6ba597877b3b48de482309c11dcd"}"#) .create(); let stacks_client = StacksClient::new( @@ -530,11 +546,17 @@ mod tests { reqwest::Client::new(), ); + let error = stacks_client + .get_contract_block_height(contract.try_into().unwrap()) + .await + .expect_err("contract not deployed"); + + assert!(error.to_string().contains( + r#""error":"transaction rejected","reason":"ConflictingNonceInMempool""# + )); + assert_eq!( - stacks_client - .get_contract_block_height(contract.try_into().unwrap(),) - .await - .expect_err("contract not deployed") + error .downcast::() .unwrap() .status()