Skip to content

Commit

Permalink
seperate video decoding thread for each display (rustdesk#9968)
Browse files Browse the repository at this point in the history
* seperate video decoding thread for each display

1. Separate Video Decoding Thread for Each Display
2. Fix Decode Errors When Clearing the Queue
Previously, on-flight frames after clearing the queue could not be decoded successfully. This issue can be resolved by setting a discard_queue flag when sending a refresh message. The flag will be reset upon receiving a keyframe.

Signed-off-by: 21pages <[email protected]>

* update video format along with fps to flutter

Signed-off-by: 21pages <[email protected]>

* Fix keyframe interval when auto record outgoing sessions

Signed-off-by: 21pages <[email protected]>

---------

Signed-off-by: 21pages <[email protected]>
  • Loading branch information
21pages authored Nov 21, 2024
1 parent 1c99eb5 commit 64654ee
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 299 deletions.
173 changes: 53 additions & 120 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ impl AudioBuffer {
}
self.2[i] += 1;

#[allow(non_upper_case_globals)]
static mut tms: i64 = 0;
let dt = Local::now().timestamp_millis();
unsafe {
Expand Down Expand Up @@ -2274,74 +2275,60 @@ impl LoginConfigHandler {

/// Media data.
pub enum MediaData {
VideoQueue(usize),
VideoQueue,
VideoFrame(Box<VideoFrame>),
AudioFrame(Box<AudioFrame>),
AudioFormat(AudioFormat),
Reset(Option<usize>),
Reset,
RecordScreen(bool),
}

pub type MediaSender = mpsc::Sender<MediaData>;

struct VideoHandlerController {
handler: VideoHandler,
skip_beginning: u32,
}

/// Start video and audio thread.
/// Return two [`MediaSender`], they should be given to the media producer.
/// Start video thread.
///
/// # Arguments
///
/// * `video_callback` - The callback for video frame. Being called when a video frame is ready.
pub fn start_video_audio_threads<F, T>(
pub fn start_video_thread<F, T>(
session: Session<T>,
display: usize,
video_receiver: mpsc::Receiver<MediaData>,
video_queue: Arc<RwLock<ArrayQueue<VideoFrame>>>,
fps: Arc<RwLock<Option<usize>>>,
chroma: Arc<RwLock<Option<Chroma>>>,
discard_queue: Arc<RwLock<bool>>,
video_callback: F,
) -> (
MediaSender,
MediaSender,
Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>>,
Arc<RwLock<Option<usize>>>,
Arc<RwLock<Option<Chroma>>>,
)
where
) where
F: 'static + FnMut(usize, &mut scrap::ImageRgb, *mut c_void, bool) + Send,
T: InvokeUiSession,
{
let (video_sender, video_receiver) = mpsc::channel::<MediaData>();
let video_queue_map: Arc<RwLock<HashMap<usize, ArrayQueue<VideoFrame>>>> = Default::default();
let video_queue_map_cloned = video_queue_map.clone();
let mut video_callback = video_callback;

let fps = Arc::new(RwLock::new(None));
let decode_fps_map = fps.clone();
let chroma = Arc::new(RwLock::new(None));
let chroma_cloned = chroma.clone();
let mut last_chroma = None;

std::thread::spawn(move || {
#[cfg(windows)]
sync_cpu_usage();
get_hwcodec_config();
let mut handler_controller_map = HashMap::new();
let mut video_handler = None;
let mut count = 0;
let mut duration = std::time::Duration::ZERO;
let mut skip_beginning = 0;
loop {
if let Ok(data) = video_receiver.recv() {
match data {
MediaData::VideoFrame(_) | MediaData::VideoQueue(_) => {
MediaData::VideoFrame(_) | MediaData::VideoQueue => {
let vf = match data {
MediaData::VideoFrame(vf) => *vf,
MediaData::VideoQueue(display) => {
if let Some(video_queue) =
video_queue_map.read().unwrap().get(&display)
{
if let Some(vf) = video_queue.pop() {
vf
} else {
MediaData::VideoFrame(vf) => {
*discard_queue.write().unwrap() = false;
*vf
}
MediaData::VideoQueue => {
if let Some(vf) = video_queue.read().unwrap().pop() {
if discard_queue.read().unwrap().clone() {
continue;
}
vf
} else {
continue;
}
Expand All @@ -2354,36 +2341,25 @@ where
let display = vf.display as usize;
let start = std::time::Instant::now();
let format = CodecFormat::from(&vf);
if !handler_controller_map.contains_key(&display) {
if video_handler.is_none() {
let mut handler = VideoHandler::new(format, display);
let record = session.lc.read().unwrap().record;
let id = session.lc.read().unwrap().id.clone();
if record {
handler.record_screen(record, id, display);
}
handler_controller_map.insert(
display,
VideoHandlerController {
handler,
skip_beginning: 0,
},
);
video_handler = Some(handler);
}
if let Some(handler_controller) = handler_controller_map.get_mut(&display) {
if let Some(handler) = video_handler.as_mut() {
let mut pixelbuffer = true;
let mut tmp_chroma = None;
let format_changed =
handler_controller.handler.decoder.format() != format;
match handler_controller.handler.handle_frame(
vf,
&mut pixelbuffer,
&mut tmp_chroma,
) {
let format_changed = handler.decoder.format() != format;
match handler.handle_frame(vf, &mut pixelbuffer, &mut tmp_chroma) {
Ok(true) => {
video_callback(
display,
&mut handler_controller.handler.rgb,
handler_controller.handler.texture.texture,
&mut handler.rgb,
handler.texture.texture,
pixelbuffer,
);

Expand All @@ -2395,7 +2371,7 @@ where

// fps calculation
fps_calculate(
handler_controller,
&mut skip_beginning,
&fps,
format_changed,
start.elapsed(),
Expand All @@ -2417,78 +2393,43 @@ where
// to-do: fix the error
log::error!("handle video frame error, {}", e);
session.refresh_video(display as _);
#[cfg(feature = "hwcodec")]
if format == CodecFormat::H265 {
if let Some(&scrap::hwcodec::ERR_HEVC_POC) =
e.downcast_ref::<i32>()
{
for (i, handler_controler) in
handler_controller_map.iter_mut()
{
if *i != display
&& handler_controler.handler.decoder.format()
== CodecFormat::H265
{
log::info!("refresh video {} due to hevc poc not found", i);
session.refresh_video(*i as _);
}
}
}
}
}
_ => {}
}
}

// check invalid decoders
let mut should_update_supported = false;
handler_controller_map
.iter()
.map(|(_, h)| {
if !h.handler.decoder.valid() || h.handler.fail_counter >= MAX_DECODE_FAIL_COUNTER {
let mut lc = session.lc.write().unwrap();
let format = h.handler.decoder.format();
if !lc.mark_unsupported.contains(&format) {
lc.mark_unsupported.push(format);
should_update_supported = true;
log::info!("mark {format:?} decoder as unsupported, valid:{}, fail_counter:{}, all unsupported:{:?}", h.handler.decoder.valid(), h.handler.fail_counter, lc.mark_unsupported);
}
if let Some(handler) = video_handler.as_mut() {
if !handler.decoder.valid()
|| handler.fail_counter >= MAX_DECODE_FAIL_COUNTER
{
let mut lc = session.lc.write().unwrap();
let format = handler.decoder.format();
if !lc.mark_unsupported.contains(&format) {
lc.mark_unsupported.push(format);
should_update_supported = true;
log::info!("mark {format:?} decoder as unsupported, valid:{}, fail_counter:{}, all unsupported:{:?}", handler.decoder.valid(), handler.fail_counter, lc.mark_unsupported);
}
})
.count();
}
}
if should_update_supported {
session.send(Data::Message(
session.lc.read().unwrap().update_supported_decodings(),
));
}
}
MediaData::Reset(display) => {
if let Some(display) = display {
if let Some(handler_controler) =
handler_controller_map.get_mut(&display)
{
handler_controler.handler.reset(None);
}
} else {
for (_, handler_controler) in handler_controller_map.iter_mut() {
handler_controler.handler.reset(None);
}
MediaData::Reset => {
if let Some(handler) = video_handler.as_mut() {
handler.reset(None);
}
}
MediaData::RecordScreen(start) => {
log::info!("record screen command: start: {start}");
let record = session.lc.read().unwrap().record;
session.update_record_status(start);
if record != start {
session.lc.write().unwrap().record = start;
let id = session.lc.read().unwrap().id.clone();
for (display, handler_controler) in handler_controller_map.iter_mut() {
handler_controler.handler.record_screen(
start,
id.clone(),
*display,
);
}
let id = session.lc.read().unwrap().id.clone();
if let Some(handler) = video_handler.as_mut() {
handler.record_screen(start, id, display);
}
}
_ => {}
Expand All @@ -2499,14 +2440,6 @@ where
}
log::info!("Video decoder loop exits");
});
let audio_sender = start_audio_thread();
return (
video_sender,
audio_sender,
video_queue_map_cloned,
decode_fps_map,
chroma_cloned,
);
}

/// Start an audio thread
Expand Down Expand Up @@ -2538,7 +2471,7 @@ pub fn start_audio_thread() -> MediaSender {

#[inline]
fn fps_calculate(
handler_controller: &mut VideoHandlerController,
skip_beginning: &mut usize,
fps: &Arc<RwLock<Option<usize>>>,
format_changed: bool,
elapsed: std::time::Duration,
Expand All @@ -2548,11 +2481,11 @@ fn fps_calculate(
if format_changed {
*count = 0;
*duration = std::time::Duration::ZERO;
handler_controller.skip_beginning = 0;
*skip_beginning = 0;
}
// // The first frame will be very slow
if handler_controller.skip_beginning < 3 {
handler_controller.skip_beginning += 1;
if *skip_beginning < 3 {
*skip_beginning += 1;
return;
}
*duration += elapsed;
Expand Down
Loading

0 comments on commit 64654ee

Please sign in to comment.