Skip to content

Commit f8ce467

Browse files
committed
fix: Prefetch messages in limited batches (#6915)
I have logs from a user where messages are prefetched for long minutes, and while it's not a problem on its own, we can't rely that the connection overlives such a period, so make `fetch_new_messages()` prefetch (and then actually download) messages in batches of 500 messages.
1 parent 4e47ebd commit f8ce467

File tree

2 files changed

+44
-19
lines changed

2 files changed

+44
-19
lines changed

src/imap.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -555,10 +555,38 @@ impl Imap {
555555
}
556556
session.new_mail = false;
557557

558+
let mut read_cnt = 0;
559+
loop {
560+
let (n, fetch_more) = self
561+
.fetch_new_msg_batch(context, session, folder, folder_meaning)
562+
.await?;
563+
read_cnt += n;
564+
if !fetch_more {
565+
return Ok(read_cnt > 0);
566+
}
567+
}
568+
}
569+
570+
/// Returns number of messages processed and whether the function should be called again.
571+
async fn fetch_new_msg_batch(
572+
&mut self,
573+
context: &Context,
574+
session: &mut Session,
575+
folder: &str,
576+
folder_meaning: FolderMeaning,
577+
) -> Result<(usize, bool)> {
558578
let uid_validity = get_uidvalidity(context, folder).await?;
559579
let old_uid_next = get_uid_next(context, folder).await?;
580+
info!(
581+
context,
582+
"fetch_new_messages({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
583+
);
560584

561-
let msgs = session.prefetch(old_uid_next).await.context("prefetch")?;
585+
let uids_to_prefetch = 500;
586+
let msgs = session
587+
.prefetch(old_uid_next, uids_to_prefetch)
588+
.await
589+
.context("prefetch")?;
562590
let read_cnt = msgs.len();
563591

564592
let download_limit = context.download_limit().await?;
@@ -718,7 +746,8 @@ impl Imap {
718746
largest_uid_fetched
719747
};
720748

721-
let actually_download_messages_future = async move {
749+
let actually_download_messages_future = async {
750+
let sender = sender;
722751
let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1));
723752
let mut fetch_partially = false;
724753
uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1));
@@ -753,14 +782,17 @@ impl Imap {
753782
// if the message has arrived after selecting mailbox
754783
// and determining its UIDNEXT and before prefetch.
755784
let mut new_uid_next = largest_uid_fetched + 1;
756-
if fetch_res.is_ok() {
785+
let fetch_more = fetch_res.is_ok() && {
786+
let prefetch_uid_next = old_uid_next + uids_to_prefetch;
757787
// If we have successfully fetched all messages we planned during prefetch,
758788
// then we have covered at least the range between old UIDNEXT
759789
// and UIDNEXT of the mailbox at the time of selecting it.
760-
new_uid_next = max(new_uid_next, mailbox_uid_next);
790+
new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
761791

762792
new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
763-
}
793+
794+
prefetch_uid_next < mailbox_uid_next
795+
};
764796
if new_uid_next > old_uid_next {
765797
set_uid_next(context, folder, new_uid_next).await?;
766798
}
@@ -777,7 +809,7 @@ impl Imap {
777809
// establish a new session if this one is broken.
778810
fetch_res?;
779811

780-
Ok(read_cnt > 0)
812+
Ok((read_cnt, fetch_more))
781813
}
782814

783815
/// Read the recipients from old emails sent by the user and add them as contacts.

src/imap/session.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,16 @@ impl Session {
110110
Ok(list)
111111
}
112112

113-
/// Prefetch all messages greater than or equal to `uid_next`. Returns a list of fetch results
114-
/// in the order of ascending delivery time to the server (INTERNALDATE).
113+
/// Prefetch `n_uids` messages starting from `uid_next`. Returns a list of fetch results in the
114+
/// order of ascending delivery time to the server (INTERNALDATE).
115115
pub(crate) async fn prefetch(
116116
&mut self,
117117
uid_next: u32,
118+
n_uids: u32,
118119
) -> Result<Vec<(u32, async_imap::types::Fetch)>> {
120+
let uid_last = uid_next.saturating_add(n_uids - 1);
119121
// fetch messages with larger UID than the last one seen
120-
let set = format!("{uid_next}:*");
122+
let set = format!("{uid_next}:{uid_last}");
121123
let mut list = self
122124
.uid_fetch(set, PREFETCH_FLAGS)
123125
.await
@@ -126,16 +128,7 @@ impl Session {
126128
let mut msgs = BTreeMap::new();
127129
while let Some(msg) = list.try_next().await? {
128130
if let Some(msg_uid) = msg.uid {
129-
// If the mailbox is not empty, results always include
130-
// at least one UID, even if last_seen_uid+1 is past
131-
// the last UID in the mailbox. It happens because
132-
// uid:* is interpreted the same way as *:uid.
133-
// See <https://tools.ietf.org/html/rfc3501#page-61> for
134-
// standard reference. Therefore, sometimes we receive
135-
// already seen messages and have to filter them out.
136-
if msg_uid >= uid_next {
137-
msgs.insert((msg.internal_date(), msg_uid), msg);
138-
}
131+
msgs.insert((msg.internal_date(), msg_uid), msg);
139132
}
140133
}
141134

0 commit comments

Comments
 (0)