Skip to content

Commit

Permalink
feat: add unsubscribe function
Browse files Browse the repository at this point in the history
  • Loading branch information
nelsonmestevao committed Oct 15, 2024
1 parent a1ea7ec commit 89739c1
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 0 deletions.
25 changes: 25 additions & 0 deletions lib/ecto_watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,31 @@ defmodule EctoWatch do
end
end

@doc """
Unsubscribe to notifications from watchers that you previously subscribe. It
receives the same params for `subscribe/2`.
Examples:
iex> EctoWatch.unsubscribe({Comment, :updated})
iex> EctoWatch.unsubscribe({Comment, :updated}, {:post_id, post_id})
"""
@spec unsubscribe(watcher_identifier(), term()) :: :ok | {:error, term()}
def unsubscribe(watcher_identifier, id \\ nil) do
validate_ecto_watch_running!()

with :ok <- validate_identifier(watcher_identifier),
{:ok, {pub_sub_mod, channel_name, debug?}} <-
WatcherServer.pub_sub_subscription_details(watcher_identifier, id) do
if(debug?, do: debug_log(watcher_identifier, "Unsubscribing to watcher"))

Phoenix.PubSub.unsubscribe(pub_sub_mod, channel_name)
else
{:error, error} ->
raise ArgumentError, error
end
end

@doc """
Returns details about a watcher for reflection purposes
Expand Down
1 change: 1 addition & 0 deletions lib/ecto_watch/watcher_trigger_validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ defmodule EctoWatch.WatcherTriggerValidator do
if MapSet.size(extra_found_triggers) > 0 do
log_extra_triggers(extra_found_triggers)
end

if MapSet.size(extra_found_functions) > 0 do
log_extra_functions(extra_found_functions)
end
Expand Down
230 changes: 230 additions & 0 deletions test/ecto_watch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,49 @@ defmodule EctoWatchTest do
assert_receive {:things_inserted, %{id: 3}}
assert_receive {{Other, :inserted}, %{weird_id: 1234}}
assert_receive {:other_inserted, %{weird_id: 1234}}

EctoWatch.unsubscribe({Thing, :inserted})

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
[]
)

refute_receive {{Thing, :inserted}, _}
assert_receive {:things_inserted, %{id: 4}}

EctoWatch.unsubscribe(:things_inserted)

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
[]
)

refute_receive {{Thing, :inserted}, _}
refute_receive {:things_inserted, _}

EctoWatch.unsubscribe({Other, :inserted})

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (2345, 'the value', NOW(), NOW())",
[]
)

refute_receive {{Other, :inserted}, %{weird_id: 2345}}
assert_receive {:other_inserted, %{weird_id: 2345}}

EctoWatch.unsubscribe(:other_inserted)

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (3456, 'the value', NOW(), NOW())",
[]
)

refute_receive {:other_inserted, _}
end

test "empty extra_columns list" do
Expand Down Expand Up @@ -769,6 +812,28 @@ defmodule EctoWatchTest do
assert_receive {:things_inserted, %{id: 3}}
assert_receive {{Other, :inserted}, %{weird_id: 1234}}
assert_receive {:other_inserted, %{weird_id: 1234}}

EctoWatch.unsubscribe({Thing, :inserted})
EctoWatch.unsubscribe({Other, :inserted})
EctoWatch.unsubscribe(:things_inserted)
EctoWatch.unsubscribe(:other_inserted)

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
[]
)

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (2345, 'the value', NOW(), NOW())",
[]
)

refute_receive {{Thing, :inserted}, _}
refute_receive {:things_inserted, _}
refute_receive {{Other, :inserted}, _}
refute_receive {:other_inserted, _}
end

test "inserts for an association column", %{already_existing_id2: already_existing_id2} do
Expand Down Expand Up @@ -803,6 +868,23 @@ defmodule EctoWatchTest do

assert_receive {:things_parent_id_inserted,
%{id: 3, parent_thing_id: ^already_existing_id2}}

EctoWatch.unsubscribe({Thing, :inserted}, {:parent_thing_id, already_existing_id2})

EctoWatch.unsubscribe(
:things_parent_id_inserted,
{:parent_thing_id, already_existing_id2}
)

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO things (the_string, the_integer, the_float, parent_thing_id, extra_field, inserted_at, updated_at) VALUES ('the other value', 8900, 24.53, #{already_existing_id2}, 'hey', NOW(), NOW())",
[]
)

refute_receive {{Thing, :inserted}, _}

refute_receive {:things_parent_id_inserted, _}
end

test "column is not in list of extra_columns", %{already_existing_id2: already_existing_id2} do
Expand Down Expand Up @@ -920,6 +1002,31 @@ defmodule EctoWatchTest do

assert_receive {{Thing, :updated}, %{id: ^already_existing_id2}}
assert_receive {:things_updated, %{id: ^already_existing_id2}}

EctoWatch.unsubscribe({Thing, :updated})

Ecto.Adapters.SQL.query!(
TestRepo,
"UPDATE things SET the_string = 'the second new value'",
[]
)

refute_receive {{Thing, :updated}, %{id: ^already_existing_id1}}
assert_receive {:things_updated, %{id: ^already_existing_id1}}

refute_receive {{Thing, :updated}, %{id: ^already_existing_id2}}
assert_receive {:things_updated, %{id: ^already_existing_id2}}

EctoWatch.unsubscribe(:things_updated)

Ecto.Adapters.SQL.query!(
TestRepo,
"UPDATE things SET the_string = 'the third new value'",
[]
)

refute_receive {:things_updated, %{id: ^already_existing_id1}}
refute_receive {:things_updated, %{id: ^already_existing_id2}}
end

test "updates for the primary key", %{
Expand All @@ -942,6 +1049,12 @@ defmodule EctoWatchTest do
assert_receive {{Thing, :updated}, %{id: ^already_existing_id1}}

refute_receive {{Thing, :updated}, %{id: ^already_existing_id2}}

EctoWatch.unsubscribe({Thing, :updated}, already_existing_id1)

Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'another new value'", [])

refute_receive {{Thing, :updated}, %{id: ^already_existing_id1}}
end

test "updates for an association column", %{
Expand Down Expand Up @@ -979,6 +1092,28 @@ defmodule EctoWatchTest do

assert_receive {:things_parent_id_updated,
%{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}}

EctoWatch.unsubscribe({Thing, :updated}, {:parent_thing_id, already_existing_id1})

Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'another new value'", [])

refute_receive {{Thing, :updated}, %{id: ^already_existing_id2}}

assert_receive {:things_parent_id_updated,
%{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}}

EctoWatch.unsubscribe(
:things_parent_id_updated,
{:parent_thing_id, already_existing_id1}
)

Ecto.Adapters.SQL.query!(
TestRepo,
"UPDATE things SET the_string = 'yet another new value'",
[]
)

refute_receive {:things_parent_id_updated, %{id: ^already_existing_id2}}
end

test "column is not in list of extra_columns", %{already_existing_id2: already_existing_id2} do
Expand Down Expand Up @@ -1048,6 +1183,15 @@ defmodule EctoWatchTest do

assert_receive {:thing_custom_event, %{id: ^already_existing_id1}}
refute_receive {_, %{id: ^already_existing_id2}}

EctoWatch.unsubscribe(:thing_custom_event, already_existing_id1)

Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'the new value'", [])
Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_integer = 9998", [])
Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_float = 99.899", [])

refute_receive {_, %{id: ^already_existing_id1}}
refute_receive {_, %{id: ^already_existing_id2}}
end

test "extra_columns option", %{
Expand Down Expand Up @@ -1093,6 +1237,26 @@ defmodule EctoWatchTest do
%{id: ^already_existing_id1, the_integer: 9999, the_float: 99.999}}

refute_receive {{_, :updated}, %{id: ^already_existing_id2}}

EctoWatch.unsubscribe({Thing, :updated}, already_existing_id1)

Ecto.Adapters.SQL.query!(
TestRepo,
"UPDATE things SET the_string = 'another new value' WHERE id = $1",
[already_existing_id1]
)

Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_integer = 9999 WHERE id = $1", [
already_existing_id1
])

Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_float = 99.999 WHERE id = $1", [
already_existing_id1
])

refute_receive {{Thing, :updated}, %{id: ^already_existing_id1}}

refute_receive {{_, :updated}, %{id: ^already_existing_id2}}
end

test "no notifications without subscribe", %{
Expand Down Expand Up @@ -1131,6 +1295,33 @@ defmodule EctoWatchTest do

assert_receive {{Thing, :deleted}, %{id: ^already_existing_id2}}
assert_receive {:things_deleted, %{id: ^already_existing_id2}}

EctoWatch.unsubscribe({Thing, :deleted})

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
[]
)

Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])

refute_receive {{Thing, :deleted}, _}

assert_receive {:things_deleted, %{id: 3}}

EctoWatch.unsubscribe(:things_deleted)

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
[]
)

Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])

refute_receive {{Thing, :deleted}, _}
refute_receive {:things_deleted, _}
end

test "empty extra_columns", %{
Expand All @@ -1156,6 +1347,32 @@ defmodule EctoWatchTest do

assert_receive {{Thing, :deleted}, %{id: ^already_existing_id2}}
assert_receive {:things_deleted, %{id: ^already_existing_id2}}

EctoWatch.unsubscribe({Thing, :deleted})

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
[]
)

Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])

refute_receive {{Thing, :deleted}, _}
assert_receive {:things_deleted, %{id: 3}}

EctoWatch.unsubscribe(:things_deleted)

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
[]
)

Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])

refute_receive {{Thing, :deleted}, _}
refute_receive {:things_deleted, _}
end

test "deletes for the primary key", %{
Expand All @@ -1178,6 +1395,19 @@ defmodule EctoWatchTest do
assert_receive {{Thing, :deleted}, %{id: ^already_existing_id1}}

refute_receive {{Thing, :deleted}, %{id: ^already_existing_id2}}

Ecto.Adapters.SQL.query!(
TestRepo,
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
[]
)

EctoWatch.subscribe({Thing, :deleted}, 3)
EctoWatch.unsubscribe({Thing, :deleted}, 3)

Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])

refute_receive {{Thing, :deleted}, _}
end

test "deletes for an association column", %{
Expand Down

0 comments on commit 89739c1

Please sign in to comment.