Skip to content

Commit

Permalink
Media item lifecycle improvements (#18)
Browse files Browse the repository at this point in the history
* Added jobs for enqueuing/dequeuing media based on source's status

* adds lifecycle columns to source and media_item tables
  • Loading branch information
kieraneglin authored Feb 10, 2024
1 parent 8bdd189 commit 219320c
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 23 deletions.
5 changes: 4 additions & 1 deletion lib/pinchflat/media/media_item.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ defmodule Pinchflat.Media.MediaItem do
media_id
original_url
livestream
media_downloaded_at
media_filepath
source_id
subtitle_filepaths
thumbnail_filepath
metadata_filepath
source_id
)a
@required_fields ~w(title original_url livestream media_id source_id)a

Expand All @@ -28,6 +29,8 @@ defmodule Pinchflat.Media.MediaItem do
field :media_id, :string
field :original_url, :string
field :livestream, :boolean, default: false
field :media_downloaded_at, :utc_datetime

field :media_filepath, :string
field :thumbnail_filepath, :string
field :metadata_filepath, :string
Expand Down
6 changes: 5 additions & 1 deletion lib/pinchflat/media_client/video_downloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ defmodule Pinchflat.MediaClient.VideoDownloader do
case download_for_media_profile(media_item.media_id, media_profile, backend) do
{:ok, parsed_json} ->
parser = metadata_parser(backend)
parsed_attrs = parser.parse_for_media_item(parsed_json)

parsed_attrs =
parsed_json
|> parser.parse_for_media_item()
|> Map.merge(%{media_downloaded_at: DateTime.utc_now()})

# Don't forgor to use preloaded associations or updates to
# associations won't work!
Expand Down
35 changes: 30 additions & 5 deletions lib/pinchflat/media_source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Pinchflat.MediaSource do
def create_source(attrs) do
%Source{}
|> change_source_from_url(attrs)
|> commit_and_start_indexing()
|> commit_and_handle_tasks()
end

@doc """
Expand All @@ -51,7 +51,7 @@ defmodule Pinchflat.MediaSource do
def update_source(%Source{} = source, attrs) do
source
|> change_source_from_url(attrs)
|> commit_and_start_indexing()
|> commit_and_handle_tasks()
end

@doc """
Expand Down Expand Up @@ -139,13 +139,38 @@ defmodule Pinchflat.MediaSource do
change_source(source, Map.merge(changes, collection_changes))
end

defp commit_and_start_indexing(changeset) do
defp commit_and_handle_tasks(changeset) do
case Repo.insert_or_update(changeset) do
{:ok, %Source{} = source} -> maybe_run_indexing_task(changeset, source)
err -> err
{:ok, %Source{} = source} ->
maybe_handle_media_tasks(changeset, source)
maybe_run_indexing_task(changeset, source)

err ->
err
end
end

# If the source is NOT new (ie: updated) and the download_media flag has changed,
# enqueue or dequeue media download tasks as necessary.
defp maybe_handle_media_tasks(changeset, source) do
case {changeset.data, changeset.changes} do
{%{__meta__: %{state: :loaded}}, %{download_media: true}} ->
SourceTasks.enqueue_pending_media_tasks(source)

{%{__meta__: %{state: :loaded}}, %{download_media: false}} ->
SourceTasks.dequeue_pending_media_tasks(source)

_ ->
:ok
end

{:ok, source}
end

# IDEA: this uses a pattern where `kickoff_indexing_task` controls whether
# it should run based on the source, but `maybe_handle_media_tasks` handles that
# logic itself. Consider updating one or the other to be consistent (once I've
# decided which I like more)
defp maybe_run_indexing_task(changeset, source) do
case changeset.data do
# If the changeset is new (not persisted), attempt indexing no matter what
Expand Down
11 changes: 10 additions & 1 deletion lib/pinchflat/media_source/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@ defmodule Pinchflat.MediaSource.Source do
friendly_name
index_frequency_minutes
download_media
last_indexed_at
original_url
media_profile_id
)a

@required_fields @allowed_fields -- ~w(index_frequency_minutes friendly_name)a
@required_fields ~w(
collection_name
collection_id
collection_type
download_media
original_url
media_profile_id
)a

schema "sources" do
field :friendly_name, :string
Expand All @@ -29,6 +37,7 @@ defmodule Pinchflat.MediaSource.Source do
field :collection_type, Ecto.Enum, values: [:channel, :playlist]
field :index_frequency_minutes, :integer, default: 60 * 24
field :download_media, :boolean, default: true
field :last_indexed_at, :utc_datetime
# This should only be used for user reference going forward
# as the collection_id should be used for all API calls
field :original_url, :string
Expand Down
8 changes: 2 additions & 6 deletions lib/pinchflat/tasks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ defmodule Pinchflat.Tasks do
%MediaItem{} = media_item -> list_tasks_for(:media_item_id, media_item.id)
end

Enum.each(tasks, fn task ->
delete_task(task)
end)
Enum.each(tasks, &delete_task/1)
end

@doc """
Expand All @@ -134,9 +132,7 @@ defmodule Pinchflat.Tasks do
%MediaItem{} = media_item -> list_pending_tasks_for(:media_item_id, media_item.id)
end

Enum.each(tasks, fn task ->
delete_task(task)
end)
Enum.each(tasks, &delete_task/1)
end

@doc """
Expand Down
17 changes: 15 additions & 2 deletions lib/pinchflat/tasks/source_tasks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Pinchflat.Tasks.SourceTasks do

alias Pinchflat.Media
alias Pinchflat.Tasks
alias Pinchflat.MediaSource
alias Pinchflat.MediaSource.Source
alias Pinchflat.MediaClient.SourceDetails
alias Pinchflat.Workers.MediaIndexingWorker
Expand Down Expand Up @@ -42,6 +43,7 @@ defmodule Pinchflat.Tasks.SourceTasks do
"""
def index_media_items(%Source{} = source) do
{:ok, media_attributes} = SourceDetails.get_media_attributes(source.original_url)
MediaSource.update_source(source, %{last_indexed_at: DateTime.utc_now()})

media_attributes
|> Enum.map(fn media_attrs ->
Expand Down Expand Up @@ -73,7 +75,7 @@ defmodule Pinchflat.Tasks.SourceTasks do
Returns :ok
"""
def enqueue_pending_media_downloads(%Source{download_media: true} = source) do
def enqueue_pending_media_tasks(%Source{download_media: true} = source) do
source
|> Media.list_pending_media_items_for()
|> Enum.each(fn media_item ->
Expand All @@ -84,7 +86,18 @@ defmodule Pinchflat.Tasks.SourceTasks do
end)
end

def enqueue_pending_media_downloads(%Source{download_media: false} = _source) do
def enqueue_pending_media_tasks(%Source{download_media: false} = _source) do
:ok
end

@doc """
Deletes ALL pending tasks for a source's media items.
Returns :ok
"""
def dequeue_pending_media_tasks(%Source{} = source) do
source
|> Media.list_pending_media_items_for()
|> Enum.each(&Tasks.delete_pending_tasks_for/1)
end
end
2 changes: 1 addition & 1 deletion lib/pinchflat/workers/media_indexing_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do
defp index_media_and_reschedule(source) do
SourceTasks.index_media_items(source)
# This method handles the case where a source is set to not download media
SourceTasks.enqueue_pending_media_downloads(source)
SourceTasks.enqueue_pending_media_tasks(source)

source
|> Map.take([:id])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Pinchflat.Repo.Migrations.AddLifecycleColumnsToSourcesAndMedia do
use Ecto.Migration

def change do
alter table(:media_items) do
add :media_downloaded_at, :utc_datetime
add :details_updated_at, :utc_datetime
end

alter table(:sources) do
add :last_indexed_at, :utc_datetime
end
end
end
6 changes: 6 additions & 0 deletions test/pinchflat/media_client/video_downloader_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ defmodule Pinchflat.MediaClient.VideoDownloaderTest do
:ok
end

test "it sets the media_downloaded_at", %{media_item: media_item} do
assert media_item.media_downloaded_at == nil
assert {:ok, updated_media_item} = VideoDownloader.download_for_media_item(media_item)
assert DateTime.diff(DateTime.utc_now(), updated_media_item.media_downloaded_at) < 1
end

test "it extracts the title", %{media_item: media_item} do
assert {:ok, updated_media_item} = VideoDownloader.download_for_media_item(media_item)
assert updated_media_item.title == "Trying to Wheelie Without the Rear Brake"
Expand Down
24 changes: 24 additions & 0 deletions test/pinchflat/media_source_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ defmodule Pinchflat.MediaSourceTest do
use Pinchflat.DataCase
import Mox
import Pinchflat.TasksFixtures
import Pinchflat.MediaFixtures
import Pinchflat.ProfilesFixtures
import Pinchflat.MediaSourceFixtures

alias Pinchflat.MediaSource
alias Pinchflat.Tasks.SourceTasks
alias Pinchflat.MediaSource.Source
alias Pinchflat.Workers.MediaIndexingWorker
alias Pinchflat.Workers.VideoDownloadWorker

@invalid_source_attrs %{name: nil, collection_id: nil}

Expand Down Expand Up @@ -173,6 +176,27 @@ defmodule Pinchflat.MediaSourceTest do
refute_enqueued(worker: MediaIndexingWorker, args: %{"id" => source.id})
end

test "enabling the download_media attribute will schedule a download task" do
source = source_fixture(download_media: false)
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
update_attrs = %{download_media: true}

refute_enqueued(worker: VideoDownloadWorker)
assert {:ok, %Source{}} = MediaSource.update_source(source, update_attrs)
assert_enqueued(worker: VideoDownloadWorker, args: %{"id" => media_item.id})
end

test "disabling the download_media attribute will cancel the download task" do
source = source_fixture(download_media: true)
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)
update_attrs = %{download_media: false}
SourceTasks.enqueue_pending_media_tasks(source)

assert_enqueued(worker: VideoDownloadWorker, args: %{"id" => media_item.id})
assert {:ok, %Source{}} = MediaSource.update_source(source, update_attrs)
refute_enqueued(worker: VideoDownloadWorker)
end

test "updates with invalid data returns error changeset" do
source = source_fixture()

Expand Down
36 changes: 30 additions & 6 deletions test/pinchflat/tasks/source_tasks_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,23 @@ defmodule Pinchflat.Tasks.SourceTasksTest do
assert Enum.all?(first_run, fn %MediaItem{} -> true end)
assert Enum.all?(duplicate_run, fn %Ecto.Changeset{} -> true end)
end

test "it updates the source's last_indexed_at field", %{source: source} do
assert source.last_indexed_at == nil

SourceTasks.index_media_items(source)
source = Repo.reload!(source)

assert DateTime.diff(DateTime.utc_now(), source.last_indexed_at) < 1
end
end

describe "enqueue_pending_media_downloads/1" do
describe "enqueue_pending_media_tasks/1" do
test "it enqueues a job for each pending media item" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)

assert :ok = SourceTasks.enqueue_pending_media_downloads(source)
assert :ok = SourceTasks.enqueue_pending_media_tasks(source)

assert_enqueued(worker: VideoDownloadWorker, args: %{"id" => media_item.id})
end
Expand All @@ -119,7 +128,7 @@ defmodule Pinchflat.Tasks.SourceTasksTest do
source = source_fixture()
_media_item = media_item_fixture(source_id: source.id, media_filepath: "some/filepath.mp4")

assert :ok = SourceTasks.enqueue_pending_media_downloads(source)
assert :ok = SourceTasks.enqueue_pending_media_tasks(source)

refute_enqueued(worker: VideoDownloadWorker)
end
Expand All @@ -130,15 +139,15 @@ defmodule Pinchflat.Tasks.SourceTasksTest do

assert [] = Tasks.list_tasks_for(:media_item_id, media_item.id)

assert :ok = SourceTasks.enqueue_pending_media_downloads(source)
assert :ok = SourceTasks.enqueue_pending_media_tasks(source)

assert [_] = Tasks.list_tasks_for(:media_item_id, media_item.id)
end

test "it does not create a job if the source is set to not download" do
source = source_fixture(download_media: false)

assert :ok = SourceTasks.enqueue_pending_media_downloads(source)
assert :ok = SourceTasks.enqueue_pending_media_tasks(source)

refute_enqueued(worker: VideoDownloadWorker)
end
Expand All @@ -147,7 +156,22 @@ defmodule Pinchflat.Tasks.SourceTasksTest do
source = source_fixture(download_media: false)
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)

assert :ok = SourceTasks.enqueue_pending_media_downloads(source)
assert :ok = SourceTasks.enqueue_pending_media_tasks(source)
assert [] = Tasks.list_tasks_for(:media_item_id, media_item.id)
end
end

describe "dequeue_pending_media_tasks/1" do
test "it deletes all pending tasks for a source's media items" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)

SourceTasks.enqueue_pending_media_tasks(source)
assert_enqueued(worker: VideoDownloadWorker, args: %{"id" => media_item.id})

assert :ok = SourceTasks.dequeue_pending_media_tasks(source)

refute_enqueued(worker: VideoDownloadWorker)
assert [] = Tasks.list_tasks_for(:media_item_id, media_item.id)
end
end
Expand Down

0 comments on commit 219320c

Please sign in to comment.