Skip to content

Commit

Permalink
Disabled toggling of ingest api source and protect againt toggle/dele…
Browse files Browse the repository at this point in the history
…te on server side. (#2914)
  • Loading branch information
fmassot authored Mar 2, 2023
1 parent add9820 commit 81af21f
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 181 deletions.
125 changes: 1 addition & 124 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ use colored::Colorize;
use itertools::Itertools;
use quickwit_common::uri::Uri;
use quickwit_common::GREEN_COLOR;
use quickwit_config::{
validate_identifier, ConfigFormat, SourceConfig, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID,
};
use quickwit_config::{validate_identifier, ConfigFormat, SourceConfig};
use quickwit_metastore::checkpoint::SourceCheckpoint;
use quickwit_rest_client::rest_client::{QuickwitClient, Transport};
use quickwit_storage::load_file;
Expand Down Expand Up @@ -143,13 +141,6 @@ pub struct ToggleSourceArgs {
pub enable: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ToggleIngestApiArgs {
pub cluster_endpoint: Url,
pub index_id: String,
pub enable: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct DeleteSourceArgs {
pub cluster_endpoint: Url,
Expand Down Expand Up @@ -183,7 +174,6 @@ pub struct ResetCheckpointArgs {
pub enum SourceCliCommand {
CreateSource(CreateSourceArgs),
ToggleSource(ToggleSourceArgs),
ToggleIngestApi(ToggleIngestApiArgs),
DeleteSource(DeleteSourceArgs),
DescribeSource(DescribeSourceArgs),
ListSources(ListSourcesArgs),
Expand All @@ -195,7 +185,6 @@ impl SourceCliCommand {
match self {
Self::CreateSource(args) => create_source_cli(args).await,
Self::ToggleSource(args) => toggle_source_cli(args).await,
Self::ToggleIngestApi(args) => toggle_ingest_api_cli(args).await,
Self::DeleteSource(args) => delete_source_cli(args).await,
Self::DescribeSource(args) => describe_source_cli(args).await,
Self::ListSources(args) => list_sources_cli(args).await,
Expand All @@ -215,9 +204,6 @@ impl SourceCliCommand {
"disable" => {
Self::parse_toggle_source_args(subcommand, submatches).map(Self::ToggleSource)
}
"ingest-api" => {
Self::parse_toggle_ingest_api_args(submatches).map(Self::ToggleIngestApi)
}
"delete" => Self::parse_delete_args(submatches).map(Self::DeleteSource),
"describe" => Self::parse_describe_args(submatches).map(Self::DescribeSource),
"list" => Self::parse_list_args(submatches).map(Self::ListSources),
Expand Down Expand Up @@ -273,23 +259,6 @@ impl SourceCliCommand {
})
}

fn parse_toggle_ingest_api_args(matches: &ArgMatches) -> anyhow::Result<ToggleIngestApiArgs> {
let cluster_endpoint = matches
.value_of("endpoint")
.map(Url::from_str)
.expect("`endpoint` is a required arg.")?;
let index_id = matches
.value_of("index")
.expect("`index` is a required arg.")
.to_string();
let enable = matches.is_present("enable");
Ok(ToggleIngestApiArgs {
cluster_endpoint,
index_id,
enable,
})
}

fn parse_delete_args(matches: &ArgMatches) -> anyhow::Result<DeleteSourceArgs> {
let cluster_endpoint = matches
.value_of("endpoint")
Expand Down Expand Up @@ -388,13 +357,6 @@ async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> {
async fn toggle_source_cli(args: ToggleSourceArgs) -> anyhow::Result<()> {
debug!(args=?args, "toggle-source");
println!("❯ Toggling source...");
if args.source_id == CLI_INGEST_SOURCE_ID {
bail!(
"Source `{}` is managed by Quickwit, you cannot enable or disable a source managed by \
Quickwit.",
args.source_id
);
}
let transport = Transport::new(args.cluster_endpoint);
let qw_client = QuickwitClient::new(transport);
qw_client
Expand All @@ -412,37 +374,9 @@ async fn toggle_source_cli(args: ToggleSourceArgs) -> anyhow::Result<()> {
Ok(())
}

pub async fn toggle_ingest_api_cli(args: ToggleIngestApiArgs) -> anyhow::Result<()> {
debug!(args=?args, "toggle-ingest-api");
println!(
"❯ {}abling ingest API...",
if args.enable { "En" } else { "Dis" }
);
let transport = Transport::new(args.cluster_endpoint);
let qw_client = QuickwitClient::new(transport);
qw_client
.sources(&args.index_id)
.toggle(INGEST_API_SOURCE_ID, args.enable)
.await
.context("Failed to update source")?;
let toggled_state_name = if args.enable { "enabled" } else { "disabled" };
println!(
"{} Source successfully {}.",
toggled_state_name,
"✔".color(GREEN_COLOR)
);
Ok(())
}

async fn delete_source_cli(args: DeleteSourceArgs) -> anyhow::Result<()> {
debug!(args=?args, "delete-source");
println!("❯ Deleting source...");
if args.source_id == INGEST_API_SOURCE_ID || args.source_id == CLI_INGEST_SOURCE_ID {
bail!(
"Source `{}` is managed by Quickwit, you cannot delete a source managed by Quickwit.",
args.source_id
);
}
validate_identifier("Source ID", &args.source_id)?;

if !args.assume_yes {
Expand Down Expand Up @@ -735,63 +669,6 @@ mod tests {
}
}

#[test]
fn test_parse_toggle_ingest_api_args() {
{
let app = build_cli().no_binary_name(true);
let matches = app
.try_get_matches_from(vec![
"source",
"ingest-api",
"--endpoint",
"https://quickwit-cluster.io",
"--index",
"foo",
"--enable",
])
.unwrap();
let command = CliCommand::parse_cli_args(&matches).unwrap();
let expected_command =
CliCommand::Source(SourceCliCommand::ToggleIngestApi(ToggleIngestApiArgs {
cluster_endpoint: Url::from_str("https://quickwit-cluster.io").unwrap(),
index_id: "foo".to_string(),
enable: true,
}));
assert_eq!(command, expected_command);
}
{
let app = build_cli().no_binary_name(true);
let matches = app
.try_get_matches_from(vec!["source", "ingest-api", "--index", "foo", "--disable"])
.unwrap();
let command = CliCommand::parse_cli_args(&matches).unwrap();
let expected_command =
CliCommand::Source(SourceCliCommand::ToggleIngestApi(ToggleIngestApiArgs {
cluster_endpoint: Url::from_str("http://127.0.0.1:7280").unwrap(),
index_id: "foo".to_string(),
enable: false,
}));
assert_eq!(command, expected_command);
}
{
let app = build_cli().no_binary_name(true);
let matches = app.try_get_matches_from(vec![
"source",
"ingest-api",
"--index",
"foo",
"--enable",
"--disable",
]);
assert!(matches.is_err());
}
{
let app = build_cli().no_binary_name(true);
let matches = app.try_get_matches_from(vec!["source", "ingest-api", "--index", "foo"]);
assert!(matches.is_err());
}
}

#[test]
fn test_parse_delete_source_args() {
let app = build_cli().no_binary_name(true);
Expand Down
56 changes: 1 addition & 55 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use quickwit_cli::index::{
SearchIndexArgs,
};
use quickwit_cli::service::RunCliCommand;
use quickwit_cli::source::{toggle_ingest_api_cli, ToggleIngestApiArgs};
use quickwit_cli::tool::{
garbage_collect_index_cli, local_ingest_docs_cli, GarbageCollectIndexArgs, LocalIngestDocsArgs,
};
Expand All @@ -43,7 +42,7 @@ use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_common::ChecklistError;
use quickwit_config::service::QuickwitService;
use quickwit_config::{CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID};
use quickwit_config::CLI_INGEST_SOURCE_ID;
use quickwit_metastore::{quickwit_metastore_uri_resolver, Metastore, MetastoreError, SplitState};
use serde_json::{json, Number, Value};
use tokio::time::{sleep, Duration};
Expand Down Expand Up @@ -157,59 +156,6 @@ fn test_cmd_create_with_ill_formed_command() {
);
}

#[tokio::test]
async fn test_cmd_toggle_ingest_api_source() {
quickwit_common::setup_logging_for_tests();
let index_id = append_random_suffix("test-create-cmd");
let test_env = create_test_env(index_id.clone(), TestStorageType::LocalFileSystem).unwrap();
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

// Disable
let toggle_args = ToggleIngestApiArgs {
cluster_endpoint: test_env.cluster_endpoint.clone(),
enable: false,
index_id: index_id.to_string(),
};
toggle_ingest_api_cli(toggle_args).await.unwrap();
let index_metadata = test_env
.metastore()
.await
.unwrap()
.index_metadata(&index_id)
.await
.unwrap();
assert!(
!index_metadata
.sources
.get(INGEST_API_SOURCE_ID)
.unwrap()
.enabled
);

// Enable
let toggle_args = ToggleIngestApiArgs {
cluster_endpoint: test_env.cluster_endpoint.clone(),
enable: true,
index_id: index_id.to_string(),
};
toggle_ingest_api_cli(toggle_args).await.unwrap();
let index_metadata = test_env
.metastore()
.await
.unwrap()
.index_metadata(&index_id)
.await
.unwrap();
assert!(
index_metadata
.sources
.get(INGEST_API_SOURCE_ID)
.unwrap()
.enabled
);
}

#[tokio::test]
async fn test_cmd_ingest_on_non_existing_index() {
let index_id = append_random_suffix("index-does-not-exist");
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-metastore/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl ServiceError for MetastoreError {
fn status_code(&self) -> ServiceErrorCode {
match self {
Self::ConnectionError { .. } => ServiceErrorCode::Internal,
Self::Forbidden { .. } => ServiceErrorCode::Internal,
Self::Forbidden { .. } => ServiceErrorCode::MethodNotAllowed,
Self::IncompatibleCheckpointDelta(_) => ServiceErrorCode::BadRequest,
Self::IndexAlreadyExists { .. } => ServiceErrorCode::BadRequest,
Self::IndexDoesNotExist { .. } => ServiceErrorCode::NotFound,
Expand Down
63 changes: 62 additions & 1 deletion quickwit/quickwit-serve/src/index_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use bytes::Bytes;
use hyper::header::CONTENT_TYPE;
use quickwit_common::simple_list::{from_simple_list, to_simple_list};
use quickwit_common::FileEntry;
use quickwit_config::{ConfigFormat, QuickwitConfig, SourceConfig};
use quickwit_config::{
ConfigFormat, QuickwitConfig, SourceConfig, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID,
};
use quickwit_core::{IndexService, IndexServiceError};
use quickwit_metastore::{
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, Split, SplitState,
Expand Down Expand Up @@ -548,6 +550,15 @@ async fn toggle_source(
metastore: Arc<dyn Metastore>,
) -> Result<(), MetastoreError> {
info!(index_id = %index_id, source_id = %source_id, enable = toggle_source.enable, "toggle-source");
metastore.index_exists(&index_id).await?;
if [CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID].contains(&source_id.as_str()) {
return Err(MetastoreError::Forbidden {
message: format!(
"Source `{source_id}` is managed by Quickwit, you cannot enable or disable a \
source managed by Quickwit."
),
});
}
metastore
.toggle_source(&index_id, &source_id, toggle_source.enable)
.await
Expand Down Expand Up @@ -583,6 +594,15 @@ async fn delete_source(
metastore: Arc<dyn Metastore>,
) -> Result<(), MetastoreError> {
info!(index_id = %index_id, source_id = %source_id, "delete-source");
metastore.index_exists(&index_id).await?;
if [INGEST_API_SOURCE_ID, CLI_INGEST_SOURCE_ID].contains(&source_id.as_str()) {
return Err(MetastoreError::Forbidden {
message: format!(
"Source `{source_id}` is managed by Quickwit, you cannot delete a source managed \
by Quickwit."
),
});
}
metastore.delete_source(&index_id, &source_id).await
}

Expand Down Expand Up @@ -1066,6 +1086,23 @@ mod tests {
let index_metadata = metastore.index_metadata("hdfs-logs").await.unwrap();
assert!(!index_metadata.sources.contains_key("file-source"));

// Check cannot delete source managed by Quickwit.
let resp = warp::test::request()
.path(format!("/indexes/hdfs-logs/sources/{INGEST_API_SOURCE_ID}").as_str())
.method("DELETE")
.body(&source_config_body)
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 405);

let resp = warp::test::request()
.path(format!("/indexes/hdfs-logs/sources/{CLI_INGEST_SOURCE_ID}").as_str())
.method("DELETE")
.body(&source_config_body)
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 405);

// Check get a non exising source returns 404.
let resp = warp::test::request()
.path("/indexes/hdfs-logs/sources/file-source")
Expand Down Expand Up @@ -1244,6 +1281,9 @@ mod tests {
"file:///path/to/index/quickwit-demo-index",
))
});
metastore
.expect_index_exists()
.return_once(|index_id: &str| Ok(index_id == "quickwit-demo-index"));
metastore
.expect_delete_source()
.return_once(|index_id, source_id| {
Expand Down Expand Up @@ -1305,6 +1345,10 @@ mod tests {
#[tokio::test]
async fn test_source_toggle() -> anyhow::Result<()> {
let mut metastore = MockMetastore::new();
metastore
.expect_index_exists()
.returning(|index_id| Ok(index_id == "quickwit-demo-index"))
.times(3);
metastore.expect_toggle_source().return_once(
|index_id: &str, source_id: &str, enable: bool| {
if index_id == "quickwit-demo-index" && source_id == "source-to-toggle" && enable {
Expand All @@ -1322,6 +1366,7 @@ mod tests {
Arc::new(QuickwitConfig::for_test()),
)
.recover(recover_fn);
// Check server returns 405 if sources root path is used.
let resp = warp::test::request()
.path("/indexes/quickwit-demo-index/sources/source-to-toggle")
.method("PUT")
Expand All @@ -1344,6 +1389,22 @@ mod tests {
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 400);
// Check cannot toggle source managed by Quickwit.
let resp = warp::test::request()
.path(format!("/indexes/hdfs-logs/sources/{INGEST_API_SOURCE_ID}/toggle").as_str())
.method("PUT")
.body(r#"{"enable": true}"#)
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 405);

let resp = warp::test::request()
.path(format!("/indexes/hdfs-logs/sources/{CLI_INGEST_SOURCE_ID}/toggle").as_str())
.method("PUT")
.body(r#"{"enable": true}"#)
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 405);
Ok(())
}
}

0 comments on commit 81af21f

Please sign in to comment.