Skip to content

Commit

Permalink
Add support for multiple column names (parseablehq#567)
Browse files Browse the repository at this point in the history
Add support for multiple column names to be passed in the 
message field of an alert

Fixes parseablehq#564

---------
Co-authored-by: theteachr <[email protected]>
  • Loading branch information
nitisht authored Dec 11, 2023
1 parent 68a9615 commit 40cddb5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 32 deletions.
34 changes: 13 additions & 21 deletions server/src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,40 +135,32 @@ pub struct Message {

impl Message {
// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
pub fn valid(&self, schema: &Schema, column: Option<&str>) -> bool {
if let Some(col) = column {
return get_field(&schema.fields, col).is_some();
}
true
pub fn valid(&self, schema: &Schema, column: &str) -> bool {
return get_field(&schema.fields, column).is_some();
}

pub fn extract_column_name(&self) -> Option<&str> {
let re = Regex::new(r"\{(.*?)\}").unwrap();
let tokens: Vec<&str> = re
pub fn extract_column_names(&self) -> Vec<&str> {
// the message can have either no column name ({column_name} not present) or any number of {column_name} present
Regex::new(r"\{(.*?)\}")
.unwrap()
.captures_iter(self.message.as_str())
.map(|cap| cap.get(1).unwrap().as_str())
.collect();
// the message can have either no column name ({column_name} not present) or one column name
// return Some only if there is exactly one column name present
if tokens.len() == 1 {
return Some(tokens[0]);
}
None
.collect()
}

// returns the message with the column name replaced with the value of the column
/// Returns the message with the column names replaced with the values in the column.
fn get(&self, event: RecordBatch) -> String {
if let Some(column) = self.extract_column_name() {
let mut replace_message = self.message.clone();
for column in self.extract_column_names() {
if let Some(value) = event.column_by_name(column) {
let arr = cast(value, &DataType::Utf8).unwrap();
let value = as_string_array(&arr).value(0);

return self
.message
.replace(&format!("{{{column}}}"), value.to_string().as_str());
replace_message =
replace_message.replace(&format!("{{{column}}}"), value.to_string().as_str());
}
}
self.message.clone()
replace_message
}
}

Expand Down
22 changes: 11 additions & 11 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,17 @@ pub async fn put_alert(

let schema = STREAM_INFO.schema(&stream_name)?;
for alert in &alerts.alerts {
let column = alert.message.extract_column_name();
let is_valid = alert.message.valid(&schema, column);
if !is_valid {
let col = column.unwrap_or("");
return Err(StreamError::InvalidAlertMessage(
alert.name.to_owned(),
col.to_string(),
));
}
if !alert.rule.valid_for_schema(&schema) {
return Err(StreamError::InvalidAlert(alert.name.to_owned()));
for column in alert.message.extract_column_names() {
let is_valid = alert.message.valid(&schema, column);
if !is_valid {
return Err(StreamError::InvalidAlertMessage(
alert.name.to_owned(),
column.to_string(),
));
}
if !alert.rule.valid_for_schema(&schema) {
return Err(StreamError::InvalidAlert(alert.name.to_owned()));
}
}
}

Expand Down

0 comments on commit 40cddb5

Please sign in to comment.