Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.

Commit 03d9bdb

Browse files
committed
reintroduce retry with enhanced error handling
1 parent c0dc02f commit 03d9bdb

File tree

1 file changed

+76
-107
lines changed

1 file changed

+76
-107
lines changed

romeo/src/stacks_client.rs

Lines changed: 76 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ use blockstack_lib::{
1717
ContractName,
1818
},
1919
};
20-
use futures::{stream::FuturesUnordered, Future, StreamExt};
20+
use futures::{stream::FuturesUnordered, StreamExt};
2121
use rand::{distributions::Alphanumeric, thread_rng, Rng};
22-
use reqwest::{Request, RequestBuilder, Response, StatusCode};
22+
use reqwest::{RequestBuilder, Response};
2323
use serde::de::DeserializeOwned;
2424
use serde_json::Value;
2525
use stacks_core::{codec::Codec, uint::Uint256, wallet::Credentials};
@@ -59,34 +59,26 @@ impl StacksClient {
5959
}
6060
}
6161

62-
async fn send_request<T>(&self, request: Request) -> anyhow::Result<T>
62+
async fn send_request<T>(
63+
&self,
64+
builder: RequestBuilder,
65+
) -> anyhow::Result<T>
6366
where
6467
T: DeserializeOwned,
6568
{
66-
let request = self.add_stacks_api_key(request);
67-
// TODO; reintroduce retry
68-
let res = self.http_client.execute(request).await?;
69+
let res = self.retry(self.add_stacks_api_key(builder)).await?;
70+
res.error_for_status_ref().expect("retry propagates errors");
6971

70-
match res.error_for_status() {
71-
Ok(res) => {
72-
let body = res.text().await?;
72+
let body = res.text().await?;
7373

74-
Ok(serde_json::from_str(&body)
75-
.map_err(|e| anyhow!("{e}: body {body}"))?)
76-
}
77-
Err(e) => Err(anyhow!(e)),
78-
}
74+
Ok(serde_json::from_str(&body)
75+
.map_err(|e| anyhow!("{e}: body {body}"))?)
7976
}
8077

8178
/// if hiro_api_key is set, add it to the request
82-
fn add_stacks_api_key(&self, request: Request) -> Request {
79+
fn add_stacks_api_key(&self, request: RequestBuilder) -> RequestBuilder {
8380
match &self.hiro_api_key {
84-
Some(api_key) => {
85-
RequestBuilder::from_parts(self.http_client.clone(), request)
86-
.header("x-hiro-api-key", api_key)
87-
.build()
88-
.unwrap()
89-
}
81+
Some(api_key) => request.header("x-hiro-api-key", api_key),
9082
None => request,
9183
}
9284
}
@@ -132,8 +124,6 @@ impl StacksClient {
132124
.post(self.transaction_url())
133125
.header("Content-type", "application/octet-stream")
134126
.body(tx_bytes)
135-
.build()
136-
.unwrap()
137127
})
138128
.await?;
139129

@@ -149,17 +139,15 @@ impl StacksClient {
149139
.send_request(
150140
self.http_client
151141
.get(self.cachebust(self.get_transation_details_url(txid)))
152-
.header("Accept", "application/json")
153-
.build()
154-
.unwrap(),
142+
.header("Accept", "application/json"),
155143
)
156144
.await;
157145

158146
let tx_status_str = match res {
159147
Ok(json) => json["tx_status"]
160148
.as_str()
161-
.map(|s| s.to_string())
162-
.expect("Could not get raw transaction from response"),
149+
.expect("Could not get raw transaction from response")
150+
.to_string(),
163151
// Stacks node sometimes returns 404 for pending transactions
164152
// :shrug:
165153
Err(err) if err.to_string().contains("404 Not Found") => {
@@ -178,10 +166,7 @@ impl StacksClient {
178166

179167
async fn get_nonce_info(&self) -> anyhow::Result<NonceInfo> {
180168
self.send_request(
181-
self.http_client
182-
.get(self.cachebust(self.nonce_url()))
183-
.build()
184-
.unwrap(),
169+
self.http_client.get(self.cachebust(self.nonce_url())),
185170
)
186171
.await
187172
}
@@ -200,13 +185,11 @@ impl StacksClient {
200185
name,
201186
);
202187

203-
let req = self
204-
.http_client
205-
.get(self.contract_info_url(id.to_string()))
206-
.build()
207-
.unwrap();
188+
let req_builder =
189+
self.http_client.get(self.contract_info_url(id.to_string()));
208190

209-
self.send_error_guarded_request(req, "block_height").await
191+
self.send_error_guarded_request(req_builder, "block_height")
192+
.await
210193
}
211194

212195
/// Get the Bitcoin block height for a Stacks block height
@@ -215,10 +198,7 @@ impl StacksClient {
215198
block_height: u32,
216199
) -> anyhow::Result<u32> {
217200
self.send_error_guarded_request::<u32>(
218-
self.http_client
219-
.get(self.block_by_height_url(block_height))
220-
.build()
221-
.unwrap(),
201+
self.http_client.get(self.block_by_height_url(block_height)),
222202
"burn_block_height",
223203
)
224204
.await
@@ -233,9 +213,7 @@ impl StacksClient {
233213
let maybe_response: Result<Value, Error> = self
234214
.send_error_guarded_request(
235215
self.http_client
236-
.get(self.block_by_height_url(block_height))
237-
.build()
238-
.unwrap(),
216+
.get(self.block_by_height_url(block_height)),
239217
"txs",
240218
)
241219
.await;
@@ -278,9 +256,7 @@ impl StacksClient {
278256
.send_error_guarded_request(
279257
self.http_client
280258
.get(self.get_raw_transaction_url(id))
281-
.header("Accept", "application/octet-stream")
282-
.build()
283-
.unwrap(),
259+
.header("Accept", "application/octet-stream"),
284260
"raw_tx",
285261
)
286262
.await?;
@@ -302,9 +278,7 @@ impl StacksClient {
302278
.send_error_guarded_request(
303279
self.http_client
304280
.get(self.block_by_bitcoin_height_url(height))
305-
.header("Accept", "application/json")
306-
.build()
307-
.unwrap(),
281+
.header("Accept", "application/json"),
308282
"hash",
309283
)
310284
.await?;
@@ -403,7 +377,7 @@ impl StacksClient {
403377

404378
async fn send_error_guarded_request<T>(
405379
&self,
406-
req: Request,
380+
req: RequestBuilder,
407381
index: &str,
408382
) -> anyhow::Result<T>
409383
where
@@ -418,56 +392,57 @@ impl StacksClient {
418392
Ok(serde_json::from_value(res[index].clone())?)
419393
}
420394
}
395+
396+
async fn retry(&self, builder: RequestBuilder) -> anyhow::Result<Response> {
397+
use backoff::Error as BackOffError;
398+
399+
let operation = || async {
400+
let request = builder
401+
.try_clone()
402+
.expect("not a stream")
403+
.build()
404+
.map_err(|e| BackOffError::permanent(anyhow!(e)))?;
405+
406+
self.http_client
407+
.execute(request)
408+
.await
409+
.and_then(Response::error_for_status)
410+
.map_err(|e| {
411+
if e.is_request() {
412+
BackOffError::transient(anyhow!(e))
413+
} else if e.is_status() {
414+
match e
415+
.status()
416+
.expect("Is status <-> has status: qed")
417+
.as_u16()
418+
{
419+
429 | 522 => BackOffError::transient(anyhow!(e)),
420+
_ => BackOffError::permanent(anyhow!(e)),
421+
}
422+
} else {
423+
BackOffError::permanent(anyhow!(e))
424+
}
425+
})
426+
};
427+
428+
let notify = |err, duration| {
429+
warn!("Retrying in {:?} after error: {:?}", duration, err);
430+
};
431+
432+
backoff::future::retry_notify(
433+
backoff::ExponentialBackoff::default(),
434+
operation,
435+
notify,
436+
)
437+
.await
438+
}
421439
}
422440

423441
#[derive(serde::Deserialize)]
424442
struct NonceInfo {
425443
possible_next_nonce: u64,
426444
}
427445

428-
async fn retry<O, Fut>(operation: O) -> anyhow::Result<Response>
429-
where
430-
O: Clone + Fn() -> Fut,
431-
Fut: Future<Output = Result<Response, reqwest::Error>>,
432-
{
433-
let operation = || async {
434-
operation.clone()()
435-
.await
436-
.and_then(Response::error_for_status)
437-
.map_err(|err| {
438-
if err.is_request() {
439-
backoff::Error::transient(anyhow::anyhow!(err))
440-
} else if err.is_status() {
441-
// Impossible not to have a status code at this section. May
442-
// as well be a teapot.
443-
let status_code_number = err
444-
.status()
445-
.unwrap_or(StatusCode::IM_A_TEAPOT)
446-
.as_u16();
447-
match status_code_number {
448-
429 | 522 => {
449-
backoff::Error::transient(anyhow::anyhow!(err))
450-
}
451-
_ => backoff::Error::permanent(anyhow::anyhow!(err)),
452-
}
453-
} else {
454-
backoff::Error::permanent(anyhow::anyhow!(err))
455-
}
456-
})
457-
};
458-
459-
let notify = |err, duration| {
460-
warn!("Retrying in {:?} after error: {:?}", duration, err);
461-
};
462-
463-
backoff::future::retry_notify(
464-
backoff::ExponentialBackoff::default(),
465-
operation,
466-
notify,
467-
)
468-
.await
469-
}
470-
471446
#[cfg(test)]
472447
mod tests {
473448

@@ -595,13 +570,10 @@ mod tests {
595570
reqwest::Client::new(),
596571
);
597572

598-
let request = stacks_client
599-
.http_client
600-
.get(server.url().add(&path))
601-
.build()
602-
.unwrap();
573+
let req_builder =
574+
stacks_client.http_client.get(server.url().add(&path));
603575

604-
assert_matches!(stacks_client.send_request::<u32>(request).await, Err(e)=>{
576+
assert_matches!(stacks_client.send_request::<u32>(req_builder).await, Err(e)=>{
605577
assert!(e.to_string().contains(body));
606578
});
607579

@@ -669,14 +641,11 @@ mod tests {
669641
reqwest::Client::new(),
670642
);
671643

672-
let request = stacks_client
673-
.http_client
674-
.get(server.url().add(&path))
675-
.build()
676-
.unwrap();
644+
let req_builder =
645+
stacks_client.http_client.get(server.url().add(&path));
677646

678647
let error = stacks_client
679-
.send_error_guarded_request::<()>(request, "any")
648+
.send_error_guarded_request::<()>(req_builder, "any")
680649
.await
681650
.expect_err("response body contains an error field");
682651
assert!(error.to_string().contains("reason"));

0 commit comments

Comments
 (0)