Skip to content

Commit

Permalink
Take review comments into account.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Jan 18, 2024
1 parent ddfefa6 commit 6099917
Showing 1 changed file with 13 additions and 19 deletions.
32 changes: 13 additions & 19 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::ops::Range;
use std::ffi::OsStr;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -153,29 +153,21 @@ impl TypedSourceFactory for FileSourceFactory {
let (dir_uri, file_name) = dir_and_filename(filepath)?;
let storage = ctx.storage_resolver.resolve(&dir_uri).await?;
let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
if offset > file_size {
return Err(anyhow::anyhow!(
"offset {} can't be greater than the file size {}",
offset,
file_size
));
}
// If it's a gzip file, we can't seek to a specific offset, we need to start from the
// beginning of the file, decompress and skip the first `offset` bytes.
if filepath.extension().map_or(false, |ext| ext == "gz") {
let stream = storage
.get_slice_stream(
file_name,
Range {
start: 0,
end: file_size,
},
)
.await?;
assert!(file_size >= offset, "file size should be >= offset");
if filepath.extension() == Some(OsStr::new("gz")) {
let stream = storage.get_slice_stream(file_name, 0..file_size).await?;
FileSourceReader::new(Box::new(GzipDecoder::new(BufReader::new(stream))), offset)
} else {
let stream = storage
.get_slice_stream(
file_name,
Range {
start: offset,
end: file_size,
},
)
.get_slice_stream(file_name, offset..file_size)
.await?;
FileSourceReader::new(stream, 0)
}
Expand Down Expand Up @@ -210,6 +202,8 @@ impl FileSourceReader {
}
}

// This function is only called for GZIP file.
// Because they cannot be seeked into, we have to scan them to the right initial position.
async fn skip(&mut self) -> io::Result<()> {
// Allocate once a 64kb buffer.
let mut buf = [0u8; 64000];
Expand Down

0 comments on commit 6099917

Please sign in to comment.