diff --git a/quickwit/quickwit-cli/src/source.rs b/quickwit/quickwit-cli/src/source.rs index 7c8b8551838..3a14b51e2a8 100644 --- a/quickwit/quickwit-cli/src/source.rs +++ b/quickwit/quickwit-cli/src/source.rs @@ -26,9 +26,7 @@ use colored::Colorize; use itertools::Itertools; use quickwit_common::uri::Uri; use quickwit_common::GREEN_COLOR; -use quickwit_config::{ - validate_identifier, ConfigFormat, SourceConfig, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, -}; +use quickwit_config::{validate_identifier, ConfigFormat, SourceConfig}; use quickwit_metastore::checkpoint::SourceCheckpoint; use quickwit_rest_client::rest_client::{QuickwitClient, Transport}; use quickwit_storage::load_file; @@ -143,13 +141,6 @@ pub struct ToggleSourceArgs { pub enable: bool, } -#[derive(Debug, Eq, PartialEq)] -pub struct ToggleIngestApiArgs { - pub cluster_endpoint: Url, - pub index_id: String, - pub enable: bool, -} - #[derive(Debug, Eq, PartialEq)] pub struct DeleteSourceArgs { pub cluster_endpoint: Url, @@ -183,7 +174,6 @@ pub struct ResetCheckpointArgs { pub enum SourceCliCommand { CreateSource(CreateSourceArgs), ToggleSource(ToggleSourceArgs), - ToggleIngestApi(ToggleIngestApiArgs), DeleteSource(DeleteSourceArgs), DescribeSource(DescribeSourceArgs), ListSources(ListSourcesArgs), @@ -195,7 +185,6 @@ impl SourceCliCommand { match self { Self::CreateSource(args) => create_source_cli(args).await, Self::ToggleSource(args) => toggle_source_cli(args).await, - Self::ToggleIngestApi(args) => toggle_ingest_api_cli(args).await, Self::DeleteSource(args) => delete_source_cli(args).await, Self::DescribeSource(args) => describe_source_cli(args).await, Self::ListSources(args) => list_sources_cli(args).await, @@ -215,9 +204,6 @@ impl SourceCliCommand { "disable" => { Self::parse_toggle_source_args(subcommand, submatches).map(Self::ToggleSource) } - "ingest-api" => { - Self::parse_toggle_ingest_api_args(submatches).map(Self::ToggleIngestApi) - } "delete" => Self::parse_delete_args(submatches).map(Self::DeleteSource), "describe" => Self::parse_describe_args(submatches).map(Self::DescribeSource), "list" => Self::parse_list_args(submatches).map(Self::ListSources), @@ -273,23 +259,6 @@ impl SourceCliCommand { }) } - fn parse_toggle_ingest_api_args(matches: &ArgMatches) -> anyhow::Result { - let cluster_endpoint = matches - .value_of("endpoint") - .map(Url::from_str) - .expect("`endpoint` is a required arg.")?; - let index_id = matches - .value_of("index") - .expect("`index` is a required arg.") - .to_string(); - let enable = matches.is_present("enable"); - Ok(ToggleIngestApiArgs { - cluster_endpoint, - index_id, - enable, - }) - } - fn parse_delete_args(matches: &ArgMatches) -> anyhow::Result { let cluster_endpoint = matches .value_of("endpoint") @@ -388,13 +357,6 @@ async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> { async fn toggle_source_cli(args: ToggleSourceArgs) -> anyhow::Result<()> { debug!(args=?args, "toggle-source"); println!("❯ Toggling source..."); - if args.source_id == CLI_INGEST_SOURCE_ID { - bail!( - "Source `{}` is managed by Quickwit, you cannot enable or disable a source managed by \ - Quickwit.", - args.source_id - ); - } let transport = Transport::new(args.cluster_endpoint); let qw_client = QuickwitClient::new(transport); qw_client @@ -412,37 +374,9 @@ async fn toggle_source_cli(args: ToggleSourceArgs) -> anyhow::Result<()> { Ok(()) } -pub async fn toggle_ingest_api_cli(args: ToggleIngestApiArgs) -> anyhow::Result<()> { - debug!(args=?args, "toggle-ingest-api"); - println!( - "❯ {}abling ingest API...", - if args.enable { "En" } else { "Dis" } - ); - let transport = Transport::new(args.cluster_endpoint); - let qw_client = QuickwitClient::new(transport); - qw_client - .sources(&args.index_id) - .toggle(INGEST_API_SOURCE_ID, args.enable) - .await - .context("Failed to update source")?; - let toggled_state_name = if args.enable { "enabled" } else { "disabled" }; - println!( - "{} Source successfully {}.", - toggled_state_name, - "✔".color(GREEN_COLOR) - ); - Ok(()) -} - async fn delete_source_cli(args: DeleteSourceArgs) -> anyhow::Result<()> { debug!(args=?args, "delete-source"); println!("❯ Deleting source..."); - if args.source_id == INGEST_API_SOURCE_ID || args.source_id == CLI_INGEST_SOURCE_ID { - bail!( - "Source `{}` is managed by Quickwit, you cannot delete a source managed by Quickwit.", - args.source_id - ); - } validate_identifier("Source ID", &args.source_id)?; if !args.assume_yes { @@ -735,63 +669,6 @@ mod tests { } } - #[test] - fn test_parse_toggle_ingest_api_args() { - { - let app = build_cli().no_binary_name(true); - let matches = app - .try_get_matches_from(vec![ - "source", - "ingest-api", - "--endpoint", - "https://quickwit-cluster.io", - "--index", - "foo", - "--enable", - ]) - .unwrap(); - let command = CliCommand::parse_cli_args(&matches).unwrap(); - let expected_command = - CliCommand::Source(SourceCliCommand::ToggleIngestApi(ToggleIngestApiArgs { - cluster_endpoint: Url::from_str("https://quickwit-cluster.io").unwrap(), - index_id: "foo".to_string(), - enable: true, - })); - assert_eq!(command, expected_command); - } - { - let app = build_cli().no_binary_name(true); - let matches = app - .try_get_matches_from(vec!["source", "ingest-api", "--index", "foo", "--disable"]) - .unwrap(); - let command = CliCommand::parse_cli_args(&matches).unwrap(); - let expected_command = - CliCommand::Source(SourceCliCommand::ToggleIngestApi(ToggleIngestApiArgs { - cluster_endpoint: Url::from_str("http://127.0.0.1:7280").unwrap(), - index_id: "foo".to_string(), - enable: false, - })); - assert_eq!(command, expected_command); - } - { - let app = build_cli().no_binary_name(true); - let matches = app.try_get_matches_from(vec![ - "source", - "ingest-api", - "--index", - "foo", - "--enable", - "--disable", - ]); - assert!(matches.is_err()); - } - { - let app = build_cli().no_binary_name(true); - let matches = app.try_get_matches_from(vec!["source", "ingest-api", "--index", "foo"]); - assert!(matches.is_err()); - } - } - #[test] fn test_parse_delete_source_args() { let app = build_cli().no_binary_name(true); diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 19b9402c609..ce25d4570fb 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -34,7 +34,6 @@ use quickwit_cli::index::{ SearchIndexArgs, }; use quickwit_cli::service::RunCliCommand; -use quickwit_cli::source::{toggle_ingest_api_cli, ToggleIngestApiArgs}; use quickwit_cli::tool::{ garbage_collect_index_cli, local_ingest_docs_cli, GarbageCollectIndexArgs, LocalIngestDocsArgs, }; @@ -43,7 +42,7 @@ use quickwit_common::rand::append_random_suffix; use quickwit_common::uri::Uri; use quickwit_common::ChecklistError; use quickwit_config::service::QuickwitService; -use quickwit_config::{CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID}; +use quickwit_config::CLI_INGEST_SOURCE_ID; use quickwit_metastore::{quickwit_metastore_uri_resolver, Metastore, MetastoreError, SplitState}; use serde_json::{json, Number, Value}; use tokio::time::{sleep, Duration}; @@ -157,59 +156,6 @@ fn test_cmd_create_with_ill_formed_command() { ); } -#[tokio::test] -async fn test_cmd_toggle_ingest_api_source() { - quickwit_common::setup_logging_for_tests(); - let index_id = append_random_suffix("test-create-cmd"); - let test_env = create_test_env(index_id.clone(), TestStorageType::LocalFileSystem).unwrap(); - test_env.start_server().await.unwrap(); - create_logs_index(&test_env).await.unwrap(); - - // Disable - let toggle_args = ToggleIngestApiArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - enable: false, - index_id: index_id.to_string(), - }; - toggle_ingest_api_cli(toggle_args).await.unwrap(); - let index_metadata = test_env - .metastore() - .await - .unwrap() - .index_metadata(&index_id) - .await - .unwrap(); - assert!( - !index_metadata - .sources - .get(INGEST_API_SOURCE_ID) - .unwrap() - .enabled - ); - - // Enable - let toggle_args = ToggleIngestApiArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - enable: true, - index_id: index_id.to_string(), - }; - toggle_ingest_api_cli(toggle_args).await.unwrap(); - let index_metadata = test_env - .metastore() - .await - .unwrap() - .index_metadata(&index_id) - .await - .unwrap(); - assert!( - index_metadata - .sources - .get(INGEST_API_SOURCE_ID) - .unwrap() - .enabled - ); -} - #[tokio::test] async fn test_cmd_ingest_on_non_existing_index() { let index_id = append_random_suffix("index-does-not-exist"); diff --git a/quickwit/quickwit-metastore/src/error.rs b/quickwit/quickwit-metastore/src/error.rs index d1b800795a3..bf06b7c426e 100644 --- a/quickwit/quickwit-metastore/src/error.rs +++ b/quickwit/quickwit-metastore/src/error.rs @@ -110,7 +110,7 @@ impl ServiceError for MetastoreError { fn status_code(&self) -> ServiceErrorCode { match self { Self::ConnectionError { .. } => ServiceErrorCode::Internal, - Self::Forbidden { .. } => ServiceErrorCode::Internal, + Self::Forbidden { .. } => ServiceErrorCode::MethodNotAllowed, Self::IncompatibleCheckpointDelta(_) => ServiceErrorCode::BadRequest, Self::IndexAlreadyExists { .. } => ServiceErrorCode::BadRequest, Self::IndexDoesNotExist { .. } => ServiceErrorCode::NotFound, diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index 87a9be077b0..a6a25aac3c4 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -23,7 +23,9 @@ use bytes::Bytes; use hyper::header::CONTENT_TYPE; use quickwit_common::simple_list::{from_simple_list, to_simple_list}; use quickwit_common::FileEntry; -use quickwit_config::{ConfigFormat, QuickwitConfig, SourceConfig}; +use quickwit_config::{ + ConfigFormat, QuickwitConfig, SourceConfig, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, +}; use quickwit_core::{IndexService, IndexServiceError}; use quickwit_metastore::{ IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, Split, SplitState, @@ -548,6 +550,15 @@ async fn toggle_source( metastore: Arc, ) -> Result<(), MetastoreError> { info!(index_id = %index_id, source_id = %source_id, enable = toggle_source.enable, "toggle-source"); + metastore.index_exists(&index_id).await?; + if [CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID].contains(&source_id.as_str()) { + return Err(MetastoreError::Forbidden { + message: format!( + "Source `{source_id}` is managed by Quickwit, you cannot enable or disable a \ + source managed by Quickwit." + ), + }); + } metastore .toggle_source(&index_id, &source_id, toggle_source.enable) .await @@ -583,6 +594,15 @@ async fn delete_source( metastore: Arc, ) -> Result<(), MetastoreError> { info!(index_id = %index_id, source_id = %source_id, "delete-source"); + metastore.index_exists(&index_id).await?; + if [INGEST_API_SOURCE_ID, CLI_INGEST_SOURCE_ID].contains(&source_id.as_str()) { + return Err(MetastoreError::Forbidden { + message: format!( + "Source `{source_id}` is managed by Quickwit, you cannot delete a source managed \ + by Quickwit." + ), + }); + } metastore.delete_source(&index_id, &source_id).await } @@ -1066,6 +1086,23 @@ mod tests { let index_metadata = metastore.index_metadata("hdfs-logs").await.unwrap(); assert!(!index_metadata.sources.contains_key("file-source")); + // Check cannot delete source managed by Quickwit. + let resp = warp::test::request() + .path(format!("/indexes/hdfs-logs/sources/{INGEST_API_SOURCE_ID}").as_str()) + .method("DELETE") + .body(&source_config_body) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 405); + + let resp = warp::test::request() + .path(format!("/indexes/hdfs-logs/sources/{CLI_INGEST_SOURCE_ID}").as_str()) + .method("DELETE") + .body(&source_config_body) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 405); + // Check get a non exising source returns 404. let resp = warp::test::request() .path("/indexes/hdfs-logs/sources/file-source") @@ -1244,6 +1281,9 @@ mod tests { "file:///path/to/index/quickwit-demo-index", )) }); + metastore + .expect_index_exists() + .return_once(|index_id: &str| Ok(index_id == "quickwit-demo-index")); metastore .expect_delete_source() .return_once(|index_id, source_id| { @@ -1305,6 +1345,10 @@ mod tests { #[tokio::test] async fn test_source_toggle() -> anyhow::Result<()> { let mut metastore = MockMetastore::new(); + metastore + .expect_index_exists() + .returning(|index_id| Ok(index_id == "quickwit-demo-index")) + .times(3); metastore.expect_toggle_source().return_once( |index_id: &str, source_id: &str, enable: bool| { if index_id == "quickwit-demo-index" && source_id == "source-to-toggle" && enable { @@ -1322,6 +1366,7 @@ mod tests { Arc::new(QuickwitConfig::for_test()), ) .recover(recover_fn); + // Check server returns 405 if sources root path is used. let resp = warp::test::request() .path("/indexes/quickwit-demo-index/sources/source-to-toggle") .method("PUT") @@ -1344,6 +1389,22 @@ mod tests { .reply(&index_management_handler) .await; assert_eq!(resp.status(), 400); + // Check cannot toggle source managed by Quickwit. + let resp = warp::test::request() + .path(format!("/indexes/hdfs-logs/sources/{INGEST_API_SOURCE_ID}/toggle").as_str()) + .method("PUT") + .body(r#"{"enable": true}"#) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 405); + + let resp = warp::test::request() + .path(format!("/indexes/hdfs-logs/sources/{CLI_INGEST_SOURCE_ID}/toggle").as_str()) + .method("PUT") + .body(r#"{"enable": true}"#) + .reply(&index_management_handler) + .await; + assert_eq!(resp.status(), 405); Ok(()) } }