From ec71ddcb32feeaea57383c3fece43fe33e728c5b Mon Sep 17 00:00:00 2001 From: Kieran Eglin Date: Fri, 26 Jan 2024 10:42:04 -0800 Subject: [PATCH] Tied together tasks with jobs and channels --- lib/pinchflat/media/media_item.ex | 3 +- lib/pinchflat/media_source.ex | 10 +- lib/pinchflat/tasks.ex | 84 ++++++++++++- lib/pinchflat/tasks/channel_tasks.ex | 11 +- .../workers/media_indexing_worker.ex | 8 +- .../20240125212753_create_tasks.exs | 3 +- test/pinchflat/media_source_test.exs | 9 ++ test/pinchflat/tasks/channel_tasks_test.exs | 19 +++ test/pinchflat/tasks_test.exs | 116 ++++++++++++++++++ .../workers/media_indexing_worker_test.exs | 12 ++ 10 files changed, 259 insertions(+), 16 deletions(-) diff --git a/lib/pinchflat/media/media_item.ex b/lib/pinchflat/media/media_item.ex index aa729998..5856a4b8 100644 --- a/lib/pinchflat/media/media_item.ex +++ b/lib/pinchflat/media/media_item.ex @@ -11,8 +11,7 @@ defmodule Pinchflat.Media.MediaItem do @required_fields ~w(media_id channel_id)a @allowed_fields ~w(title media_id video_filepath channel_id)a - # TODO: consider making an attached `metadata` model to store the JSON response from whatever backend is used - # TODO: make a tasks model to track the jobs spawned + # IDEA: consider making an attached `metadata` model to store the JSON response from whatever backend is used schema "media_items" do field :title, :string diff --git a/lib/pinchflat/media_source.ex b/lib/pinchflat/media_source.ex index 6a73d080..44847e7b 100644 --- a/lib/pinchflat/media_source.ex +++ b/lib/pinchflat/media_source.ex @@ -6,6 +6,7 @@ defmodule Pinchflat.MediaSource do import Ecto.Query, warn: false alias Pinchflat.Repo + alias Pinchflat.Tasks alias Pinchflat.Media alias Pinchflat.Tasks.ChannelTasks alias Pinchflat.MediaSource.Channel @@ -63,8 +64,8 @@ defmodule Pinchflat.MediaSource do original_url (if changed). May attempt to start indexing the channel's media if the indexing frequency has been changed. - TODO: ensure that indexing is cancelled/rescheduled if the indexing frequency - has been changed. + Existing indexing tasks will be cancelled if the indexing frequency has been + changed (logic in `ChannelTasks.kickoff_indexing_task`) Returns {:ok, %Channel{}} | {:error, %Ecto.Changeset{}} """ @@ -75,9 +76,12 @@ defmodule Pinchflat.MediaSource do end @doc """ - Deletes a channel. Returns {:ok, %Channel{}} | {:error, %Ecto.Changeset{}} + Deletes a channel and it's associated tasks (of any state). + + Returns {:ok, %Channel{}} | {:error, %Ecto.Changeset{}} """ def delete_channel(%Channel{} = channel) do + Tasks.delete_tasks_for(channel) Repo.delete(channel) end diff --git a/lib/pinchflat/tasks.ex b/lib/pinchflat/tasks.ex index 2b1b81d5..f19461cf 100644 --- a/lib/pinchflat/tasks.ex +++ b/lib/pinchflat/tasks.ex @@ -7,6 +7,7 @@ defmodule Pinchflat.Tasks do alias Pinchflat.Repo alias Pinchflat.Tasks.Task + alias Pinchflat.MediaSource.Channel @doc """ Returns the list of tasks. Returns [%Task{}, ...] @@ -15,6 +16,36 @@ defmodule Pinchflat.Tasks do Repo.all(Task) end + @doc """ + Returns the list of tasks for a given record type and ID. Optionally allows you to specify + which job states to include. + + Returns [%Task{}, ...] + """ + def list_tasks_for(attached_record_type, attached_record_id, job_states \\ Oban.Job.states()) do + stringified_states = Enum.map(job_states, &to_string/1) + + Repo.all( + from t in Task, + join: j in assoc(t, :job), + where: field(t, ^attached_record_type) == ^attached_record_id, + where: j.state in ^stringified_states + ) + end + + @doc """ + Returns the list of pending tasks for a given record type and ID. + + Returns [%Task{}, ...] + """ + def list_pending_tasks_for(attached_record_type, attached_record_id) do + list_tasks_for( + attached_record_type, + attached_record_id, + [:available, :scheduled, :retryable] + ) + end + @doc """ Gets a single task. @@ -31,13 +62,64 @@ defmodule Pinchflat.Tasks do |> Repo.insert() end + # This one's function signature is designed to help simplify + # usage of `create_job_with_task/2` + def create_task(%Oban.Job{} = job, %Channel{} = channel) do + %Task{} + |> Task.changeset(%{job_id: job.id, channel_id: channel.id}) + |> Repo.insert() + end + + @doc """ + Creates a job from given attrs, creating a task with an attached record + if successful. + + Returns {:ok, %Task{}} | {:error, %Ecto.Changeset{}}. + """ + def create_job_with_task(job_attrs, task_attached_record) do + case Oban.insert(job_attrs) do + {:ok, job} -> create_task(job, task_attached_record) + err -> err + end + end + @doc """ - Deletes a task. Returns {:ok, %Task{}} | {:error, %Ecto.Changeset{}}. + Deletes a task. Also cancels any attached job. + + Returns {:ok, %Task{}} | {:error, %Ecto.Changeset{}}. """ def delete_task(%Task{} = task) do + :ok = Oban.cancel_job(task.job_id) + Repo.delete(task) end + @doc """ + Deletes all tasks attached to a given record, cancelling any attached jobs. + + Returns :ok + """ + def delete_tasks_for(%Channel{} = channel) do + tasks = list_tasks_for(:channel_id, channel.id) + + Enum.each(tasks, fn task -> + delete_task(task) + end) + end + + @doc """ + Deletes all _pending_ tasks attached to a given record, cancelling any attached jobs. + + Returns :ok + """ + def delete_pending_tasks_for(%Channel{} = channel) do + tasks = list_pending_tasks_for(:channel_id, channel.id) + + Enum.each(tasks, fn task -> + delete_task(task) + end) + end + @doc """ Returns an `%Ecto.Changeset{}` for tracking task changes. """ diff --git a/lib/pinchflat/tasks/channel_tasks.ex b/lib/pinchflat/tasks/channel_tasks.ex index a23f608c..095c90f5 100644 --- a/lib/pinchflat/tasks/channel_tasks.ex +++ b/lib/pinchflat/tasks/channel_tasks.ex @@ -3,17 +3,16 @@ defmodule Pinchflat.Tasks.ChannelTasks do This module contains methods for managing tasks (workers) related to channels. """ + alias Pinchflat.Tasks alias Pinchflat.MediaSource.Channel alias Pinchflat.Workers.MediaIndexingWorker @doc """ - Starts tasks for indexing a channel's media. - - TODO: modify so that updates cancel/reschedule existing tasks as-needed - TODO: modify so that deletion cancels existing tasks (or maybe can do from Postgres?) - TODO: modify so that starting a worker adds a Task record (not implemented yet) + Starts tasks for indexing a channel's media. Returns {:ok, :should_not_index} | {:ok, %Task{}}. """ def kickoff_indexing_task(%Channel{} = channel) do + Tasks.delete_pending_tasks_for(channel) + if channel.index_frequency_minutes <= 0 do {:ok, :should_not_index} else @@ -21,7 +20,7 @@ defmodule Pinchflat.Tasks.ChannelTasks do |> Map.take([:id]) # Schedule this one immediately, but future ones will be on an interval |> MediaIndexingWorker.new() - |> Oban.insert() + |> Tasks.create_job_with_task(channel) end end end diff --git a/lib/pinchflat/workers/media_indexing_worker.ex b/lib/pinchflat/workers/media_indexing_worker.ex index 966f2ccb..69c14d32 100644 --- a/lib/pinchflat/workers/media_indexing_worker.ex +++ b/lib/pinchflat/workers/media_indexing_worker.ex @@ -3,9 +3,11 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do use Oban.Worker, queue: :media_indexing, - unique: [period: :infinity, states: [:available, :scheduled]] + unique: [period: :infinity, states: [:available, :scheduled, :retryable]], + tags: ["media_source", "media_indexing"] alias __MODULE__ + alias Pinchflat.Tasks alias Pinchflat.MediaSource @impl Oban.Worker @@ -23,7 +25,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do actually run every 1 hour and 30 minutes. The tradeoff of not inundating the API with requests and also not overlapping jobs is worth it, IMO. - Returns :ok | {:ok, %Oban.Job{}}. Not that it matters. + Returns :ok | {:ok, %Task{}}. Not that it matters. """ def perform(%Oban.Job{args: %{"id" => channel_id}}) do channel = MediaSource.get_channel!(channel_id) @@ -41,6 +43,6 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do channel |> Map.take([:id]) |> MediaIndexingWorker.new(schedule_in: channel.index_frequency_minutes * 60) - |> Oban.insert() + |> Tasks.create_job_with_task(channel) end end diff --git a/priv/repo/migrations/20240125212753_create_tasks.exs b/priv/repo/migrations/20240125212753_create_tasks.exs index 79b63abd..7a7b5e34 100644 --- a/priv/repo/migrations/20240125212753_create_tasks.exs +++ b/priv/repo/migrations/20240125212753_create_tasks.exs @@ -4,7 +4,8 @@ defmodule Pinchflat.Repo.Migrations.CreateTasks do def change do create table(:tasks) do add :job_id, references(:oban_jobs, on_delete: :delete_all), null: false - add :channel_id, references(:channels, on_delete: :delete_all), null: true + # `restrict` because we need to be sure to delete pending tasks when a channel is deleted + add :channel_id, references(:channels, on_delete: :restrict), null: true timestamps(type: :utc_datetime) end diff --git a/test/pinchflat/media_source_test.exs b/test/pinchflat/media_source_test.exs index dc6c12f0..3956186f 100644 --- a/test/pinchflat/media_source_test.exs +++ b/test/pinchflat/media_source_test.exs @@ -1,6 +1,7 @@ defmodule Pinchflat.MediaSourceTest do use Pinchflat.DataCase import Mox + import Pinchflat.TasksFixtures import Pinchflat.ProfilesFixtures import Pinchflat.MediaSourceFixtures @@ -218,6 +219,14 @@ defmodule Pinchflat.MediaSourceTest do channel = channel_fixture() assert %Ecto.Changeset{} = MediaSource.change_channel(channel) end + + test "deletion also deletes all associated tasks" do + channel = channel_fixture() + task = task_fixture(channel_id: channel.id) + + assert {:ok, %Channel{}} = MediaSource.delete_channel(channel) + assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end + end end describe "change_channel/2" do diff --git a/test/pinchflat/tasks/channel_tasks_test.exs b/test/pinchflat/tasks/channel_tasks_test.exs index 3df2f61b..23145232 100644 --- a/test/pinchflat/tasks/channel_tasks_test.exs +++ b/test/pinchflat/tasks/channel_tasks_test.exs @@ -1,8 +1,10 @@ defmodule Pinchflat.Tasks.ChannelTasksTest do use Pinchflat.DataCase + import Pinchflat.TasksFixtures import Pinchflat.MediaSourceFixtures + alias Pinchflat.Tasks.Task alias Pinchflat.Tasks.ChannelTasks alias Pinchflat.Workers.MediaIndexingWorker @@ -22,5 +24,22 @@ defmodule Pinchflat.Tasks.ChannelTasksTest do assert_enqueued(worker: MediaIndexingWorker, args: %{"id" => channel.id}) end + + test "it creates and attaches a task if the interval is > 0" do + channel = channel_fixture(index_frequency_minutes: 1) + + assert {:ok, %Task{} = task} = ChannelTasks.kickoff_indexing_task(channel) + + assert task.channel_id == channel.id + end + + test "it deletes any pending tasks for the channel" do + channel = channel_fixture() + task = task_fixture(channel_id: channel.id) + + assert {:ok, _} = ChannelTasks.kickoff_indexing_task(channel) + + assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end + end end end diff --git a/test/pinchflat/tasks_test.exs b/test/pinchflat/tasks_test.exs index 76355283..9228c7d9 100644 --- a/test/pinchflat/tasks_test.exs +++ b/test/pinchflat/tasks_test.exs @@ -2,12 +2,32 @@ defmodule Pinchflat.TasksTest do use Pinchflat.DataCase import Pinchflat.JobFixtures import Pinchflat.TasksFixtures + import Pinchflat.MediaSourceFixtures alias Pinchflat.Tasks alias Pinchflat.Tasks.Task + alias Pinchflat.JobFixtures.TestJobWorker @invalid_attrs %{job_id: nil} + describe "schema" do + test "it deletes a task when the job gets deleted" do + task = Repo.preload(task_fixture(), [:job]) + + {:ok, _} = Repo.delete(task.job) + + assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end + end + + test "it does not delete the other record when a job gets deleted" do + task = Repo.preload(task_fixture(), [:channel, :job]) + + {:ok, _} = Repo.delete(task.job) + + assert Repo.reload!(task.channel) + end + end + describe "list_tasks/0" do test "it returns all tasks" do task = task_fixture() @@ -15,6 +35,36 @@ defmodule Pinchflat.TasksTest do end end + describe "list_tasks_for/3" do + test "it lets you specify which record type/ID to join on" do + task = task_fixture() + + assert Tasks.list_tasks_for(:channel_id, task.channel_id) == [task] + end + + test "it lets you specify which job states to include" do + task = task_fixture() + + assert Tasks.list_tasks_for(:channel_id, task.channel_id, [:available]) == [task] + assert Tasks.list_tasks_for(:channel_id, task.channel_id, [:cancelled]) == [] + end + end + + describe "list_pending_tasks_for/2" do + test "it lists pending tasks" do + task = task_fixture() + + assert Tasks.list_pending_tasks_for(:channel_id, task.channel_id) == [task] + end + + test "it does not list non-pending tasks" do + task = Repo.preload(task_fixture(), :job) + :ok = Oban.cancel_job(task.job) + + assert Tasks.list_pending_tasks_for(:channel_id, task.channel_id) == [] + end + end + describe "get_task!/1" do test "it returns the task with given id" do task = task_fixture() @@ -32,6 +82,34 @@ defmodule Pinchflat.TasksTest do test "creation with invalid data returns error changeset" do assert {:error, %Ecto.Changeset{}} = Tasks.create_task(@invalid_attrs) end + + test "accepts a job and channel" do + job = job_fixture() + channel = channel_fixture() + + assert {:ok, %Task{} = task} = Tasks.create_task(job, channel) + + assert task.job_id == job.id + assert task.channel_id == channel.id + end + end + + describe "create_job_with_task/2" do + test "it enqueues the given job" do + channel = channel_fixture() + + refute_enqueued(worker: TestJobWorker) + assert {:ok, %Task{}} = Tasks.create_job_with_task(TestJobWorker.new(%{}), channel) + assert_enqueued(worker: TestJobWorker) + end + + test "it creates a task record if successful" do + channel = channel_fixture() + + assert {:ok, %Task{} = task} = Tasks.create_job_with_task(TestJobWorker.new(%{}), channel) + + assert task.channel_id == channel.id + end end describe "delete_task/1" do @@ -40,6 +118,44 @@ defmodule Pinchflat.TasksTest do assert {:ok, %Task{}} = Tasks.delete_task(task) assert_raise Ecto.NoResultsError, fn -> Tasks.get_task!(task.id) end end + + test "deletion also cancels the attached job" do + task = Repo.preload(task_fixture(), :job) + + assert {:ok, %Task{}} = Tasks.delete_task(task) + job = Repo.reload!(task.job) + + assert job.state == "cancelled" + end + end + + describe "delete_tasks_for/1" do + test "it deletes tasks attached to a channel" do + channel = channel_fixture() + task = task_fixture(channel_id: channel.id) + + assert :ok = Tasks.delete_tasks_for(channel) + assert_raise Ecto.NoResultsError, fn -> Tasks.get_task!(task.id) end + end + end + + describe "delete_pending_tasks_for/1" do + test "it deletes pending tasks attached to a channel" do + channel = channel_fixture() + task = task_fixture(channel_id: channel.id) + + assert :ok = Tasks.delete_pending_tasks_for(channel) + assert_raise Ecto.NoResultsError, fn -> Tasks.get_task!(task.id) end + end + + test "it does not delete non-pending tasks" do + channel = channel_fixture() + task = Repo.preload(task_fixture(channel_id: channel.id), :job) + :ok = Oban.cancel_job(task.job) + + assert :ok = Tasks.delete_pending_tasks_for(channel) + assert Tasks.get_task!(task.id) + end end describe "change_task/1" do diff --git a/test/pinchflat/workers/media_indexing_worker_test.exs b/test/pinchflat/workers/media_indexing_worker_test.exs index 35ae5243..625d2777 100644 --- a/test/pinchflat/workers/media_indexing_worker_test.exs +++ b/test/pinchflat/workers/media_indexing_worker_test.exs @@ -4,6 +4,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do import Mox import Pinchflat.MediaSourceFixtures + alias Pinchflat.Tasks alias Pinchflat.Workers.MediaIndexingWorker setup :verify_on_exit! @@ -47,6 +48,17 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do ) end + test "it creates a task for the rescheduled job" do + expect(YtDlpRunnerMock, :run, 1, fn _url, _opts -> {:ok, ""} end) + + channel = channel_fixture(index_frequency_minutes: 10) + task_count_fetcher = fn -> Enum.count(Tasks.list_tasks()) end + + assert_changed([from: 0, to: 1], task_count_fetcher, fn -> + perform_job(MediaIndexingWorker, %{id: channel.id}) + end) + end + test "it creates the basic media_item records" do expect(YtDlpRunnerMock, :run, 1, fn _url, _opts -> {:ok, "video1\nvideo2"} end)