From 89739c17b3c916a16c10a53a1df13b3a5b87825a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nelson=20Estev=C3=A3o?= Date: Tue, 15 Oct 2024 13:32:22 +0100 Subject: [PATCH] feat: add unsubscribe function --- lib/ecto_watch.ex | 25 +++ lib/ecto_watch/watcher_trigger_validator.ex | 1 + test/ecto_watch_test.exs | 230 ++++++++++++++++++++ 3 files changed, 256 insertions(+) diff --git a/lib/ecto_watch.ex b/lib/ecto_watch.ex index 53fd688..d42259e 100644 --- a/lib/ecto_watch.ex +++ b/lib/ecto_watch.ex @@ -189,6 +189,31 @@ defmodule EctoWatch do end end + @doc """ + Unsubscribe to notifications from watchers that you previously subscribe. It + receives the same params for `subscribe/2`. + + Examples: + + iex> EctoWatch.unsubscribe({Comment, :updated}) + iex> EctoWatch.unsubscribe({Comment, :updated}, {:post_id, post_id}) + """ + @spec unsubscribe(watcher_identifier(), term()) :: :ok | {:error, term()} + def unsubscribe(watcher_identifier, id \\ nil) do + validate_ecto_watch_running!() + + with :ok <- validate_identifier(watcher_identifier), + {:ok, {pub_sub_mod, channel_name, debug?}} <- + WatcherServer.pub_sub_subscription_details(watcher_identifier, id) do + if(debug?, do: debug_log(watcher_identifier, "Unsubscribing to watcher")) + + Phoenix.PubSub.unsubscribe(pub_sub_mod, channel_name) + else + {:error, error} -> + raise ArgumentError, error + end + end + @doc """ Returns details about a watcher for reflection purposes diff --git a/lib/ecto_watch/watcher_trigger_validator.ex b/lib/ecto_watch/watcher_trigger_validator.ex index 40d187e..08a846a 100644 --- a/lib/ecto_watch/watcher_trigger_validator.ex +++ b/lib/ecto_watch/watcher_trigger_validator.ex @@ -40,6 +40,7 @@ defmodule EctoWatch.WatcherTriggerValidator do if MapSet.size(extra_found_triggers) > 0 do log_extra_triggers(extra_found_triggers) end + if MapSet.size(extra_found_functions) > 0 do log_extra_functions(extra_found_functions) end diff --git a/test/ecto_watch_test.exs b/test/ecto_watch_test.exs index 1757354..ec400b7 100644 --- a/test/ecto_watch_test.exs +++ b/test/ecto_watch_test.exs @@ -733,6 +733,49 @@ defmodule EctoWatchTest do assert_receive {:things_inserted, %{id: 3}} assert_receive {{Other, :inserted}, %{weird_id: 1234}} assert_receive {:other_inserted, %{weird_id: 1234}} + + EctoWatch.unsubscribe({Thing, :inserted}) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())", + [] + ) + + refute_receive {{Thing, :inserted}, _} + assert_receive {:things_inserted, %{id: 4}} + + EctoWatch.unsubscribe(:things_inserted) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())", + [] + ) + + refute_receive {{Thing, :inserted}, _} + refute_receive {:things_inserted, _} + + EctoWatch.unsubscribe({Other, :inserted}) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (2345, 'the value', NOW(), NOW())", + [] + ) + + refute_receive {{Other, :inserted}, %{weird_id: 2345}} + assert_receive {:other_inserted, %{weird_id: 2345}} + + EctoWatch.unsubscribe(:other_inserted) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (3456, 'the value', NOW(), NOW())", + [] + ) + + refute_receive {:other_inserted, _} end test "empty extra_columns list" do @@ -769,6 +812,28 @@ defmodule EctoWatchTest do assert_receive {:things_inserted, %{id: 3}} assert_receive {{Other, :inserted}, %{weird_id: 1234}} assert_receive {:other_inserted, %{weird_id: 1234}} + + EctoWatch.unsubscribe({Thing, :inserted}) + EctoWatch.unsubscribe({Other, :inserted}) + EctoWatch.unsubscribe(:things_inserted) + EctoWatch.unsubscribe(:other_inserted) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())", + [] + ) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (2345, 'the value', NOW(), NOW())", + [] + ) + + refute_receive {{Thing, :inserted}, _} + refute_receive {:things_inserted, _} + refute_receive {{Other, :inserted}, _} + refute_receive {:other_inserted, _} end test "inserts for an association column", %{already_existing_id2: already_existing_id2} do @@ -803,6 +868,23 @@ defmodule EctoWatchTest do assert_receive {:things_parent_id_inserted, %{id: 3, parent_thing_id: ^already_existing_id2}} + + EctoWatch.unsubscribe({Thing, :inserted}, {:parent_thing_id, already_existing_id2}) + + EctoWatch.unsubscribe( + :things_parent_id_inserted, + {:parent_thing_id, already_existing_id2} + ) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, parent_thing_id, extra_field, inserted_at, updated_at) VALUES ('the other value', 8900, 24.53, #{already_existing_id2}, 'hey', NOW(), NOW())", + [] + ) + + refute_receive {{Thing, :inserted}, _} + + refute_receive {:things_parent_id_inserted, _} end test "column is not in list of extra_columns", %{already_existing_id2: already_existing_id2} do @@ -920,6 +1002,31 @@ defmodule EctoWatchTest do assert_receive {{Thing, :updated}, %{id: ^already_existing_id2}} assert_receive {:things_updated, %{id: ^already_existing_id2}} + + EctoWatch.unsubscribe({Thing, :updated}) + + Ecto.Adapters.SQL.query!( + TestRepo, + "UPDATE things SET the_string = 'the second new value'", + [] + ) + + refute_receive {{Thing, :updated}, %{id: ^already_existing_id1}} + assert_receive {:things_updated, %{id: ^already_existing_id1}} + + refute_receive {{Thing, :updated}, %{id: ^already_existing_id2}} + assert_receive {:things_updated, %{id: ^already_existing_id2}} + + EctoWatch.unsubscribe(:things_updated) + + Ecto.Adapters.SQL.query!( + TestRepo, + "UPDATE things SET the_string = 'the third new value'", + [] + ) + + refute_receive {:things_updated, %{id: ^already_existing_id1}} + refute_receive {:things_updated, %{id: ^already_existing_id2}} end test "updates for the primary key", %{ @@ -942,6 +1049,12 @@ defmodule EctoWatchTest do assert_receive {{Thing, :updated}, %{id: ^already_existing_id1}} refute_receive {{Thing, :updated}, %{id: ^already_existing_id2}} + + EctoWatch.unsubscribe({Thing, :updated}, already_existing_id1) + + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'another new value'", []) + + refute_receive {{Thing, :updated}, %{id: ^already_existing_id1}} end test "updates for an association column", %{ @@ -979,6 +1092,28 @@ defmodule EctoWatchTest do assert_receive {:things_parent_id_updated, %{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}} + + EctoWatch.unsubscribe({Thing, :updated}, {:parent_thing_id, already_existing_id1}) + + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'another new value'", []) + + refute_receive {{Thing, :updated}, %{id: ^already_existing_id2}} + + assert_receive {:things_parent_id_updated, + %{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}} + + EctoWatch.unsubscribe( + :things_parent_id_updated, + {:parent_thing_id, already_existing_id1} + ) + + Ecto.Adapters.SQL.query!( + TestRepo, + "UPDATE things SET the_string = 'yet another new value'", + [] + ) + + refute_receive {:things_parent_id_updated, %{id: ^already_existing_id2}} end test "column is not in list of extra_columns", %{already_existing_id2: already_existing_id2} do @@ -1048,6 +1183,15 @@ defmodule EctoWatchTest do assert_receive {:thing_custom_event, %{id: ^already_existing_id1}} refute_receive {_, %{id: ^already_existing_id2}} + + EctoWatch.unsubscribe(:thing_custom_event, already_existing_id1) + + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'the new value'", []) + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_integer = 9998", []) + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_float = 99.899", []) + + refute_receive {_, %{id: ^already_existing_id1}} + refute_receive {_, %{id: ^already_existing_id2}} end test "extra_columns option", %{ @@ -1093,6 +1237,26 @@ defmodule EctoWatchTest do %{id: ^already_existing_id1, the_integer: 9999, the_float: 99.999}} refute_receive {{_, :updated}, %{id: ^already_existing_id2}} + + EctoWatch.unsubscribe({Thing, :updated}, already_existing_id1) + + Ecto.Adapters.SQL.query!( + TestRepo, + "UPDATE things SET the_string = 'another new value' WHERE id = $1", + [already_existing_id1] + ) + + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_integer = 9999 WHERE id = $1", [ + already_existing_id1 + ]) + + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_float = 99.999 WHERE id = $1", [ + already_existing_id1 + ]) + + refute_receive {{Thing, :updated}, %{id: ^already_existing_id1}} + + refute_receive {{_, :updated}, %{id: ^already_existing_id2}} end test "no notifications without subscribe", %{ @@ -1131,6 +1295,33 @@ defmodule EctoWatchTest do assert_receive {{Thing, :deleted}, %{id: ^already_existing_id2}} assert_receive {:things_deleted, %{id: ^already_existing_id2}} + + EctoWatch.unsubscribe({Thing, :deleted}) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())", + [] + ) + + Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", []) + + refute_receive {{Thing, :deleted}, _} + + assert_receive {:things_deleted, %{id: 3}} + + EctoWatch.unsubscribe(:things_deleted) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())", + [] + ) + + Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", []) + + refute_receive {{Thing, :deleted}, _} + refute_receive {:things_deleted, _} end test "empty extra_columns", %{ @@ -1156,6 +1347,32 @@ defmodule EctoWatchTest do assert_receive {{Thing, :deleted}, %{id: ^already_existing_id2}} assert_receive {:things_deleted, %{id: ^already_existing_id2}} + + EctoWatch.unsubscribe({Thing, :deleted}) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())", + [] + ) + + Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", []) + + refute_receive {{Thing, :deleted}, _} + assert_receive {:things_deleted, %{id: 3}} + + EctoWatch.unsubscribe(:things_deleted) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())", + [] + ) + + Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", []) + + refute_receive {{Thing, :deleted}, _} + refute_receive {:things_deleted, _} end test "deletes for the primary key", %{ @@ -1178,6 +1395,19 @@ defmodule EctoWatchTest do assert_receive {{Thing, :deleted}, %{id: ^already_existing_id1}} refute_receive {{Thing, :deleted}, %{id: ^already_existing_id2}} + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())", + [] + ) + + EctoWatch.subscribe({Thing, :deleted}, 3) + EctoWatch.unsubscribe({Thing, :deleted}, 3) + + Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", []) + + refute_receive {{Thing, :deleted}, _} end test "deletes for an association column", %{