Skip to content

Commit 48ad83e

Browse files
gautamg795Convex, Inc.
authored andcommitted
table iterator fixes
- add fallback handling for prev revs queries - move database error check to helper fn - make timeout retriable in table iterator GitOrigin-RevId: 88cdb6e0eb7fd7ccc411fe00184725a8c20ed388
1 parent e7bd30e commit 48ad83e

File tree

3 files changed

+136
-39
lines changed

3 files changed

+136
-39
lines changed

crates/common/src/knobs.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ pub static DEFAULT_DOCUMENTS_PAGE_SIZE: LazyLock<u32> =
185185
pub static DOCUMENTS_IN_MEMORY: LazyLock<usize> =
186186
LazyLock::new(|| env_config("DOCUMENTS_IN_MEMORY", 512));
187187

188-
/// Number of times to retry on out-of-retention errors in TableIterator
188+
/// Number of times to retry on retriable errors (out-of-retention, database
189+
/// timeout) in TableIterator
189190
pub static TABLE_ITERATOR_MAX_RETRIES: LazyLock<u32> =
190191
LazyLock::new(|| env_config("TABLE_ITERATOR_MAX_RETRIES", 1));
191192

crates/database/src/table_iteration.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use anyhow::Context;
1212
use common::{
1313
bootstrap_model::index::database_index::IndexedFields,
1414
document::ResolvedDocument,
15+
errors::DatabaseTimeoutError,
1516
index::{
1617
IndexKey,
1718
IndexKeyBytes,
@@ -584,8 +585,11 @@ impl<RT: Runtime> TableIteratorInner<RT> {
584585
);
585586
let documents_in_page: Vec<_> = match stream.take(self.page_size).try_collect().await {
586587
Ok(docs) => docs,
587-
Err(e) if attempt < *TABLE_ITERATOR_MAX_RETRIES && e.is_out_of_retention() => {
588-
tracing::warn!("TableIterator hit out-of-retention error {e}, retrying...");
588+
Err(e)
589+
if attempt < *TABLE_ITERATOR_MAX_RETRIES
590+
&& (e.is_out_of_retention() || e.is::<DatabaseTimeoutError>()) =>
591+
{
592+
tracing::warn!("TableIterator hit retriable error {e}, retrying...");
589593
continue;
590594
},
591595
Err(e) => return Err(e),

crates/mysql/src/lib.rs

Lines changed: 128 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,21 @@ use crate::{
129129
},
130130
};
131131

132+
/// Checks if an error is the Vitess "message too large" error that occurs
133+
/// when query results exceed 64MiB.
134+
fn is_message_too_large_error(error: &anyhow::Error) -> Option<&mysql_async::ServerError> {
135+
error
136+
.chain()
137+
.find_map(|e| e.downcast_ref::<mysql_async::ServerError>())
138+
.filter(|db_err| {
139+
db_err.state == "HY000"
140+
&& db_err.code == 1105
141+
&& db_err
142+
.message
143+
.contains("trying to send message larger than max")
144+
})
145+
}
146+
132147
#[derive(Clone, Debug)]
133148
pub struct MySqlInstanceName {
134149
raw: String,
@@ -749,16 +764,7 @@ impl<RT: Runtime> MySqlReader<RT> {
749764
params.push((page_size as i64).into());
750765
let stream_result = match client.query_stream(query, params, page_size as usize).await {
751766
Ok(stream) => Ok(stream),
752-
Err(ref e)
753-
if let Some(db_err) = e
754-
.chain()
755-
.find_map(|e| e.downcast_ref::<mysql_async::ServerError>())
756-
&& db_err.state == "HY000"
757-
&& db_err.code == 1105
758-
&& db_err
759-
.message
760-
.contains("trying to send message larger than max") =>
761-
{
767+
Err(ref e) if let Some(db_err) = is_message_too_large_error(e) => {
762768
if page_size == 1 {
763769
anyhow::bail!(
764770
"Failed to load documents with minimum page size `1`: {}",
@@ -924,16 +930,7 @@ impl<RT: Runtime> MySqlReader<RT> {
924930
.await
925931
{
926932
Ok(stream) => Ok(stream),
927-
Err(ref e)
928-
if let Some(db_err) = e
929-
.chain()
930-
.find_map(|e| e.downcast_ref::<mysql_async::ServerError>())
931-
&& db_err.state == "HY000"
932-
&& db_err.code == 1105
933-
&& db_err
934-
.message
935-
.contains("trying to send message larger than max") =>
936-
{
933+
Err(ref e) if let Some(db_err) = is_message_too_large_error(e) => {
937934
if batch_size == 1 {
938935
anyhow::bail!(
939936
"Failed to load index rows with minimum page size `1`: {}",
@@ -1223,18 +1220,34 @@ impl<RT: Runtime> PersistenceReader for MySqlReader<RT> {
12231220
) -> anyhow::Result<BTreeMap<DocumentPrevTsQuery, DocumentLogEntry>> {
12241221
let timer = metrics::previous_revisions_of_documents_timer(self.read_pool.cluster_name());
12251222

1226-
let mut client = self
1227-
.read_pool
1228-
.acquire("previous_revisions_of_documents", &self.db_name)
1229-
.await?;
12301223
let ids: Vec<_> = ids.into_iter().collect();
12311224

12321225
let mut result = BTreeMap::new();
12331226

12341227
let multitenant = self.multitenant;
12351228
let instance_name: mysql_async::Value = (&self.instance_name.raw).into();
12361229

1237-
for chunk in smart_chunks(&ids) {
1230+
// Track remaining items to process and fallback chunk size
1231+
let mut remaining: &[DocumentPrevTsQuery] = &ids;
1232+
let mut fallback_chunk_size: Option<usize> = None;
1233+
1234+
while !remaining.is_empty() {
1235+
// Avoid holding connections across yield points, to limit lifetime
1236+
// and improve fairness.
1237+
let mut client = self
1238+
.read_pool
1239+
.acquire("previous_revisions_of_documents", &self.db_name)
1240+
.await?;
1241+
1242+
// Determine chunk - either use smart_chunks or fallback size
1243+
let chunk = if let Some(max_size) = fallback_chunk_size {
1244+
let len = remaining.len().min(max_size);
1245+
&remaining[..len]
1246+
} else {
1247+
// Use first chunk from smart_chunks
1248+
smart_chunks(remaining).next().unwrap()
1249+
};
1250+
12381251
let mut params = Vec::with_capacity(
12391252
chunk.len() * (sql::EXACT_REV_CHUNK_PARAMS + multitenant as usize),
12401253
);
@@ -1253,13 +1266,42 @@ impl<RT: Runtime> PersistenceReader for MySqlReader<RT> {
12531266
// deduplicate, so create a map from DB results back to queries
12541267
id_ts_to_query.entry((id, prev_ts)).or_default().push(*q);
12551268
}
1256-
let result_stream = client
1269+
let result_stream = match client
12571270
.query_stream(
12581271
sql::exact_rev_chunk(chunk.len(), multitenant),
12591272
params,
12601273
chunk.len(),
12611274
)
1262-
.await?;
1275+
.await
1276+
{
1277+
Ok(stream) => Ok(stream),
1278+
Err(ref e) if let Some(db_err) = is_message_too_large_error(e) => {
1279+
let current_size = fallback_chunk_size.unwrap_or(chunk.len());
1280+
if current_size == 1 {
1281+
anyhow::bail!(
1282+
"Failed to load previous revisions of documents with minimum chunk \
1283+
size `1`: {}",
1284+
db_err.message,
1285+
);
1286+
}
1287+
if current_size <= *MYSQL_FALLBACK_PAGE_SIZE as usize {
1288+
tracing::warn!(
1289+
"Falling back to chunk size `1` due to repeated server error: {}",
1290+
db_err.message
1291+
);
1292+
fallback_chunk_size = Some(1);
1293+
} else {
1294+
tracing::warn!(
1295+
"Falling back to chunk size `{}` due to server error: {}",
1296+
*MYSQL_FALLBACK_PAGE_SIZE,
1297+
db_err.message
1298+
);
1299+
fallback_chunk_size = Some(*MYSQL_FALLBACK_PAGE_SIZE as usize);
1300+
}
1301+
continue;
1302+
},
1303+
Err(e) => Err(e),
1304+
}?;
12631305
pin_mut!(result_stream);
12641306
while let Some(row) = result_stream.try_next().await? {
12651307
let (prev_ts, id, maybe_doc, prev_prev_ts) = self.row_to_document(&row)?;
@@ -1278,6 +1320,9 @@ impl<RT: Runtime> PersistenceReader for MySqlReader<RT> {
12781320
anyhow::ensure!(result.insert(q, entry).is_none());
12791321
}
12801322
}
1323+
1324+
// Advance past the processed chunk
1325+
remaining = &remaining[chunk.len()..];
12811326
}
12821327

12831328
if let Some(min_ts) = ids.iter().map(|DocumentPrevTsQuery { ts, .. }| *ts).min() {
@@ -1298,10 +1343,6 @@ impl<RT: Runtime> PersistenceReader for MySqlReader<RT> {
12981343
) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>> {
12991344
let timer = metrics::prev_revisions_timer(self.read_pool.cluster_name());
13001345

1301-
let mut client = self
1302-
.read_pool
1303-
.acquire("previous_revisions", &self.db_name)
1304-
.await?;
13051346
let ids: Vec<_> = ids.into_iter().collect();
13061347

13071348
let mut result = BTreeMap::new();
@@ -1312,7 +1353,27 @@ impl<RT: Runtime> PersistenceReader for MySqlReader<RT> {
13121353
let multitenant = self.multitenant;
13131354
let instance_name: mysql_async::Value = (&self.instance_name.raw).into();
13141355

1315-
for chunk in smart_chunks(&ids) {
1356+
// Track remaining items to process and fallback chunk size
1357+
let mut remaining: &[(InternalDocumentId, Timestamp)] = &ids;
1358+
let mut fallback_chunk_size: Option<usize> = None;
1359+
1360+
while !remaining.is_empty() {
1361+
// Avoid holding connections across yield points, to limit lifetime
1362+
// and improve fairness.
1363+
let mut client = self
1364+
.read_pool
1365+
.acquire("previous_revisions", &self.db_name)
1366+
.await?;
1367+
1368+
// Determine chunk - either use smart_chunks or fallback size
1369+
let chunk = if let Some(max_size) = fallback_chunk_size {
1370+
let len = remaining.len().min(max_size);
1371+
&remaining[..len]
1372+
} else {
1373+
// Use first chunk from smart_chunks
1374+
smart_chunks(remaining).next().unwrap()
1375+
};
1376+
13161377
let mut params = Vec::with_capacity(
13171378
chunk.len() * (sql::PREV_REV_CHUNK_PARAMS + multitenant as usize),
13181379
);
@@ -1326,17 +1387,48 @@ impl<RT: Runtime> PersistenceReader for MySqlReader<RT> {
13261387
}
13271388
min_ts = cmp::min(*ts, min_ts);
13281389
}
1329-
let result_stream = client
1390+
let result_stream = match client
13301391
.query_stream(
13311392
sql::prev_rev_chunk(chunk.len(), multitenant),
13321393
params,
13331394
chunk.len(),
13341395
)
1335-
.await?;
1396+
.await
1397+
{
1398+
Ok(stream) => Ok(stream),
1399+
Err(ref e) if let Some(db_err) = is_message_too_large_error(e) => {
1400+
let current_size = fallback_chunk_size.unwrap_or(chunk.len());
1401+
if current_size == 1 {
1402+
anyhow::bail!(
1403+
"Failed to load previous revisions with minimum chunk size `1`: {}",
1404+
db_err.message,
1405+
);
1406+
}
1407+
if current_size <= *MYSQL_FALLBACK_PAGE_SIZE as usize {
1408+
tracing::warn!(
1409+
"Falling back to chunk size `1` due to repeated server error: {}",
1410+
db_err.message
1411+
);
1412+
fallback_chunk_size = Some(1);
1413+
} else {
1414+
tracing::warn!(
1415+
"Falling back to chunk size `{}` due to server error: {}",
1416+
*MYSQL_FALLBACK_PAGE_SIZE,
1417+
db_err.message
1418+
);
1419+
fallback_chunk_size = Some(*MYSQL_FALLBACK_PAGE_SIZE as usize);
1420+
}
1421+
continue;
1422+
},
1423+
Err(e) => Err(e),
1424+
}?;
13361425
pin_mut!(result_stream);
1337-
while let Some(result) = result_stream.try_next().await? {
1338-
results.push(result);
1426+
while let Some(row) = result_stream.try_next().await? {
1427+
results.push(row);
13391428
}
1429+
1430+
// Advance past the processed chunk
1431+
remaining = &remaining[chunk.len()..];
13401432
}
13411433
for row in results.into_iter() {
13421434
let ts: i64 = row.get_opt(6).context("row[6]")??;

0 commit comments

Comments
 (0)