Skip to content

Commit 81df556

Browse files
committed
feat: run indexation in background thread
1 parent af51b72 commit 81df556

File tree

1 file changed

+109
-63
lines changed

1 file changed

+109
-63
lines changed

issue-bot/src/main.rs

Lines changed: 109 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use axum::{
1717
};
1818
use config::{load_config, IssueBotConfig, ServerConfig};
1919
use embeddings::inference_endpoints::EmbeddingApi;
20-
use futures::{pin_mut, TryStreamExt};
20+
use futures::{pin_mut, StreamExt};
2121
use github::GithubApi;
2222
use huggingface::HuggingfaceApi;
2323
use metrics::start_metrics_server;
@@ -41,7 +41,7 @@ use tokio::{
4141
};
4242
use tower::{BoxError, ServiceBuilder};
4343
use tower_http::trace::TraceLayer;
44-
use tracing::{info, info_span, Span};
44+
use tracing::{error, info, info_span, Instrument, Span};
4545
use tracing_subscriber::EnvFilter;
4646

4747
mod config;
@@ -399,48 +399,80 @@ async fn handle_webhooks(
399399
}
400400
}
401401
EventData::Indexation(repository) => {
402+
let embedding_api = embedding_api.clone();
403+
let github_api = github_api.clone();
404+
let pool = pool.clone();
402405
let span = info_span!(
403406
"indexation",
404407
repository_id = repository.repo_id,
405408
source = repository.source.to_string()
406409
);
407-
info!(parent: &span, "indexing started");
408-
let job = sqlx::query_as!(
409-
Job,
410-
r#"select data as "data: Json<JobData>" from jobs where repository_id = $1"#,
411-
repository.repo_id
412-
)
413-
.fetch_optional(&pool)
414-
.await?;
415-
let from_issues_page = job.as_ref().map(|j| j.data.issues_page + 1).unwrap_or(1);
416-
let issues = github_api.get_issues(from_issues_page, repository.clone());
417-
pin_mut!(issues);
418-
while let Some((issue, page)) = issues.try_next().await? {
419-
let embedding_api = embedding_api.clone();
420-
let pool = pool.clone();
421-
let source = repository.source.to_string();
422-
let comment_string = format!(
423-
"\n----\nComment: {}",
424-
issue
425-
.comments
426-
.iter()
427-
.map(|c| c.body.to_owned())
428-
.collect::<Vec<String>>()
429-
.join("\n----\nComment: ")
430-
);
431-
let issue_text = format!("# {}\n{}{}", issue.title, issue.body, comment_string);
432-
let embedding =
433-
Vector::from(embedding_api.generate_embedding(issue_text).await?);
434-
let issue_id: Option<i32> = sqlx::query_scalar!(
435-
"select id from issues where source_id = $1",
436-
issue.id.to_string()
410+
tokio::spawn(async move {
411+
info!("indexing started");
412+
let job = match sqlx::query_as!(
413+
Job,
414+
r#"select data as "data: Json<JobData>" from jobs where repository_id = $1"#,
415+
repository.repo_id
437416
)
438417
.fetch_optional(&pool)
439-
.await?;
440-
let issue_id = if let Some(id) = issue_id {
441-
id
442-
} else {
443-
sqlx::query_scalar(
418+
.await {
419+
Ok(job) => job,
420+
Err(err) => {
421+
error!(err = err.to_string(), "error fetching job");
422+
return;
423+
}
424+
};
425+
let from_issues_page =
426+
job.as_ref().map(|j| j.data.issues_page + 1).unwrap_or(1);
427+
let issues = github_api.get_issues(from_issues_page, repository.clone());
428+
pin_mut!(issues);
429+
while let Some(issue) = issues.next().await {
430+
let (issue, page) = match issue {
431+
Ok(issue) => issue,
432+
Err(err) => {
433+
error!(err = err.to_string(), "error fetching next item from issues stream");
434+
continue;
435+
}
436+
};
437+
let embedding_api = embedding_api.clone();
438+
let pool = pool.clone();
439+
let source = repository.source.to_string();
440+
let comment_string = format!(
441+
"\n----\nComment: {}",
442+
issue
443+
.comments
444+
.iter()
445+
.map(|c| c.body.to_owned())
446+
.collect::<Vec<String>>()
447+
.join("\n----\nComment: ")
448+
);
449+
let issue_text =
450+
format!("# {}\n{}{}", issue.title, issue.body, comment_string);
451+
let raw_embedding = match embedding_api.generate_embedding(issue_text).await {
452+
Ok(embedding) => embedding,
453+
Err(err) => {
454+
error!(issue_number = issue.number, err = err.to_string(), "generate embedding error");
455+
continue;
456+
}
457+
};
458+
let embedding =
459+
Vector::from(raw_embedding);
460+
let issue_id: Option<i32> = match sqlx::query_scalar!(
461+
"select id from issues where source_id = $1",
462+
issue.id.to_string()
463+
)
464+
.fetch_optional(&pool)
465+
.await {
466+
Ok(id) => id,
467+
Err(err) => {
468+
error!(issue_number = issue.number, err = err.to_string(), "failed to fetch issue id");
469+
continue;
470+
}
471+
};
472+
let issue_id = if let Some(id) = issue_id {
473+
id
474+
} else {
475+
match sqlx::query_scalar(
444476
r#"insert into issues (source_id, source, title, body, is_pull_request, number, html_url, url, embedding)
445477
values ($1, $2, $3, $4, $5, $6, $7, $8, $9)
446478
returning id"#
@@ -455,23 +487,31 @@ async fn handle_webhooks(
455487
.bind(issue.url)
456488
.bind(embedding)
457489
.fetch_one(&pool)
458-
.await?
459-
};
460-
if !issue.comments.is_empty() {
461-
let mut qb = QueryBuilder::new(
462-
"insert into comments (source_id, body, url, issue_id)",
463-
);
464-
qb.push_values(issue.comments, |mut b, comment| {
465-
b.push_bind(comment.id)
466-
.push_bind(comment.body)
467-
.push_bind(comment.url)
468-
.push_bind(issue_id);
469-
});
470-
qb.push("on conflict do nothing");
471-
qb.build().execute(&pool).await?;
472-
}
473-
if let Some(page) = page {
474-
sqlx::query(
490+
.await {
491+
Ok(id) => id,
492+
Err(err) => {
493+
error!(issue_number = issue.number, err = err.to_string(), "error inserting issue");
494+
continue;
495+
}
496+
}
497+
};
498+
if !issue.comments.is_empty() {
499+
let mut qb = QueryBuilder::new(
500+
"insert into comments (source_id, body, url, issue_id)",
501+
);
502+
qb.push_values(issue.comments, |mut b, comment| {
503+
b.push_bind(comment.id)
504+
.push_bind(comment.body)
505+
.push_bind(comment.url)
506+
.push_bind(issue_id);
507+
});
508+
qb.push("on conflict do nothing");
509+
if let Err(err) = qb.build().execute(&pool).await {
510+
error!(issue_number = issue.number, err = err.to_string(), "error inserting comments");
511+
}
512+
}
513+
if let Some(page) = page {
514+
if let Err(err) = sqlx::query(
475515
r#"insert into jobs (repository_id, data)
476516
values ($1, jsonb_build_object('issues_page', $2))
477517
on conflict (repository_id)
@@ -483,16 +523,22 @@ async fn handle_webhooks(
483523
.bind(&repository.repo_id)
484524
.bind(page)
485525
.execute(&pool)
486-
.await?;
526+
.await {
527+
error!(issue_number = issue.number, err = err.to_string(), "error inserting job")
528+
}
529+
}
487530
}
488-
}
489-
sqlx::query!(
490-
"delete from jobs where repository_id = $1",
491-
repository.repo_id
492-
)
493-
.execute(&pool)
494-
.await?;
495-
info!(parent: &span, "finished indexing {repository}");
531+
if let Err(err) = sqlx::query!(
532+
"delete from jobs where repository_id = $1",
533+
repository.repo_id
534+
)
535+
.execute(&pool)
536+
.await {
537+
error!(err = err.to_string(), "failed to delete job");
538+
return;
539+
}
540+
info!("finished indexing");
541+
}.instrument(span));
496542
None
497543
}
498544
};

0 commit comments

Comments
 (0)