Skip to content

Commit 1bb1665

Browse files
authored
feat: replace notification batching with debouncing (#58)
Instead of batching, where notifications may be delayed by up to 24 hours, we now send notifications about check failures immediately, avoiding sending multiple notifications within a 24-hour period to the same member.
2 parents 1831073 + 88f151f commit 1bb1665

File tree

4 files changed

+207
-182
lines changed

4 files changed

+207
-182
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ serde = { version = "1.0.219", features = ["derive"] }
3939
serde_json = "1.0.141"
4040
tera = { version = "1.20.0", default-features = false, features = [ "urlencode" ] }
4141
thiserror = { version = "2.0.12", default-features = false }
42-
tokio = { version = "1.47.0", features = ["macros", "rt-multi-thread", "test-util"] }
42+
tokio = { version = "1.47.0", features = ["macros", "rt-multi-thread"] }
4343
toml = { version = "0.9.2", default-features = false, features = ["preserve_order", "parse", "serde", "std"] }
4444
tower-http = { version = "0.6.6", features = ["cors", "fs", "catch-panic", "trace"] }
4545
tracing = "0.1.41"
@@ -52,3 +52,4 @@ httpmock = { version = "0.7.0", default-features = false }
5252
pretty_assertions = "1.4.1"
5353
tempfile = "3.20.0"
5454
tower = "0.5.2"
55+
tokio = { version = "1.47.0", features = ["macros", "rt-multi-thread", "test-util"] }

src/discord.rs

Lines changed: 24 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,19 @@
44
//!
55
//! [webhook-api-ref]: https://discord.com/developers/docs/resources/webhook#execute-webhook
66
7-
use std::{
8-
collections::HashMap,
9-
fmt::Display,
10-
str::FromStr,
11-
sync::{Mutex, MutexGuard},
12-
time::Duration,
13-
};
7+
use std::{fmt::Display, str::FromStr, time::Duration};
148

15-
use eyre::{Context, bail};
9+
use eyre::{Context, eyre};
1610
use reqwest::{
1711
Client, Response, StatusCode, Url,
1812
header::{self, HeaderValue},
1913
};
2014
use serde::{Deserialize, Serialize};
2115
use serde_json::json;
22-
use tracing::{Instrument, error, info, info_span, instrument, warn};
16+
use tracing::{error, instrument, warn};
2317

24-
/// Maximum number of attempts to send a message before giving up.
25-
const MAX_DELIVERY_ATTEMPTS: usize = 5;
18+
/// The period within which at most one notification can be sent to each member about their site.
19+
pub const NOTIFICATION_DEBOUNCE_PERIOD: Duration = Duration::from_secs(60 * 60 * 24); // 24 hours
2620

2721
/// A user agent representing our program. [Required by Discord][api-doc-ua].
2822
///
@@ -69,15 +63,12 @@ impl Display for Snowflake {
6963
}
7064

7165
/// A notifier that sends messages to a Discord channel using a webhook URL.
72-
#[derive(Debug)]
66+
#[derive(Clone, Debug)]
7367
pub struct DiscordNotifier {
7468
/// The webhook URL to send messages to.
7569
webhook_url: reqwest::Url,
7670
/// HTTP client used to send requests.
7771
client: Client,
78-
/// Set of pending messages to be sent.
79-
/// Map from recipient -> (message, attempt count).
80-
message_queue: Mutex<HashMap<Option<Snowflake>, (String, usize)>>,
8172
}
8273

8374
impl DiscordNotifier {
@@ -87,102 +78,37 @@ impl DiscordNotifier {
8778
Self {
8879
webhook_url: webhook_url.to_owned(),
8980
client: Client::new(),
90-
message_queue: Mutex::new(HashMap::new()),
91-
}
92-
}
93-
94-
/// Lock the message queue, recovering from a poisoned lock if necessary.
95-
fn lock_message_queue(&self) -> MutexGuard<'_, HashMap<Option<Snowflake>, (String, usize)>> {
96-
// If the lock is poisoned, we clear it and return a new lock.
97-
self.message_queue.lock().unwrap_or_else(|mut err| {
98-
error!("Discord notification queue was poisoned; some notifications may be lost");
99-
**err.get_mut() = HashMap::new();
100-
self.message_queue.clear_poison();
101-
err.into_inner()
102-
})
103-
}
104-
105-
/// Enqueue a message to be sent to the Discord channel the next time notifications are
106-
/// dispatched.
107-
///
108-
/// Only one message is sent per user per notification dispatch, so if a message was already
109-
/// enqueued with the same `ping` value, it will be replaced with the new message.
110-
pub fn enqueue_message(&self, ping: Option<Snowflake>, message: String) {
111-
self.lock_message_queue().insert(ping, (message, 0));
112-
}
113-
114-
/// Dispatch all messages in the queue, sending them to Discord.
115-
///
116-
/// Returns `(n_sent, n_failed)` where `n_sent` is the number of messages that were sent
117-
/// successfully, and `n_failed` is the number of messages that failed to send. Errors are
118-
/// logged but not returned.
119-
#[instrument(name = "discord.dispatch_messages", skip(self))]
120-
pub async fn dispatch_messages(&self) -> (usize, usize) {
121-
// Take the map to avoid race conditions while sending messages.
122-
let mut map = {
123-
let mut lock = self.lock_message_queue();
124-
std::mem::take(&mut *lock)
125-
};
126-
let mut sent = 0;
127-
let mut failed = 0;
128-
for (ping, (message, attempts)) in map.drain() {
129-
let span = info_span!("discord.dispatch_message", ?ping, prev_attempts = attempts);
130-
{
131-
let _enter = span.enter();
132-
if attempts >= MAX_DELIVERY_ATTEMPTS {
133-
error!(
134-
channel = %ping.map_or("channel".to_string(), |id| id.to_string()),
135-
attempts,
136-
"Failed to send message to channel after several attempts; giving up",
137-
);
138-
continue;
139-
}
140-
}
141-
let result = self
142-
.send_message(ping, &message)
143-
.instrument(span.clone())
144-
.await;
145-
let _enter = span.enter();
146-
if result.is_err() {
147-
// If no new message was enqueued for the same recipient, retry this one.
148-
let mut lock = self.lock_message_queue();
149-
if lock.get(&ping).is_none() {
150-
lock.insert(ping, (message, attempts + 1));
151-
}
152-
failed += 1;
153-
} else {
154-
sent += 1;
155-
}
156-
}
157-
if sent > 0 || failed > 0 {
158-
info!(sent, failed, "Discord notifications dispatched");
15981
}
160-
(sent, failed)
16182
}
16283

16384
/// Send a message in the channel this notifier is registered to.
16485
#[instrument(name = "discord.send_message", skip(self, message), err(Display))]
165-
async fn send_message(&self, ping: Option<Snowflake>, message: &str) -> eyre::Result<()> {
86+
pub async fn send_message(&self, ping: Option<Snowflake>, message: &str) -> eyre::Result<()> {
87+
let result;
16688
loop {
16789
let response = self.send_single_message(ping, message).await?;
16890
// If we get rate limited, try again.
16991
// See https://discord.com/developers/docs/topics/rate-limits.
17092
if response.status() == StatusCode::TOO_MANY_REQUESTS {
171-
match response.headers().get(header::RETRY_AFTER) {
172-
Some(value) => {
173-
warn!("Retry-After" = ?value, "Hit Discord API rate limit; retrying after delay");
174-
sleep_from_retry_after(value).await?;
175-
continue;
176-
}
177-
None => bail!("Got rate-limited but response contains no Retry-After header"),
93+
if let Some(value) = response.headers().get(header::RETRY_AFTER) {
94+
warn!("Retry-After" = ?value, "Hit Discord API rate limit; retrying after delay");
95+
sleep_from_retry_after(value).await?;
96+
continue;
17897
}
98+
result = Err(eyre!(
99+
"Got rate-limited but response contains no Retry-After header"
100+
));
101+
} else {
102+
result = response
103+
.error_for_status()
104+
.wrap_err("Discord webhook returned error");
179105
}
180-
response
181-
.error_for_status()
182-
.wrap_err("Discord webhook returned error")?;
183106
break;
184107
}
185-
Ok(())
108+
if let Err(err) = &result {
109+
error!(%err, "Failed to send Discord message");
110+
}
111+
result.map(|_| ())
186112
}
187113

188114
/// Sends a single message to the Discord webhook.
@@ -238,10 +164,9 @@ async fn sleep_from_retry_after(header_val: &HeaderValue) -> eyre::Result<()> {
238164

239165
#[cfg(test)]
240166
mod tests {
241-
use httpmock::{Method, MockServer};
242167
use reqwest::Url;
243168

244-
use super::{DiscordNotifier, MAX_DELIVERY_ATTEMPTS};
169+
use super::DiscordNotifier;
245170

246171
/// Tests sending a message in Discord.
247172
///
@@ -265,59 +190,4 @@ mod tests {
265190
.await
266191
.unwrap();
267192
}
268-
269-
/// Tests that messages destined for the same user are deduplicated
270-
#[tokio::test]
271-
async fn test_dispatch_deduplicate() {
272-
let server = MockServer::start_async().await;
273-
let mock = server.mock(|when, then| {
274-
when.method(Method::POST);
275-
then.status(200);
276-
});
277-
let notifier = DiscordNotifier::new(&Url::parse(&server.base_url()).unwrap());
278-
279-
notifier.enqueue_message(Some(1234.into()), "first message".to_string());
280-
// Next message should replace the first one
281-
notifier.enqueue_message(Some(1234.into()), "second message".to_string());
282-
notifier.enqueue_message(Some(4321.into()), "message for other user".to_string());
283-
notifier.enqueue_message(None, "message for no user".to_string());
284-
285-
let (sent, failed) = notifier.dispatch_messages().await;
286-
287-
assert_eq!(3, sent);
288-
assert_eq!(0, failed);
289-
assert_eq!(3, mock.hits_async().await);
290-
291-
// Ensure sent messages were removed
292-
let (sent, failed) = notifier.dispatch_messages().await;
293-
assert_eq!(0, sent);
294-
assert_eq!(0, failed);
295-
assert_eq!(3, mock.hits_async().await); // 3 were sent last round
296-
}
297-
298-
#[tokio::test]
299-
async fn test_give_up_after_max_attempts() {
300-
let server = MockServer::start_async().await;
301-
let mock = server.mock(|when, then| {
302-
when.method(Method::POST);
303-
then.status(500);
304-
});
305-
let notifier = DiscordNotifier::new(&Url::parse(&server.base_url()).unwrap());
306-
307-
notifier.enqueue_message(Some(1234.into()), "test message".to_string());
308-
309-
// Expect MAX_DELIVERY_ATTEMPTS attempts
310-
for i in 0..MAX_DELIVERY_ATTEMPTS {
311-
let (sent, failed) = notifier.dispatch_messages().await;
312-
assert_eq!(0, sent);
313-
assert_eq!(1, failed);
314-
assert_eq!(i + 1, mock.hits_async().await);
315-
}
316-
317-
// Expect no more attempts
318-
let (sent, failed) = notifier.dispatch_messages().await;
319-
assert_eq!(0, sent);
320-
assert_eq!(0, failed);
321-
assert_eq!(MAX_DELIVERY_ATTEMPTS, mock.hits_async().await);
322-
}
323193
}

src/main.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,18 +99,6 @@ async fn main() -> ExitCode {
9999
});
100100
}
101101

102-
// Send Discord notifications every 24 hours
103-
if let Some(notifier) = webring.notifier() {
104-
const DISCORD_NOTIFICATION_INTERVAL: Duration = Duration::from_secs(60 * 60 * 24);
105-
let notifier = Arc::clone(notifier);
106-
tokio::spawn(async move {
107-
loop {
108-
notifier.dispatch_messages().await;
109-
tokio::time::sleep(DISCORD_NOTIFICATION_INTERVAL).await;
110-
}
111-
});
112-
}
113-
114102
webring.enable_ip_pruning(chrono::Duration::hours(1));
115103
if let Err(err) = webring.enable_reloading(&cli.config_file) {
116104
error!(%err, "Failed to watch configuration files for changes");

0 commit comments

Comments
 (0)