Skip to content

Commit

Permalink
Kills running slow-indexes when a new slow-index is enqueued
Browse files Browse the repository at this point in the history
  • Loading branch information
kieraneglin committed Sep 10, 2024
1 parent 82c6e52 commit d8d9cc8
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 64 deletions.
2 changes: 1 addition & 1 deletion lib/pinchflat/slow_indexing/slow_indexing_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpers do
"""
def kickoff_indexing_task(%Source{} = source, job_args \\ %{}, job_opts \\ []) do
Tasks.delete_pending_tasks_for(source, "FastIndexingWorker")
Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker")
Tasks.delete_pending_tasks_for(source, "MediaCollectionIndexingWorker", include_executing: true)

MediaCollectionIndexingWorker.kickoff_with_task(source, job_args, job_opts)
end
Expand Down
30 changes: 9 additions & 21 deletions lib/pinchflat/tasks/tasks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,6 @@ defmodule Pinchflat.Tasks do
)
end

@doc """
Returns the list of pending tasks for a given record type and ID. Optionally allows you to specify
which worker to include.
Returns [%Task{}, ...]
"""
def list_pending_tasks_for(record, worker_name \\ nil) do
list_tasks_for(
record,
worker_name,
[:available, :scheduled, :retryable]
)
end

@doc """
Gets a single task.
Expand Down Expand Up @@ -127,13 +113,13 @@ defmodule Pinchflat.Tasks do

@doc """
Deletes all tasks attached to a given record, cancelling any attached jobs.
Optionally allows you to specify which worker to include.
Optionally allows you to specify which worker and job states to include.
Returns :ok
"""
def delete_tasks_for(record, worker_name \\ nil) do
def delete_tasks_for(record, worker_name \\ nil, job_states \\ Oban.Job.states()) do
record
|> list_tasks_for(worker_name)
|> list_tasks_for(worker_name, job_states)
|> Enum.each(&delete_task/1)
end

Expand All @@ -143,10 +129,12 @@ defmodule Pinchflat.Tasks do
Returns :ok
"""
def delete_pending_tasks_for(record, worker_name \\ nil) do
record
|> list_pending_tasks_for(worker_name)
|> Enum.each(&delete_task/1)
def delete_pending_tasks_for(record, worker_name \\ nil, opts \\ []) do
include_executing = Keyword.get(opts, :include_executing, false)
base_job_states = [:available, :scheduled, :retryable]
job_states = if include_executing, do: base_job_states ++ [:executing], else: base_job_states

delete_tasks_for(record, worker_name, job_states)
end

@doc """
Expand Down
Binary file modified priv/repo/erd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 11 additions & 0 deletions test/pinchflat/slow_indexing/slow_indexing_helpers_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ defmodule Pinchflat.SlowIndexing.SlowIndexingHelpersTest do
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end

test "deletes any executing media collection tasks for the source" do
source = source_fixture()
{:ok, job} = Oban.insert(MediaCollectionIndexingWorker.new(%{"id" => source.id}))
task = task_fixture(source_id: source.id, job_id: job.id)
Repo.update_all(from(Oban.Job, where: [id: ^task.job_id], update: [set: [state: "executing"]]), [])

assert {:ok, _} = SlowIndexingHelpers.kickoff_indexing_task(source)

assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end

test "deletes any pending media tasks for the source" do
source = source_fixture()
{:ok, job} = Oban.insert(FastIndexingWorker.new(%{"id" => source.id}))
Expand Down
84 changes: 42 additions & 42 deletions test/pinchflat/tasks_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ defmodule Pinchflat.TasksTest do
@invalid_attrs %{job_id: nil}

describe "schema" do
test "it deletes a task when the job gets deleted" do
test "deletes a task when the job gets deleted" do
task = Repo.preload(task_fixture(), [:job])

{:ok, _} = Repo.delete(task.job)

assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end

test "it does not delete the other record when a job gets deleted" do
test "does not delete the other record when a job gets deleted" do
task = Repo.preload(task_fixture(), [:source, :job])

{:ok, _} = Repo.delete(task.job)
Expand All @@ -30,70 +30,46 @@ defmodule Pinchflat.TasksTest do
end

describe "list_tasks/0" do
test "it returns all tasks" do
test "returns all tasks" do
task = task_fixture()
assert Tasks.list_tasks() == [task]
end
end

describe "list_tasks_for/3" do
test "it lets you specify which record type/ID to join on" do
test "lets you specify which record type/ID to join on" do
source = source_fixture()
task = task_fixture(source_id: source.id)

assert Tasks.list_tasks_for(source, nil, [:available]) == [task]
end

test "it lets you specify which job states to include" do
test "lets you specify which job states to include" do
source = source_fixture()
task = task_fixture(source_id: source.id)

assert Tasks.list_tasks_for(source, nil, [:available]) == [task]
assert Tasks.list_tasks_for(source, nil, [:cancelled]) == []
end

test "it lets you specify which worker to include" do
test "lets you specify which worker to include" do
source = source_fixture()
task = task_fixture(source_id: source.id)

assert Tasks.list_tasks_for(source, "TestJobWorker") == [task]
assert Tasks.list_tasks_for(source, "FooBarWorker") == []
end

test "it includes all workers if no worker is specified" do
test "includes all workers if no worker is specified" do
source = source_fixture()
task = task_fixture(source_id: source.id)

assert Tasks.list_tasks_for(source, nil) == [task]
end
end

describe "list_pending_tasks_for/3" do
test "it lists pending tasks" do
source = source_fixture()
task = task_fixture(source_id: source.id)

assert Tasks.list_pending_tasks_for(source) == [task]
end

test "it does not list non-pending tasks" do
task = Repo.preload(task_fixture(), [:job, :source])
:ok = Oban.cancel_job(task.job)

assert Tasks.list_pending_tasks_for(task.source) == []
end

test "it lets you specify which worker to include" do
source = source_fixture()
task = task_fixture(source_id: source.id)

assert Tasks.list_pending_tasks_for(source, "TestJobWorker") == [task]
assert Tasks.list_pending_tasks_for(source, "FooBarWorker") == []
end
end

describe "get_task!/1" do
test "it returns the task with given id" do
test "returns the task with given id" do
task = task_fixture()
assert Tasks.get_task!(task.id) == task
end
Expand Down Expand Up @@ -132,31 +108,31 @@ defmodule Pinchflat.TasksTest do
end

describe "create_job_with_task/2" do
test "it enqueues the given job" do
test "enqueues the given job" do
media_item = media_item_fixture()

refute_enqueued(worker: TestJobWorker)
assert {:ok, %Task{}} = Tasks.create_job_with_task(TestJobWorker.new(%{}), media_item)
assert_enqueued(worker: TestJobWorker)
end

test "it creates a task record if successful" do
test "creates a task record if successful" do
source = source_fixture()

assert {:ok, %Task{} = task} = Tasks.create_job_with_task(TestJobWorker.new(%{}), source)

assert task.source_id == source.id
end

test "it returns an error if the job already exists" do
test "returns an error if the job already exists" do
source = source_fixture()
job = TestJobWorker.new(%{foo: "bar"}, unique: [period: :infinity])

assert {:ok, %Task{}} = Tasks.create_job_with_task(job, source)
assert {:error, :duplicate_job} = Tasks.create_job_with_task(job, source)
end

test "it returns an error if the job fails to enqueue" do
test "returns an error if the job fails to enqueue" do
source = source_fixture()

assert {:error, %Ecto.Changeset{}} = Tasks.create_job_with_task(%Ecto.Changeset{}, source)
Expand All @@ -181,15 +157,15 @@ defmodule Pinchflat.TasksTest do
end

describe "delete_tasks_for/2" do
test "it deletes tasks attached to a source" do
test "deletes tasks attached to a source" do
source = source_fixture()
task = task_fixture(source_id: source.id)

assert :ok = Tasks.delete_tasks_for(source)
assert_raise Ecto.NoResultsError, fn -> Tasks.get_task!(task.id) end
end

test "it deletes the tasks attached to a media_item" do
test "deletes the tasks attached to a media_item" do
media_item = media_item_fixture()
task = task_fixture(media_item_id: media_item.id)

Expand All @@ -208,6 +184,17 @@ defmodule Pinchflat.TasksTest do
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end

test "deletion can specify which states to include" do
source = source_fixture()
task = task_fixture(source_id: source.id)

assert :ok = Tasks.delete_tasks_for(source, nil, [:executing])
assert Repo.reload!(task)

assert :ok = Tasks.delete_tasks_for(source, nil, [:available])
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end

test "deletion does not impact unintended records" do
source = source_fixture()
task = task_fixture(source_id: source.id)
Expand All @@ -221,15 +208,15 @@ defmodule Pinchflat.TasksTest do
end

describe "delete_pending_tasks_for/1" do
test "it deletes pending tasks attached to a source" do
test "deletes pending tasks attached to a source" do
source = source_fixture()
task = task_fixture(source_id: source.id)

assert :ok = Tasks.delete_pending_tasks_for(source)
assert_raise Ecto.NoResultsError, fn -> Tasks.get_task!(task.id) end
end

test "it does not delete non-pending tasks" do
test "does not delete non-pending tasks" do
source = source_fixture()
task = Repo.preload(task_fixture(source_id: source.id), :job)
:ok = Oban.cancel_job(task.job)
Expand All @@ -238,7 +225,7 @@ defmodule Pinchflat.TasksTest do
assert Tasks.get_task!(task.id)
end

test "it works on media_items" do
test "works on media_items" do
media_item = media_item_fixture()
pending_task = task_fixture(media_item_id: media_item.id)
cancelled_task = Repo.preload(task_fixture(media_item_id: media_item.id), :job)
Expand All @@ -259,10 +246,23 @@ defmodule Pinchflat.TasksTest do
assert :ok = Tasks.delete_pending_tasks_for(media_item, "TestJobWorker")
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end

test "deletion can optionall include executing tasks" do
source = source_fixture()
task = task_fixture(source_id: source.id)

from(Oban.Job, where: [id: ^task.job_id], update: [set: [state: "executing"]])
|> Repo.update_all([])

assert :ok = Tasks.delete_pending_tasks_for(source, nil, include_executing: false)
assert Repo.reload!(task)
assert :ok = Tasks.delete_pending_tasks_for(source, nil, include_executing: true)
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end
end

describe "change_task/1" do
test "it returns a task changeset" do
test "returns a task changeset" do
task = task_fixture()
assert %Ecto.Changeset{} = Tasks.change_task(task)
end
Expand Down

0 comments on commit d8d9cc8

Please sign in to comment.