Skip to content

Commit

Permalink
Nidx export import (#2821)
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino authored Jan 30, 2025
1 parent 07e053e commit 67a6048
Show file tree
Hide file tree
Showing 20 changed files with 675 additions and 44 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 16 additions & 19 deletions nidx/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nidx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ tracing-core = { version = "0.1.33", optional = true }
tracing-serde = { version = "0.2.0", optional = true }
clap = { version = "4.5.21", features = ["derive"] }
arc-swap = "1.7.1"
tokio-stream = "0.1.17"
zstd = "0.13.2"

[features]
default = ["telemetry"]
Expand Down
4 changes: 2 additions & 2 deletions nidx/nidx_binding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl NidxBinding {
let _ = tracing_subscriber::fmt::try_init();

// API server
let api_service = ApiServer::new(settings.metadata.clone()).into_service();
let api_service = ApiServer::new(&settings).into_router();
let api_server = GrpcServer::new("localhost:0").await?;
let api_port = api_server.port()?;
tokio::task::spawn(api_server.serve(api_service, shutdown.clone()));
Expand All @@ -124,7 +124,7 @@ impl NidxBinding {
let searcher_api = SearchServer::new(searcher.index_cache(), ShardSelector::new_single());
let searcher_server = GrpcServer::new("localhost:0").await?;
let searcher_port = searcher_server.port()?;
tokio::task::spawn(searcher_server.serve(searcher_api.into_service(), shutdown.clone()));
tokio::task::spawn(searcher_server.serve(searcher_api.into_router(), shutdown.clone()));
let settings_copy = settings.clone();
let shutdown2 = shutdown.clone();
tokio::task::spawn(async move {
Expand Down
4 changes: 1 addition & 3 deletions nidx/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use tracing::debug;
use crate::{grpc_server::GrpcServer, Settings};

pub async fn run(settings: Settings, shutdown: CancellationToken) -> anyhow::Result<()> {
let meta = settings.metadata.clone();

let service = grpc::ApiServer::new(meta).into_service();
let service = grpc::ApiServer::new(&settings).into_router();
let server = GrpcServer::new("0.0.0.0:10000").await?;
debug!("Running API at port {}", server.port()?);
server.serve(service, shutdown).await?;
Expand Down
98 changes: 93 additions & 5 deletions nidx/src/api/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,51 @@
//

use std::collections::HashMap;
use std::io::Write;
use std::str::FromStr;
use std::sync::Arc;

use crate::errors::NidxError;
use crate::metadata::{Index, IndexKind, Shard};
use crate::metadata::{Index, IndexId, IndexKind, Shard};
use axum::body::{Body, Bytes};
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use nidx_protos::nidx::nidx_api_server::*;
use nidx_protos::*;
use nidx_vector::config::VectorConfig;
use object_store::DynObjectStore;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use tonic::service::Routes;
use tonic::{Request, Response, Result, Status};
use tracing::error;
use uuid::Uuid;

use crate::api::shards;
use crate::NidxMetadata;
use crate::{import_export, NidxMetadata, Settings};

#[derive(Clone)]
pub struct ApiServer {
meta: NidxMetadata,
storage: Arc<DynObjectStore>,
}

impl ApiServer {
pub fn new(meta: NidxMetadata) -> Self {
pub fn new(settings: &Settings) -> Self {
Self {
meta,
meta: settings.metadata.clone(),
storage: settings.storage.as_ref().unwrap().object_store.clone(),
}
}

pub fn into_service(self) -> Routes {
pub fn into_router(self) -> axum::Router {
let myself = self.clone();
let myself2 = self.clone();
Routes::new(NidxApiServer::new(self))
.into_axum_router()
.route("/api/shard/:shard_id/download", axum::routing::get(download_shard).with_state(myself))
.route("/api/index/:index_id/download", axum::routing::get(download_index).with_state(myself2))
}
}

Expand Down Expand Up @@ -182,3 +200,73 @@ impl NidxApi for ApiServer {
Ok(Response::new(NodeMetadata::default()))
}
}

struct ChannelWriter(Sender<anyhow::Result<Bytes>>);

impl Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if let Err(e) = self.0.blocking_send(Ok(Bytes::copy_from_slice(buf))) {
return Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e));
};

Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

async fn download_shard(State(server): State<ApiServer>, Path(shard_id): Path<Uuid>) -> impl IntoResponse {
let shard = Shard::get(&server.meta.pool, shard_id).await;
if let Err(e) = shard {
return axum::response::Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(e.to_string()))
.unwrap();
};

download_export(server, shard_id, None, shard_id.to_string()).await
}

async fn download_index(State(server): State<ApiServer>, Path(index_id): Path<i64>) -> impl IntoResponse {
let index = match Index::get(&server.meta.pool, index_id.into()).await {
Ok(index) => index,
Err(e) => {
return axum::response::Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(e.to_string()))
.unwrap();
}
};

download_export(server, index.shard_id, Some(index.id), index_id.to_string()).await
}

async fn download_export(
server: ApiServer,
shard_id: Uuid,
index_id: Option<IndexId>,
filename: String,
) -> axum::response::Response<axum::body::Body> {
let (tx, rx) = tokio::sync::mpsc::channel(10);
let txc = tx.clone();
let writer = ChannelWriter(tx);

tokio::spawn(async move {
let r = import_export::export_shard(server.meta, server.storage, shard_id, index_id, writer).await;
// If an error happens during the export, send it to the stream so axum cancels the download
// and the user gets an error. Status code will still be 200 since this happens in the middle
// of a stream, but it's better than nothing
if let Err(e) = r {
error!("Error exporting shard: {e:?}");
let _ = txc.send(Err(e)).await;
}
});

let body = Body::from_stream(ReceiverStream::new(rx));
axum::response::Response::builder()
.header("Content-Disposition", format!("attachment; filename=\"{filename}.tar.zstd\""))
.body(body)
.unwrap()
}
Loading

0 comments on commit 67a6048

Please sign in to comment.