Skip to content

Commit

Permalink
Write auto demand queue test wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Oct 27, 2023
1 parent b075c70 commit b66ef9d
Showing 1 changed file with 52 additions and 4 deletions.
56 changes: 52 additions & 4 deletions test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Membrane.Integration.AutoDemandsTest do

alias Membrane.Testing.{Pipeline, Sink, Source}

defmodule AutoDemandFilter do
defmodule ExponentialAutoFilter do
use Membrane.Filter

def_input_pad :input, accepted_format: _any
Expand Down Expand Up @@ -35,6 +35,26 @@ defmodule Membrane.Integration.AutoDemandsTest do
end
end

defmodule NotifyingAutoFilter do
use Membrane.Fitler

def_input_pad :input, accepted_format: _any, availability: :on_request
def_output_pad :output, accepted_format: _any

@impl true
def handle_buffer(pad, buffer, _ctx, state) do
actions = [
notify_parent: {:handling_buffer, pad, buffer},
buffer: {:output, buffer}
]

{actions, state}
end

@impl true
def handle_end_of_stream(_ctx, state), do: {[], state}
end

defmodule AutoDemandTee do
use Membrane.Filter

Expand Down Expand Up @@ -64,7 +84,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
:down -> {mult_payloads, payloads}
end

filter = %AutoDemandFilter{factor: factor, direction: direction}
filter = %ExponentialAutoFilter{factor: factor, direction: direction}

pipeline =
Pipeline.start_link_supervised!(
Expand Down Expand Up @@ -202,7 +222,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
Pipeline.start_link_supervised!(
spec:
child(:source, PushSource)
|> child(:filter, AutoDemandFilter)
|> child(:filter, ExponentialAutoFilter)
|> child(:sink, Sink)
)

Expand Down Expand Up @@ -230,7 +250,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
Pipeline.start_supervised!(
spec:
child(:source, PushSource)
|> child(:filter, AutoDemandFilter)
|> child(:filter, ExponentialAutoFilter)
|> child(:sink, %Sink{autodemand: false})
)

Expand All @@ -246,6 +266,34 @@ defmodule Membrane.Integration.AutoDemandsTest do
)
end

test "auto flow queue" do
# Testing.Source fed with such a buffer generator will produce buffers with incremenal
# sequence of numbers as payloads
buffer_generator =
fn counter, _size ->
actions = [
buffer: {:output, %Buffer{payload: counter}},
redemand: :output
]

{actions, counter + 1}
end

source_definiton = %Source{output: {0, buffer_generator}}

pipeline =
Pipeline.start_supervised!(
spec: [
child({:source, 0}, source_definiton)
|> child(:filter, NotifyingAutoFilter)
|> child(:sink, %Sink{autodemand: false}),
child({:source, 1}, source_definiton)
|> get_child(:filter)
]
)

end

defp reduce_link(link, enum, fun) do
Enum.reduce(enum, link, &fun.(&2, &1))
end
Expand Down

0 comments on commit b66ef9d

Please sign in to comment.