Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.

Commit 76bcb4c

Browse files
committed
reintroduce retry with enhanced error handling
1 parent c0dc02f commit 76bcb4c

File tree

1 file changed

+75
-107
lines changed

1 file changed

+75
-107
lines changed

romeo/src/stacks_client.rs

Lines changed: 75 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ use blockstack_lib::{
1717
ContractName,
1818
},
1919
};
20-
use futures::{stream::FuturesUnordered, Future, StreamExt};
20+
use futures::{stream::FuturesUnordered, StreamExt};
2121
use rand::{distributions::Alphanumeric, thread_rng, Rng};
22-
use reqwest::{Request, RequestBuilder, Response, StatusCode};
22+
use reqwest::{RequestBuilder, Response};
2323
use serde::de::DeserializeOwned;
2424
use serde_json::Value;
2525
use stacks_core::{codec::Codec, uint::Uint256, wallet::Credentials};
@@ -59,34 +59,25 @@ impl StacksClient {
5959
}
6060
}
6161

62-
async fn send_request<T>(&self, request: Request) -> anyhow::Result<T>
62+
async fn send_request<T>(
63+
&self,
64+
builder: RequestBuilder,
65+
) -> anyhow::Result<T>
6366
where
6467
T: DeserializeOwned,
6568
{
66-
let request = self.add_stacks_api_key(request);
67-
// TODO; reintroduce retry
68-
let res = self.http_client.execute(request).await?;
69+
let res = self.retry(self.add_stacks_api_key(builder)).await?;
70+
res.error_for_status_ref().expect("retry propagates errors");
6971

70-
match res.error_for_status() {
71-
Ok(res) => {
72-
let body = res.text().await?;
72+
let body = res.text().await?;
7373

74-
Ok(serde_json::from_str(&body)
75-
.map_err(|e| anyhow!("{e}: body {body}"))?)
76-
}
77-
Err(e) => Err(anyhow!(e)),
78-
}
74+
serde_json::from_str(&body).map_err(|e| anyhow!("{e}: body {body}"))
7975
}
8076

8177
/// if hiro_api_key is set, add it to the request
82-
fn add_stacks_api_key(&self, request: Request) -> Request {
78+
fn add_stacks_api_key(&self, request: RequestBuilder) -> RequestBuilder {
8379
match &self.hiro_api_key {
84-
Some(api_key) => {
85-
RequestBuilder::from_parts(self.http_client.clone(), request)
86-
.header("x-hiro-api-key", api_key)
87-
.build()
88-
.unwrap()
89-
}
80+
Some(api_key) => request.header("x-hiro-api-key", api_key),
9081
None => request,
9182
}
9283
}
@@ -132,8 +123,6 @@ impl StacksClient {
132123
.post(self.transaction_url())
133124
.header("Content-type", "application/octet-stream")
134125
.body(tx_bytes)
135-
.build()
136-
.unwrap()
137126
})
138127
.await?;
139128

@@ -149,17 +138,15 @@ impl StacksClient {
149138
.send_request(
150139
self.http_client
151140
.get(self.cachebust(self.get_transation_details_url(txid)))
152-
.header("Accept", "application/json")
153-
.build()
154-
.unwrap(),
141+
.header("Accept", "application/json"),
155142
)
156143
.await;
157144

158145
let tx_status_str = match res {
159146
Ok(json) => json["tx_status"]
160147
.as_str()
161-
.map(|s| s.to_string())
162-
.expect("Could not get raw transaction from response"),
148+
.expect("Could not get raw transaction from response")
149+
.to_string(),
163150
// Stacks node sometimes returns 404 for pending transactions
164151
// :shrug:
165152
Err(err) if err.to_string().contains("404 Not Found") => {
@@ -178,10 +165,7 @@ impl StacksClient {
178165

179166
async fn get_nonce_info(&self) -> anyhow::Result<NonceInfo> {
180167
self.send_request(
181-
self.http_client
182-
.get(self.cachebust(self.nonce_url()))
183-
.build()
184-
.unwrap(),
168+
self.http_client.get(self.cachebust(self.nonce_url())),
185169
)
186170
.await
187171
}
@@ -200,13 +184,11 @@ impl StacksClient {
200184
name,
201185
);
202186

203-
let req = self
204-
.http_client
205-
.get(self.contract_info_url(id.to_string()))
206-
.build()
207-
.unwrap();
187+
let req_builder =
188+
self.http_client.get(self.contract_info_url(id.to_string()));
208189

209-
self.send_error_guarded_request(req, "block_height").await
190+
self.send_error_guarded_request(req_builder, "block_height")
191+
.await
210192
}
211193

212194
/// Get the Bitcoin block height for a Stacks block height
@@ -215,10 +197,7 @@ impl StacksClient {
215197
block_height: u32,
216198
) -> anyhow::Result<u32> {
217199
self.send_error_guarded_request::<u32>(
218-
self.http_client
219-
.get(self.block_by_height_url(block_height))
220-
.build()
221-
.unwrap(),
200+
self.http_client.get(self.block_by_height_url(block_height)),
222201
"burn_block_height",
223202
)
224203
.await
@@ -233,9 +212,7 @@ impl StacksClient {
233212
let maybe_response: Result<Value, Error> = self
234213
.send_error_guarded_request(
235214
self.http_client
236-
.get(self.block_by_height_url(block_height))
237-
.build()
238-
.unwrap(),
215+
.get(self.block_by_height_url(block_height)),
239216
"txs",
240217
)
241218
.await;
@@ -278,9 +255,7 @@ impl StacksClient {
278255
.send_error_guarded_request(
279256
self.http_client
280257
.get(self.get_raw_transaction_url(id))
281-
.header("Accept", "application/octet-stream")
282-
.build()
283-
.unwrap(),
258+
.header("Accept", "application/octet-stream"),
284259
"raw_tx",
285260
)
286261
.await?;
@@ -302,9 +277,7 @@ impl StacksClient {
302277
.send_error_guarded_request(
303278
self.http_client
304279
.get(self.block_by_bitcoin_height_url(height))
305-
.header("Accept", "application/json")
306-
.build()
307-
.unwrap(),
280+
.header("Accept", "application/json"),
308281
"hash",
309282
)
310283
.await?;
@@ -403,7 +376,7 @@ impl StacksClient {
403376

404377
async fn send_error_guarded_request<T>(
405378
&self,
406-
req: Request,
379+
req: RequestBuilder,
407380
index: &str,
408381
) -> anyhow::Result<T>
409382
where
@@ -418,56 +391,57 @@ impl StacksClient {
418391
Ok(serde_json::from_value(res[index].clone())?)
419392
}
420393
}
394+
395+
async fn retry(&self, builder: RequestBuilder) -> anyhow::Result<Response> {
396+
use backoff::Error as BackOffError;
397+
398+
let operation = || async {
399+
let request = builder
400+
.try_clone()
401+
.expect("not a stream")
402+
.build()
403+
.map_err(|e| BackOffError::permanent(anyhow!(e)))?;
404+
405+
self.http_client
406+
.execute(request)
407+
.await
408+
.and_then(Response::error_for_status)
409+
.map_err(|e| {
410+
if e.is_request() {
411+
BackOffError::transient(anyhow!(e))
412+
} else if e.is_status() {
413+
match e
414+
.status()
415+
.expect("Is status <-> has status: qed")
416+
.as_u16()
417+
{
418+
429 | 522 => BackOffError::transient(anyhow!(e)),
419+
_ => BackOffError::permanent(anyhow!(e)),
420+
}
421+
} else {
422+
BackOffError::permanent(anyhow!(e))
423+
}
424+
})
425+
};
426+
427+
let notify = |err, duration| {
428+
warn!("Retrying in {:?} after error: {:?}", duration, err);
429+
};
430+
431+
backoff::future::retry_notify(
432+
backoff::ExponentialBackoff::default(),
433+
operation,
434+
notify,
435+
)
436+
.await
437+
}
421438
}
422439

423440
#[derive(serde::Deserialize)]
424441
struct NonceInfo {
425442
possible_next_nonce: u64,
426443
}
427444

428-
async fn retry<O, Fut>(operation: O) -> anyhow::Result<Response>
429-
where
430-
O: Clone + Fn() -> Fut,
431-
Fut: Future<Output = Result<Response, reqwest::Error>>,
432-
{
433-
let operation = || async {
434-
operation.clone()()
435-
.await
436-
.and_then(Response::error_for_status)
437-
.map_err(|err| {
438-
if err.is_request() {
439-
backoff::Error::transient(anyhow::anyhow!(err))
440-
} else if err.is_status() {
441-
// Impossible not to have a status code at this section. May
442-
// as well be a teapot.
443-
let status_code_number = err
444-
.status()
445-
.unwrap_or(StatusCode::IM_A_TEAPOT)
446-
.as_u16();
447-
match status_code_number {
448-
429 | 522 => {
449-
backoff::Error::transient(anyhow::anyhow!(err))
450-
}
451-
_ => backoff::Error::permanent(anyhow::anyhow!(err)),
452-
}
453-
} else {
454-
backoff::Error::permanent(anyhow::anyhow!(err))
455-
}
456-
})
457-
};
458-
459-
let notify = |err, duration| {
460-
warn!("Retrying in {:?} after error: {:?}", duration, err);
461-
};
462-
463-
backoff::future::retry_notify(
464-
backoff::ExponentialBackoff::default(),
465-
operation,
466-
notify,
467-
)
468-
.await
469-
}
470-
471445
#[cfg(test)]
472446
mod tests {
473447

@@ -595,13 +569,10 @@ mod tests {
595569
reqwest::Client::new(),
596570
);
597571

598-
let request = stacks_client
599-
.http_client
600-
.get(server.url().add(&path))
601-
.build()
602-
.unwrap();
572+
let req_builder =
573+
stacks_client.http_client.get(server.url().add(&path));
603574

604-
assert_matches!(stacks_client.send_request::<u32>(request).await, Err(e)=>{
575+
assert_matches!(stacks_client.send_request::<u32>(req_builder).await, Err(e)=>{
605576
assert!(e.to_string().contains(body));
606577
});
607578

@@ -669,14 +640,11 @@ mod tests {
669640
reqwest::Client::new(),
670641
);
671642

672-
let request = stacks_client
673-
.http_client
674-
.get(server.url().add(&path))
675-
.build()
676-
.unwrap();
643+
let req_builder =
644+
stacks_client.http_client.get(server.url().add(&path));
677645

678646
let error = stacks_client
679-
.send_error_guarded_request::<()>(request, "any")
647+
.send_error_guarded_request::<()>(req_builder, "any")
680648
.await
681649
.expect_err("response body contains an error field");
682650
assert!(error.to_string().contains("reason"));

0 commit comments

Comments
 (0)