Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jan 11, 2024
1 parent e32c255 commit 4003425
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 30 deletions.
7 changes: 7 additions & 0 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ defmodule Membrane.Core.CallbackHandler do
e ->
Membrane.Logger.error("""
Error handling actions returned by callback #{inspect(state.module)}.#{callback}
""")
Membrane.Logger.error("""
Error handling actions returned by callback #{callback}
#{inspect(e, limit: :infinity, pretty: true)}
#{inspect(__STACKTRACE__, limit: :infinity, pretty: true)}
""")

reraise e, __STACKTRACE__
Expand Down
3 changes: 3 additions & 0 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ defmodule Membrane.Core.Element do

defp do_handle_info(Message.new(:stream_format, stream_format, _opts) = msg, state) do
pad_ref = Message.for_pad(msg)
require Membrane.Logger, as: L
L.warning("DUPA RECEIVING STREAM FORMAT")

state = StreamFormatController.handle_stream_format(pad_ref, stream_format, state)
{:noreply, state}
end
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ defmodule Membrane.Core.Element.ActionHandler do
%State{type: type} = state
)
when type in [:source, :filter, :endpoint] and is_pad_ref(pad_ref) do
require Membrane.Logger, as: L
L.warning("HANDLING ACTION SEND STREAM FORMAT")
send_stream_format(pad_ref, stream_format, state)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Membrane.Core.Element.DemandController do
if pad_data.direction == :input,
do: raise("cannot snapshot atomic counter in input pad")

if state.name == {:filter, 10}, do: IO.puts("snapshot_atomic_demand")
# if state.name == {:filter, 10}, do: IO.puts("snapshot_atomic_demand")
# IO.inspect(state)


Expand Down
4 changes: 4 additions & 0 deletions lib/membrane/core/element/stream_format_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ defmodule Membrane.Core.Element.StreamFormatController do
"""
end

require Membrane.Logger, as: L
L.warning("VALIDATING STREAM FORMAT #{inspect({direction, stream_format, params}, pretty: true, limit: :infinity)}")


for {module, pad_name} <- params do
unless module.membrane_stream_format_match?(pad_name, stream_format) do
pattern_string =
Expand Down
7 changes: 6 additions & 1 deletion test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ defmodule Membrane.Integration.AutoDemandsTest do

assert_end_of_stream(pipeline, :sink)
refute_sink_buffer(pipeline, :sink, _buffer, 0)

Pipeline.terminate(pipeline)
end
end)

Expand Down Expand Up @@ -137,9 +139,10 @@ defmodule Membrane.Integration.AutoDemandsTest do
end)

refute_sink_buffer(pipeline, :left_sink, %{payload: 25_000})

Pipeline.terminate(pipeline)
end

@tag :asd
test "handle removed branch" do
pipeline =
Pipeline.start_link_supervised!(
Expand Down Expand Up @@ -231,6 +234,8 @@ defmodule Membrane.Integration.AutoDemandsTest do
{:buffer_arrived, %Membrane.Buffer{payload: ^payload}}
)
end

Pipeline.terminate(pipeline)
end
end)

Expand Down
3 changes: 3 additions & 0 deletions test/membrane/integration/child_pad_removed_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ defmodule Membrane.Integration.ChildPadRemovedTest do
end
end

@tag :asd
test "sibling linked via static pad raises" do
for actions <- [
[remove_children: :source],
Expand All @@ -179,6 +180,8 @@ defmodule Membrane.Integration.ChildPadRemovedTest do
end
end

@tag :asd
@tag :dsa
test "and sibling linked via static pad is removed, pipeline is not raising" do
for bin_actions <- [
[remove_children: :source],
Expand Down
27 changes: 7 additions & 20 deletions test/membrane/integration/no_stream_format_crash_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ defmodule Membrane.FailWhenNoStreamFormatAreSent do
def_output_pad :output, accepted_format: _any, flow_control: :manual

@impl true
def handle_init(_ctx, _options) do
{[], %{}}
def handle_playing(_ctx, state) do
{[notify_parent: {:my_pid, self()}], state}
end

@impl true
Expand All @@ -26,36 +26,23 @@ defmodule Membrane.FailWhenNoStreamFormatAreSent do
{[buffer: {:output, %Membrane.Buffer{payload: "Something"}}], state}
end

@impl true
def handle_parent_notification({:send_your_pid, requester_pid}, _ctx, state) do
send(requester_pid, {:my_pid, self()})
{[], state}
end
end

@tag :asd
test "if pipeline crashes when the stream format are not sent before the first buffer" do
links = [
spec =
child(:source, SourceWhichDoesNotSendStreamFormat)
|> child(:sink, Sink)
]

options = [
spec: links
]

pipeline = Pipeline.start_supervised!(options)
Pipeline.message_child(pipeline, :source, {:send_your_pid, self()})

source_pid =
receive do
{:my_pid, pid} -> pid
end
pipeline = Pipeline.start_supervised!(spec: spec)

assert_pipeline_notified(pipeline, :source, {:my_pid, source_pid})
source_ref = Process.monitor(source_pid)

Pipeline.message_child(pipeline, :source, :send_buffer)
assert_receive {:DOWN, ^source_ref, :process, ^source_pid, {reason, _stack_trace}}
assert %Membrane.ElementError{message: action_error_msg} = reason
# IO.inspect(action_error_msg)
assert action_error_msg =~ ~r/buffer.*stream.*format.*not.*sent/
end

Expand Down
26 changes: 18 additions & 8 deletions test/membrane/integration/stream_format_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ defmodule Membrane.StreamFormatTest do
describe "Stream format should be accepted, when they match" do
test "input pad :accepted_format in bins" do
pipeline = start_test_pipeline(Source, OuterSinkBin, AcceptedByAll)
assert_strean_format_accepted(pipeline, Source, AcceptedByAll)
assert_stream_format_accepted(pipeline, Source, AcceptedByAll)
Pipeline.terminate(pipeline)
end

test "output pad :accepted_format in bins" do
pipeline = start_test_pipeline(OuterSourceBin, Sink, AcceptedByAll)
assert_strean_format_accepted(pipeline, Source, AcceptedByAll)
assert_stream_format_accepted(pipeline, Source, AcceptedByAll)
Pipeline.terminate(pipeline)
end
end

Expand All @@ -40,29 +42,37 @@ defmodule Membrane.StreamFormatTest do
assert_down(RestrictiveSink, source_module: Source)
end

@tag :one
@tag :asd
@tag :dsa
test "input pad :accepted_format in inner bin" do
start_test_pipeline(Source, OuterSinkBin, AcceptedByOuterBins)
pipeline = start_test_pipeline(Source, OuterSinkBin, AcceptedByOuterBins)
assert_down(Sink, source_module: Source)
Pipeline.terminate(pipeline)
end

test "input pad :accepted_format in outer bin" do
start_test_pipeline(Source, OuterSinkBin, AcceptedByInnerBins)
pipeline = start_test_pipeline(Source, OuterSinkBin, AcceptedByInnerBins)
assert_down(Sink, source_module: Source)
Pipeline.terminate(pipeline)
end

test "output pad :accepted_format in element" do
start_test_pipeline(RestrictiveSource, Sink, AcceptedByOuterBins)
pipeline = start_test_pipeline(RestrictiveSource, Sink, AcceptedByOuterBins)
assert_down(RestrictiveSource)
Pipeline.terminate(pipeline)
end

test "output pad :accepted_format in inner bin" do
start_test_pipeline(OuterSourceBin, Sink, AcceptedByOuterBins)
pipeline = start_test_pipeline(OuterSourceBin, Sink, AcceptedByOuterBins)
assert_down(Source)
Pipeline.terminate(pipeline)
end

test "output pad :accepted_format in outer bin" do
start_test_pipeline(OuterSourceBin, Sink, AcceptedByInnerBins)
pipeline = start_test_pipeline(OuterSourceBin, Sink, AcceptedByInnerBins)
assert_down(Source)
Pipeline.terminate(pipeline)
end
end

Expand Down Expand Up @@ -99,7 +109,7 @@ defmodule Membrane.StreamFormatTest do
assert_down(module, source_module: module)
end

defp assert_strean_format_accepted(pipeline, source_module, stream_format_format) do
defp assert_stream_format_accepted(pipeline, source_module, stream_format_format) do
assert_receive({:my_pid, ^source_module, pid})
send(pid, :send_stream_format)

Expand Down
11 changes: 11 additions & 0 deletions test/support/accepted_format_test/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Membrane.Support.AcceptedFormatTest.Source do
Sends stream format passed in opts, after entering the `:playing` playback.
"""

require Membrane.Logger, as: L

use Membrane.Source

alias Membrane.Support.AcceptedFormatTest.StreamFormat
Expand All @@ -18,17 +20,26 @@ defmodule Membrane.Support.AcceptedFormatTest.Source do

@impl true
def handle_init(_ctx, %__MODULE__{test_pid: test_pid, stream_format: stream_format}) do
L.warning("DUPA HANDLE INIT")
{[], %{test_pid: test_pid, stream_format: stream_format}}
end

@impl true
def handle_setup(_ctx, state) do
L.warning("DUPA HANDLE SETUP")
{[], state}
end

@impl true
def handle_playing(_ctx, state) do
L.warning("DUPA HANDLE PLAYING")
send(state.test_pid, {:my_pid, __MODULE__, self()})
{[], state}
end

@impl true
def handle_info(:send_stream_format, _ctx, state) do
L.warning("DUPA HANDLE INFO SEND STREAM FORMAT")
{[stream_format: {:output, state.stream_format}], state}
end
end

0 comments on commit 4003425

Please sign in to comment.