diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index bce916fb6..6d8d50c73 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -69,18 +69,19 @@ defmodule Membrane.Core.Element.BufferController do # we check if pad should be corcked before decrementing :demand field in PadData, # to avoid situation, when big chunk of data is stored in the queue only because it # exceeds auto_demand_size sufficiently - hard_corcked? = AutoFlowUtils.hard_corcked?(pad_ref, state) state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) :atomics.put(stalker_metrics.demand, 1, demand - buf_size) - state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) + state = + if AutoFlowUtils.hard_corcked?(pad_ref, state) do + IO.inspect({state.name, buffers}, label: "STORING IN QUEUE") + AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) + else + exec_buffer_callback(pad_ref, buffers, state) + end - if hard_corcked? do - AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) - else - exec_buffer_callback(pad_ref, buffers, state) - end + AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) end defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do diff --git a/lib/membrane/core/element/callback_context.ex b/lib/membrane/core/element/callback_context.ex index 71cee7c9b..e0eadd694 100644 --- a/lib/membrane/core/element/callback_context.ex +++ b/lib/membrane/core/element/callback_context.ex @@ -18,7 +18,8 @@ defmodule Membrane.Core.Element.CallbackContext do name: state.name, playback: state.playback, resource_guard: state.resource_guard, - utility_supervisor: state.subprocess_supervisor + utility_supervisor: state.subprocess_supervisor, + big_state: state }) end end diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex index 93979661c..cb8535848 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex @@ -71,7 +71,14 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do pad_data = PadModel.get_data!(state, pad_ref) state.effective_flow_control == :pull and pad_data.direction == :input and - pad_data.flow_control == :auto and pad_data.demand < -1 * pad_data.auto_demand_size / 2 + pad_data.flow_control == :auto and + pad_data.demand < 0 + # Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state)) + end + + @spec auto_flow_queue_empty?(Pad.ref(), State.t()) :: boolean() + def auto_flow_queue_empty?(pad_ref, state) do + PadModel.get_data!(state, pad_ref, :auto_flow_queue) == Qex.new() end @spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t() @@ -90,7 +97,18 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do end defp store_in_queue(pad_ref, type, item, state) do - PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item})) + old_q = PadModel.get_data!(state, pad_ref, :auto_flow_queue) + + state = PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item})) + + new_q = PadModel.get_data!(state, pad_ref, :auto_flow_queue) + + IO.inspect({state.name, old_q, new_q}, label: "BEFORE AND AFTER") + + state + + # state = PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, {type, item})) + # flush_auto_flow_queues([pad_ref], state) end @spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t() @@ -151,14 +169,16 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do defp flush_auto_flow_queues(pad_ref_list, state) do pad_to_queue_map = pad_ref_list - |> Enum.filter(&hard_corcked?(&1, state)) + |> Enum.reject(&hard_corcked?(&1, state)) |> Map.new(&{&1, PadModel.get_data!(state, &1, :auto_flow_queue)}) - state = handle_queued_items(pad_to_queue_map, state) + # if pad_to_queue_map != %{}, + # do: + # IO.inspect({state.name, pad_ref_list, hard_corcked?(:input, state), pad_to_queue_map}, + # label: "FLUSH AUTO FLOW QUEUES" + # ) - Enum.reduce(pad_ref_list, state, fn pad_ref, state -> - PadModel.set_data!(state, pad_ref, :auto_flow_queue, Qex.new()) - end) + handle_queued_items(pad_to_queue_map, state) end defp handle_queued_items(pad_to_queue_map, state) when pad_to_queue_map == %{}, do: state @@ -168,28 +188,30 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do case Qex.pop(queue) do {{:value, queue_item}, popped_queue} -> - state = do_handle_queue_item(pad_ref, queue_item, state) + state = exec_queue_item_callback(pad_ref, queue_item, state) pad_to_queue_map |> Map.put(pad_ref, popped_queue) |> handle_queued_items(state) - {:empty, _empty_queue} -> + {:empty, empty_queue} -> + state = PadModel.set_data!(state, pad_ref, :auto_flow_queue, empty_queue) + pad_to_queue_map |> Map.delete(pad_ref) |> handle_queued_items(state) end end - defp do_handle_queue_item(pad_ref, {:buffers, buffers}, state) do + defp exec_queue_item_callback(pad_ref, {:buffers, buffers}, state) do BufferController.exec_buffer_callback(pad_ref, buffers, state) end - defp do_handle_queue_item(pad_ref, {:event, event}, state) do + defp exec_queue_item_callback(pad_ref, {:event, event}, state) do EventController.exec_handle_event(pad_ref, event, state) end - defp do_handle_queue_item(pad_ref, {:stream_format, stream_format}, state) do + defp exec_queue_item_callback(pad_ref, {:stream_format, stream_format}, state) do StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) end end diff --git a/logs.txt b/logs.txt new file mode 100644 index 000000000..5c120e161 --- /dev/null +++ b/logs.txt @@ -0,0 +1,3 @@ + +BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo + (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 18bdd7b71..7edc8526e 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -20,7 +20,23 @@ defmodule Membrane.Integration.AutoDemandsTest do end @impl true - def handle_buffer(:input, buffer, _ctx, %{direction: :up} = state) do + def handle_buffer(:input, buffer, ctx, %{direction: :up} = state) do + # require Membrane.Logger + # if _ctx.name == {:filter, 1}, + # do: IO.puts("PAYLOAD #{inspect(buffer.payload)}") + + with %{last_payload: lp} <- state do + if lp != buffer.payload - 1 do + IO.inspect({ctx.name, buffer.payload, ctx.big_state.pads_data.input.auto_flow_queue, ctx.big_state}, + label: "RAISING", limit: :infinity + ) + + raise "DUPA #{inspect(buffer.payload)} #{inspect(ctx.name)}" + end + end + + state = Map.put(state, :last_payload, buffer.payload) + buffers = Enum.map(1..state.factor, fn _i -> buffer end) {[buffer: {:output, buffers}], state} end @@ -33,6 +49,11 @@ defmodule Membrane.Integration.AutoDemandsTest do {[buffer: {:output, buffer}], %{state | counter: 1}} end end + + @impl true + def handle_end_of_stream(:input, ctx, state) do + IO.inspect(state.max, label: "MAX #{inspect(ctx.name)}") + end end defmodule AutoDemandTee do @@ -46,11 +67,12 @@ defmodule Membrane.Integration.AutoDemandsTest do end [ - %{payloads: 1..100_000, factor: 1, direction: :up, filters: 10}, - %{payloads: 1..4, factor: 10, direction: :up, filters: 5}, - %{payloads: 1..4, factor: 10, direction: :down, filters: 5} + %{payloads: 1..100_000, factor: 1, direction: :up, filters: 10} + # %{payloads: 1..4, factor: 10, direction: :up, filters: 5} This doesn't work too + # %{payloads: 1..4, factor: 10, direction: :down, filters: 5} ] |> Enum.map(fn opts -> + @tag :dupa test "buffers pass through auto-demand filters; setup: #{inspect(opts)}" do %{payloads: payloads, factor: factor, direction: direction, filters: filters} = unquote(Macro.escape(opts))