Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement H264 VideoWritter with gstreamer #135

Merged
merged 7 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions crates/kornia-imgproc/benches/bench_flip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,35 +86,47 @@ fn bench_flip(c: &mut Criterion) {
BenchmarkId::new("par_par_slicecopy", &parameter_string),
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0.clone(), i.1.clone());
b.iter(|| black_box(par_par_slicecopy(&src, &mut dst)))
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| {
par_par_slicecopy(black_box(src), black_box(&mut dst));
black_box(())
})
},
);

group.bench_with_input(
BenchmarkId::new("par_loop_loop", &parameter_string),
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0.clone(), i.1.clone());
b.iter(|| black_box(par_loop_loop(&src, &mut dst)))
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| {
par_loop_loop(black_box(src), black_box(&mut dst));
black_box(())
})
},
);

group.bench_with_input(
BenchmarkId::new("par_loop_slicecopy", &parameter_string),
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0.clone(), i.1.clone());
b.iter(|| black_box(par_loop_slicecopy(&src, &mut dst)))
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| {
par_loop_slicecopy(black_box(src), black_box(&mut dst));
black_box(())
})
},
);

group.bench_with_input(
BenchmarkId::new("par_seq_slicecopy", &parameter_string),
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0.clone(), i.1.clone());
b.iter(|| black_box(par_seq_slicecopy(&src, &mut dst)))
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| {
par_seq_slicecopy(black_box(src), black_box(&mut dst));
black_box(())
})
},
);

Expand All @@ -123,7 +135,7 @@ fn bench_flip(c: &mut Criterion) {
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| black_box(flip::horizontal_flip(src, &mut dst)))
b.iter(|| flip::horizontal_flip(black_box(src), black_box(&mut dst)))
},
);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/kornia-io/src/stream/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl StreamCapture {
fn get_appsink(&self) -> Result<gst_app::AppSink, StreamCaptureError> {
self.pipeline
.by_name("sink")
.ok_or_else(|| StreamCaptureError::DowncastAppSinkError)?
.ok_or_else(|| StreamCaptureError::GetElementByNameError)?
.dynamic_cast::<gst_app::AppSink>()
.map_err(StreamCaptureError::DowncastPipelineError)
}
Expand Down
8 changes: 6 additions & 2 deletions crates/kornia-io/src/stream/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub enum StreamCaptureError {
DowncastPipelineError(gst::Element),

/// An error occurred during GStreamer downcast of appsink.
#[error("Failed to downcast appsink")]
DowncastAppSinkError,
#[error("Failed to get an element by name")]
GetElementByNameError,

/// An error occurred during GStreamer to get the bus.
#[error("Failed to get the bus")]
Expand Down Expand Up @@ -67,4 +67,8 @@ pub enum StreamCaptureError {
/// An error for an invalid configuration.
#[error("Invalid configuration: {0}")]
InvalidConfig(String),

/// An error occurred during GStreamer to send end of stream event.
#[error("Error ocurred in the gstreamer flow")]
GstreamerFlowError(#[from] gst::FlowError),
}
4 changes: 4 additions & 0 deletions crates/kornia-io/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ pub mod rtsp;
/// A module for capturing video streams from v4l2 cameras.
pub mod v4l2;

/// A module for capturing video streams from video files.
pub mod video;

pub use crate::stream::camera::{CameraCapture, CameraCaptureConfig};
pub use crate::stream::capture::StreamCapture;
pub use crate::stream::error::StreamCaptureError;
pub use crate::stream::rtsp::RTSPCameraConfig;
pub use crate::stream::v4l2::V4L2CameraConfig;
pub use crate::stream::video::VideoWriter;
219 changes: 219 additions & 0 deletions crates/kornia-io/src/stream/video.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use std::path::Path;

use futures::prelude::*;
use gst::prelude::*;

use kornia_image::{Image, ImageSize};

use super::StreamCaptureError;

/// The codec to use for the video writer.
pub enum VideoWriterCodec {
/// H.264 codec.
H264,
}

/// A struct for writing video files.
pub struct VideoWriter {
pipeline: gst::Pipeline,
appsrc: gst_app::AppSrc,
fps: i32,
counter: u64,
handle: Option<tokio::task::JoinHandle<()>>,
}

impl VideoWriter {
/// Create a new VideoWriter.
///
/// # Arguments
///
/// * `path` - The path to save the video file.
/// * `codec` - The codec to use for the video writer.
/// * `fps` - The frames per second of the video.
/// * `size` - The size of the video.
pub fn new(
path: impl AsRef<Path>,
codec: VideoWriterCodec,
fps: i32,
size: ImageSize,
) -> Result<Self, StreamCaptureError> {
gst::init()?;

// TODO: Add support for other codecs
#[allow(unreachable_patterns)]
let _codec = match codec {
VideoWriterCodec::H264 => "x264enc",
_ => {
return Err(StreamCaptureError::InvalidConfig(
"Unsupported codec".to_string(),
))
}
};

let path = path.as_ref().to_owned();

let pipeline_str = format!(
"appsrc name=src ! \
videoconvert ! video/x-raw,format=I420 ! \
x264enc ! \
video/x-h264,profile=main ! \
h264parse ! \
mp4mux ! \
filesink location={}",
path.to_string_lossy()
);

let pipeline = gst::parse::launch(&pipeline_str)?
.dynamic_cast::<gst::Pipeline>()
.map_err(StreamCaptureError::DowncastPipelineError)?;

let appsrc = pipeline
.by_name("src")
.ok_or_else(|| StreamCaptureError::GetElementByNameError)?
.dynamic_cast::<gst_app::AppSrc>()
.map_err(StreamCaptureError::DowncastPipelineError)?;

appsrc.set_format(gst::Format::Time);

let caps = gst::Caps::builder("video/x-raw")
.field("format", "RGB")
.field("width", size.width as i32)
.field("height", size.height as i32)
.field("framerate", gst::Fraction::new(fps, 1))
.build();

appsrc.set_caps(Some(&caps));

appsrc.set_is_live(true);
appsrc.set_property("block", false);

Ok(Self {
pipeline,
appsrc,
fps,
counter: 0,
handle: None,
})
}

/// Start the video writer
pub fn start(&mut self) -> Result<(), StreamCaptureError> {
self.pipeline.set_state(gst::State::Playing)?;

let bus = self.pipeline.bus().ok_or(StreamCaptureError::BusError)?;
let mut messages = bus.stream();

let handle = tokio::spawn(async move {
while let Some(msg) = messages.next().await {
match msg.view() {
gst::MessageView::Eos(..) => {
println!("EOS");
break;
}
gst::MessageView::Error(err) => {
eprintln!(
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
}
_ => {}
}
}
});

self.handle = Some(handle);

Ok(())
}

/// Stop the video writer
pub fn stop(&mut self) -> Result<(), StreamCaptureError> {
// Send end of stream to the appsrc
self.appsrc
.end_of_stream()
.map_err(StreamCaptureError::GstreamerFlowError)?;

// Take the handle and await it
// TODO: This is a blocking call, we need to make it non-blocking
if let Some(handle) = self.handle.take() {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
if let Err(e) = handle.await {
eprintln!("Error waiting for handle: {:?}", e);
}
});
});
}

// Set the pipeline to null
self.pipeline.set_state(gst::State::Null)?;

Ok(())
}

/// Write an image to the video file.
///
/// # Arguments
///
/// * `img` - The image to write to the video file.
// TODO: support write_async
pub fn write(&mut self, img: &Image<u8, 3>) -> Result<(), StreamCaptureError> {
// TODO: verify is there is a cheaper way to copy the buffer
let mut buffer = gst::Buffer::from_mut_slice(img.as_slice().to_vec());

let pts = gst::ClockTime::from_nseconds(self.counter * 1_000_000_000 / self.fps as u64);
let duration = gst::ClockTime::from_nseconds(1_000_000_000 / self.fps as u64);

let buffer_ref = buffer.get_mut().expect("Failed to get buffer");
buffer_ref.set_pts(Some(pts));
buffer_ref.set_duration(Some(duration));

self.counter += 1;

if let Err(err) = self.appsrc.push_buffer(buffer) {
return Err(StreamCaptureError::InvalidConfig(err.to_string()));
}

Ok(())
}
}

impl Drop for VideoWriter {
fn drop(&mut self) {
self.stop().unwrap_or_else(|e| {
eprintln!("Error stopping video writer: {:?}", e);
});
}
}

#[cfg(test)]
mod tests {
use super::{VideoWriter, VideoWriterCodec};
use kornia_image::{Image, ImageSize};

#[test]
#[ignore = "TODO: fix this test as there's a race condition in the gstreamer flow"]
fn video_writer() -> Result<(), Box<dyn std::error::Error>> {
let tmp_dir = tempfile::tempdir()?;
std::fs::create_dir_all(tmp_dir.path())?;

let file_path = tmp_dir.path().join("test.mp4");

let size = ImageSize {
width: 6,
height: 4,
};
let mut writer = VideoWriter::new(&file_path, VideoWriterCodec::H264, 30, size)?;
writer.start()?;

let img = Image::new(size, vec![0; size.width * size.height * 3])?;
writer.write(&img)?;
writer.stop()?;

assert!(file_path.exists(), "File does not exist: {:?}", file_path);

Ok(())
}
}
14 changes: 14 additions & 0 deletions examples/video_write tasks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "video_write_tasks"
version = "0.1.0"
authors = ["Edgar Riba <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
ctrlc = "3.4.4"
kornia = { workspace = true, features = ["gstreamer"] }
rerun = "0.18"
tokio = { version = "1" }
20 changes: 20 additions & 0 deletions examples/video_write tasks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Example showing how to write a video using different background tasks.

NOTE: This example requires the gstremer backend to be enabled. To enable the gstreamer backend, use the `gstreamer` feature flag when building the `kornia` crate and its dependencies.

```bash
Usage: video_write_tasks [OPTIONS] --output <OUTPUT>

Options:
-o, --output <OUTPUT>
-c, --camera-id <CAMERA_ID> [default: 0]
-f, --fps <FPS> [default: 30]
-d, --duration <DURATION>
-h, --help Print help Print help
```

Example:

```bash
cargo run --bin video_write_tasks --release -- --output output.mp4
```
Loading
Loading