@@ -17,7 +17,7 @@ use axum::{
1717} ;
1818use config:: { load_config, IssueBotConfig , ServerConfig } ;
1919use embeddings:: inference_endpoints:: EmbeddingApi ;
20- use futures:: { pin_mut, StreamExt } ;
20+ use futures:: { pin_mut, TryStreamExt } ;
2121use github:: GithubApi ;
2222use huggingface:: HuggingfaceApi ;
2323use metrics:: start_metrics_server;
@@ -30,6 +30,7 @@ use slack::Slack;
3030use sqlx:: {
3131 postgres:: { PgConnectOptions , PgPoolOptions } ,
3232 prelude:: FromRow ,
33+ types:: Json ,
3334 Pool , Postgres , QueryBuilder ,
3435} ;
3536use tokio:: {
@@ -244,11 +245,20 @@ struct ClosestIssue {
244245 title : String ,
245246 number : i32 ,
246247 html_url : String ,
247- body : String ,
248248 #[ allow( unused) ]
249249 cosine_similarity : f64 ,
250250}
251251
252+ #[ derive( Debug , Deserialize , FromRow ) ]
253+ struct JobData {
254+ issues_page : i32 ,
255+ }
256+
257+ #[ derive( Debug , FromRow ) ]
258+ struct Job {
259+ data : Json < JobData > ,
260+ }
261+
252262async fn handle_webhooks_wrapper (
253263 rx : Receiver < EventData > ,
254264 embedding_api : EmbeddingApi ,
@@ -282,7 +292,7 @@ async fn handle_webhooks(
282292 Vector :: from ( embedding_api. generate_embedding ( issue_text) . await ?) ;
283293
284294 let closest_issues: Vec < ClosestIssue > = sqlx:: query_as (
285- "select title, number, html_url, body, 1 - (embedding <=> $1) as cosine_similarity from issues order by embedding <=> $1 LIMIT 3" ,
295+ "select title, number, html_url, 1 - (embedding <=> $1) as cosine_similarity from issues order by embedding <=> $1 LIMIT 3" ,
286296 )
287297 . bind ( embedding. clone ( ) )
288298 . fetch_all ( & pool)
@@ -395,17 +405,17 @@ async fn handle_webhooks(
395405 source = repository. source. to_string( )
396406 ) ;
397407 info ! ( parent: & span, "indexing started" ) ;
398- let from_page = sqlx:: query!(
399- "select page from jobs where repository_id = $1" ,
408+ let job = sqlx:: query_as!(
409+ Job ,
410+ r#"select data as "data: Json<JobData>" from jobs where repository_id = $1"# ,
400411 repository. repo_id
401412 )
402- . map ( |row| row. page + 1 )
403413 . fetch_optional ( & pool)
404414 . await ?;
405- let issues = github_api. get_issues ( from_page, repository. clone ( ) ) ;
415+ let from_issues_page = job. as_ref ( ) . map ( |j| j. data . issues_page + 1 ) . unwrap_or ( 1 ) ;
416+ let issues = github_api. get_issues ( from_issues_page, repository. clone ( ) ) ;
406417 pin_mut ! ( issues) ;
407- while let Some ( issue) = issues. next ( ) . await {
408- let ( issue, page) = issue?;
418+ while let Some ( ( issue, page) ) = issues. try_next ( ) . await ? {
409419 let embedding_api = embedding_api. clone ( ) ;
410420 let pool = pool. clone ( ) ;
411421 let source = repository. source . to_string ( ) ;
@@ -461,19 +471,19 @@ async fn handle_webhooks(
461471 qb. build ( ) . execute ( & pool) . await ?;
462472 }
463473 if let Some ( page) = page {
464- sqlx:: query! (
465- r#"insert into jobs (repository_id, page )
466- values ($1, $2 )
474+ sqlx:: query (
475+ r#"insert into jobs (repository_id, data )
476+ values ($1, jsonb_build_object('issues_page', $2) )
467477 on conflict (repository_id)
468478 do update
469479 set
470- page = excluded.page ,
480+ data = jsonb_set(jobs.data, '{issues_page}', to_jsonb($2::int), true) ,
471481 updated_at = current_timestamp"# ,
472- repository . repo_id ,
473- page ,
474- )
475- . execute ( & pool)
476- . await ?;
482+ )
483+ . bind ( & repository . repo_id )
484+ . bind ( page )
485+ . execute ( & pool)
486+ . await ?;
477487 }
478488 }
479489 sqlx:: query!(
0 commit comments