diff --git a/lib/ecto_watch.ex b/lib/ecto_watch.ex index 9fef30a..23d4ce1 100644 --- a/lib/ecto_watch.ex +++ b/lib/ecto_watch.ex @@ -147,8 +147,10 @@ defmodule EctoWatch do validate_ecto_watch_running!() with :ok <- validate_identifier(watcher_identifier), - {:ok, {pub_sub_mod, channel_name}} <- + {:ok, {pub_sub_mod, channel_name, debug?}} <- WatcherServer.pub_sub_subscription_details(watcher_identifier, id) do + if(debug?, do: debug_log(watcher_identifier, "Subscribing to watcher")) + Phoenix.PubSub.subscribe(pub_sub_mod, channel_name) else {:error, error} -> @@ -291,4 +293,8 @@ defmodule EctoWatch do |> Enum.filter(fn {_, values} -> length(values) >= 2 end) |> Enum.map(fn {_, [value | _]} -> value end) end + + defp debug_log(watcher_identifier, message) do + EctoWatch.Helpers.debug_log(watcher_identifier, message) + end end diff --git a/lib/ecto_watch/helpers.ex b/lib/ecto_watch/helpers.ex index e498d6c..26e4207 100644 --- a/lib/ecto_watch/helpers.ex +++ b/lib/ecto_watch/helpers.ex @@ -1,6 +1,8 @@ defmodule EctoWatch.Helpers do @moduledoc false + require Logger + def label(schema_mod_or_label) do if ecto_schema_mod?(schema_mod_or_label) do module_to_label(schema_mod_or_label) @@ -39,4 +41,8 @@ defmodule EctoWatch.Helpers do def validate_list(_, _) do {:error, "should be a list"} end + + def debug_log(watcher_identifier, message) do + Logger.debug("EctoWatch | #{inspect(watcher_identifier)} | #{inspect(self())} | #{message}") + end end diff --git a/lib/ecto_watch/options.ex b/lib/ecto_watch/options.ex index e6cb425..a2912ce 100644 --- a/lib/ecto_watch/options.ex +++ b/lib/ecto_watch/options.ex @@ -3,13 +3,16 @@ defmodule EctoWatch.Options do alias EctoWatch.Options.WatcherOptions - defstruct [:repo_mod, :pub_sub_mod, :watchers] + defstruct [:repo_mod, :pub_sub_mod, :watchers, :debug?] def new(opts) do %__MODULE__{ repo_mod: opts[:repo], pub_sub_mod: opts[:pub_sub], - watchers: Enum.map(opts[:watchers], &WatcherOptions.new/1) + watchers: + Enum.map(opts[:watchers], fn watcher_opts -> + WatcherOptions.new(watcher_opts, opts[:debug?]) + end) } end @@ -26,6 +29,11 @@ defmodule EctoWatch.Options do watchers: [ type: {:custom, WatcherOptions, :validate_list, []}, required: true + ], + debug?: [ + type: :boolean, + required: false, + default: false ] ] diff --git a/lib/ecto_watch/options/watcher_options.ex b/lib/ecto_watch/options/watcher_options.ex index 0d23f26..7e992ec 100644 --- a/lib/ecto_watch/options/watcher_options.ex +++ b/lib/ecto_watch/options/watcher_options.ex @@ -3,7 +3,7 @@ defmodule EctoWatch.Options.WatcherOptions do alias EctoWatch.Helpers - defstruct [:schema_definition, :update_type, :label, :trigger_columns, :extra_columns] + defstruct [:schema_definition, :update_type, :label, :trigger_columns, :extra_columns, :debug?] def validate_list(list) do Helpers.validate_list(list, &validate/1) @@ -157,6 +157,11 @@ defmodule EctoWatch.Options.WatcherOptions do type: {:custom, __MODULE__, :validate_columns, [schema_definition]}, required: false, default: [] + ], + debug?: [ + type: :boolean, + required: false, + default: false ] ] @@ -198,11 +203,11 @@ defmodule EctoWatch.Options.WatcherOptions do end) end - def new({schema_definition, update_type}) do - new({schema_definition, update_type, []}) + def new({schema_definition, update_type}, debug?) do + new({schema_definition, update_type, []}, debug?) end - def new({schema_definition, update_type, opts}) do + def new({schema_definition, update_type, opts}, debug?) do schema_definition = SchemaDefinition.new(schema_definition) %__MODULE__{ @@ -210,7 +215,8 @@ defmodule EctoWatch.Options.WatcherOptions do update_type: update_type, label: opts[:label], trigger_columns: opts[:trigger_columns] || [], - extra_columns: opts[:extra_columns] || [] + extra_columns: opts[:extra_columns] || [], + debug?: debug? || opts[:debug?] } end end diff --git a/lib/ecto_watch/watcher_server.ex b/lib/ecto_watch/watcher_server.ex index e679f31..0a86fe5 100644 --- a/lib/ecto_watch/watcher_server.ex +++ b/lib/ecto_watch/watcher_server.ex @@ -45,6 +45,8 @@ defmodule EctoWatch.WatcherServer do @impl true def init({repo_mod, pub_sub_mod, options}) do + debug_log(options, "Starting server") + unique_label = "#{unique_label(options)}" update_keyword = @@ -152,7 +154,7 @@ defmodule EctoWatch.WatcherServer do "#{state.unique_label}" end - {:ok, {state.pub_sub_mod, channel_name}} + {:ok, {state.pub_sub_mod, channel_name, state.options.debug?}} end {:reply, result, state} @@ -182,6 +184,11 @@ defmodule EctoWatch.WatcherServer do @impl true def handle_info({:notification, _pid, _ref, channel_name, payload}, state) do + debug_log( + state.options, + "Received Postgrex notification on channel `#{channel_name}`: #{payload}" + ) + details = watcher_details(state) if channel_name != details.notify_channel do @@ -210,6 +217,11 @@ defmodule EctoWatch.WatcherServer do returned_values, state.identifier_columns ) do + debug_log( + state.options, + "Broadcasting to Phoenix PubSub topic `#{topic}`: #{inspect(message)}" + ) + Phoenix.PubSub.broadcast(state.pub_sub_mod, topic, message) end @@ -244,11 +256,9 @@ defmodule EctoWatch.WatcherServer do # that can be used as the watcher process name, trigger name, trigger function name, # and Phoenix.PubSub channel name. defp unique_label(%WatcherOptions{} = options) do - if options.label do - unique_label(options.label) - else - unique_label({options.schema_definition.label, options.update_type}) - end + options + |> identifier() + |> unique_label() end defp unique_label({schema_mod, update_type}) do @@ -258,4 +268,18 @@ defmodule EctoWatch.WatcherServer do defp unique_label(label) do :"ew_for_#{Helpers.label(label)}" end + + defp identifier(%WatcherOptions{} = options) do + if options.label do + options.label + else + {options.schema_definition.label, options.update_type} + end + end + + defp debug_log(%{debug?: debug_value} = options, message) do + if debug_value do + Helpers.debug_log(identifier(options), message) + end + end end diff --git a/test/ecto_watch_test.exs b/test/ecto_watch_test.exs index 19dde02..1757354 100644 --- a/test/ecto_watch_test.exs +++ b/test/ecto_watch_test.exs @@ -3,6 +3,8 @@ defmodule EctoWatchTest do use ExUnit.Case, async: false + import ExUnit.CaptureLog + # TODO: Long module names (testing for limits of postgres labels) # TODO: More tests for label option # TODO: Pass non-lists to `extra_columns` @@ -572,8 +574,6 @@ defmodule EctoWatchTest do end describe "trigger cleanup" do - import ExUnit.CaptureLog - setup do Ecto.Adapters.SQL.query!( TestRepo, @@ -1333,4 +1333,162 @@ defmodule EctoWatchTest do assert details.notify_channel == "ew_for_thing_custom_event" end end + + describe "debug? option" do + test "option on specific watcher" do + log = + capture_log([level: :debug], fn -> + start_supervised!( + {EctoWatch, + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {Thing, :updated, + extra_columns: [:the_string], label: :custom_event1, debug?: true}, + {Thing, :updated, extra_columns: [:the_integer], label: :custom_event2} + ]} + ) + end) + + assert log =~ ~r/EctoWatch \| :custom_event1 \| #PID<\d+\.\d+\.\d+> \| Starting server/ + refute log =~ ~r/EctoWatch \| custom_event2/ + + log = + capture_log([level: :debug], fn -> + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'the new value'", []) + + Process.sleep(1_000) + end) + + assert log =~ + ~r/Received Postgrex notification on channel `ew_for_custom_event1`: {"type": "updated", "values": {"id": 1, "the_string": "the new value"}}/ + + refute log =~ ~r/Received Postgrex notification on channel `ew_for_custom_event2`/ + + assert log =~ + ~r/Broadcasting to Phoenix PubSub topic `ew_for_custom_event1`: {:custom_event1, %{id: 1, the_string: "the new value"}}/ + + refute log =~ ~r/Broadcasting to Phoenix PubSub topic `ew_for_custom_event2`/ + + log = + capture_log([level: :info], fn -> + Ecto.Adapters.SQL.query!( + TestRepo, + "UPDATE things SET the_string = 'the new new value'", + [] + ) + + Process.sleep(1_000) + end) + + refute log =~ ~r/Received Postgrex notification on channel/ + refute log =~ ~r/Broadcasting to Phoenix PubSub topic/ + + log = + capture_log([level: :debug], fn -> + EctoWatch.subscribe(:custom_event1) + end) + + assert log =~ + ~r/EctoWatch \| :custom_event1 \| #PID<\d+\.\d+\.\d+> \| Subscribing to watcher/ + + refute log =~ ~r/EctoWatch \| :custom_event2/ + + log = + capture_log([level: :debug], fn -> + EctoWatch.subscribe(:custom_event2) + end) + + refute log =~ ~r/EctoWatch \| :custom_event1/ + refute log =~ ~r/EctoWatch \| :custom_event2/ + + log = + capture_log([level: :info], fn -> + EctoWatch.subscribe(:custom_event1) + EctoWatch.subscribe(:custom_event2) + end) + + refute log =~ ~r/EctoWatch \| :custom_event/ + end + + test "global option" do + log = + capture_log([level: :debug], fn -> + start_supervised!( + {EctoWatch, + repo: TestRepo, + pub_sub: TestPubSub, + debug?: true, + watchers: [ + {Thing, :updated, extra_columns: [:the_string], label: :custom_event1}, + {Thing, :updated, extra_columns: [:the_integer], label: :custom_event2} + ]} + ) + end) + + assert log =~ ~r/EctoWatch \| :custom_event1 \| #PID<\d+\.\d+\.\d+> \| Starting server/ + assert log =~ ~r/EctoWatch \| :custom_event2 \| #PID<\d+\.\d+\.\d+> \| Starting server/ + + log = + capture_log([level: :debug], fn -> + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'the new value'", []) + + Process.sleep(1_000) + end) + + assert log =~ + ~r/Received Postgrex notification on channel `ew_for_custom_event1`: {"type": "updated", "values": {"id": 1, "the_string": "the new value"}}/ + + assert log =~ + ~r/Received Postgrex notification on channel `ew_for_custom_event2`: {"type": "updated", "values": {"id": 1, "the_integer": 4455}}/ + + assert log =~ + ~r/Broadcasting to Phoenix PubSub topic `ew_for_custom_event1`: {:custom_event1, %{id: 1, the_string: "the new value"}}/ + + assert log =~ + ~r/Broadcasting to Phoenix PubSub topic `ew_for_custom_event2`: {:custom_event2, %{id: 1, the_integer: 4455}}/ + + log = + capture_log([level: :info], fn -> + Ecto.Adapters.SQL.query!( + TestRepo, + "UPDATE things SET the_string = 'the new new value'", + [] + ) + + Process.sleep(1_000) + end) + + refute log =~ ~r/Received Postgrex notification on channel/ + refute log =~ ~r/Broadcasting to Phoenix PubSub topic/ + + log = + capture_log([level: :debug], fn -> + EctoWatch.subscribe(:custom_event1) + end) + + assert log =~ + ~r/EctoWatch \| :custom_event1 \| #PID<\d+\.\d+\.\d+> \| Subscribing to watcher/ + + refute log =~ ~r/EctoWatch \| :custom_event2/ + + log = + capture_log([level: :debug], fn -> + EctoWatch.subscribe(:custom_event2) + end) + + refute log =~ ~r/EctoWatch \| :custom_event1/ + + assert log =~ + ~r/EctoWatch \| :custom_event2 \| #PID<\d+\.\d+\.\d+> \| Subscribing to watcher/ + + log = + capture_log([level: :info], fn -> + EctoWatch.subscribe(:custom_event1) + EctoWatch.subscribe(:custom_event2) + end) + + refute log =~ ~r/EctoWatch \| :custom_event/ + end + end end