Skip to content

Commit

Permalink
fix(indexer): fan out when build index chunks
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Zhang <[email protected]>
  • Loading branch information
zwpaper committed Dec 2, 2024
1 parent 5c15a7c commit 3e510cf
Showing 1 changed file with 32 additions and 18 deletions.
50 changes: 32 additions & 18 deletions crates/tabby-index/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::{collections::HashSet, sync::mpsc::sync_channel};

use anyhow::{bail, Result};
use async_stream::stream;
Expand Down Expand Up @@ -76,35 +76,49 @@ impl<T: ToIndexId> TantivyDocBuilder<T> {
let doc_id = id.clone();
let doc_attributes = self.builder.build_attributes(&document).await;
let s = stream! {
let mut failed_count: u64 = 0;
let (tx, rx) = sync_channel(32);

for await chunk_doc in self.build_chunks(cloned_id, source_id.clone(), updated_at, document).await {
match chunk_doc.await {
Ok(Ok(doc)) => {
yield tokio::spawn(async move { Some(doc) });
}
Ok(Err(e)) => {
warn!("Failed to build chunk for document '{}': {}", doc_id, e);
failed_count += 1;
}
Err(e) => {
warn!("Failed to call build chunk '{}': {}", doc_id, e);
failed_count += 1;
let tx = tx.clone();
let doc_id = doc_id.clone();
yield tokio::spawn(async move {
match chunk_doc.await {
Ok(Ok(doc)) => {
Some(doc)
}
Ok(Err(e)) => {
warn!("Failed to build chunk for document '{}': {}", doc_id, e);
tx.send(1).unwrap();
None
}
Err(e) => {
warn!("Failed to call build chunk '{}': {}", doc_id, e);
tx.send(1).unwrap();
None
}
}
}
});
};

// drop tx to signal the end of the stream
// the cloned is dropped in its own thread
drop(tx);

let mut doc = doc! {
schema.field_id => doc_id,
schema.field_source_id => source_id,
schema.field_corpus => self.corpus,
schema.field_attributes => doc_attributes,
schema.field_updated_at => updated_at,
};
if failed_count > 0 {
doc.add_u64(schema.field_failed_chunks_count, failed_count);
}

yield tokio::spawn(async move { Some(doc) });
yield tokio::spawn(async move {
let failed_count = rx.iter().count();
if failed_count > 0 {
doc.add_u64(schema.field_failed_chunks_count, failed_count as u64);
}
Some(doc)
});
};

(id, s)
Expand Down

0 comments on commit 3e510cf

Please sign in to comment.