Skip to content

Commit

Permalink
fix clock selection
Browse files Browse the repository at this point in the history
  • Loading branch information
mat-hek committed Oct 23, 2023
1 parent 9269612 commit f849f80
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 66 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Changelog

<<<<<<< HEAD
## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
* Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626)

## 1.0.0
=======
## 1.0.0-rc1
>>>>>>> 3df8e641 (disable automatic clock selection)
* Introduce `:remove_link` action in pipelines and bins.
* Add children groups - a mechanism that allows refering to multiple children with a single identifier.
* Rename `remove_child` action into `remove_children` and allow for removing a children group with a single action.
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ defmodule Membrane.Core.Bin do
parent_clock: options.parent_clock,
timers: %{},
clock: clock,
clock_provider: %{clock: nil, provider: nil, choice: :auto},
clock_provider: %{clock: nil, provider: nil},
clock_proxy: clock_proxy,
# This is a sync for siblings. This is not yet allowed.
stream_sync: Membrane.Sync.no_sync(),
Expand Down
3 changes: 1 addition & 2 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ defmodule Membrane.Core.Bin.State do
clock_proxy: Clock.t(),
clock_provider: %{
clock: Clock.t() | nil,
provider: Child.name() | nil,
choice: :auto | :manual
provider: Child.name() | nil
}
},
children_log_metadata: Keyword.t(),
Expand Down
31 changes: 8 additions & 23 deletions lib/membrane/core/parent/clock_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defmodule Membrane.Core.Parent.ClockHandler do
Core.Parent.state()
) ::
Core.Parent.state() | no_return
def choose_clock(_children, nil, state) do
state
end

def choose_clock(children, provider, state) do
%{synchronization: synchronization} = state

Expand All @@ -20,17 +24,13 @@ defmodule Membrane.Core.Parent.ClockHandler do
end

components = components ++ children

cond do
provider != nil -> set_clock_provider(get_clock_from_provider(components, provider), state)
synchronization.clock_provider.choice == :manual -> state
true -> choose_clock_provider(components) |> set_clock_provider(state)
end
clock = get_clock_from_provider(components, provider)
set_clock_provider(clock, state)
end

@spec reset_clock(Core.Parent.state()) :: Core.Parent.state()
def reset_clock(state),
do: set_clock_provider(%{clock: nil, provider: nil, choice: :auto}, state)
do: set_clock_provider(%{clock: nil, provider: nil}, state)

defp set_clock_provider(clock_provider, state) do
Clock.proxy_for(state.synchronization.clock_proxy, clock_provider.clock)
Expand All @@ -48,22 +48,7 @@ defmodule Membrane.Core.Parent.ClockHandler do
raise ParentError, "#{inspect(provider)} is not a clock provider"

%{clock: clock} ->
%{clock: clock, provider: provider, choice: :manual}
end
end

defp choose_clock_provider(components) do
case components |> Enum.filter(& &1.clock) do
[] ->
%{clock: nil, provider: nil, choice: :auto}

[%{name: name, clock: clock}] ->
%{clock: clock, provider: name, choice: :auto}

components ->
raise ParentError, """
Cannot choose clock for the parent, as multiple components provide one, namely: #{Enum.map_join(components, ", ", & &1.name)}. Please explicitly select the clock by setting `ChildrenSpec.clock_provider` parameter.
"""
%{clock: clock, provider: provider}
end
end
end
2 changes: 1 addition & 1 deletion lib/membrane/core/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Membrane.Core.Pipeline do
module: params.module,
synchronization: %{
clock_proxy: clock_proxy,
clock_provider: %{clock: nil, provider: nil, choice: :auto},
clock_provider: %{clock: nil, provider: nil},
timers: %{}
},
subprocess_supervisor: subprocess_supervisor,
Expand Down
3 changes: 1 addition & 2 deletions lib/membrane/core/pipeline/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ defmodule Membrane.Core.Pipeline.State do
timers: %{Timer.id() => Timer.t()},
clock_provider: %{
clock: Membrane.Clock.t() | nil,
provider: Child.name() | nil,
choice: :auto | :manual
provider: Child.name() | nil
},
clock_proxy: Membrane.Clock.t()
},
Expand Down
37 changes: 6 additions & 31 deletions test/membrane/clock_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ defmodule Membrane.ClockTest do
synchronization: %{clock_proxy: result_proxy, clock_provider: result_provider}
} = ClockHandler.choose_clock(children, :el2, dummy_pipeline_state)

assert %{choice: :manual, clock: ^clock, provider: :el2} = result_provider
assert %{clock: ^clock, provider: :el2} = result_provider
assert ^proxy_clock = result_proxy
end

Expand All @@ -188,15 +188,15 @@ defmodule Membrane.ClockTest do
dummy_pipeline_state =
struct(State,
module: __MODULE__,
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{choice: :auto}}
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{}}
)

assert_raise Membrane.ParentError, ~r/.*el1.*clock provider/, fn ->
ClockHandler.choose_clock(children, :el1, dummy_pipeline_state)
end
end

test "when provider is not specified and there are multiple clock providers among children" do
test "when provider is not specified" do
{:ok, clock} = Clock.start_link()
{:ok, clock2} = Clock.start_link()
{:ok, proxy_clock} = Clock.start_link(proxy: true)
Expand All @@ -210,36 +210,11 @@ defmodule Membrane.ClockTest do
dummy_pipeline_state =
struct(State,
module: __MODULE__,
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{choice: :auto}}
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{}}
)

assert_raise Membrane.ParentError, ~r/.*multiple components.*/, fn ->
ClockHandler.choose_clock(children, nil, dummy_pipeline_state)
end
end

test "when there is no clock provider and there is exactly one clock provider among children" do
{:ok, clock} = Clock.start_link()
{:ok, proxy_clock} = Clock.start_link(proxy: true)

children = [
%ChildEntry{name: :el1, pid: :c.pid(0, 1, 0)},
%ChildEntry{name: :el2, clock: clock, pid: :c.pid(0, 2, 0)},
%ChildEntry{name: :el3, pid: :c.pid(0, 3, 0)}
]

dummy_pipeline_state =
struct(State,
module: __MODULE__,
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{choice: :auto}}
)

assert %State{
synchronization: %{clock_proxy: result_proxy, clock_provider: result_provider}
} = ClockHandler.choose_clock(children, nil, dummy_pipeline_state)

assert %{choice: :auto, clock: ^clock, provider: :el2} = result_provider
assert ^proxy_clock = result_proxy
assert dummy_pipeline_state ==
ClockHandler.choose_clock(children, nil, dummy_pipeline_state)
end
end

Expand Down
4 changes: 2 additions & 2 deletions test/membrane/integration/bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,15 @@ defmodule Membrane.Core.BinTest do
%Membrane.Core.Pipeline.State{synchronization: %{clock_provider: pipeline_clock_provider}} =
state = :sys.get_state(pid)

assert %{choice: :manual, clock: clock1, provider: :bin_child} = pipeline_clock_provider
assert %{clock: clock1, provider: :bin_child} = pipeline_clock_provider
refute is_nil(clock1)

%{pid: bin_pid} = state.children[:bin_child]

%Membrane.Core.Bin.State{synchronization: %{clock_provider: bin_clock_provider}} =
:sys.get_state(bin_pid)

assert %{choice: :manual, clock: clock2, provider: :element_child} = bin_clock_provider
assert %{clock: clock2, provider: :element_child} = bin_clock_provider
refute is_nil(clock2)

assert proxy_for?(clock1, clock2)
Expand Down
9 changes: 5 additions & 4 deletions test/membrane/integration/sync_test/ticking_pace.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ defmodule Membrane.Integration.SyncTest.TickingPace do
actual_report_interval = 100
reported_interval = 300

links = [
spec = {
child(:source, %Sync.Source{
tick_interval: tick_interval |> Time.milliseconds(),
test_process: self()
})
|> child(:sink, Sync.Sink)
]
|> child(:sink, Sync.Sink),
clock_provider: :sink
}

pipeline = Testing.Pipeline.start_link_supervised!(spec: links)
pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

%{synchronization: %{clock_provider: %{clock: original_clock, provider: :sink}}} =
:sys.get_state(pipeline)
Expand Down

0 comments on commit f849f80

Please sign in to comment.