Skip to content

Commit

Permalink
Fix tests wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jan 11, 2024
1 parent e32c255 commit 5e5f5b3
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 60 deletions.
2 changes: 2 additions & 0 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ defmodule Membrane.Core.Element.ActionHandler do
%State{type: type} = state
)
when type in [:source, :filter, :endpoint] and is_pad_ref(pad_ref) do
require Membrane.Logger, as: L
L.warning("HANDLING ACTION SEND STREAM FORMAT")
send_stream_format(pad_ref, stream_format, state)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ defmodule Membrane.Core.Element.BufferController do
@doc """
Executes `handle_buffer` callback.
"""
@spec exec_buffer_callback( Pad.ref(), [Buffer.t()], State.t() ) :: State.t()
@spec exec_buffer_callback(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do
Telemetry.report_metric("buffer", 1, inspect(pad_ref))

Expand Down
16 changes: 5 additions & 11 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ defmodule Membrane.Core.Element.DemandController do
if pad_data.direction == :input,
do: raise("cannot snapshot atomic counter in input pad")

if state.name == {:filter, 10}, do: IO.puts("snapshot_atomic_demand")
# if state.name == {:filter, 10}, do: IO.puts("snapshot_atomic_demand")
# IO.inspect(state)


# aktualnie bug polega na tym, ze w tescie z tagiem :dupa,
# ze mamy efc push, dodajemy go do satisfied auto output pads
# przechodzimy w pull, ale sciąganie rzeczy z kolejki jest obwarowane if set empty
# aktualnie bug polega na tym, ze w tescie z tagiem :dupa,
# ze mamy efc push, dodajemy go do satisfied auto output pads
# przechodzimy w pull, ale sciąganie rzeczy z kolejki jest obwarowane if set empty

do_snapshot_atomic_demand(pad_data, state)
else
Expand All @@ -51,29 +50,24 @@ defmodule Membrane.Core.Element.DemandController do
%{flow_control: :auto} = pad_data,
%{effective_flow_control: :pull} = state
) do

# last comment
# last comment
# if state.name == {:filter, 10} do
# IO.puts("ALA MA KOTA")
# AtomicDemand.get(pad_data.atomic_demand)
# |> IO.inspect()
# IO.inspect(state.satisfied_auto_output_pads)
# end


if AtomicDemand.get(pad_data.atomic_demand) > 0 do
# tutaj powinno mieć miejsce
# - usuniecie pada z mapsetu
# - sflushowanie kolejek, jesli mapset jest pusty
# zwroc uwage, czy gdzies w czyms w stylu handle_outgoing_buffers nie wjedzie ci tutaj jakas nieprzyjemna rekurencja
# kolejna rzecz: przerwanie rekurencji moze nastąpić, nawet wtedy, gdy kolejki będą miały w sobie bufory



state = Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
# IO.inspect(state.satisfied_auto_output_pads)


# dobra, wyglada git

AutoFlowUtils.pop_queues_and_bump_demand(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

@spec store_event_in_queue(Pad.ref(), Event.t(), State.t()) :: State.t()
def store_event_in_queue(pad_ref, event, state) do
queue_item = {:event, event}
queue_item = {:event, event}
PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item))
end

@spec store_stream_format_in_queue(Pad.ref(), StreamFormat.t(), State.t()) :: State.t()
def store_stream_format_in_queue(pad_ref, stream_format, state) do
queue_item = {:stream_format, stream_format}
queue_item = {:stream_format, stream_format}
PadModel.update_data!(state, pad_ref, :auto_flow_queue, &Qex.push(&1, queue_item))
end

Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/effective_flow_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
with %{effective_flow_control: :pull} <- state do
state.pads_data
|> Map.values()
|> Enum.filter(& &1.direction == :output and &1.flow_control == :auto)
|> Enum.filter(&(&1.direction == :output and &1.flow_control == :auto))
|> Enum.reduce(state, fn pad_data, state ->
DemandController.snapshot_atomic_demand(pad_data.ref, state)
end)
Expand Down
5 changes: 3 additions & 2 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,9 @@ defmodule Membrane.Core.Element.PadController do
PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref))
end)
|> PadModel.set_data!(pad_ref, :associated_pads, [])
# |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
# |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))

# |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
# |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))

# |> AutoFlowUtils.pop_queues_and_bump_demand()

Expand Down
6 changes: 6 additions & 0 deletions lib/membrane/core/element/stream_format_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ defmodule Membrane.Core.Element.StreamFormatController do
"""
end

require Membrane.Logger, as: L

L.warning(
"VALIDATING STREAM FORMAT #{inspect({direction, stream_format, params}, pretty: true, limit: :infinity)}"
)

for {module, pad_name} <- params do
unless module.membrane_stream_format_match?(pad_name, stream_format) do
pattern_string =
Expand Down
85 changes: 42 additions & 43 deletions test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ defmodule Membrane.Integration.AutoDemandsTest do

assert_end_of_stream(pipeline, :sink)
refute_sink_buffer(pipeline, :sink, _buffer, 0)

Pipeline.terminate(pipeline)
end
end)

Expand Down Expand Up @@ -137,9 +139,10 @@ defmodule Membrane.Integration.AutoDemandsTest do
end)

refute_sink_buffer(pipeline, :left_sink, %{payload: 25_000})

Pipeline.terminate(pipeline)
end

@tag :asd
test "handle removed branch" do
pipeline =
Pipeline.start_link_supervised!(
Expand Down Expand Up @@ -231,6 +234,8 @@ defmodule Membrane.Integration.AutoDemandsTest do
{:buffer_arrived, %Membrane.Buffer{payload: ^payload}}
)
end

Pipeline.terminate(pipeline)
end
end)

Expand Down Expand Up @@ -336,68 +341,62 @@ defmodule Membrane.Integration.AutoDemandsTest do
]
)

# time for NotifyingAutoFilter to enter playing playback
Process.sleep(500)

[pipeline: pipeline]
end

describe "auto flow queue" do
setup :setup_pipeline_with_notifying_auto_filter

@tag :skip
defp receive_processed_buffers(pipeline, limit, acc \\ [])

defp receive_processed_buffers(_pipeline, limit, acc) when limit <= 0 do
Enum.reverse(acc)
end

defp receive_processed_buffers(pipeline, limit, acc) do
receive do
{Pipeline, ^pipeline,
{:handle_child_notification, {{:handling_buffer, _pad, buffer}, :filter}}} ->
receive_processed_buffers(pipeline, limit - 1, [buffer | acc])
after
500 -> Enum.reverse(acc)
end
end

@tag :asd
test "when there is no demand on the output pad", %{pipeline: pipeline} do
auto_demand_size = 400
manual_flow_queue_size = 40

assert_pipeline_notified(pipeline, :filter, :playing)

for i <- 1..auto_demand_size, source_idx <- [0, 1] do
expected_buffer = %Buffer{payload: i, metadata: %{creator: {:source, source_idx}}}
buffers = receive_processed_buffers(pipeline, 100)
assert length(buffers) == manual_flow_queue_size

assert_pipeline_notified(
pipeline,
:filter,
{:handling_buffer, _pad, ^expected_buffer}
)
end
demand = 10_000
Pipeline.message_child(pipeline, :sink, {:make_demand, demand})

for _source_idx <- [0, 1] do
refute_pipeline_notified(
pipeline,
:filter,
{:handling_buffer, _pad, %Buffer{}}
)
end
buffers = receive_processed_buffers(pipeline, 2 * demand)
buffers_number = length(buffers)

Pipeline.message_child(pipeline, :sink, {:make_demand, 2 * auto_demand_size})
# check if filter processed proper number of buffers
assert demand <= buffers_number
assert buffers_number <= demand + manual_flow_queue_size

for i <- 1..auto_demand_size, source_idx <- [0, 1] do
expected_buffer = %Buffer{payload: i, metadata: %{creator: {:source, source_idx}}}
assert_sink_buffer(pipeline, :sink, ^expected_buffer)
end
# check if filter processed buffers from both sources
buffers_by_creator = Enum.group_by(buffers, & &1.metadata.creator)
assert Enum.count(buffers_by_creator) == 2

for i <- (auto_demand_size + 1)..(auto_demand_size * 2), source_idx <- [0, 1] do
expected_buffer = %Buffer{payload: i, metadata: %{creator: {:source, source_idx}}}
# check if filter balanced procesesd buffers by their origin
counter_0 = Map.fetch!(buffers_by_creator, {:source, 0}) |> length()
counter_1 = Map.fetch!(buffers_by_creator, {:source, 1}) |> length()

assert_pipeline_notified(
pipeline,
:filter,
{:handling_buffer, _pad, ^expected_buffer}
)
end

for _source_idx <- [0, 1] do
refute_pipeline_notified(
pipeline,
:filter,
{:handling_buffer, _pad, %Buffer{}}
)
end
assert 0.8 < counter_0 / counter_1
assert 1.2 > counter_0 / counter_1

Pipeline.terminate(pipeline)
end

@tag :skip
# @tag :skip
test "when an element returns :pause_auto_demand action", %{pipeline: pipeline} do
auto_demand_size = 400

Expand Down

0 comments on commit 5e5f5b3

Please sign in to comment.