Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Handle QuotaExceededError when pushing segments to the SourceBuffer #11

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,14 @@ MSE API and buffer handling:
segments if they are needed again.
- [x] Discontinuity handling: Automatically skip "holes" in the buffer where
it is known that no segment will be pushed to fill them.
- [ ] Freezing handling: Detect when the browser is not making progress in the
content despite having media data to play and try to unstuck it.
_Priority: average_
- [ ] Proper handling of `QuotaExceededError` after pushing segments (when low
- [x] Proper handling of `QuotaExceededError` after pushing segments (when low
on memory).
This is generally not needed as the browser should already handle some kind of
garbage collection but some platforms still may have issues when memory is
constrained.
_Priority: low_
- [ ] Freezing handling: Detect when the browser is not making progress in the
content despite having media data to play and try to unstuck it.
_Priority: average_

Tracks:

Expand Down
3 changes: 2 additions & 1 deletion src/rs-core/bindings/js_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ extern "C" {

/// Function to call to indicate that an error arised when removing data from a
/// `SourceBuffer`.
pub fn jsSendRemovedBufferError(fatal: bool, media_type: MediaType, message: &str);
pub fn jsSendRemoveBufferError(fatal: bool, media_type: MediaType, message: &str);

/// Function to call to indicate that an uncategorized error happened.
pub fn jsSendOtherError(fatal: bool, code: OtherErrorCode, message: &str);
Expand Down Expand Up @@ -899,6 +899,7 @@ impl From<PushSegmentError> for SegmentParsingErrorCode {

/// Errors that can arise after a SourceBuffer's `appendBuffer` call.
#[wasm_bindgen]
#[derive(Eq, PartialEq, Clone, Copy, Debug)]
pub enum PushedSegmentErrorCode {
/// We could not push the segment because the `SourceBuffer`'s buffer seems full.
BufferFull,
Expand Down
52 changes: 48 additions & 4 deletions src/rs-core/dispatcher/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
jsAnnounceVariantUpdate, jsClearTimer, jsSendMediaPlaylistParsingError,
jsSendMediaPlaylistRequestError, jsSendMultivariantPlaylistParsingError,
jsSendMultivariantPlaylistRequestError, jsSendOtherError, jsSendPushedSegmentError,
jsSendRemovedBufferError, jsSendSegmentParsingError, jsSendSegmentRequestError,
jsSendRemoveBufferError, jsSendSegmentParsingError, jsSendSegmentRequestError,
jsSendSourceBufferCreationError, jsSetMediaSourceDuration, jsStartObservingPlayback,
jsStopObservingPlayback, jsTimer, jsUpdateContentInfo, AddSourceBufferErrorCode, MediaType,
MultivariantPlaylistParsingErrorCode, OtherErrorCode, PlaylistNature,
Expand Down Expand Up @@ -303,17 +303,55 @@ impl Dispatcher {
buffered: JsTimeRanges,
) {
self.media_element_ref
.on_source_buffer_update(source_buffer_id, buffered);
.on_source_buffer_update(source_buffer_id, buffered, true);
}

/// Method to call when a `SourceBuffer`'s `appendBuffer` call led to an `error` event.
pub(super) fn on_append_buffer_error_core(
&mut self,
source_buffer_id: SourceBufferId,
code: PushedSegmentErrorCode,
buffered: JsTimeRanges,
) {
self.media_element_ref
.on_source_buffer_update(source_buffer_id, buffered, false);

match self.media_element_ref.media_type_for(source_buffer_id) {
Some(mt) => {
if code == PushedSegmentErrorCode::BufferFull {
let wanted_pos = self.media_element_ref.wanted_position();
let min_pos = if wanted_pos < 10. {
0.
} else {
wanted_pos - 10.
};
let max_pos = wanted_pos + self.buffer_goal + 10.;

let has_segments_to_delete =
self.media_element_ref.inventory(mt).iter().any(|x| {
x.last_buffered_start() < min_pos || x.last_buffered_end() > max_pos
});
if has_segments_to_delete {
Logger::warn(&format!(
"BufferFull error received for {}. Cleaning < {}, > {}.",
mt, min_pos, max_pos
));
match (
self.media_element_ref.remove_data(mt, 0., min_pos),
self.media_element_ref.remove_data(mt, max_pos, f64::MAX),
) {
(Ok(_), Ok(_)) => {
self.segment_selectors
.restart_from_position(wanted_pos - 0.2);
return;
}
_ => {}
};
}

// TODO Reduce buffer goal instead of just failing here?
}

let message = match code {
PushedSegmentErrorCode::BufferFull => format!(
"The {mt} `SourceBuffer` was full and could not accept anymore segment"
Expand All @@ -334,12 +372,18 @@ impl Dispatcher {
}

/// Method to call when a `SourceBuffer`'s `remove` call led to an `error` event.
pub(super) fn on_remove_buffer_error_core(&mut self, source_buffer_id: SourceBufferId) {
pub(super) fn on_remove_buffer_error_core(
&mut self,
source_buffer_id: SourceBufferId,
buffered: JsTimeRanges,
) {
self.media_element_ref
.on_source_buffer_update(source_buffer_id, buffered, false);
match self.media_element_ref.media_type_for(source_buffer_id) {
Some(mt) => {
let message =
&format!("An error happened while calling `remove` on the {mt} `SourceBuffer`");
jsSendRemovedBufferError(true, mt, message);
jsSendRemoveBufferError(true, mt, message);
}
None => jsSendOtherError(
true,
Expand Down
11 changes: 8 additions & 3 deletions src/rs-core/dispatcher/event_listeners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ impl Dispatcher {
&mut self,
source_buffer_id: SourceBufferId,
code: PushedSegmentErrorCode,
buffered: JsTimeRanges,
) {
self.on_append_buffer_error_core(source_buffer_id, code);
self.on_append_buffer_error_core(source_buffer_id, code, buffered);
}

/// The JS code should call this method when a SourceBuffer emits an `error`
Expand All @@ -146,8 +147,12 @@ impl Dispatcher {
/// * `source_buffer_id` - The identifier given generated when the
/// SourceBuffer was created. This allows the `Dispatcher` to identify
/// which SourceBuffer actually emitted this event.
pub fn on_remove_buffer_error(&mut self, source_buffer_id: SourceBufferId) {
self.on_remove_buffer_error_core(source_buffer_id);
pub fn on_remove_buffer_error(
&mut self,
source_buffer_id: SourceBufferId,
buffered: JsTimeRanges,
) {
self.on_remove_buffer_error_core(source_buffer_id, buffered);
}

/// The JS code should call this method once regular playback "tick" are enabled
Expand Down
11 changes: 9 additions & 2 deletions src/rs-core/media_element/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,23 +523,30 @@ impl MediaElementReference {
&mut self,
source_buffer_id: SourceBufferId,
buffered: JsTimeRanges,
success: bool,
) {
if let Some(ref mut sb) = self.audio_buffer {
if sb.id() == source_buffer_id {
if !success {
sb.clear_queue();
}
if let Some(SourceBufferQueueElement::PushMedia((_, id))) = sb.on_operation_end() {
if let Some(media_offset) = self.media_offset {
self.audio_inventory
.validate_segment(id, &buffered, media_offset);
.validate_segment(id, &buffered, media_offset, success);
}
}
}
}
if let Some(ref mut sb) = self.video_buffer {
if sb.id() == source_buffer_id {
if !success {
sb.clear_queue();
}
if let Some(SourceBufferQueueElement::PushMedia((_, id))) = sb.on_operation_end() {
if let Some(media_offset) = self.media_offset {
self.video_inventory
.validate_segment(id, &buffered, media_offset);
.validate_segment(id, &buffered, media_offset, success);
}
}
}
Expand Down
27 changes: 24 additions & 3 deletions src/rs-core/media_element/segment_inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl SegmentInventory {
seg_id: u64,
buffered: &JsTimeRanges,
media_offset: f64,
success: bool,
) {
self.synchronize(buffered, media_offset);
let seg_idx = self
Expand Down Expand Up @@ -393,16 +394,35 @@ impl SegmentInventory {
let seg = self.inventory.get_mut(seg_idx).unwrap();
seg.start += start_correction;
seg.end += end_correction;
seg.last_buffered_start = seg.start;
seg.last_buffered_end = seg.end;
seg.validated = true;

if f64::abs(start_correction) >= 0.05 || f64::abs(end_correction) >= 0.05 {
Logger::debug(&format!(
"SI: corrected {} segment (s:{}, e:{}, cs:{}, ce:{})",
self.media_type, seg.start, seg.end, start_correction, end_correction
));
}

seg.validated = true;
seg.last_buffered_start = seg.start;
seg.last_buffered_end = seg.end;
if !success {
// Push operation failed, let's base ourselves on the current
// announced buffered time ranges here.
//
// In cases where the push operation succeeded, we don't synchronize
// right at validation time because there's a very small risk that
// the buffered time range is not up-to-date, in which case we might
// falsely consider it as garbage-collected by the browser.
// Having the same problem when the push operation failed is less
// problematic: reloading the segment in such rare conditions is not
// that much of an issue - and we would consequently prefer having
// the real buffered range sooner.
//
// TODO This might maybe be improved, as we still want the
// `SegmentInventory` to reflect as much as possible the real
// buffered time ranges, even when the operation failed.
self.synchronize(buffered, media_offset);
}
}

/// Push a new segment to the `SegmentInventory`.
Expand Down Expand Up @@ -710,6 +730,7 @@ impl SegmentInventory {

if range_end <= curr_seg.last_buffered_start {
// That range is before the current segment
// Go to the next range directly
return;
}

Expand Down
14 changes: 14 additions & 0 deletions src/rs-core/media_element/source_buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ impl SourceBuffer {
jsRemoveBuffer(self.id, 0., f64::INFINITY);
}

/// SourceBuffers maintain a queue of planned operations such as push and remove to media
/// buffers.
///
/// In some rare scenarios, we could be left in a situation where all previously scheduled
/// operations are cancelled, such as when one of them fails.
/// This method allows to empty that SourceBuffer's queue in such situations.
pub(super) fn clear_queue(&mut self) {
Logger::info(&format!(
"Buffer {} ({}): clearing queue.",
self.id, self.typ
));
self.queue.clear();
}

/// Indicate to this `SourceBuffer` that the last chronological segment has been pushed.
pub(super) fn announce_last_segment_pushed(&mut self) {
self.last_segment_pushed = true;
Expand Down
2 changes: 2 additions & 0 deletions src/ts-common/QueuedSourceBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ export default class QueuedSourceBuffer {
if (this._pendingTask !== null) {
this._pendingTask.reject(error);
}
this._pendingTask = null;
this._queue = [];
}

/**
Expand Down
9 changes: 9 additions & 0 deletions src/ts-common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,10 @@ export interface SourceBufferOperationSuccessMainMessage {
* `CreateSourceBufferWorkerMessage`.
*/
sourceBufferId: SourceBufferId;
/**
* Buffered TimeRanges at the time of the error once that operation was
* validated.
*/
buffered: Float64Array;
};
}
Expand All @@ -1147,6 +1151,11 @@ export interface SourceBufferOperationErrorMainMessage {
operation: SourceBufferOperation;
/** If `true` the error is due to the fact that the buffer is full. */
isBufferFull: boolean;
/**
* Buffered TimeRanges at the time of the error.
* Empty Float64Array if that data cannot be retrieved due to the error.
*/
buffered: Float64Array;
};
}

Expand Down
22 changes: 22 additions & 0 deletions src/ts-main/worker-message-handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,16 @@ export function onAppendBufferMessage(
err,
"Unknown error when appending data to the SourceBuffer",
);
let buffered = new Float64Array([]);
try {
if (sbObject !== undefined) {
buffered = timeRangesToFloat64Array(
sbObject.queuedSourceBuffer.getBufferedRanges(),
);
}
} catch (_) {
/* ignore error here */
}
postMessageToWorker(worker, {
type: MainMessageType.SourceBufferOperationError,
value: {
Expand All @@ -433,6 +443,7 @@ export function onAppendBufferMessage(
operation: SourceBufferOperation.Push,
isBufferFull:
err instanceof Error && err.name === "QuotaExceededError",
buffered,
},
});
}
Expand Down Expand Up @@ -487,6 +498,16 @@ export function onRemoveBufferMessage(
err,
"Unknown error when removing data to the SourceBuffer",
);
let buffered = new Float64Array([]);
try {
if (sbObject !== undefined) {
buffered = timeRangesToFloat64Array(
sbObject.queuedSourceBuffer.getBufferedRanges(),
);
}
} catch (_) {
/* ignore error here */
}
postMessageToWorker(worker, {
type: MainMessageType.SourceBufferOperationError,
value: {
Expand All @@ -495,6 +516,7 @@ export function onRemoveBufferMessage(
message,
operation: SourceBufferOperation.Remove,
isBufferFull: false,
buffered,
},
});
}
Expand Down
8 changes: 7 additions & 1 deletion src/ts-worker/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,17 +394,23 @@ export default function MessageReceiver() {
) {
return;
}
const buffered = new JsTimeRanges(data.value.buffered);
if (data.value.operation === SourceBufferOperation.Remove) {
dispatcher.on_remove_buffer_error(data.value.sourceBufferId);
dispatcher.on_remove_buffer_error(
data.value.sourceBufferId,
buffered,
);
} else if (data.value.isBufferFull) {
dispatcher.on_append_buffer_error(
data.value.sourceBufferId,
PushedSegmentErrorCode.BufferFull,
buffered,
);
} else {
dispatcher.on_append_buffer_error(
data.value.sourceBufferId,
PushedSegmentErrorCode.UnknownError,
buffered,
);
}
}
Expand Down
Loading
Loading