Skip to content

Commit f3c0b88

Browse files
authored
Fix the stream listener loop (#18)
1 parent ed90748 commit f3c0b88

File tree

3 files changed

+70
-61
lines changed

3 files changed

+70
-61
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pythnet-watcher"
3-
version = "1.1.0"
3+
version = "1.1.1"
44
edition = "2021"
55

66
[dependencies]

src/main.rs

Lines changed: 68 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -156,69 +156,74 @@ async fn run_listener(input: RunListenerInput) -> anyhow::Result<()> {
156156
)
157157
.await?;
158158

159-
tokio::select! {
160-
update = stream.next() => {
161-
let Some(update) = update else {
162-
tracing::error!("Failed to receive update");
163-
tokio::spawn(async move { unsubscribe().await });
164-
bail!("Stream ended");
165-
};
166-
let started = Instant::now();
167-
let unreliable_data = decode_and_verify_update(&input.wormhole_pid, &input.accumulator_address, update);
168-
let status = if unreliable_data.is_err() {
169-
"error"
170-
} else {
171-
"success"
172-
};
173-
let duration = started.elapsed();
174-
metrics::histogram!("decode_and_verify_observed_messages_duration").record(
175-
duration.as_secs_f64(),
176-
);
177-
metrics::counter!("decode_and_verify_observed_messages", &[("status", status)]).increment(1);
178-
if let Ok(unreliable_data) = unreliable_data {
179-
tokio::spawn({
180-
let (api_clients, signer) = (input.api_clients.clone(), input.signer.clone());
181-
async move {
182-
let started = Instant::now();
183-
let body = message_data_to_body(&unreliable_data);
184-
let status = match Observation::try_new(body.clone(), signer.clone()).await {
185-
Ok(observation) => {
186-
join_all(api_clients.iter().map(|api_client| {
187-
let observation = observation.clone();
188-
let api_client = api_client.clone();
189-
async move {
190-
if let Err(e) = api_client.post_observation(observation).await {
191-
tracing::warn!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation");
192-
} else {
193-
tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully");
159+
let mut exit_receiver = EXIT.subscribe();
160+
// Check if the exit flag is already set, if so, we don't need to wait.
161+
if *exit_receiver.borrow() {
162+
tracing::info!("Received exit signal, stopping pythnet watcher");
163+
return Ok(());
164+
}
165+
166+
loop {
167+
tokio::select! {
168+
update = stream.next() => {
169+
let Some(update) = update else {
170+
tracing::error!("Failed to receive update");
171+
tokio::spawn(async move { unsubscribe().await });
172+
bail!("Stream ended");
173+
};
174+
let started = Instant::now();
175+
let unreliable_data = decode_and_verify_update(&input.wormhole_pid, &input.accumulator_address, update);
176+
let status = if unreliable_data.is_err() {
177+
"error"
178+
} else {
179+
"success"
180+
};
181+
let duration = started.elapsed();
182+
metrics::histogram!("decode_and_verify_observed_messages_duration").record(
183+
duration.as_secs_f64(),
184+
);
185+
metrics::counter!("decode_and_verify_observed_messages", &[("status", status)]).increment(1);
186+
if let Ok(unreliable_data) = unreliable_data {
187+
tokio::spawn({
188+
let (api_clients, signer) = (input.api_clients.clone(), input.signer.clone());
189+
async move {
190+
let started = Instant::now();
191+
let body = message_data_to_body(&unreliable_data);
192+
let status = match Observation::try_new(body.clone(), signer.clone()).await {
193+
Ok(observation) => {
194+
join_all(api_clients.iter().map(|api_client| {
195+
let observation = observation.clone();
196+
let api_client = api_client.clone();
197+
async move {
198+
if let Err(e) = api_client.post_observation(observation).await {
199+
tracing::warn!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation");
200+
} else {
201+
tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully");
202+
}
194203
}
195-
}
196-
})).await;
197-
"success"
198-
}
199-
Err(e) => {
200-
tracing::error!(error = ?e, "Failed to create observation");
201-
"error"
202-
}
203-
};
204-
let duration = started.elapsed();
205-
metrics::histogram!("create_and_post_observation_duration").record(
206-
duration.as_secs_f64(),
207-
);
208-
metrics::counter!("create_and_post_observation", &[("status", status)]).increment(1);
209-
}
210-
});
204+
})).await;
205+
"success"
206+
}
207+
Err(e) => {
208+
tracing::error!(error = ?e, "Failed to create observation");
209+
"error"
210+
}
211+
};
212+
let duration = started.elapsed();
213+
metrics::histogram!("create_and_post_observation_duration").record(
214+
duration.as_secs_f64(),
215+
);
216+
metrics::counter!("create_and_post_observation", &[("status", status)]).increment(1);
217+
}
218+
});
219+
}
220+
}
221+
_ = exit_receiver.changed() => {
222+
tracing::info!("Received exit signal, stopping pythnet watcher");
223+
return Ok(())
211224
}
212-
}
213-
_ = wait_for_exit() => {
214-
tracing::info!("Received exit signal, stopping pythnet watcher");
215-
return Ok(())
216225
}
217226
}
218-
219-
tokio::spawn(async move { unsubscribe().await });
220-
221-
bail!("Stream ended")
222227
}
223228

224229
async fn get_signer(run_options: config::RunOptions) -> anyhow::Result<Arc<dyn Signer>> {
@@ -293,6 +298,10 @@ where
293298
break;
294299
}
295300
}
301+
302+
if *EXIT.borrow() {
303+
break;
304+
}
296305
}
297306
}
298307

0 commit comments

Comments
 (0)