Skip to content

Commit

Permalink
fix: config log reader to not wait for multi-line message
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Jan 12, 2024
1 parent 4cd30d8 commit 78a589f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
6 changes: 6 additions & 0 deletions uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,18 @@ impl Default for DeviceShadowConfig {
}
}

fn default_true() -> bool {
true
}

#[derive(Clone, Debug, Deserialize)]
pub struct LogReaderConfig {
pub path: String,
pub stream_name: String,
pub log_template: String,
pub timestamp_template: String,
#[serde(default = "default_true")]
pub multi_line: bool
}

#[derive(Clone, Debug, Deserialize)]
Expand Down
46 changes: 25 additions & 21 deletions uplink/src/collector/log_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl LogEntry {
line: &str,
log_template: &Regex,
timestamp_template: &Regex,
multi_line: bool,
) -> Option<Self> {
let to_string = |x: Match| x.as_str().to_string();
let line = line.trim().to_string();
Expand All @@ -96,12 +97,14 @@ impl LogEntry {
let message = captures.name("message").map(to_string);

return current_line.replace(LogEntry { line, tag, level, timestamp, message });
} else if let Some(log_entry) = current_line {
log_entry.line += &format!("\n{line}");
match &mut log_entry.message {
Some(msg) => *msg += &format!("\n{line}"),
_ => log_entry.message = Some(line),
};
} else if multi_line {
if let Some(log_entry) = current_line {
log_entry.line += &format!("\n{line}");
match &mut log_entry.message {
Some(msg) => *msg += &format!("\n{line}"),
_ => log_entry.message = Some(line),
};
}
}

None
Expand Down Expand Up @@ -132,13 +135,14 @@ impl<T: AsyncBufReadExt + Unpin> LogParser<T> {
Self { lines, log_entry: None, log_template, timestamp_template }
}

async fn next(&mut self) -> Option<LogEntry> {
async fn next(&mut self, multi_line: bool) -> Option<LogEntry> {
while let Some(line) = self.lines.next_line().await.ok()? {
if let Some(entry) = LogEntry::parse(
&mut self.log_entry,
&line,
&self.log_template,
&self.timestamp_template,
multi_line,
) {
return Some(entry);
}
Expand Down Expand Up @@ -175,7 +179,7 @@ impl LogFileReader {
let stream_name = self.config.stream_name.to_owned();
let tx = self.tx.clone();

while let Some(entry) = parser.next().await {
while let Some(entry) = parser.next(self.config.multi_line).await {
sequence += 1;
let payload = entry.payload(stream_name.clone(), sequence);
tx.send_payload(payload).await
Expand All @@ -200,7 +204,7 @@ mod test {
let timestamp_template = Regex::new(r#"^(?P<year>\S+)-(?P<month>\S+)-(?P<day>\S+)T(?P<hour>\S+):(?P<minute>\S+):(?P<second>\S+)\.(?P<millisecond>\S\S\S)"#).unwrap();
let mut parser = LogParser::new(lines, log_template, timestamp_template);

let entry = parser.next().await.unwrap();
let entry = parser.next(false).await.unwrap();
assert_eq!(
entry,
LogEntry {
Expand All @@ -213,7 +217,7 @@ mod test {
}
);

assert!(parser.next().await.is_none());
assert!(parser.next(false).await.is_none());
}

#[tokio::test]
Expand All @@ -225,7 +229,7 @@ mod test {
let timestamp_template = Regex::new(r#"^(?P<year>\S+)-(?P<month>\S+)-(?P<day>\S+)T(?P<hour>\S+):(?P<minute>\S+):(?P<second>\S+)\.(?P<subsecond>\S\S\S)"#).unwrap();
let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template);

let entry = parser.next().await.unwrap();
let entry = parser.next(false).await.unwrap();
assert_eq!(entry.timestamp, 1688407162979);

let raw = r#"23-07-11 18:03:32"#;
Expand All @@ -234,7 +238,7 @@ mod test {
let timestamp_template= Regex::new(r#"^(?P<year>\S+)-(?P<month>\S+)-(?P<day>\S+)\s(?P<hour>\S+):(?P<minute>\S+):(?P<second>\S+)"#).unwrap();
let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template);

let entry = parser.next().await.unwrap();
let entry = parser.next(false).await.unwrap();

assert_eq!(entry.timestamp, 1689098612000);
}
Expand All @@ -251,7 +255,7 @@ mod test {
let timestamp_template= Regex::new(r#"^(?P<year>\S+)-(?P<month>\S+)-(?P<day>\S+)T(?P<hour>\S+):(?P<minute>\S+):(?P<second>\S+)\.(?P<subsecond>\S\S\S)"#).unwrap();
let mut parser = LogParser::new(lines, log_template, timestamp_template);

let entry = parser.next().await.unwrap();
let entry = parser.next(true).await.unwrap();
assert_eq!(
entry,
LogEntry {
Expand All @@ -264,7 +268,7 @@ mod test {
}
);

let entry = parser.next().await.unwrap();
let entry = parser.next(true).await.unwrap();
assert_eq!(
entry,
LogEntry {
Expand All @@ -277,7 +281,7 @@ mod test {
}
);

assert!(parser.next().await.is_none());
assert!(parser.next(true).await.is_none());
}

#[tokio::test]
Expand All @@ -294,7 +298,7 @@ mod test {
let timestamp_template= Regex::new(r#"^(?P<year>\S+)-(?P<month>\S+)-(?P<day>\S+)T(?P<hour>\S+):(?P<minute>\S+):(?P<second>\S+)\.(?P<subsecond>\S\S\S)"#).unwrap();
let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template);

let entry = parser.next().await.unwrap();
let entry = parser.next(true).await.unwrap();
assert_eq!(
entry,
LogEntry {
Expand All @@ -306,7 +310,7 @@ mod test {
}
);

let entry = parser.next().await.unwrap();
let entry = parser.next(true).await.unwrap();
assert_eq!(
entry,
LogEntry {
Expand All @@ -318,7 +322,7 @@ mod test {
}
);

let entry = parser.next().await.unwrap();
let entry = parser.next(true).await.unwrap();
assert_eq!(
entry,
LogEntry {
Expand All @@ -330,7 +334,7 @@ mod test {
}
);

assert!(parser.next().await.is_none());
assert!(parser.next(true).await.is_none());
}

#[tokio::test]
Expand All @@ -344,7 +348,7 @@ mod test {
let timestamp_template = Regex::new(r#"^(?P<year>\S+)-(?P<month>\S+)-(?P<day>\S+)\s(?P<hour>\S+):(?P<minute>\S+):(?P<second>\S+)"#).unwrap();
let mut parser = LogParser::new(lines, log_template.clone(), timestamp_template);

let entry = parser.next().await.unwrap();
let entry = parser.next(true).await.unwrap();
assert_eq!(
entry,
LogEntry {
Expand All @@ -356,6 +360,6 @@ mod test {
}
);

assert!(parser.next().await.is_none());
assert!(parser.next(true).await.is_none());
}
}

0 comments on commit 78a589f

Please sign in to comment.