diff --git a/lib/membrane_file/sink.ex b/lib/membrane_file/sink.ex index ab1573c..99c88ef 100644 --- a/lib/membrane_file/sink.ex +++ b/lib/membrane_file/sink.ex @@ -11,6 +11,7 @@ defmodule Membrane.File.Sink do use Membrane.Sink alias Membrane.File.SeekEvent + alias Membrane.ResourceGuard @common_file Membrane.File.CommonFileBehaviour.get_impl() @@ -39,7 +40,8 @@ defmodule Membrane.File.Sink do Membrane.ResourceGuard.register( ctx.resource_guard, - fn -> @common_file.close!(fd) end + fn -> @common_file.close!(fd) end, + tag: {:fd, fd} ) {[], %{state | fd: fd}} @@ -57,52 +59,64 @@ defmodule Membrane.File.Sink do end @impl true - def handle_event(:input, %SeekEvent{insert?: insert?, position: position}, _ctx, state) do + def handle_event(:input, %SeekEvent{insert?: insert?, position: position}, ctx, state) do state = - if insert? do - split_file(state, position) - else - seek_file(state, position) - end + if insert?, + do: split_file(state, ctx.resource_guard, position), + else: seek_file(state, ctx.resource_guard, position) {[], state} end def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state) - defp seek_file(%{fd: fd} = state, position) do - state = maybe_merge_temporary(state) + defp seek_file(%{fd: fd} = state, resource_guard, position) do + state = maybe_merge_temporary(state, resource_guard) _position = @common_file.seek!(fd, position) state end - defp split_file(%{fd: fd} = state, position) do + defp split_file(%{fd: fd} = state, resource_guard, position) do state = state - |> seek_file(position) - |> open_temporary() + |> seek_file(resource_guard, position) + |> open_temporary(resource_guard) :ok = @common_file.split!(fd, state.temp_fd) state end - defp maybe_merge_temporary(%{temp_fd: nil} = state), do: state + defp maybe_merge_temporary(%{temp_fd: nil} = state, _resource_guard), do: state - defp maybe_merge_temporary(%{fd: fd, temp_fd: temp_fd} = state) do + defp maybe_merge_temporary( + %{fd: fd, temp_fd: temp_fd, temp_location: temp_location} = state, + resource_guard + ) do # TODO: Consider improving performance for multi-insertion scenarios by using # multiple temporary files and merging them only once on `handle_prepared_to_stopped/2`. - _bytes_copied = @common_file.copy!(temp_fd, fd) - remove_temporary(state) + ResourceGuard.unregister(resource_guard, {:temp_fd, temp_fd}) + copy_and_remove_temporary(fd, temp_fd, temp_location) + %{state | temp_fd: nil} end - defp open_temporary(%{temp_fd: nil, temp_location: temp_location} = state) do + defp open_temporary( + %{temp_fd: nil, fd: fd, temp_location: temp_location} = state, + resource_guard + ) do temp_fd = @common_file.open!(temp_location, [:read, :exclusive]) + + ResourceGuard.register( + resource_guard, + fn -> copy_and_remove_temporary(fd, temp_fd, temp_location) end, + tag: {:temp_fd, temp_fd} + ) + %{state | temp_fd: temp_fd} end - defp remove_temporary(%{temp_fd: temp_fd, temp_location: temp_location} = state) do + defp copy_and_remove_temporary(fd, temp_fd, temp_location) do + _bytes_copied = @common_file.copy!(temp_fd, fd) :ok = @common_file.close!(temp_fd) :ok = @common_file.rm!(temp_location) - %{state | temp_fd: nil} end end diff --git a/lib/membrane_file/sink_multi.ex b/lib/membrane_file/sink_multi.ex index a76cf34..74914b9 100644 --- a/lib/membrane_file/sink_multi.ex +++ b/lib/membrane_file/sink_multi.ex @@ -82,9 +82,6 @@ defmodule Membrane.File.Sink.Multi do {[demand: :input], state} end - # @impl true - # def handle_prepared_to_stopped(_ctx, state), do: {:ok, close(state)} - defp open(%{naming_fun: naming_fun, index: index} = state, ctx) do fd = @common_file.open!(naming_fun.(index), :write) diff --git a/test/membrane_file/sink_multi_test.exs b/test/membrane_file/sink_multi_test.exs index 0e9c954..d659a42 100644 --- a/test/membrane_file/sink_multi_test.exs +++ b/test/membrane_file/sink_multi_test.exs @@ -1,13 +1,15 @@ defmodule Membrane.File.Sink.MultiTest do use Membrane.File.TestCaseTemplate, module: Membrane.File.Sink.Multi, async: true + import Membrane.Testing.Assertions + alias Membrane.File.{CommonMock, SplitEvent} alias Membrane.Buffer @module Membrane.File.Sink.Multi defp state_and_ctx(_ctx) do - {:ok, resource_guard} = Membrane.ResourceGuard.start_link(self()) + {:ok, resource_guard} = Membrane.Testing.MockResourceGuard.start_link() %{ ctx: %{resource_guard: resource_guard}, @@ -46,19 +48,21 @@ defmodule Membrane.File.Sink.MultiTest do %{state: %{state | naming_fun: &Integer.to_string/1}, ctx: ctx} end - # test "should close current file and open new one if event type is state.split_on", %{ - # state: state, - # ctx: ctx - # } do - # %{fd: file} = state + test "should close current file and open new one if event type is state.split_on", %{ + state: state, + ctx: ctx + } do + %{fd: file} = state + + CommonMock + |> expect(:open!, fn "1", _modes -> :new_file end) - # CommonMock - # |> expect(:close!, fn ^file -> :ok end) - # |> expect(:open!, fn "1", _modes -> :new_file end) + assert {[], %{state | index: 1, fd: :new_file}} == + @module.handle_event(:input, %SplitEvent{}, ctx, state) - # assert {[], %{state | index: 1, fd: :new_file}} == - # @module.handle_event(:input, %SplitEvent{}, ctx, state) - # end + assert_resource_guard_cleanup(ctx.resource_guard, ^file) + assert_resource_guard_register(ctx.resource_guard, _fun, :new_file) + end test "should not close current file and open new one if event type is not state.split_on", %{ state: state, @@ -70,11 +74,4 @@ defmodule Membrane.File.Sink.MultiTest do @module.handle_event(:input, :whatever, ctx, state) end end - - # describe "handle_prepared_to_stopped" do - # test "should increment file index", %{state: state} do - # CommonMock |> expect(:close!, fn _fd -> :ok end) - # assert {[], %{index: 1, fd: nil}} = @module.handle_prepared_to_stopped(%{}, state) - # end - # end end diff --git a/test/membrane_file/sink_test.exs b/test/membrane_file/sink_test.exs index eece154..b878bbb 100644 --- a/test/membrane_file/sink_test.exs +++ b/test/membrane_file/sink_test.exs @@ -1,13 +1,15 @@ defmodule Membrane.File.SinkTest do use Membrane.File.TestCaseTemplate, module: Membrane.File.Sink, async: true + import Membrane.Testing.Assertions + alias Membrane.Buffer alias Membrane.File.{CommonMock, SeekEvent} @module Membrane.File.Sink defp state_and_ctx(_ctx) do - {:ok, resource_guard} = Membrane.ResourceGuard.start_link(self()) + {:ok, resource_guard} = Membrane.Testing.MockResourceGuard.start_link() %{ ctx: %{resource_guard: resource_guard}, @@ -68,6 +70,15 @@ defmodule Membrane.File.SinkTest do ctx, state ) + + assert_resource_guard_register(ctx.resource_guard, cleanup_function, {:temp_fd, :temporary}) + + CommonMock + |> expect(:copy!, fn :temporary, ^file -> 0 end) + |> expect(:close!, fn :temporary -> :ok end) + |> expect(:rm!, fn ^temp_location -> :ok end) + + cleanup_function.() end test "should write to main file if temporary descriptor is opened", %{state: state, ctx: ctx} do @@ -100,29 +111,24 @@ defmodule Membrane.File.SinkTest do end end - describe "on handle_prepared_to_stopped" do + describe "on handle_setup" do setup :inject_mock_fd - # test "should close file", %{state: state} do - # %{fd: file} = state + test "should register closing file", %{state: state, ctx: ctx} do + %{location: location} = state - # CommonMock |> expect(:close!, fn ^file -> :ok end) + CommonMock + |> expect(:open!, fn ^location, [:read, :write] -> :file end) + |> expect(:truncate!, fn :file -> :ok end) - # assert {[], %{state | fd: nil}} == @module.handle_prepared_to_stopped(nil, state) - # end + assert {[], %{fd: :file}} = @module.handle_setup(ctx, state) - # test "should handle temporary file if temporary descriptor is opened", %{state: state} do - # %{fd: file, temp_location: temp_location} = state - # state = %{state | temp_fd: :temporary} + assert_resource_guard_register(ctx.resource_guard, fd_cleanup_function, {:fd, :file}) - # CommonMock - # |> expect(:copy!, fn :temporary, ^file -> 0 end) - # |> expect(:close!, fn :temporary -> :ok end) - # |> expect(:rm!, fn ^temp_location -> :ok end) - # |> expect(:close!, fn ^file -> :ok end) + CommonMock + |> expect(:close!, fn :file -> :ok end) - # assert {[], %{state | fd: nil, temp_fd: nil}} == - # @module.handle_prepared_to_stopped(nil, state) - # end + fd_cleanup_function.() + end end end diff --git a/test/membrane_file/source_test.exs b/test/membrane_file/source_test.exs index 6e13483..aa666c1 100644 --- a/test/membrane_file/source_test.exs +++ b/test/membrane_file/source_test.exs @@ -9,7 +9,7 @@ defmodule Membrane.File.SourceTest do @module Membrane.File.Source defp state_and_ctx(_ctx) do - {:ok, resource_guard} = Membrane.ResourceGuard.start_link(self()) + {:ok, resource_guard} = Membrane.Testing.MockResourceGuard.start_link() %{ ctx: %{resource_guard: resource_guard}, diff --git a/test/support/membrane_file/test_case_template.ex b/test/support/membrane_file/test_case_template.ex index db1c67a..ecb0bdf 100644 --- a/test/support/membrane_file/test_case_template.ex +++ b/test/support/membrane_file/test_case_template.ex @@ -7,6 +7,7 @@ defmodule Membrane.File.TestCaseTemplate do quote do import Mox + import Membrane.Testing.Assertions alias Membrane.File.CommonMock alias Membrane.Element.CallbackContext.{Prepare, Stop} @@ -23,21 +24,15 @@ defmodule Membrane.File.TestCaseTemplate do |> stub(:truncate!, fn _fd -> :ok end) assert {[], %{fd: :file}} = unquote(module).handle_setup(ctx, state) - end - end - - # describe "template: handle_prepared_to_stopped" do - # setup :inject_mock_fd - # test "should close file", %{state: state} do - # %{fd: fd} = state + assert_resource_guard_register(ctx.resource_guard, cleanup_function, _tag) - # CommonMock - # |> expect(:close!, fn _fd -> :ok end) + CommonMock + |> expect(:close!, fn :file -> :ok end) - # assert {[], %{fd: nil}} = unquote(module).handle_prepared_to_stopped(%{}, state) - # end - # end + cleanup_function.() + end + end defp inject_mock_fd(%{state: state, ctx: ctx}) do %{state: %{state | fd: :file}, ctx: ctx}