Skip to content

Commit

Permalink
New SourceConfig format with notifications field
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jul 6, 2024
1 parent 97f7b7d commit c963ad4
Show file tree
Hide file tree
Showing 19 changed files with 363 additions and 224 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ services:
environment:
SERVICES: kinesis,s3,sqs
PERSISTENCE: 1
# avoid using the localstack.cloud domain in the generated queue URLs
SQS_ENDPOINT_STRATEGY: path
volumes:
- .localstack:/etc/localstack/init/ready.d
- localstack_data:/var/lib/localstack
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,12 +821,12 @@ mod tests {
let expected_sources = [
SourceRow {
source_id: "bar-source".to_string(),
source_type: "file".to_string(),
source_type: "stdin".to_string(),
enabled: "true".to_string(),
},
SourceRow {
source_id: "foo-source".to_string(),
source_type: "file".to_string(),
source_type: "stdin".to_string(),
enabled: "true".to_string(),
},
];
Expand Down
15 changes: 8 additions & 7 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ pub use quickwit_doc_mapper::DocMapping;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value as JsonValue;
use source_config::FileSourceParamsInner;
pub use source_config::{
load_source_config_from_user_config, FileSourceMessageType, FileSourceParams, FileSourceSqs,
FileSourceUri, KafkaSourceParams, KinesisSourceParams, PubSubSourceParams, PulsarSourceAuth,
PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat, SourceParams,
SqsSourceParams, TransformConfig, VecSourceParams, VoidSourceParams, CLI_SOURCE_ID,
INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
load_source_config_from_user_config, FileSourceMessageType, FileSourceNotification,
FileSourceParams, FileSourceSqs, KafkaSourceParams, KinesisSourceParams, PubSubSourceParams,
PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat,
SourceParams, SqsSourceParams, TransformConfig, VecSourceParams, VoidSourceParams,
CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use tracing::warn;

Expand Down Expand Up @@ -114,9 +115,9 @@ pub fn disable_ingest_v1() -> bool {
SourceInputFormat,
SourceParams,
FileSourceMessageType,
FileSourceNotification,
FileSourceParamsInner,
FileSourceSqs,
FileSourceParams,
FileSourceUri,
SqsSourceParams,
PubSubSourceParams,
KafkaSourceParams,
Expand Down
152 changes: 114 additions & 38 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

pub(crate) mod serialize;

use std::borrow::Cow;
use std::num::NonZeroUsize;
use std::str::FromStr;

Expand Down Expand Up @@ -81,6 +82,7 @@ impl SourceConfig {
SourceParams::Kinesis(_) => SourceType::Kinesis,
SourceParams::PubSub(_) => SourceType::PubSub,
SourceParams::Pulsar(_) => SourceType::Pulsar,
SourceParams::Stdin => SourceType::Stdin,
SourceParams::Sqs(_) => SourceType::Sqs,
SourceParams::Vec(_) => SourceType::Vec,
SourceParams::Void(_) => SourceType::Void,
Expand All @@ -98,6 +100,7 @@ impl SourceConfig {
SourceParams::Kafka(params) => serde_json::to_value(params),
SourceParams::Kinesis(params) => serde_json::to_value(params),
SourceParams::Pulsar(params) => serde_json::to_value(params),
SourceParams::Stdin => serde_json::to_value(()),
SourceParams::Sqs(params) => serde_json::to_value(params),
SourceParams::Vec(params) => serde_json::to_value(params),
SourceParams::Void(params) => serde_json::to_value(params),
Expand Down Expand Up @@ -215,6 +218,7 @@ impl FromStr for SourceInputFormat {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "source_type", content = "params", rename_all = "snake_case")]
pub enum SourceParams {
#[schema(value_type = FileSourceParamsInner)]
File(FileSourceParams),
Ingest,
#[serde(rename = "ingest-api")]
Expand All @@ -226,22 +230,23 @@ pub enum SourceParams {
#[serde(rename = "pubsub")]
PubSub(PubSubSourceParams),
Pulsar(PulsarSourceParams),
Stdin,
Sqs(SqsSourceParams),
Vec(VecSourceParams),
Void(VoidSourceParams),
}

impl SourceParams {
pub fn file_from_uri(uri: Uri) -> Self {
Self::File(FileSourceParams::FileUri(FileSourceUri { filepath: uri }))
Self::File(FileSourceParams::Filepath(uri))
}

pub fn file_from_str<P: AsRef<str>>(filepath: P) -> anyhow::Result<Self> {
Uri::from_str(filepath.as_ref()).map(Self::file_from_uri)
}

pub fn stdin() -> Self {
Self::File(FileSourceParams::Stdin)
Self::Stdin
}

pub fn void() -> Self {
Expand All @@ -265,23 +270,73 @@ pub struct FileSourceSqs {
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
pub struct FileSourceUri {
#[schema(value_type = String)]
pub filepath: Uri,
#[serde(tag = "type", rename_all = "snake_case")]
pub enum FileSourceNotification {
Sqs(FileSourceSqs),
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "snake_case", tag = "mode")]
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct FileSourceParamsInner {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub notifications: Vec<FileSourceNotification>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub filepath: Option<String>,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(try_from = "FileSourceParamsInner", into = "FileSourceParamsInner")]
pub enum FileSourceParams {
Stdin,
Sqs(FileSourceSqs),
#[serde(untagged)]
FileUri(FileSourceUri),
Notifications(FileSourceNotification),
Filepath(Uri),
}

impl TryFrom<FileSourceParamsInner> for FileSourceParams {
type Error = Cow<'static, str>;

fn try_from(mut value: FileSourceParamsInner) -> Result<Self, Self::Error> {
if value.filepath.is_some() && !value.notifications.is_empty() {
return Err(
"File source parameters `notifications` and `filepath` are mutually exclusive"
.into(),
);
}
if let Some(filepath) = value.filepath {
let uri = Uri::from_str(&filepath).map_err(|err| err.to_string())?;
Ok(FileSourceParams::Filepath(uri))
} else if value.notifications.len() == 1 {
Ok(FileSourceParams::Notifications(
value.notifications.remove(0),
))
} else if value.notifications.len() > 1 {
return Err("Only one notification can be specified for now".into());
} else {
return Err(
"Either `notifications` or `filepath` must be specified as file source parameters"
.into(),
);
}
}
}

impl From<FileSourceParams> for FileSourceParamsInner {
fn from(value: FileSourceParams) -> Self {
match value {
FileSourceParams::Filepath(uri) => Self {
filepath: Some(uri.to_string()),
notifications: vec![],
},
FileSourceParams::Notifications(notification) => Self {
filepath: None,
notifications: vec![notification],
},
}
}
}

impl FileSourceParams {
pub fn from_filepath<P: AsRef<str>>(filepath: P) -> anyhow::Result<Self> {
Uri::from_str(filepath.as_ref()).map(|uri| Self::FileUri(FileSourceUri { filepath: uri }))
Uri::from_str(filepath.as_ref()).map(Self::Filepath)
}
}

Expand Down Expand Up @@ -811,51 +866,72 @@ mod tests {
}

#[test]
fn test_file_source_params_deserialization() {
fn test_file_source_params_serde() {
{
let yaml = r#"
filepath: source-path.json
"#;
let file_params = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
let file_params_deserialized = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
let uri = Uri::from_str("source-path.json").unwrap();
assert_eq!(
file_params,
FileSourceParams::FileUri(FileSourceUri { filepath: uri })
);
assert_eq!(file_params_deserialized, FileSourceParams::Filepath(uri));
let file_params_reserialized = serde_json::to_value(file_params_deserialized).unwrap();
file_params_reserialized
.get("filepath")
.unwrap()
.as_str()
.unwrap()
.contains("source-path.json");
}
{
let yaml = r#"
mode: file_uri
filepath: source-path.json
notifications:
- type: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
message_type: s3_notification
"#;
let file_params = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
let uri = Uri::from_str("source-path.json").unwrap();
let file_params_deserialized = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
assert_eq!(
file_params,
FileSourceParams::FileUri(FileSourceUri { filepath: uri })
file_params_deserialized,
FileSourceParams::Notifications(FileSourceNotification::Sqs(FileSourceSqs {
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name"
.to_string(),
message_type: FileSourceMessageType::S3Notification,
})),
);
let file_params_reserialized = serde_json::to_value(&file_params_deserialized).unwrap();
assert_eq!(
file_params_reserialized,
json!({"notifications": [{"type": "sqs", "queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name", "message_type": "s3_notification"}]})
);
}
{
let yaml = r#"
mode: stdin
filepath: source-path.json
notifications:
- type: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
message_type: s3_notification
"#;
let file_params = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
assert_eq!(file_params, FileSourceParams::Stdin);
let error = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap_err();
assert_eq!(
error.to_string(),
"File source parameters `notifications` and `filepath` are mutually exclusive"
);
}
{
let yaml = r#"
mode: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
message_type: s3_notification
notifications:
- type: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue1
message_type: s3_notification
- type: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue2
message_type: s3_notification
"#;
let file_params = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
let error = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap_err();
assert_eq!(
file_params,
FileSourceParams::Sqs(FileSourceSqs {
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name"
.to_string(),
message_type: FileSourceMessageType::S3Notification,
})
error.to_string(),
"Only one notification can be specified for now"
);
}
}
Expand Down Expand Up @@ -1244,7 +1320,7 @@ mod tests {
"max_num_pipelines_per_indexer": 1,
"source_type": "file",
"params": {
"filepath": "/test_non_json_corpus.txt"
"filepath": "s3://mybucket/test_non_json_corpus.txt"
},
"input_format": "plain_text"
}"#;
Expand Down
32 changes: 16 additions & 16 deletions quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,26 @@ impl SourceConfigForSerialization {
/// Checks the validity of the `SourceConfig` as a "deserializable source".
///
/// Two remarks:
/// - This does not check connectivity. (See `check_connectivity(..)`)
/// This just validate configuration, without performing any IO.
/// - This is only here to validate user input.
/// When ingesting from stdin, we programmatically create an invalid `SourceConfig`.
///
/// TODO refactor #1065
/// - This does not check connectivity, it just validate configuration,
/// without performing any IO. See `check_connectivity(..)`.
/// - This is used each time the `SourceConfig` is deserialized (at creation but also during
/// communications with the metastore). When ingesting from stdin, we programmatically create
/// an invalid `SourceConfig` and only use it locally.
fn validate_and_build(self) -> anyhow::Result<SourceConfig> {
if !RESERVED_SOURCE_IDS.contains(&self.source_id.as_str()) {
validate_identifier("source", &self.source_id)?;
}
let num_pipelines = NonZeroUsize::new(self.num_pipelines)
.ok_or_else(|| anyhow::anyhow!("`desired_num_pipelines` must be strictly positive"))?;
match &self.source_params {
// We want to forbid source_config with no filepath
SourceParams::File(file_params) => {
if matches!(file_params, FileSourceParams::Stdin) {
bail!(
"source `{}` of type `file` must contain a filepath",
self.source_id
)
}
SourceParams::Stdin => {
bail!(
"stdin ingestions are limited to a local usage. Please use the CLI command \
`quickwit tool local-ingest`."
);
}
SourceParams::Kafka(_)
SourceParams::File(_)
| SourceParams::Kafka(_)
| SourceParams::Kinesis(_)
| SourceParams::Pulsar(_)
| SourceParams::Sqs(_) => {
Expand All @@ -104,7 +101,10 @@ impl SourceConfigForSerialization {
| SourceParams::Void(_) => {}
}
match &self.source_params {
SourceParams::PubSub(_) | SourceParams::Kafka(_) => {}
SourceParams::PubSub(_)
| SourceParams::Kafka(_)
| SourceParams::Sqs(_)
| SourceParams::File(FileSourceParams::Notifications(_)) => {}
_ => {
if self.num_pipelines > 1 {
bail!("Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types");
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
continue;
}
match source_config.source_params {
SourceParams::File(FileSourceParams::FileUri(_))
| SourceParams::File(FileSourceParams::Stdin)
SourceParams::File(FileSourceParams::Filepath(_))
| SourceParams::IngestCli
| SourceParams::Stdin
| SourceParams::Void(_)
| SourceParams::Vec(_) => { // We don't need to schedule those.
}
Expand Down Expand Up @@ -212,7 +212,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
| SourceParams::PubSub(_)
| SourceParams::Sqs(_)
| SourceParams::Pulsar(_)
| SourceParams::File(FileSourceParams::Sqs(_)) => {
| SourceParams::File(FileSourceParams::Notifications(_)) => {
sources.push(SourceToSchedule {
source_uid,
source_type: SourceToScheduleType::NonSharded {
Expand Down
Loading

0 comments on commit c963ad4

Please sign in to comment.