Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] Add offline processing integration test #661

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0082b85
Video MP4 output (#646)
WojciechBarczynski Jul 25, 2024
22f5e7a
Add AAC encoder, fixed MP4 output (#655)
WojciechBarczynski Aug 7, 2024
5d1fc2f
Update CHANGELOG
WojciechBarczynski Aug 7, 2024
7f0a591
Update comment
WojciechBarczynski Aug 7, 2024
2c609e6
Merge branch 'master' into @WojciechBarczynski/mp4
WojciechBarczynski Aug 7, 2024
7aa5cef
fmt
WojciechBarczynski Aug 7, 2024
4500f62
Standardize errors
WojciechBarczynski Aug 7, 2024
ede799a
Simplify type conversions
WojciechBarczynski Aug 7, 2024
d4f7186
Emit EOS events on input unregister
WojciechBarczynski Aug 8, 2024
820d225
Add output EOS event
WojciechBarczynski Aug 8, 2024
19a0e76
Merge branch '@WojciechBarczynski/mp4' into @WojciechBarczynski/emit_…
WojciechBarczynski Aug 8, 2024
e0086d3
Add mp4 output eos event
WojciechBarczynski Aug 8, 2024
ae149b1
RM input EOS on unregister
WojciechBarczynski Aug 8, 2024
6b9fea5
Update CHANGELOG
WojciechBarczynski Aug 8, 2024
d1f07e8
Unify CHANGELOG
WojciechBarczynski Aug 8, 2024
b43730d
Merge branch '@WojciechBarczynski/mp4' into @WojciechBarczynski/emit_…
WojciechBarczynski Aug 8, 2024
2eda02b
WIP
WojciechBarczynski Aug 8, 2024
581a546
Merge branch '@WojciechBarczynski/emit_eos_on_input_unregister' into …
WojciechBarczynski Aug 9, 2024
d712cb7
Add mp4 check
WojciechBarczynski Aug 9, 2024
5b48f0b
Specify offline processing config
WojciechBarczynski Aug 9, 2024
ce88a3c
Pass config to compositor instance
WojciechBarczynski Aug 9, 2024
df3f6a9
Merge branch 'master' into @WojciechBarczynski/offline_processing_int…
WojciechBarczynski Aug 12, 2024
3263fe7
Add output duration and sample rate check
WojciechBarczynski Aug 13, 2024
2161ad5
Merge branch 'master' into @WojciechBarczynski/offline_processing_int…
WojciechBarczynski Aug 13, 2024
053d68b
Extract test utils
WojciechBarczynski Aug 13, 2024
e8cd803
Review
WojciechBarczynski Aug 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ signal-hook = { workspace = true }
tokio-tungstenite = "0.21.0"
wgpu = { workspace = true }
image = { workspace = true }
regex = "1.10.6"
7 changes: 4 additions & 3 deletions integration_tests/src/compositor_instance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{anyhow, Result};
use crossbeam_channel::Sender;
use live_compositor::{
config::{read_config, LoggerConfig, LoggerFormat},
config::{read_config, Config, LoggerConfig, LoggerFormat},
logger::{self, FfmpegLogLevel},
server::run_api,
state::ApiState,
Expand Down Expand Up @@ -31,10 +31,11 @@ impl Drop for CompositorInstance {
}

impl CompositorInstance {
pub fn start() -> Self {
/// api port in config is overwritten
pub fn start(config: Option<Config>) -> Self {
init_compositor_prerequisites();
let api_port = get_free_port();
let mut config = read_config();
let mut config = config.unwrap_or(read_config());
config.api_port = api_port;

info!("Starting LiveCompositor Integration Test with config:\n{config:#?}",);
Expand Down
70 changes: 70 additions & 0 deletions integration_tests/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,76 @@
mod audio_only;
mod offline_processing;
mod push_input_before_start;
mod required_inputs;
mod schedule_update;
mod unregistering;
mod video_audio;

use crossbeam_channel::Sender;
use futures_util::{SinkExt as _, StreamExt as _};
use tokio_tungstenite::tungstenite;

pub fn start_server_msg_listener(port: u16, event_sender: Sender<tungstenite::Message>) {
std::thread::Builder::new()
.name("Websocket Thread".to_string())
.spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { server_msg_listener(port, event_sender).await });
})
.unwrap();
}

async fn server_msg_listener(port: u16, event_sender: Sender<tungstenite::Message>) {
let url = format!("ws://127.0.0.1:{}/ws", port);

let (ws_stream, _) = tokio_tungstenite::connect_async(url)
.await
.expect("Failed to connect");

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (mut outgoing, mut incoming) = ws_stream.split();

let sender_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if let tungstenite::Message::Close(None) = &msg {
let _ = outgoing.send(msg).await;
return;
}
match outgoing.send(msg).await {
Ok(()) => (),
Err(e) => {
println!("Send Loop: {:?}", e);
let _ = outgoing.send(tungstenite::Message::Close(None)).await;
return;
}
}
}
});

let receiver_task = tokio::spawn(async move {
while let Some(result) = incoming.next().await {
match result {
Ok(tungstenite::Message::Close(_)) => {
let _ = tx.send(tungstenite::Message::Close(None));
return;
}
Ok(tungstenite::Message::Ping(data)) => {
if tx.send(tungstenite::Message::Pong(data)).is_err() {
return;
}
}
Err(_) => {
let _ = tx.send(tungstenite::Message::Close(None));
return;
}
Ok(msg) => {
event_sender.send(msg).unwrap();
}
}
}
});

sender_task.await.unwrap();
receiver_task.await.unwrap();
}
8 changes: 4 additions & 4 deletions integration_tests/src/tests/audio_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
#[test]
pub fn audio_mixing_with_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "audio_mixing_with_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down Expand Up @@ -112,7 +112,7 @@ pub fn audio_mixing_with_offset() -> Result<()> {
#[test]
pub fn audio_mixing_no_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "audio_mixing_no_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down Expand Up @@ -213,7 +213,7 @@ pub fn audio_mixing_no_offset() -> Result<()> {
#[test]
pub fn single_input_opus() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "single_input_opus_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let output_port = instance.get_port();

Expand Down Expand Up @@ -295,7 +295,7 @@ pub fn single_input_opus() -> Result<()> {
#[test]
pub fn single_input_aac() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "single_input_aac_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let output_port = instance.get_port();

Expand Down
149 changes: 149 additions & 0 deletions integration_tests/src/tests/offline_processing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::{
fs::{self},
path::Path,
process::Command,
};

use anyhow::{anyhow, Result};
use live_compositor::config::read_config;
use log::info;
use regex::Regex;
use serde_json::json;
use tokio_tungstenite::tungstenite;

use crate::{tests::start_server_msg_listener, CompositorInstance};

const BUNNY_URL: &str =
"https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4";

#[test]
pub fn offline_processing() -> Result<()> {
const OUTPUT_FILE: &str = "/tmp/offline_processing_output.mp4";
if Path::new(OUTPUT_FILE).exists() {
fs::remove_file(OUTPUT_FILE)?;
};

let mut config = read_config();
config.queue_options.ahead_of_time_processing = true;
config.queue_options.never_drop_output_frames = true;
let instance = CompositorInstance::start(Some(config));
let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
start_server_msg_listener(instance.api_port, msg_sender);

instance.send_request(
"input/input_1/register",
json!({
"type": "mp4",
"url": BUNNY_URL,
"required": true
}),
)?;

instance.send_request(
"output/output_1/register",
json!({
"type": "mp4",
"path": OUTPUT_FILE,
"video": {
"resolution": {
"width": 640,
"height": 320
},
"encoder": {
"type": "ffmpeg_h264",
"preset": "ultrafast",
},
"initial": {
"root": {
"type": "view",
"children": [{
"type": "rescaler",
"child": {
"type": "input_stream",
"input_id": "input_1"
}
}]
}
},
"send_eos_when": { "all_inputs": true }
},
"audio": {
"encoder": {
"type": "aac",
"channels": "stereo"
},
"initial": {
"inputs": [{ "input_id": "input_1" }]
},
"send_eos_when": { "all_inputs": true }
}
}),
)?;

instance.send_request(
"input/input_1/unregister",
json!({
"schedule_time_ms": 2000
}),
)?;
instance.send_request(
"output/output_1/unregister",
json!({
"schedule_time_ms": 2000
}),
)?;

instance.send_request("start", json!({}))?;

for msg in msg_receiver.iter() {
if let tungstenite::Message::Text(msg) = msg {
if msg.contains("\"type\":\"OUTPUT_DONE\",\"output_id\":\"output_1\"") {
info!("breaking");
break;
}
}
}

let command_output = Command::new("ffprobe")
.args(["-v", "error", "-show_format", OUTPUT_FILE])
.output()
.map_err(|e| anyhow!("Invalid mp4 file. FFprobe error: {}", e))?;

if !command_output.status.success() {
return Err(anyhow!(
"Invalid mp4 file. FFprobe error: {}",
String::from_utf8_lossy(&command_output.stderr)
));
}

let output_str = String::from_utf8_lossy(&command_output.stdout);
let (duration, bit_rate) = extract_ffprobe_info(&output_str)?;

if !(1.9..=2.1).contains(&duration) {
return Err(anyhow!("Invalid duration: {}", duration));
}
if !(950_000..=980_000).contains(&bit_rate) {
return Err(anyhow!("Invalid bit rate: {}", bit_rate));
}

Ok(())
}

fn extract_ffprobe_info(output: &str) -> Result<(f64, u64)> {
let re_duration = Regex::new(r"duration=(\d+\.\d+)").unwrap();
let re_bit_rate = Regex::new(r"bit_rate=(\d+)").unwrap();

let duration: f64 = re_duration
.captures(output)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str().parse().unwrap_or(0.0))
.ok_or_else(|| anyhow!("Failed to extract duration"))?;

let bit_rate: u64 = re_bit_rate
.captures(output)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str().parse().unwrap_or(0))
.ok_or_else(|| anyhow!("Failed to extract bit rate"))?;

Ok((duration, bit_rate))
}
8 changes: 4 additions & 4 deletions integration_tests/src/tests/push_input_before_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
#[test]
pub fn push_input_before_start_tcp() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "push_entire_input_before_start_tcp.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down Expand Up @@ -103,7 +103,7 @@ pub fn push_input_before_start_tcp() -> Result<()> {
#[test]
pub fn push_input_before_start_udp() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "push_entire_input_before_start_udp.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down Expand Up @@ -188,7 +188,7 @@ pub fn push_input_before_start_udp() -> Result<()> {
#[test]
pub fn push_input_before_start_tcp_no_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "push_entire_input_before_start_tcp_without_offset.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down Expand Up @@ -273,7 +273,7 @@ pub fn push_input_before_start_tcp_no_offset() -> Result<()> {
#[test]
pub fn push_input_before_start_udp_no_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "push_entire_input_before_start_udp_without_offset.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down
6 changes: 3 additions & 3 deletions integration_tests/src/tests/required_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde_json::json;
#[test]
pub fn required_inputs_no_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "required_inputs_no_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down Expand Up @@ -131,7 +131,7 @@ pub fn required_inputs_no_offset() -> Result<()> {
#[test]
pub fn required_inputs_with_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "required_inputs_with_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down Expand Up @@ -252,7 +252,7 @@ pub fn required_inputs_with_offset() -> Result<()> {
#[test]
pub fn optional_inputs_no_offset_flaky() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "optional_inputs_no_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/tests/schedule_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde_json::json;
#[test]
pub fn schedule_update() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "schedule_update_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/tests/unregistering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde_json::json;
#[test]
pub fn unregistering() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "unregistering_test_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down
Loading