Skip to content

Commit

Permalink
[Enhancement] Run fast indexing on source creation and at higher prio…
Browse files Browse the repository at this point in the history
…rity (#583)

* Updated default job priorities for downloading queue

* Added the ability to set priority to various downloading helpers

* Sets sources to fast index on creation
  • Loading branch information
kieraneglin authored Jan 22, 2025
1 parent 704d29d commit 62214b8
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 60 deletions.
2 changes: 1 addition & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ config :pinchflat, Pinchflat.Repo,
config :pinchflat, Oban,
queues: [
default: 10,
fast_indexing: 6,
fast_indexing: yt_dlp_worker_count,
media_collection_indexing: yt_dlp_worker_count,
media_fetching: yt_dlp_worker_count,
remote_metadata: yt_dlp_worker_count,
Expand Down
12 changes: 7 additions & 5 deletions lib/pinchflat/downloading/downloading_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do
Returns :ok
"""
def enqueue_pending_download_tasks(%Source{download_media: true} = source) do
def enqueue_pending_download_tasks(source, job_opts \\ [])

def enqueue_pending_download_tasks(%Source{download_media: true} = source, job_opts) do
source
|> Media.list_pending_media_items_for()
|> Enum.each(&MediaDownloadWorker.kickoff_with_task/1)
|> Enum.each(&MediaDownloadWorker.kickoff_with_task(&1, %{}, job_opts))
end

def enqueue_pending_download_tasks(%Source{download_media: false}) do
def enqueue_pending_download_tasks(%Source{download_media: false}, _job_opts) do
:ok
end

Expand All @@ -55,13 +57,13 @@ defmodule Pinchflat.Downloading.DownloadingHelpers do
Returns {:ok, %Task{}} | {:error, :should_not_download} | {:error, any()}
"""
def kickoff_download_if_pending(%MediaItem{} = media_item) do
def kickoff_download_if_pending(%MediaItem{} = media_item, job_opts \\ []) do
media_item = Repo.preload(media_item, :source)

if media_item.source.download_media && Media.pending_download?(media_item) do
Logger.info("Kicking off download for media item ##{media_item.id} (#{media_item.media_id})")

MediaDownloadWorker.kickoff_with_task(media_item)
MediaDownloadWorker.kickoff_with_task(media_item, %{}, job_opts)
else
{:error, :should_not_download}
end
Expand Down
1 change: 1 addition & 0 deletions lib/pinchflat/downloading/media_download_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Pinchflat.Downloading.MediaDownloadWorker do

use Oban.Worker,
queue: :media_fetching,
priority: 5,
unique: [period: :infinity, states: [:available, :scheduled, :retryable, :executing]],
tags: ["media_item", "media_fetching", "show_in_dashboard"]

Expand Down
7 changes: 5 additions & 2 deletions lib/pinchflat/fast_indexing/fast_indexing_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do
Returns [%MediaItem{}] where each item is a new media item that was created _but not necessarily
downloaded_.
"""
def kickoff_download_tasks_from_youtube_rss_feed(%Source{} = source) do
def index_and_kickoff_downloads(%Source{} = source) do
# The media_profile is needed to determine the quality options to _then_ determine a more
# accurate predicted filepath
source = Repo.preload(source, [:media_profile])
Expand All @@ -53,6 +53,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do
Enum.map(new_media_ids, fn media_id ->
case create_media_item_from_media_id(source, media_id) do
{:ok, media_item} ->
DownloadingHelpers.kickoff_download_if_pending(media_item, priority: 0)
media_item

err ->
Expand All @@ -61,7 +62,9 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpers do
end
end)

DownloadingHelpers.enqueue_pending_download_tasks(source)
# Pick up any stragglers. Intentionally has a lower priority than the per-media item
# kickoff above
DownloadingHelpers.enqueue_pending_download_tasks(source, priority: 1)

Enum.filter(maybe_new_media_items, & &1)
end
Expand Down
6 changes: 3 additions & 3 deletions lib/pinchflat/fast_indexing/fast_indexing_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ defmodule Pinchflat.FastIndexing.FastIndexingWorker do
Order of operations:
1. FastIndexingWorker (this module) periodically checks the YouTube RSS feed for new media.
with `FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed`
2. If the above `kickoff_download_tasks_from_youtube_rss_feed` finds new media items in the RSS feed,
with `FastIndexingHelpers.index_and_kickoff_downloads`
2. If the above `index_and_kickoff_downloads` finds new media items in the RSS feed,
it indexes them with a yt-dlp call to create the media item records then kicks off downloading
tasks (MediaDownloadWorker) for any new media items _that should be downloaded_.
3. Once downloads are kicked off, this worker sends a notification to the apprise server if applicable
Expand Down Expand Up @@ -67,7 +67,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingWorker do

new_media_items =
source
|> FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed()
|> FastIndexingHelpers.index_and_kickoff_downloads()
|> Enum.filter(&Media.pending_download?(&1))

if source.download_media do
Expand Down
1 change: 0 additions & 1 deletion lib/pinchflat/slow_indexing/slow_indexing_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
def kickoff_indexing_task(%Source{} = source, job_args \\ %{}, job_opts \\ []) do
job_offset_seconds = if job_args[:force], do: 0, else: calculate_job_offset_seconds(source)

Tasks.delete_pending_tasks_for(source, "FastIndexingWorker")
Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker", include_executing: true)

MediaCollectionIndexingWorker.kickoff_with_task(source, job_args, job_opts ++ [schedule_in: job_offset_seconds])
Expand Down
4 changes: 4 additions & 0 deletions lib/pinchflat/sources/sources.ex
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ defmodule Pinchflat.Sources do
%{__meta__: %{state: :built}} ->
SlowIndexingHelpers.kickoff_indexing_task(source)

if Ecto.Changeset.get_field(changeset, :fast_index) do
FastIndexingHelpers.kickoff_indexing_task(source)
end

# If the record has been persisted, only run indexing if the
# indexing frequency has been changed and is now greater than 0
%{__meta__: %{state: :loaded}} ->
Expand Down
29 changes: 23 additions & 6 deletions test/pinchflat/downloading/downloading_helpers_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
alias Pinchflat.Downloading.MediaDownloadWorker

describe "enqueue_pending_download_tasks/1" do
test "it enqueues a job for each pending media item" do
test "enqueues a job for each pending media item" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)

Expand All @@ -19,7 +19,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})
end

test "it does not enqueue a job for media items with a filepath" do
test "does not enqueue a job for media items with a filepath" do
source = source_fixture()
_media_item = media_item_fixture(source_id: source.id, media_filepath: "some/filepath.mp4")

Expand All @@ -28,7 +28,7 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
refute_enqueued(worker: MediaDownloadWorker)
end

test "it attaches a task to each enqueued job" do
test "attaches a task to each enqueued job" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)

Expand All @@ -39,25 +39,34 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do
assert [_] = Tasks.list_tasks_for(media_item)
end

test "it does not create a job if the source is set to not download" do
test "does not create a job if the source is set to not download" do
source = source_fixture(download_media: false)

assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source)

refute_enqueued(worker: MediaDownloadWorker)
end

test "it does not attach tasks if the source is set to not download" do
test "does not attach tasks if the source is set to not download" do
source = source_fixture(download_media: false)
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)

assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source)
assert [] = Tasks.list_tasks_for(media_item)
end

test "can pass job options" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)

assert :ok = DownloadingHelpers.enqueue_pending_download_tasks(source, priority: 1)

assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id}, priority: 1)
end
end

describe "dequeue_pending_download_tasks/1" do
test "it deletes all pending tasks for a source's media items" do
test "deletes all pending tasks for a source's media items" do
source = source_fixture()
media_item = media_item_fixture(source_id: source.id, media_filepath: nil)

Expand Down Expand Up @@ -109,6 +118,14 @@ defmodule Pinchflat.Downloading.DownloadingHelpersTest do

refute_enqueued(worker: MediaDownloadWorker)
end

test "can pass job options" do
media_item = media_item_fixture(media_filepath: nil)

assert {:ok, _} = DownloadingHelpers.kickoff_download_if_pending(media_item, priority: 1)

assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id}, priority: 1)
end
end

describe "kickoff_redownload_for_existing_media/1" do
Expand Down
15 changes: 11 additions & 4 deletions test/pinchflat/downloading/media_download_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,20 @@ defmodule Pinchflat.Downloading.MediaDownloadWorkerTest do
assert_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id, "force" => true})
end

test "can be called with additional job options", %{media_item: media_item} do
job_opts = [max_attempts: 5]
test "has a priority of 5 by default", %{media_item: media_item} do
assert {:ok, _} = MediaDownloadWorker.kickoff_with_task(media_item)

[job] = all_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})

assert {:ok, _} = MediaDownloadWorker.kickoff_with_task(media_item, %{}, job_opts)
assert job.priority == 5
end

test "priority can be set", %{media_item: media_item} do
assert {:ok, _} = MediaDownloadWorker.kickoff_with_task(media_item, %{}, priority: 0)

[job] = all_enqueued(worker: MediaDownloadWorker, args: %{"id" => media_item.id})
assert job.max_attempts == 5

assert job.priority == 0
end
end

Expand Down
48 changes: 30 additions & 18 deletions test/pinchflat/fast_indexing/fast_indexing_helpers_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,44 +38,56 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
end
end

describe "kickoff_download_tasks_from_youtube_rss_feed/1" do
test "enqueues a new worker for each new media_id in the source's RSS feed", %{source: source} do
describe "index_and_kickoff_downloads/1" do
test "enqueues a worker for each new media_id in the source's RSS feed", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)

assert [media_item] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [media_item] = FastIndexingHelpers.index_and_kickoff_downloads(source)

assert [worker] = all_enqueued(worker: MediaDownloadWorker)
assert worker.args["id"] == media_item.id
assert worker.priority == 0
end

test "does not enqueue a new worker for the source's media IDs we already know about", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
media_item_fixture(source_id: source.id, media_id: "test_1")

assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source)

refute_enqueued(worker: MediaDownloadWorker)
end

test "kicks off a download task for all pending media but at a lower priority", %{source: source} do
pending_item = media_item_fixture(source_id: source.id, media_filepath: nil)
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)

assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)

assert [worker_1, _worker_2] = all_enqueued(worker: MediaDownloadWorker)
assert worker_1.args["id"] == pending_item.id
assert worker_1.priority == 1
end

test "returns the found media items", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)

assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end

test "does not enqueue a download job if the source does not allow it" do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)
source = source_fixture(%{download_media: false})

assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)

refute_enqueued(worker: MediaDownloadWorker)
end

test "creates a download task record", %{source: source} do
expect(HTTPClientMock, :get, fn _url -> {:ok, "<yt:videoId>test_1</yt:videoId>"} end)

assert [media_item] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [media_item] = FastIndexingHelpers.index_and_kickoff_downloads(source)

assert [_] = Tasks.list_tasks_for(media_item, "MediaDownloadWorker")
end
Expand All @@ -89,7 +101,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
{:ok, media_attributes_return_fixture()}
end)

FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
FastIndexingHelpers.index_and_kickoff_downloads(source)
end

test "sets use_cookies if the source uses cookies" do
Expand All @@ -103,7 +115,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do

source = source_fixture(%{use_cookies: true})

assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end

test "does not set use_cookies if the source does not use cookies" do
Expand All @@ -117,7 +129,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do

source = source_fixture(%{use_cookies: false})

assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end

test "does not enqueue a download job if the media item does not match the format rules" do
Expand All @@ -142,7 +154,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
{:ok, output}
end)

assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)

refute_enqueued(worker: MediaDownloadWorker)
end
Expand All @@ -154,7 +166,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
{:ok, "{}"}
end)

assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end

test "does not blow up if a media item causes a yt-dlp error", %{source: source} do
Expand All @@ -164,11 +176,11 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do
{:error, "message", 1}
end)

assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
end

describe "kickoff_download_tasks_from_youtube_rss_feed/1 when testing backends" do
describe "index_and_kickoff_downloads/1 when testing backends" do
test "uses the YouTube API if it is enabled", %{source: source} do
expect(HTTPClientMock, :get, fn url, _headers ->
assert url =~ "https://youtube.googleapis.com/youtube/v3/playlistItems"
Expand All @@ -178,7 +190,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do

Settings.set(youtube_api_key: "test_key")

assert [] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end

test "the YouTube API creates records as expected", %{source: source} do
Expand All @@ -188,7 +200,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do

Settings.set(youtube_api_key: "test_key")

assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end

test "RSS is used as a backup if the API fails", %{source: source} do
Expand All @@ -197,7 +209,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do

Settings.set(youtube_api_key: "test_key")

assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end

test "RSS is used if the API is not enabled", %{source: source} do
Expand All @@ -209,7 +221,7 @@ defmodule Pinchflat.FastIndexing.FastIndexingHelpersTest do

Settings.set(youtube_api_key: nil)

assert [%MediaItem{}] = FastIndexingHelpers.kickoff_download_tasks_from_youtube_rss_feed(source)
assert [%MediaItem{}] = FastIndexingHelpers.index_and_kickoff_downloads(source)
end
end
end
Loading

0 comments on commit 62214b8

Please sign in to comment.