diff --git a/crates/kornia-imgproc/benches/bench_flip.rs b/crates/kornia-imgproc/benches/bench_flip.rs index 96a3bc71..813c41bc 100644 --- a/crates/kornia-imgproc/benches/bench_flip.rs +++ b/crates/kornia-imgproc/benches/bench_flip.rs @@ -86,8 +86,11 @@ fn bench_flip(c: &mut Criterion) { BenchmarkId::new("par_par_slicecopy", ¶meter_string), &(&image_f32, &output), |b, i| { - let (src, mut dst) = (i.0.clone(), i.1.clone()); - b.iter(|| black_box(par_par_slicecopy(&src, &mut dst))) + let (src, mut dst) = (i.0, i.1.clone()); + b.iter(|| { + par_par_slicecopy(black_box(src), black_box(&mut dst)); + black_box(()) + }) }, ); @@ -95,8 +98,11 @@ fn bench_flip(c: &mut Criterion) { BenchmarkId::new("par_loop_loop", ¶meter_string), &(&image_f32, &output), |b, i| { - let (src, mut dst) = (i.0.clone(), i.1.clone()); - b.iter(|| black_box(par_loop_loop(&src, &mut dst))) + let (src, mut dst) = (i.0, i.1.clone()); + b.iter(|| { + par_loop_loop(black_box(src), black_box(&mut dst)); + black_box(()) + }) }, ); @@ -104,8 +110,11 @@ fn bench_flip(c: &mut Criterion) { BenchmarkId::new("par_loop_slicecopy", ¶meter_string), &(&image_f32, &output), |b, i| { - let (src, mut dst) = (i.0.clone(), i.1.clone()); - b.iter(|| black_box(par_loop_slicecopy(&src, &mut dst))) + let (src, mut dst) = (i.0, i.1.clone()); + b.iter(|| { + par_loop_slicecopy(black_box(src), black_box(&mut dst)); + black_box(()) + }) }, ); @@ -113,8 +122,11 @@ fn bench_flip(c: &mut Criterion) { BenchmarkId::new("par_seq_slicecopy", ¶meter_string), &(&image_f32, &output), |b, i| { - let (src, mut dst) = (i.0.clone(), i.1.clone()); - b.iter(|| black_box(par_seq_slicecopy(&src, &mut dst))) + let (src, mut dst) = (i.0, i.1.clone()); + b.iter(|| { + par_seq_slicecopy(black_box(src), black_box(&mut dst)); + black_box(()) + }) }, ); @@ -123,7 +135,7 @@ fn bench_flip(c: &mut Criterion) { &(&image_f32, &output), |b, i| { let (src, mut dst) = (i.0, i.1.clone()); - b.iter(|| black_box(flip::horizontal_flip(src, &mut dst))) + b.iter(|| flip::horizontal_flip(black_box(src), black_box(&mut dst))) }, ); } diff --git a/crates/kornia-io/src/stream/capture.rs b/crates/kornia-io/src/stream/capture.rs index 7a94c4fa..22219b18 100644 --- a/crates/kornia-io/src/stream/capture.rs +++ b/crates/kornia-io/src/stream/capture.rs @@ -161,7 +161,7 @@ impl StreamCapture { fn get_appsink(&self) -> Result { self.pipeline .by_name("sink") - .ok_or_else(|| StreamCaptureError::DowncastAppSinkError)? + .ok_or_else(|| StreamCaptureError::GetElementByNameError)? .dynamic_cast::() .map_err(StreamCaptureError::DowncastPipelineError) } diff --git a/crates/kornia-io/src/stream/error.rs b/crates/kornia-io/src/stream/error.rs index c3f79eff..3333ebd0 100644 --- a/crates/kornia-io/src/stream/error.rs +++ b/crates/kornia-io/src/stream/error.rs @@ -10,8 +10,8 @@ pub enum StreamCaptureError { DowncastPipelineError(gst::Element), /// An error occurred during GStreamer downcast of appsink. - #[error("Failed to downcast appsink")] - DowncastAppSinkError, + #[error("Failed to get an element by name")] + GetElementByNameError, /// An error occurred during GStreamer to get the bus. #[error("Failed to get the bus")] @@ -67,4 +67,8 @@ pub enum StreamCaptureError { /// An error for an invalid configuration. #[error("Invalid configuration: {0}")] InvalidConfig(String), + + /// An error occurred during GStreamer to send end of stream event. + #[error("Error ocurred in the gstreamer flow")] + GstreamerFlowError(#[from] gst::FlowError), } diff --git a/crates/kornia-io/src/stream/mod.rs b/crates/kornia-io/src/stream/mod.rs index 73572544..1cd11975 100644 --- a/crates/kornia-io/src/stream/mod.rs +++ b/crates/kornia-io/src/stream/mod.rs @@ -13,8 +13,12 @@ pub mod rtsp; /// A module for capturing video streams from v4l2 cameras. pub mod v4l2; +/// A module for capturing video streams from video files. +pub mod video; + pub use crate::stream::camera::{CameraCapture, CameraCaptureConfig}; pub use crate::stream::capture::StreamCapture; pub use crate::stream::error::StreamCaptureError; pub use crate::stream::rtsp::RTSPCameraConfig; pub use crate::stream::v4l2::V4L2CameraConfig; +pub use crate::stream::video::VideoWriter; diff --git a/crates/kornia-io/src/stream/video.rs b/crates/kornia-io/src/stream/video.rs new file mode 100644 index 00000000..116bd232 --- /dev/null +++ b/crates/kornia-io/src/stream/video.rs @@ -0,0 +1,219 @@ +use std::path::Path; + +use futures::prelude::*; +use gst::prelude::*; + +use kornia_image::{Image, ImageSize}; + +use super::StreamCaptureError; + +/// The codec to use for the video writer. +pub enum VideoWriterCodec { + /// H.264 codec. + H264, +} + +/// A struct for writing video files. +pub struct VideoWriter { + pipeline: gst::Pipeline, + appsrc: gst_app::AppSrc, + fps: i32, + counter: u64, + handle: Option>, +} + +impl VideoWriter { + /// Create a new VideoWriter. + /// + /// # Arguments + /// + /// * `path` - The path to save the video file. + /// * `codec` - The codec to use for the video writer. + /// * `fps` - The frames per second of the video. + /// * `size` - The size of the video. + pub fn new( + path: impl AsRef, + codec: VideoWriterCodec, + fps: i32, + size: ImageSize, + ) -> Result { + gst::init()?; + + // TODO: Add support for other codecs + #[allow(unreachable_patterns)] + let _codec = match codec { + VideoWriterCodec::H264 => "x264enc", + _ => { + return Err(StreamCaptureError::InvalidConfig( + "Unsupported codec".to_string(), + )) + } + }; + + let path = path.as_ref().to_owned(); + + let pipeline_str = format!( + "appsrc name=src ! \ + videoconvert ! video/x-raw,format=I420 ! \ + x264enc ! \ + video/x-h264,profile=main ! \ + h264parse ! \ + mp4mux ! \ + filesink location={}", + path.to_string_lossy() + ); + + let pipeline = gst::parse::launch(&pipeline_str)? + .dynamic_cast::() + .map_err(StreamCaptureError::DowncastPipelineError)?; + + let appsrc = pipeline + .by_name("src") + .ok_or_else(|| StreamCaptureError::GetElementByNameError)? + .dynamic_cast::() + .map_err(StreamCaptureError::DowncastPipelineError)?; + + appsrc.set_format(gst::Format::Time); + + let caps = gst::Caps::builder("video/x-raw") + .field("format", "RGB") + .field("width", size.width as i32) + .field("height", size.height as i32) + .field("framerate", gst::Fraction::new(fps, 1)) + .build(); + + appsrc.set_caps(Some(&caps)); + + appsrc.set_is_live(true); + appsrc.set_property("block", false); + + Ok(Self { + pipeline, + appsrc, + fps, + counter: 0, + handle: None, + }) + } + + /// Start the video writer + pub fn start(&mut self) -> Result<(), StreamCaptureError> { + self.pipeline.set_state(gst::State::Playing)?; + + let bus = self.pipeline.bus().ok_or(StreamCaptureError::BusError)?; + let mut messages = bus.stream(); + + let handle = tokio::spawn(async move { + while let Some(msg) = messages.next().await { + match msg.view() { + gst::MessageView::Eos(..) => { + println!("EOS"); + break; + } + gst::MessageView::Error(err) => { + eprintln!( + "Error from {:?}: {} ({:?})", + msg.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + } + _ => {} + } + } + }); + + self.handle = Some(handle); + + Ok(()) + } + + /// Stop the video writer + pub fn stop(&mut self) -> Result<(), StreamCaptureError> { + // Send end of stream to the appsrc + self.appsrc + .end_of_stream() + .map_err(StreamCaptureError::GstreamerFlowError)?; + + // Take the handle and await it + // TODO: This is a blocking call, we need to make it non-blocking + if let Some(handle) = self.handle.take() { + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + if let Err(e) = handle.await { + eprintln!("Error waiting for handle: {:?}", e); + } + }); + }); + } + + // Set the pipeline to null + self.pipeline.set_state(gst::State::Null)?; + + Ok(()) + } + + /// Write an image to the video file. + /// + /// # Arguments + /// + /// * `img` - The image to write to the video file. + // TODO: support write_async + pub fn write(&mut self, img: &Image) -> Result<(), StreamCaptureError> { + // TODO: verify is there is a cheaper way to copy the buffer + let mut buffer = gst::Buffer::from_mut_slice(img.as_slice().to_vec()); + + let pts = gst::ClockTime::from_nseconds(self.counter * 1_000_000_000 / self.fps as u64); + let duration = gst::ClockTime::from_nseconds(1_000_000_000 / self.fps as u64); + + let buffer_ref = buffer.get_mut().expect("Failed to get buffer"); + buffer_ref.set_pts(Some(pts)); + buffer_ref.set_duration(Some(duration)); + + self.counter += 1; + + if let Err(err) = self.appsrc.push_buffer(buffer) { + return Err(StreamCaptureError::InvalidConfig(err.to_string())); + } + + Ok(()) + } +} + +impl Drop for VideoWriter { + fn drop(&mut self) { + self.stop().unwrap_or_else(|e| { + eprintln!("Error stopping video writer: {:?}", e); + }); + } +} + +#[cfg(test)] +mod tests { + use super::{VideoWriter, VideoWriterCodec}; + use kornia_image::{Image, ImageSize}; + + #[test] + #[ignore = "TODO: fix this test as there's a race condition in the gstreamer flow"] + fn video_writer() -> Result<(), Box> { + let tmp_dir = tempfile::tempdir()?; + std::fs::create_dir_all(tmp_dir.path())?; + + let file_path = tmp_dir.path().join("test.mp4"); + + let size = ImageSize { + width: 6, + height: 4, + }; + let mut writer = VideoWriter::new(&file_path, VideoWriterCodec::H264, 30, size)?; + writer.start()?; + + let img = Image::new(size, vec![0; size.width * size.height * 3])?; + writer.write(&img)?; + writer.stop()?; + + assert!(file_path.exists(), "File does not exist: {:?}", file_path); + + Ok(()) + } +} diff --git a/examples/video_write tasks/Cargo.toml b/examples/video_write tasks/Cargo.toml new file mode 100644 index 00000000..995b9a17 --- /dev/null +++ b/examples/video_write tasks/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "video_write_tasks" +version = "0.1.0" +authors = ["Edgar Riba "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +clap = { version = "4.5.4", features = ["derive"] } +ctrlc = "3.4.4" +kornia = { workspace = true, features = ["gstreamer"] } +rerun = "0.18" +tokio = { version = "1" } diff --git a/examples/video_write tasks/README.md b/examples/video_write tasks/README.md new file mode 100644 index 00000000..b80929e1 --- /dev/null +++ b/examples/video_write tasks/README.md @@ -0,0 +1,20 @@ +Example showing how to write a video using different background tasks. + +NOTE: This example requires the gstremer backend to be enabled. To enable the gstreamer backend, use the `gstreamer` feature flag when building the `kornia` crate and its dependencies. + +```bash +Usage: video_write_tasks [OPTIONS] --output + +Options: + -o, --output + -c, --camera-id [default: 0] + -f, --fps [default: 30] + -d, --duration + -h, --help Print help Print help +``` + +Example: + +```bash +cargo run --bin video_write_tasks --release -- --output output.mp4 +``` diff --git a/examples/video_write tasks/src/main.rs b/examples/video_write tasks/src/main.rs new file mode 100644 index 00000000..02ef76ac --- /dev/null +++ b/examples/video_write tasks/src/main.rs @@ -0,0 +1,130 @@ +use clap::Parser; +use std::{path::PathBuf, sync::Arc}; +use tokio::signal; +use tokio::sync::Mutex; + +use kornia::{ + image::{Image, ImageSize}, + io::stream::{video::VideoWriterCodec, V4L2CameraConfig, VideoWriter}, +}; + +#[derive(Parser)] +struct Args { + #[arg(short, long)] + output: PathBuf, + + #[arg(short, long, default_value = "0")] + camera_id: u32, + + #[arg(short, long, default_value = "30")] + fps: i32, + + #[arg(short, long)] + duration: Option, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + + // Ensure the output path ends with .mp4 + if args.output.extension().and_then(|ext| ext.to_str()) != Some("mp4") { + return Err("Output file must have a .mp4 extension".into()); + } + + // start the recording stream + let rec = rerun::RecordingStreamBuilder::new("Kornia Video Write App").spawn()?; + + // allocate the image buffers + let frame_size = ImageSize { + width: 640, + height: 480, + }; + + // create a webcam capture object with camera id 0 + // and force the image size to 640x480 + let webcam = V4L2CameraConfig::new() + .with_camera_id(args.camera_id) + .with_fps(args.fps as u32) + .with_size(frame_size) + .build()?; + + // start the video writer + let video_writer = VideoWriter::new(args.output, VideoWriterCodec::H264, args.fps, frame_size)?; + let video_writer = Arc::new(Mutex::new(video_writer)); + video_writer.lock().await.start()?; + + // Create a channel to send frames to the video writer + let (tx, rx) = tokio::sync::mpsc::channel::>>>(32); + let rx = Arc::new(Mutex::new(rx)); + + // Spawn a task to read frames from the camera and send them to the video writer + let video_writer_task = tokio::spawn({ + let rx = rx.clone(); + let video_writer = video_writer.clone(); + async move { + while let Some(img) = rx.lock().await.recv().await { + // lock the image and write it to the video writer + let img = img.lock().await; + video_writer + .lock() + .await + .write(&img) + .expect("Failed to write image to video writer"); + } + Ok::<_, Box>(()) + } + }); + + // Visualization thread + let visualization_task = tokio::spawn({ + let rec = rec.clone(); + let rx = rx.clone(); + async move { + while let Some(img) = rx.lock().await.recv().await { + // lock the image and log it + let img = img.lock().await; + rec.log_static( + "image", + &rerun::Image::from_elements( + img.as_slice(), + img.size().into(), + rerun::ColorModel::RGB, + ), + )?; + } + Ok::<_, Box>(()) + } + }); + + // start grabbing frames from the camera + let capture = webcam.run_with_termination( + |img| { + let tx = tx.clone(); + async move { + // send the image to the video writer and the visualization + tx.send(Arc::new(Mutex::new(img))).await?; + Ok(()) + } + }, + async { + signal::ctrl_c().await.expect("Failed to listen for Ctrl+C"); + println!("👋 Finished recording. Closing app."); + }, + ); + + tokio::select! { + _ = capture => (), + _ = video_writer_task => (), + _ = visualization_task => (), + _ = signal::ctrl_c() => (), + } + + video_writer + .lock() + .await + .stop() + .expect("Failed to stop video writer"); + + Ok(()) +} diff --git a/examples/video_write/Cargo.toml b/examples/video_write/Cargo.toml new file mode 100644 index 00000000..3f8d5d00 --- /dev/null +++ b/examples/video_write/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "video_write" +version = "0.1.0" +authors = ["Edgar Riba "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +clap = { version = "4.5.4", features = ["derive"] } +ctrlc = "3.4.4" +kornia = { workspace = true, features = ["gstreamer"] } +rerun = "0.18" +tokio = { version = "1" } diff --git a/examples/video_write/README.md b/examples/video_write/README.md new file mode 100644 index 00000000..35027082 --- /dev/null +++ b/examples/video_write/README.md @@ -0,0 +1,19 @@ +An example showing how to write a video file using the `kornia::io` module along with the webcam capture example. Visualizes the webcam feed in a [`rerun`](https://github.com/rerun-io/rerun) window. + +NOTE: This example requires the gstremer backend to be enabled. To enable the gstreamer backend, use the `gstreamer` feature flag when building the `kornia` crate and its dependencies. + +```bash +Usage: video_write [OPTIONS] --output + +Options: + -o, --output + -c, --camera-id [default: 0] + -f, --fps [default: 30] + -h, --help Print help +``` + +Example: + +```bash +cargo run --bin video_write --release -- --output ~/output.mp4 +``` diff --git a/examples/video_write/src/main.rs b/examples/video_write/src/main.rs new file mode 100644 index 00000000..a7f5db20 --- /dev/null +++ b/examples/video_write/src/main.rs @@ -0,0 +1,90 @@ +use clap::Parser; +use std::{path::PathBuf, sync::Arc}; +use tokio::signal; +use tokio::sync::Mutex; + +use kornia::{ + image::ImageSize, + io::stream::{video::VideoWriterCodec, V4L2CameraConfig, VideoWriter}, +}; + +#[derive(Parser)] +struct Args { + #[arg(short, long)] + output: PathBuf, + + #[arg(short, long, default_value = "0")] + camera_id: u32, + + #[arg(short, long, default_value = "30")] + fps: i32, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + + // Ensure the output path ends with .mp4 + if args.output.extension().and_then(|ext| ext.to_str()) != Some("mp4") { + return Err("Output file must have a .mp4 extension".into()); + } + + // start the recording stream + let rec = rerun::RecordingStreamBuilder::new("Kornia Video Write App").spawn()?; + + // allocate the image buffers + let frame_size = ImageSize { + width: 640, + height: 480, + }; + + // create a webcam capture object with camera id 0 + // and force the image size to 640x480 + let webcam = V4L2CameraConfig::new() + .with_camera_id(args.camera_id) + .with_fps(args.fps as u32) + .with_size(frame_size) + .build()?; + + // start the video writer + let video_writer = VideoWriter::new(args.output, VideoWriterCodec::H264, args.fps, frame_size)?; + let video_writer = Arc::new(Mutex::new(video_writer)); + video_writer.lock().await.start()?; + + // start grabbing frames from the camera + webcam + .run_with_termination( + |img| { + let rec = rec.clone(); + let video_writer = video_writer.clone(); + async move { + // write the image to the video writer + video_writer.lock().await.write(&img)?; + + // log the image + rec.log_static( + "image", + &rerun::Image::from_elements( + img.as_slice(), + img.size().into(), + rerun::ColorModel::RGB, + ), + )?; + Ok(()) + } + }, + async { + signal::ctrl_c().await.expect("Failed to listen for Ctrl+C"); + println!("👋 Finished recording. Closing app."); + }, + ) + .await?; + + video_writer + .lock() + .await + .stop() + .expect("Failed to stop video writer"); + + Ok(()) +}