diff --git a/docker/Dockerfile.manylinux_2_28_ARM64 b/docker/Dockerfile.manylinux_2_28_ARM64 index 399d608..805e1e8 100644 --- a/docker/Dockerfile.manylinux_2_28_ARM64 +++ b/docker/Dockerfile.manylinux_2_28_ARM64 @@ -1,4 +1,4 @@ -FROM ghcr.io/insight-platform/manylinux_2_28_arm64:v0.0.5 as builder +FROM ghcr.io/insight-platform/manylinux_2_28_arm64:v0.0.5 AS builder WORKDIR /opt COPY . . @@ -7,5 +7,5 @@ ARG PYTHON_INTERPRETER RUN bash /opt/docker/build-manylinux.sh RUN rm -rf target -FROM alpine:3.18 as dist +FROM alpine:3.18 AS dist COPY --from=builder /opt/dist /opt/dist diff --git a/docker/Dockerfile.manylinux_2_28_X64 b/docker/Dockerfile.manylinux_2_28_X64 index e58a692..95b1216 100644 --- a/docker/Dockerfile.manylinux_2_28_X64 +++ b/docker/Dockerfile.manylinux_2_28_X64 @@ -1,4 +1,4 @@ -FROM ghcr.io/insight-platform/manylinux_2_28_x64:v0.0.5 as builder +FROM ghcr.io/insight-platform/manylinux_2_28_x64:v0.0.5 AS builder WORKDIR /opt COPY . . @@ -7,5 +7,5 @@ ARG PYTHON_INTERPRETER RUN bash /opt/docker/build-manylinux.sh RUN rm -rf target -FROM alpine:3.18 as dist +FROM alpine:3.18 AS dist COPY --from=builder /opt/dist /opt/dist \ No newline at end of file diff --git a/docker/Dockerfile.u2204 b/docker/Dockerfile.u2204 index cbbcd1d..4af18bb 100644 --- a/docker/Dockerfile.u2204 +++ b/docker/Dockerfile.u2204 @@ -1,10 +1,10 @@ -FROM ubuntu:22.04 as base +FROM ubuntu:22.04 AS base WORKDIR /opt COPY docker/install-basic-deps-u2204.sh . RUN bash /opt/install-basic-deps-u2204.sh -FROM base as chef +FROM base AS chef ENV PATH="/root/.cargo/bin:$PATH" RUN rustc -V @@ -25,5 +25,5 @@ RUN maturin build --release --manylinux off --out dist RUN python3 -m pip install --upgrade pip RUN python3 -m pip install dist/*.whl -FROM alpine:3.18 as dist +FROM alpine:3.18 AS dist COPY --from=builder /opt/dist /opt/dist diff --git a/src/lib.rs b/src/lib.rs index f267eb3..2a8f916 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,11 +10,14 @@ use derive_builder::Builder; use ffmpeg::util::frame::video::Video; use ffmpeg_next as ffmpeg; use ffmpeg_next::codec::{Id, Parameters}; -use ffmpeg_next::ffi::{av_bsf_alloc, av_bsf_init, AVBSFContext, AVERROR, AVERROR_EOF}; +use ffmpeg_next::ffi::{ + av_bsf_alloc, av_bsf_flush, av_bsf_init, AVBSFContext, AVERROR, AVERROR_EOF, +}; use ffmpeg_next::format::{input_with_dictionary, Pixel}; use ffmpeg_next::log::Level; use ffmpeg_next::packet::Mut; use ffmpeg_next::software::converter; +use ffmpeg_next::software::scaling::Context; use ffmpeg_next::sys::{ av_bsf_get_by_name, av_bsf_receive_packet, av_bsf_send_packet, av_opt_set, avcodec_parameters_copy, AV_OPT_SEARCH_CHILDREN, EAGAIN, @@ -272,13 +275,18 @@ fn process_bsf( for mut packet in packets.drain(..) { debug!("Source packet size: {}", packet.size()); unsafe { - if av_bsf_send_packet(filter.ptr, packet.as_mut_ptr()) < 0 { - error!("Unable to send packet to bitstream filter"); + if !packet.is_empty() { + if av_bsf_send_packet(filter.ptr, packet.as_mut_ptr()) < 0 { + error!("Unable to send packet to bitstream filter"); + } + } else { + av_bsf_flush(filter.ptr); } loop { let mut new_packet = Packet::empty(); let ret = av_bsf_receive_packet(filter.ptr, new_packet.as_mut_ptr()); + debug!("Receive packet result: {}", ret); if ret == AVERROR(EAGAIN) { debug!("Required extra packets"); break; @@ -295,7 +303,6 @@ fn process_bsf( "New packet size: {}", new_packet.data().as_ref().unwrap().len() ); - new_packet.set_stream(packet.stream()); new_packets.push(new_packet.clone()); } } @@ -333,75 +340,75 @@ fn handle(params: HandleParams) -> anyhow::Result<()> { error!("Unable to open input URI. Error is: {:?}", e); e })?; + let (mut video_decoder, mut video_filters, mut converter) = { + let video_input = match ictx.streams().best(ffmpeg_next::media::Type::Video) { + Some(s) => s, + None => { + let msg = "Cannot discover the best suitable video stream to work with."; + error!("{}", msg); + bail!(msg); + } + }; + let video_parameters = video_input.parameters(); + let time_base = video_input.time_base(); + + info!("Codec: {:?}", video_input.codec().id()); + + let mut video_filters = Vec::new(); + + let codec_name = video_input.codec().id().name(); + for f in ¶ms.bsf_filters { + if f.codec != codec_name { + info!( + "Skipping filter {} as it is not applicable to codec {}, must match {}", + f.name, codec_name, f.codec + ); + continue; + } - let video_input = match ictx.streams().best(ffmpeg_next::media::Type::Video) { - Some(s) => s, - None => { - let msg = "Cannot discover the best suitable video stream to work with."; - error!("{}", msg); - bail!(msg); - } - }; - let video_parameters = video_input.parameters(); - let time_base = video_input.time_base(); - - info!("Codec: {:?}", video_input.codec().id()); - - let video_stream_index = video_input.index(); - - let mut video_filters = Vec::new(); - - let codec_name = video_input.codec().id().name(); - for f in ¶ms.bsf_filters { - if f.codec != codec_name { info!( - "Skipping filter {} as it is not applicable to codec {}, must match {}", - f.name, codec_name, f.codec + "Initializing filter: {} with parameters {:?}", + f.name, f.params ); - continue; - } - - info!( - "Initializing filter: {} with parameters {:?}", - f.name, f.params - ); - - video_filters.push(init_bsf( - f.name.as_str(), - &video_parameters, - time_base, - &f.params, - )?); - } - - let mut video_decoder = - ffmpeg::codec::context::Context::from_parameters(video_input.parameters()) - .and_then(|c| c.decoder().video()) - .map_err(|e| { - error!("Unable to get video decoder. Error is: {:?}", e); - e - })?; - let mut converter = converter( - (video_decoder.width(), video_decoder.height()), - video_decoder.format(), - DECODING_FORMAT, - ) - .map_err(|e| { - error!("Unable to get video converter. Error is: {:?}", e); - e - })?; + video_filters.push(init_bsf( + f.name.as_str(), + &video_parameters, + time_base, + &f.params, + )?); + } - let audio_stream_index_opt = ictx - .streams() - .best(ffmpeg_next::media::Type::Audio) - .map(|s| s.index()); + let video_decoder = + ffmpeg::codec::context::Context::from_parameters(video_input.parameters()) + .and_then(|c| c.decoder().video()) + .map_err(|e| { + error!("Unable to get video decoder. Error is: {:?}", e); + e + })?; + + let converter = converter( + (video_decoder.width(), video_decoder.height()), + video_decoder.format(), + DECODING_FORMAT, + ) + .map_err(|e| { + error!("Unable to get video converter. Error is: {:?}", e); + e + })?; - let audio_opt = ictx - .streams() - .best(ffmpeg_next::media::Type::Audio) - .and_then(|s| ffmpeg::codec::context::Context::from_parameters(s.parameters()).ok()) - .and_then(|c| c.decoder().audio().ok()); + (video_decoder, video_filters, converter) + }; + // let audio_stream_index_opt = ictx + // .streams() + // .best(ffmpeg_next::media::Type::Audio) + // .map(|s| s.index()); + // + // let audio_opt = ictx + // .streams() + // .best(ffmpeg_next::media::Type::Audio) + // .and_then(|s| ffmpeg::codec::context::Context::from_parameters(s.parameters()).ok()) + // .and_then(|c| c.decoder().audio().ok()); let mut skip_until_first_key_frame = true; params.init_complete.send(()).map_err(|e| { @@ -425,163 +432,208 @@ fn handle(params: HandleParams) -> anyhow::Result<()> { ffmpeg::log::set_level(l); } - let frame_received_ts = i64::try_from( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis(), - ) - .map_err(|e| { - error!("Unable to get current time. Error is: {:?}", e); - e - })?; + process_packet( + ¶ms, + &mut queue_full_skipped_count, + &mut skip_until_first_key_frame, + &mut video_decoder, + &mut converter, + &mut video_filters, + &packet, + &stream, + )?; + } - if let Some(index) = audio_stream_index_opt { - if index == stream.index() { - if let Some(name) = audio_opt - .as_ref() - .and_then(|a| a.codec().as_ref().map(|c| String::from(c.name()))) - { - debug!("Audio streams are not supported yet. Codec is {}", name); - } - } - } + let video_input = ictx + .streams() + .best(ffmpeg_next::media::Type::Video) + .unwrap(); + + let mut packet = Packet::empty(); + packet.set_stream(video_input.index()); + + process_packet( + ¶ms, + &mut queue_full_skipped_count, + &mut skip_until_first_key_frame, + &mut video_decoder, + &mut converter, + &mut video_filters, + &packet, + &video_input, + )?; - if stream.index() == video_stream_index { - let modified_packets = process_bsf(&mut video_filters, &packet)?; - for p in &modified_packets { - let time_base_r = stream.time_base(); + Ok(()) +} - let has_key_frames = match is_stream_key_framed(stream.codec().id()) { - Ok(res) => res, - Err(e) => { - bail!( - "Unsupported video codec detected: {:?}, exit the application.", - e - ); - } - }; +#[allow(clippy::too_many_arguments)] +fn process_packet( + params: &HandleParams, + queue_full_skipped_count: &mut i64, + skip_until_first_key_frame: &mut bool, + video_decoder: &mut ffmpeg::codec::decoder::Video, + converter: &mut Context, + video_filters: &mut Vec, + packet: &Packet, + video_stream: &ffmpeg::Stream, +) -> anyhow::Result<()> { + let frame_received_ts = i64::try_from( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis(), + ) + .map_err(|e| { + error!("Unable to get current time. Error is: {:?}", e); + e + })?; - if has_key_frames { - if p.is_key() { - skip_until_first_key_frame = false; - } - } else { - skip_until_first_key_frame = false; + // if let Some(audio_stream) = audio_opt.map(|a| a.) { + // if audio_stream.index() == packet.stream() { + // if let Some(name) = audio_opt + // .as_ref() + // .and_then(|a| a.codec().as_ref().map(|c| String::from(c.name()))) + // { + // debug!("Audio streams are not supported yet. Codec is {}", name); + // return Ok(()); + // } + // } + // } + + if packet.stream() == video_stream.index() { + let modified_packets = process_bsf(video_filters, packet)?; + for p in &modified_packets { + let time_base_r = video_stream.time_base(); + + let has_key_frames = match is_stream_key_framed(video_stream.codec().id()) { + Ok(res) => res, + Err(e) => { + bail!( + "Unsupported video codec detected: {:?}, exit the application.", + e + ); } + }; - if skip_until_first_key_frame { - debug!("Skipping until the first key frame"); - continue; + if has_key_frames { + if p.is_key() { + *skip_until_first_key_frame = false; } + } else { + *skip_until_first_key_frame = false; + } - let decode = params.decode - || (params.autoconvert_raw_formats_to_rgb24 - && video_decoder.codec().map(|c| c.id()) == Some(Id::RAWVIDEO)); + if *skip_until_first_key_frame { + debug!("Skipping until the first key frame"); + return Ok(()); + } - let raw_frames = if decode { - let mut raw_frames = Vec::new(); - video_decoder.send_packet(p).map_err(|e| { - error!("Unable to send packet to decoder. Error is: {:?}", e); + let decode = params.decode + || (params.autoconvert_raw_formats_to_rgb24 + && video_decoder.codec().map(|c| c.id()) == Some(Id::RAWVIDEO)); + + let raw_frames = if decode { + let mut raw_frames = Vec::new(); + video_decoder.send_packet(p).map_err(|e| { + error!("Unable to send packet to decoder. Error is: {:?}", e); + e + })?; + let mut decoded = Video::empty(); + while video_decoder.receive_frame(&mut decoded).is_ok() { + let mut rgb_frame = Video::empty(); + converter.run(&decoded, &mut rgb_frame).map_err(|e| { + error!("Unable to convert frame to RGB. Error is: {:?}", e); e })?; - let mut decoded = Video::empty(); - while video_decoder.receive_frame(&mut decoded).is_ok() { - let mut rgb_frame = Video::empty(); - converter.run(&decoded, &mut rgb_frame).map_err(|e| { - error!("Unable to convert frame to RGB. Error is: {:?}", e); - e - })?; - raw_frames.push(( - rgb_frame.data(0).to_vec(), - rgb_frame.stride(0) as u32 / DECODED_PIX_BYTES, - rgb_frame.plane_height(0), - )); + raw_frames.push(( + rgb_frame.data(0).to_vec(), + rgb_frame.stride(0) as u32 / DECODED_PIX_BYTES, + rgb_frame.plane_height(0), + )); + } + raw_frames + } else { + vec![( + p.data().unwrap_or(&[]).to_vec(), + video_decoder.width(), + video_decoder.height(), + )] + }; + + for (raw_frame, width, height) in raw_frames { + let codec = if !decode { + match video_decoder.codec() { + Some(c) => String::from(c.name()), + None => bail!("Unable to get codec name"), } - raw_frames } else { - vec![( - p.data().unwrap_or(&[]).to_vec(), - video_decoder.width(), - video_decoder.height(), - )] + String::from(Id::RAWVIDEO.name()) }; - for (raw_frame, width, height) in raw_frames { - let codec = if !decode { - match video_decoder.codec() { - Some(c) => String::from(c.name()), - None => bail!("Unable to get codec name"), - } - } else { - String::from(Id::RAWVIDEO.name()) - }; - - let pixel_format = if !decode { - format!("{:?}", video_decoder.format()) - } else { - format!("{:?}", DECODING_FORMAT) - }; + let pixel_format = if !decode { + format!("{:?}", video_decoder.format()) + } else { + format!("{:?}", DECODING_FORMAT) + }; - let key_frame = p.is_key(); - let pts = p.pts(); - let dts = p.dts(); - let corrupted = p.is_corrupt(); - let fps = stream.rate().to_string(); - let avg_fps = stream.avg_frame_rate().to_string(); + let key_frame = p.is_key(); + let pts = p.pts(); + let dts = p.dts(); + let corrupted = p.is_corrupt(); + let fps = video_stream.rate().to_string(); + let avg_fps = video_stream.avg_frame_rate().to_string(); - debug!("Frame info: codec_name={:?}, FPS={:?}, AVG_FPS={:?}, width={}, height={}, is_key={}, len={}, pts={:?}, dts={:?}, is_corrupt={}, pixel_format={}", + debug!("Frame info: codec_name={:?}, FPS={:?}, AVG_FPS={:?}, width={}, height={}, is_key={}, len={}, pts={:?}, dts={:?}, is_corrupt={}, pixel_format={}", codec, fps, avg_fps, width, height, key_frame, raw_frame.len(), pts, dts, corrupted, pixel_format); - let frame_processed_ts = i64::try_from( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis(), - ) - .map_err(|e| { - error!("Unable to get current time. Error is: {:?}", e); - e - })?; + let frame_processed_ts = i64::try_from( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis(), + ) + .map_err(|e| { + error!("Unable to get current time. Error is: {:?}", e); + e + })?; + + let frame_envelope = VideoFrameEnvelope { + codec, + frame_width: i64::from(width), + frame_height: i64::from(height), + key_frame, + pts, + dts, + corrupted, + time_base: (time_base_r.0 as i64, time_base_r.1 as i64), + fps, + avg_fps, + pixel_format, + queue_full_skipped_count: *queue_full_skipped_count, + payload: raw_frame, + frame_received_ts, + frame_processed_ts, + queue_len: i64::try_from(params.tx.len()).unwrap(), + }; + + if !params.block_if_queue_full { + if !params.tx.is_full() { + let res = params.tx.send(frame_envelope); - let frame_envelope = VideoFrameEnvelope { - codec, - frame_width: i64::from(width), - frame_height: i64::from(height), - key_frame, - pts, - dts, - corrupted, - time_base: (time_base_r.0 as i64, time_base_r.1 as i64), - fps, - avg_fps, - pixel_format, - queue_full_skipped_count, - payload: raw_frame, - frame_received_ts, - frame_processed_ts, - queue_len: i64::try_from(params.tx.len()).unwrap(), - }; - - if !params.block_if_queue_full { - if !params.tx.is_full() { - let res = params.tx.send(frame_envelope); - - if let Err(e) = res { - error!("Unable to send data to upstream. Error is: {:?}", e); - break; - } - } else { - dbg!("Sink queue is full, the frame is skipped."); - queue_full_skipped_count += 1; + if let Err(e) = res { + error!("Unable to send data to upstream. Error is: {:?}", e); + break; } } else { - params.tx.send(frame_envelope).map_err(|e| { - error!("Unable to send data to upstream. Error is: {:?}", e); - e - })?; + dbg!("Sink queue is full, the frame is skipped."); + *queue_full_skipped_count += 1; } + } else { + params.tx.send(frame_envelope).map_err(|e| { + error!("Unable to send data to upstream. Error is: {:?}", e); + e + })?; } } }