From 78a589f4dfd2f0deff1c93db81bcbe1e2f4480f3 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 12 Jan 2024 19:55:18 +0530 Subject: [PATCH] fix: config log reader to not wait for multi-line message --- uplink/src/base/mod.rs | 6 ++++ uplink/src/collector/log_reader.rs | 46 ++++++++++++++++-------------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/uplink/src/base/mod.rs b/uplink/src/base/mod.rs index c2f4b26b9..ba9110154 100644 --- a/uplink/src/base/mod.rs +++ b/uplink/src/base/mod.rs @@ -220,12 +220,18 @@ impl Default for DeviceShadowConfig { } } +fn default_true() -> bool { + true +} + #[derive(Clone, Debug, Deserialize)] pub struct LogReaderConfig { pub path: String, pub stream_name: String, pub log_template: String, pub timestamp_template: String, + #[serde(default = "default_true")] + pub multi_line: bool } #[derive(Clone, Debug, Deserialize)] diff --git a/uplink/src/collector/log_reader.rs b/uplink/src/collector/log_reader.rs index b553b197e..2da8a39a6 100644 --- a/uplink/src/collector/log_reader.rs +++ b/uplink/src/collector/log_reader.rs @@ -80,6 +80,7 @@ impl LogEntry { line: &str, log_template: &Regex, timestamp_template: &Regex, + multi_line: bool, ) -> Option { let to_string = |x: Match| x.as_str().to_string(); let line = line.trim().to_string(); @@ -96,12 +97,14 @@ impl LogEntry { let message = captures.name("message").map(to_string); return current_line.replace(LogEntry { line, tag, level, timestamp, message }); - } else if let Some(log_entry) = current_line { - log_entry.line += &format!("\n{line}"); - match &mut log_entry.message { - Some(msg) => *msg += &format!("\n{line}"), - _ => log_entry.message = Some(line), - }; + } else if multi_line { + if let Some(log_entry) = current_line { + log_entry.line += &format!("\n{line}"); + match &mut log_entry.message { + Some(msg) => *msg += &format!("\n{line}"), + _ => log_entry.message = Some(line), + }; + } } None @@ -132,13 +135,14 @@ impl LogParser { Self { lines, log_entry: None, log_template, timestamp_template } } - async fn next(&mut self) -> Option { + async fn next(&mut self, multi_line: bool) -> Option { while let Some(line) = self.lines.next_line().await.ok()? { if let Some(entry) = LogEntry::parse( &mut self.log_entry, &line, &self.log_template, &self.timestamp_template, + multi_line, ) { return Some(entry); } @@ -175,7 +179,7 @@ impl LogFileReader { let stream_name = self.config.stream_name.to_owned(); let tx = self.tx.clone(); - while let Some(entry) = parser.next().await { + while let Some(entry) = parser.next(self.config.multi_line).await { sequence += 1; let payload = entry.payload(stream_name.clone(), sequence); tx.send_payload(payload).await @@ -200,7 +204,7 @@ mod test { let timestamp_template = Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+)\.(?P\S\S\S)"#).unwrap(); let mut parser = LogParser::new(lines, log_template, timestamp_template); - let entry = parser.next().await.unwrap(); + let entry = parser.next(false).await.unwrap(); assert_eq!( entry, LogEntry { @@ -213,7 +217,7 @@ mod test { } ); - assert!(parser.next().await.is_none()); + assert!(parser.next(false).await.is_none()); } #[tokio::test] @@ -225,7 +229,7 @@ mod test { let timestamp_template = Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+)\.(?P\S\S\S)"#).unwrap(); let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template); - let entry = parser.next().await.unwrap(); + let entry = parser.next(false).await.unwrap(); assert_eq!(entry.timestamp, 1688407162979); let raw = r#"23-07-11 18:03:32"#; @@ -234,7 +238,7 @@ mod test { let timestamp_template= Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)\s(?P\S+):(?P\S+):(?P\S+)"#).unwrap(); let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template); - let entry = parser.next().await.unwrap(); + let entry = parser.next(false).await.unwrap(); assert_eq!(entry.timestamp, 1689098612000); } @@ -251,7 +255,7 @@ mod test { let timestamp_template= Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+)\.(?P\S\S\S)"#).unwrap(); let mut parser = LogParser::new(lines, log_template, timestamp_template); - let entry = parser.next().await.unwrap(); + let entry = parser.next(true).await.unwrap(); assert_eq!( entry, LogEntry { @@ -264,7 +268,7 @@ mod test { } ); - let entry = parser.next().await.unwrap(); + let entry = parser.next(true).await.unwrap(); assert_eq!( entry, LogEntry { @@ -277,7 +281,7 @@ mod test { } ); - assert!(parser.next().await.is_none()); + assert!(parser.next(true).await.is_none()); } #[tokio::test] @@ -294,7 +298,7 @@ mod test { let timestamp_template= Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)T(?P\S+):(?P\S+):(?P\S+)\.(?P\S\S\S)"#).unwrap(); let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template); - let entry = parser.next().await.unwrap(); + let entry = parser.next(true).await.unwrap(); assert_eq!( entry, LogEntry { @@ -306,7 +310,7 @@ mod test { } ); - let entry = parser.next().await.unwrap(); + let entry = parser.next(true).await.unwrap(); assert_eq!( entry, LogEntry { @@ -318,7 +322,7 @@ mod test { } ); - let entry = parser.next().await.unwrap(); + let entry = parser.next(true).await.unwrap(); assert_eq!( entry, LogEntry { @@ -330,7 +334,7 @@ mod test { } ); - assert!(parser.next().await.is_none()); + assert!(parser.next(true).await.is_none()); } #[tokio::test] @@ -344,7 +348,7 @@ mod test { let timestamp_template = Regex::new(r#"^(?P\S+)-(?P\S+)-(?P\S+)\s(?P\S+):(?P\S+):(?P\S+)"#).unwrap(); let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template); - let entry = parser.next().await.unwrap(); + let entry = parser.next(true).await.unwrap(); assert_eq!( entry, LogEntry { @@ -356,6 +360,6 @@ mod test { } ); - assert!(parser.next().await.is_none()); + assert!(parser.next(true).await.is_none()); } }