diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index bce916fb6..ef56f6832 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -66,21 +66,23 @@ defmodule Membrane.Core.Element.BufferController do %{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers) - # we check if pad should be corcked before decrementing :demand field in PadData, - # to avoid situation, when big chunk of data is stored in the queue only because it - # exceeds auto_demand_size sufficiently + # we check if pad should be corcked before decrementing :demand field in PadData + # 1) to avoid situation, when big chunk of data is stored in the queue only because it + # exceeds auto_demand_size + # 2) to handle start of stream caused by first buffer arrival possibly early hard_corcked? = AutoFlowUtils.hard_corcked?(pad_ref, state) state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) :atomics.put(stalker_metrics.demand, 1, demand - buf_size) - state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) + state = + if hard_corcked? do + AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) + else + exec_buffer_callback(pad_ref, buffers, state) + end - if hard_corcked? do - AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) - else - exec_buffer_callback(pad_ref, buffers, state) - end + AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) end defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do diff --git a/lib/membrane/core/element/callback_context.ex b/lib/membrane/core/element/callback_context.ex index 71cee7c9b..e0eadd694 100644 --- a/lib/membrane/core/element/callback_context.ex +++ b/lib/membrane/core/element/callback_context.ex @@ -18,7 +18,8 @@ defmodule Membrane.Core.Element.CallbackContext do name: state.name, playback: state.playback, resource_guard: state.resource_guard, - utility_supervisor: state.subprocess_supervisor + utility_supervisor: state.subprocess_supervisor, + big_state: state }) end end diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex index 93979661c..3040925c8 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex @@ -71,7 +71,12 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do pad_data = PadModel.get_data!(state, pad_ref) state.effective_flow_control == :pull and pad_data.direction == :input and - pad_data.flow_control == :auto and pad_data.demand < -1 * pad_data.auto_demand_size / 2 + pad_data.flow_control == :auto and pad_data.demand < 0 + end + + @spec auto_flow_queue_empty?(Pad.ref(), State.t()) :: boolean() + def auto_flow_queue_empty?(pad_ref, state) do + PadModel.get_data!(state, pad_ref, :auto_flow_queue) == Qex.new() end @spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t() @@ -95,18 +100,30 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do @spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t() def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do - state = Enum.reduce(pad_ref_list, state, &do_auto_adjust_atomic_demand/2) - flush_auto_flow_queues(pad_ref_list, state) - end + {bumped_pads, state} = + pad_ref_list + |> Enum.flat_map_reduce(state, fn pad_ref, state -> + PadModel.get_data!(state, pad_ref) + |> do_auto_adjust_atomic_demand(state) + |> case do + {:increased, state} -> {[pad_ref], state} + {:unchanged, state} -> {[], state} + end + end) - def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do - state = do_auto_adjust_atomic_demand(pad_ref, state) - flush_auto_flow_queues([pad_ref], state) + flush_auto_flow_queues(bumped_pads, state) end - defp do_auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do + def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do PadModel.get_data!(state, pad_ref) |> do_auto_adjust_atomic_demand(state) + |> case do + {:increased, state} -> + flush_auto_flow_queues([pad_ref], state) + + {:unchanged, state} -> + state + end end defp do_auto_adjust_atomic_demand(pad_data, state) when is_input_auto_pad_data(pad_data) do @@ -123,9 +140,11 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do :ok = AtomicDemand.increase(atomic_demand, diff) :atomics.put(stalker_metrics.demand, 1, auto_demand_size) - PadModel.set_data!(state, ref, :demand, auto_demand_size) + + state = PadModel.set_data!(state, ref, :demand, auto_demand_size) + {:increased, state} else - state + {:unchanged, state} end end @@ -149,47 +168,44 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do end defp flush_auto_flow_queues(pad_ref_list, state) do - pad_to_queue_map = - pad_ref_list - |> Enum.filter(&hard_corcked?(&1, state)) - |> Map.new(&{&1, PadModel.get_data!(state, &1, :auto_flow_queue)}) - - state = handle_queued_items(pad_to_queue_map, state) - - Enum.reduce(pad_ref_list, state, fn pad_ref, state -> - PadModel.set_data!(state, pad_ref, :auto_flow_queue, Qex.new()) - end) + pad_ref_list + |> Enum.reject(&hard_corcked?(&1, state)) + |> do_flush_auto_flow_queues(state) end - defp handle_queued_items(pad_to_queue_map, state) when pad_to_queue_map == %{}, do: state + defp do_flush_auto_flow_queues([], state), do: state - defp handle_queued_items(pad_to_queue_map, state) do - {pad_ref, queue} = Enum.random(pad_to_queue_map) + defp do_flush_auto_flow_queues(pads_to_flush, state) do + selected_pad = Enum.random(pads_to_flush) - case Qex.pop(queue) do + PadModel.get_data!(state, selected_pad, :auto_flow_queue) + |> Qex.pop() + |> case do {{:value, queue_item}, popped_queue} -> - state = do_handle_queue_item(pad_ref, queue_item, state) + state = + exec_queue_item_callback(selected_pad, queue_item, state) + |> PadModel.set_data!(selected_pad, :auto_flow_queue, popped_queue) + + do_flush_auto_flow_queues(pads_to_flush, state) - pad_to_queue_map - |> Map.put(pad_ref, popped_queue) - |> handle_queued_items(state) + {:empty, empty_queue} -> + state = PadModel.set_data!(state, selected_pad, :auto_flow_queue, empty_queue) - {:empty, _empty_queue} -> - pad_to_queue_map - |> Map.delete(pad_ref) - |> handle_queued_items(state) + pads_to_flush + |> List.delete(selected_pad) + |> do_flush_auto_flow_queues(state) end end - defp do_handle_queue_item(pad_ref, {:buffers, buffers}, state) do + defp exec_queue_item_callback(pad_ref, {:buffers, buffers}, state) do BufferController.exec_buffer_callback(pad_ref, buffers, state) end - defp do_handle_queue_item(pad_ref, {:event, event}, state) do + defp exec_queue_item_callback(pad_ref, {:event, event}, state) do EventController.exec_handle_event(pad_ref, event, state) end - defp do_handle_queue_item(pad_ref, {:stream_format, stream_format}, state) do + defp exec_queue_item_callback(pad_ref, {:stream_format, stream_format}, state) do StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) end end diff --git a/logs.txt b/logs.txt new file mode 100644 index 000000000..5c120e161 --- /dev/null +++ b/logs.txt @@ -0,0 +1,3 @@ + +BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo + (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution