Skip to content

Commit

Permalink
implemented blocking when queue is full (for file processing)
Browse files Browse the repository at this point in the history
  • Loading branch information
bwsw committed Apr 26, 2024
1 parent 4b81f55 commit 9245138
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 40 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ repository = "https://github.com/insight-platform/FFmpeg-Input"
readme = "README.md"
keywords = ["FFmpeg", "Video"]
categories = ["computer-vision"]
version = "0.1.23"
version = "0.1.24"
edition = "2021"
license="Apache-2.0"
license = "Apache-2.0"
rust-version = "1.62"

[lib]
Expand All @@ -22,15 +22,15 @@ env_logger = "0.11"
parking_lot = "0.12"

[dependencies.ffmpeg-next]
version = "6"
version = "7"
features = ["default"]

[dependencies.pyo3]
version = "0.20"
features = ["extension-module"]

[build-dependencies]
pyo3-build-config = { version = "0.20"}
pyo3-build-config = { version = "0.20" }

[profile.release]
opt-level = 3
Expand Down
78 changes: 45 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,15 @@ impl Drop for FFMpegSource {
}
}

#[allow(clippy::too_many_arguments)]
fn handle(
uri: String,
params: HashMap<String, String>,
tx: Sender<VideoFrameEnvelope>,
signal: Arc<Mutex<bool>>,
decode: bool,
autoconvert_raw_formats_to_rgb24: bool,
block_if_queue_full: bool,
log_level: Arc<Mutex<Option<Level>>>,
) {
let mut queue_full_skipped_count = 0;
Expand Down Expand Up @@ -317,41 +319,48 @@ fn handle(
codec, fps, avg_fps, width, height, key_frame, raw_frame.len(),
pts, dts, corrupted, pixel_format);

if !tx.is_full() {
let frame_processed_ts = i64::try_from(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis(),
)
.expect("Milliseconds must fit i64");

let res = tx.send(VideoFrameEnvelope {
codec,
frame_width: i64::from(width),
frame_height: i64::from(height),
key_frame,
pts,
dts,
corrupted,
time_base: (time_base_r.0 as i64, time_base_r.1 as i64),
fps,
avg_fps,
pixel_format,
queue_full_skipped_count,
payload: raw_frame,
frame_received_ts,
frame_processed_ts,
queue_len: i64::try_from(tx.len()).unwrap(),
});

if let Err(e) = res {
error!("Unable to send data to upstream. Error is: {:?}", e);
break;
let frame_processed_ts = i64::try_from(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis(),
)
.expect("Milliseconds must fit i64");

let frame_envelope = VideoFrameEnvelope {
codec,
frame_width: i64::from(width),
frame_height: i64::from(height),
key_frame,
pts,
dts,
corrupted,
time_base: (time_base_r.0 as i64, time_base_r.1 as i64),
fps,
avg_fps,
pixel_format,
queue_full_skipped_count,
payload: raw_frame,
frame_received_ts,
frame_processed_ts,
queue_len: i64::try_from(tx.len()).unwrap(),
};

if !block_if_queue_full {
if !tx.is_full() {
let res = tx.send(frame_envelope);

if let Err(e) = res {
error!("Unable to send data to upstream. Error is: {:?}", e);
break;
}
} else {
dbg!("Sink queue is full, the frame is skipped.");
queue_full_skipped_count += 1;
}
} else {
warn!("Sink queue is full, the frame is skipped.");
queue_full_skipped_count += 1;
tx.send(frame_envelope)
.expect("Unable to send data to upstream");
}
}
}
Expand All @@ -378,6 +387,7 @@ impl FFMpegSource {
queue_len = 32,
decode = false,
autoconvert_raw_formats_to_rgb24 = false,
block_if_queue_full = false,
ffmpeg_log_level = FFmpegLogLevel::Info)
)]
pub fn new(
Expand All @@ -386,6 +396,7 @@ impl FFMpegSource {
queue_len: i64,
decode: bool,
autoconvert_raw_formats_to_rgb24: bool,
block_if_queue_full: bool,
ffmpeg_log_level: FFmpegLogLevel,
) -> Self {
assert!(queue_len > 0, "Queue length must be a positive number");
Expand All @@ -406,6 +417,7 @@ impl FFMpegSource {
thread_exit_signal,
decode,
autoconvert_raw_formats_to_rgb24,
block_if_queue_full,
thread_ll,
)
}));
Expand Down
8 changes: 5 additions & 3 deletions test.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from ffmpeg_input import FFMpegSource, FFmpegLogLevel
from time import sleep

if __name__ == '__main__':
s = FFMpegSource("/home/ivan/road-traffic-processed.mp4", params={"fflags": "+genpts"},
queue_len=100, decode=False,
s = FFMpegSource("/home/ivan/file.mp4", params={},
queue_len=10, decode=False,
block_if_queue_full=True,
ffmpeg_log_level=FFmpegLogLevel.Debug)
s.log_level = FFmpegLogLevel.Trace
while True:
try:
sleep(1)
p = s.video_frame()
print("Codec: {}, Geometry: {}x{}".format(p.codec, p.frame_width, p.frame_height))
print("System ts, when the frame was received from the source:", p.frame_received_ts)
print("Current queue length:", p.queue_len)
print("Time base, pts, dts:", p.time_base, p.pts, p.dts)
print("Skipped frames because of queue overflow:", p.queue_full_skipped_count)
print("Payload length:", len(p.payload_as_bytes()))
break
except BrokenPipeError:
print("EOS")
break

0 comments on commit 9245138

Please sign in to comment.