Skip to content

Commit

Permalink
Create empty shards if necessary (#4375)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Jan 11, 2024
1 parent bb7043a commit 9a6685f
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::HashMap;

use itertools::Itertools;
use quickwit_doc_mapper::{BinaryFormat, FieldMappingType};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::SourceId;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -59,6 +60,7 @@ pub(crate) struct FileBackedIndexV0_7 {
#[serde(rename = "index")]
metadata: IndexMetadata,
splits: Vec<Split>,
// TODO: Remove `skip_serializing_if` when we release ingest v2.
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
shards: HashMap<SourceId, SerdeShards>,
#[serde(default)]
Expand All @@ -76,6 +78,7 @@ impl From<FileBackedIndex> for FileBackedIndexV0_7 {
.per_source_shards
.into_iter()
.filter_map(|(source_id, shards)| {
// TODO: Remove this filter when we release ingest v2.
// Skip serializing empty shards since the feature is hidden and disabled by
// default. This way, we can still modify the serialization format without worrying
// about backward compatibility post `0.7`.
Expand Down Expand Up @@ -130,7 +133,7 @@ impl From<FileBackedIndexV0_7> for FileBackedIndex {
split.split_metadata.index_uid = index.metadata.index_uid.clone();
}
}
let shards = index
let mut shards: HashMap<SourceId, Shards> = index
.shards
.into_iter()
.map(|(source_id, serde_shards)| {
Expand All @@ -141,6 +144,16 @@ impl From<FileBackedIndexV0_7> for FileBackedIndex {
)
})
.collect();
// TODO: Remove this when we release ingest v2.
for source in index.metadata.sources.values() {
if source.source_type() == SourceType::IngestV2
&& !shards.contains_key(&source.source_id)
{
let index_uid = index.metadata.index_uid.clone();
let source_id = source.source_id.clone();
shards.insert(source_id.clone(), Shards::empty(index_uid, source_id));
}
}
Self::new(index.metadata, index.splits, shards, index.delete_tasks)
}
}

0 comments on commit 9a6685f

Please sign in to comment.