From 067ab404da0e2eec3ac2504d6161b6b78263eb5c Mon Sep 17 00:00:00 2001 From: Brian Underwood Date: Thu, 1 Aug 2024 11:45:21 +0200 Subject: [PATCH] add: Allow watchers without an ecto schema --- README.md | 31 ++++ lib/ecto_watch/helpers.ex | 16 ++ lib/ecto_watch/options.ex | 2 +- lib/ecto_watch/options/watcher_options.ex | 216 +++++++++++++++++++++ lib/ecto_watch/watcher_options.ex | 118 ------------ lib/ecto_watch/watcher_server.ex | 177 +++++++----------- lib/test_repo.ex | 1 - test/ecto_watch_test.exs | 217 ++++++++++++++++++---- 8 files changed, 507 insertions(+), 271 deletions(-) create mode 100644 lib/ecto_watch/options/watcher_options.ex delete mode 100644 lib/ecto_watch/watcher_options.ex diff --git a/README.md b/README.md index 34b7e1f..6fcc908 100644 --- a/README.md +++ b/README.md @@ -170,6 +170,37 @@ If you would like to get more than just the `id` from the record, you can use th def handle_info({:updated, MyApp.Posts.Comment, %{id: id, post_id: post_id}}, socket) do ``` +## Watching without a schema + +Since ecto supports working with tables withoun needed a schema, you may also want to create EctoWatch watchers without needing to create a schema like so: + +```elixir + # setup + {EctoWatch, + repo: MyApp.Repo, + pub_sub: MyApp.PubSub, + watchers: [ + { + %{ + table_name: "comments", + primary_key: :ID, + columns: [:title, :body, :author_id, :post_id], + association_columns: [:author_id, :post_id] + }, :updated, extra_columns: [:post_id] + } + ]} +``` + +Everything works the same as with a schema, though make sure to specify your association columns if you want to subscribe to an association column. + +Supported keys for configuring a table without a schema: + + * `schema_prefix` (optional, defaults to `public`) + * `table_name` (required) + * `primary_key` (optional, defaults to `id`) + * `columns` (optional, defaults to `[]`) + * `association_columns` (optional, defaults to `[]`) + ## Example use-cases * Updating LiveView in real-time diff --git a/lib/ecto_watch/helpers.ex b/lib/ecto_watch/helpers.ex index dbdae18..b66db7a 100644 --- a/lib/ecto_watch/helpers.ex +++ b/lib/ecto_watch/helpers.ex @@ -25,4 +25,20 @@ defmodule EctoWatch.Helpers do rescue UndefinedFunctionError -> false end + + def validate_list(list, func) when is_list(list) do + result = + list + |> Enum.map(func) + + first_error = + result + |> Enum.find(&match?({:error, _}, &1)) + + first_error || {:ok, Enum.map(result, fn {:ok, value} -> value end)} + end + + def validate_list(_, _) do + {:error, "should be a list"} + end end diff --git a/lib/ecto_watch/options.ex b/lib/ecto_watch/options.ex index 1398e8d..decbd87 100644 --- a/lib/ecto_watch/options.ex +++ b/lib/ecto_watch/options.ex @@ -3,7 +3,7 @@ defmodule EctoWatch.Options do Logic for processing the `EctoWatch` options passed by the end user's config """ - alias EctoWatch.WatcherOptions + alias EctoWatch.Options.WatcherOptions defstruct [:repo_mod, :pub_sub_mod, :watchers] diff --git a/lib/ecto_watch/options/watcher_options.ex b/lib/ecto_watch/options/watcher_options.ex new file mode 100644 index 0000000..5f90778 --- /dev/null +++ b/lib/ecto_watch/options/watcher_options.ex @@ -0,0 +1,216 @@ +defmodule EctoWatch.Options.WatcherOptions do + alias EctoWatch.Helpers + + @moduledoc """ + Logic for processing the `EctoWatch` postgres notification watcher options + which are passed in by the end user's config + """ + defstruct [:schema_definition, :update_type, :label, :trigger_columns, :extra_columns] + + def validate_list(list) do + Helpers.validate_list(list, &validate/1) + end + + defmodule SchemaDefinition do + @moduledoc """ + Generic representation of an app schema. Contains important details about a postgres table, + whether it's create from an Ecto schema module or from a map. + """ + defstruct [ + :schema_prefix, + :table_name, + :primary_key, + :columns, + :association_columns, + :label + ] + + def new(schema_mod) when is_atom(schema_mod) do + schema_prefix = + case schema_mod.__schema__(:prefix) do + nil -> "public" + prefix -> prefix + end + + table_name = "#{schema_mod.__schema__(:source)}" + [primary_key] = schema_mod.__schema__(:primary_key) + + fields = schema_mod.__schema__(:fields) + + association_columns = + schema_mod.__schema__(:associations) + |> Enum.map(&schema_mod.__schema__(:association, &1)) + |> Enum.map(& &1.owner_key) + + %__MODULE__{ + schema_prefix: schema_prefix, + table_name: table_name, + primary_key: primary_key, + columns: fields, + association_columns: association_columns, + label: schema_mod + } + end + + def new(%__MODULE__{}) do + raise "There is a bug! SchemaDefinition struct was passed to new/1" + end + + def new(opts) when is_map(opts) do + schema_prefix = opts[:schema_prefix] || "public" + + %__MODULE__{ + schema_prefix: to_string(schema_prefix), + table_name: to_string(opts.table_name), + primary_key: opts.primary_key, + columns: opts.columns, + association_columns: opts[:association_columns] || [], + label: "#{schema_prefix}|#{opts.table_name}" + } + end + end + + def validate({schema_definition, update_type}) do + validate({schema_definition, update_type, []}) + end + + def validate({schema_definition, update_type, opts}) do + with {:ok, schema_definition} <- validate_schema_definition(schema_definition, opts[:label]), + {:ok, update_type} <- validate_update_type(update_type), + {:ok, opts} <- validate_opts(opts, schema_definition, update_type) do + {:ok, {schema_definition, update_type, opts}} + end + end + + def validate(other) do + {:error, + "should be either `{schema_definition, update_type}` or `{schema_definition, update_type, opts}`. Got: #{inspect(other)}"} + end + + def validate_schema_definition(schema_mod, _label_opt) when is_atom(schema_mod) do + if EctoWatch.Helpers.ecto_schema_mod?(schema_mod) do + {:ok, schema_mod} + else + {:error, "Expected atom to be an Ecto schema module. Got: #{inspect(schema_mod)}"} + end + end + + def validate_schema_definition(opts, label_opt) when is_map(opts) do + schema = [ + schema_prefix: [ + type: {:or, ~w[string atom]a}, + required: false, + default: :public + ], + table_name: [ + type: {:or, ~w[string atom]a}, + required: true + ], + primary_key: [ + type: :atom, + required: false, + default: :id + ], + columns: [ + type: {:list, :atom}, + required: false, + default: [] + ], + association_columns: [ + type: {:list, :atom}, + required: false, + default: [] + ] + ] + + if label_opt do + with {:error, error} <- NimbleOptions.validate(opts, NimbleOptions.new!(schema)) do + {:error, Exception.message(error)} + end + else + {:error, "Label must be used when passing in a map for schema_definition"} + end + end + + def validate_schema_definition(_, _), do: {:error, "should be an ecto schema module name"} + + def validate_update_type(update_type) do + if update_type in ~w[inserted updated deleted]a do + {:ok, update_type} + else + {:error, "update_type was not one of :inserted, :updated, or :deleted"} + end + end + + def validate_opts(opts, schema_definition, update_type) do + schema_definition = SchemaDefinition.new(schema_definition) + + schema = [ + label: [ + type: :atom, + required: false + ], + trigger_columns: [ + type: + {:custom, __MODULE__, :validate_trigger_columns, + [opts[:label], schema_definition, update_type]}, + required: false + ], + extra_columns: [ + type: {:custom, __MODULE__, :validate_columns, [schema_definition]}, + required: false + ] + ] + + with {:error, error} <- NimbleOptions.validate(opts, schema) do + {:error, Exception.message(error)} + end + end + + def validate_trigger_columns(columns, label, schema_definition, update_type) do + cond do + update_type != :updated -> + {:error, "Cannot listen to trigger_columns for `#{update_type}` events."} + + label == nil -> + {:error, "Label must be used when trigger_columns are specified."} + + true -> + validate_columns(columns, schema_definition) + end + end + + def validate_columns([], _schema_mod), + do: {:error, "List must not be empty"} + + def validate_columns(columns, schema_definition) do + Helpers.validate_list(columns, fn + column when is_atom(column) -> + if column in schema_definition.columns do + {:ok, column} + else + {:error, + "Invalid column: #{inspect(column)} (expected to be in #{inspect(schema_definition.columns)})"} + end + + column -> + {:error, "Invalid column: #{inspect(column)} (expected to be an atom)"} + end) + end + + def new({schema_definition, update_type}) do + new({schema_definition, update_type, []}) + end + + def new({schema_definition, update_type, opts}) do + schema_definition = SchemaDefinition.new(schema_definition) + + %__MODULE__{ + schema_definition: schema_definition, + update_type: update_type, + label: opts[:label], + trigger_columns: opts[:trigger_columns] || [], + extra_columns: opts[:extra_columns] || [] + } + end +end diff --git a/lib/ecto_watch/watcher_options.ex b/lib/ecto_watch/watcher_options.ex deleted file mode 100644 index a0ae7a0..0000000 --- a/lib/ecto_watch/watcher_options.ex +++ /dev/null @@ -1,118 +0,0 @@ -defmodule EctoWatch.WatcherOptions do - @moduledoc """ - Logic for processing the `EctoWatch` postgres notification watcher options - which are passed in by the end user's config - """ - defstruct [:schema_mod, :update_type, :label, :trigger_columns, :extra_columns] - - def validate_list(list) when is_list(list) do - result = - list - |> Enum.map(&validate/1) - |> Enum.find(&match?({:error, _}, &1)) - - result || {:ok, list} - end - - def validate_list(_) do - {:error, "should be a list"} - end - - def validate({schema_mod, update_type}) do - validate({schema_mod, update_type, []}) - end - - def validate({schema_mod, update_type, opts}) do - opts = - opts - |> Keyword.put(:schema_mod, schema_mod) - |> Keyword.put(:update_type, update_type) - - schema = [ - schema_mod: [ - type: {:custom, __MODULE__, :validate_schema_mod, []}, - required: true - ], - update_type: [ - type: {:in, ~w[inserted updated deleted]a}, - required: true - ], - label: [ - type: :atom, - required: false - ], - trigger_columns: [ - type: - {:custom, __MODULE__, :validate_trigger_columns, - [opts[:label], schema_mod, update_type]}, - required: false - ], - extra_columns: [ - type: {:custom, __MODULE__, :validate_columns, [schema_mod]}, - required: false - ] - ] - - with {:error, error} <- NimbleOptions.validate(opts, schema) do - {:error, Exception.message(error)} - end - end - - def validate(other) do - {:error, - "should be either `{schema_mod, update_type}` or `{schema_mod, update_type, opts}`. Got: #{inspect(other)}"} - end - - def validate_schema_mod(schema_mod) when is_atom(schema_mod) do - if EctoWatch.Helpers.ecto_schema_mod?(schema_mod) do - {:ok, schema_mod} - else - {:error, "Expected schema_mod to be an Ecto schema module. Got: #{inspect(schema_mod)}"} - end - end - - def validate_schema_mod(_), do: {:error, "should be an atom"} - - def validate_trigger_columns(columns, label, schema_mod, update_type) do - cond do - update_type != :updated -> - {:error, "Cannot listen to trigger_columns for `#{update_type}` events."} - - label == nil -> - {:error, "Label must be used when trigger_columns are specified."} - - true -> - validate_columns(columns, schema_mod) - end - end - - def validate_columns([], _schema_mod), - do: {:error, "List must not be empty"} - - def validate_columns(columns, schema_mod) do - schema_fields = schema_mod.__schema__(:fields) - - Enum.reject(columns, &(&1 in schema_fields)) - |> case do - [] -> - {:ok, columns} - - extra_fields -> - {:error, "Invalid columns for #{inspect(schema_mod)}: #{inspect(extra_fields)}"} - end - end - - def new({schema_mod, update_type}) do - new({schema_mod, update_type, []}) - end - - def new({schema_mod, update_type, opts}) do - %__MODULE__{ - schema_mod: schema_mod, - update_type: update_type, - label: opts[:label], - trigger_columns: opts[:trigger_columns], - extra_columns: opts[:extra_columns] || [] - } - end -end diff --git a/lib/ecto_watch/watcher_server.ex b/lib/ecto_watch/watcher_server.ex index da1501b..181bcf1 100644 --- a/lib/ecto_watch/watcher_server.ex +++ b/lib/ecto_watch/watcher_server.ex @@ -4,7 +4,7 @@ defmodule EctoWatch.WatcherServer do """ alias EctoWatch.Helpers - alias EctoWatch.WatcherOptions + alias EctoWatch.Options.WatcherOptions use GenServer @@ -21,107 +21,24 @@ defmodule EctoWatch.WatcherServer do end end - defmodule EctoSchemaDetails do - @moduledoc """ - Struct holding pre-processed details about Ecto schemas for use in the watcher server - """ - - defstruct ~w[schema_mod pg_schema_name table_name primary_key]a - - def from_watcher_options(watcher_options) do - pg_schema_name = - case watcher_options.schema_mod.__schema__(:prefix) do - nil -> "public" - prefix -> prefix - end - - table_name = "#{watcher_options.schema_mod.__schema__(:source)}" - - # TODO: Raise an "unsupported" error if primary key is more than one column - # Or maybe multiple columns could be supported? - [primary_key] = watcher_options.schema_mod.__schema__(:primary_key) - - %__MODULE__{ - schema_mod: watcher_options.schema_mod, - pg_schema_name: pg_schema_name, - table_name: table_name, - primary_key: primary_key - } - end - end - def start_link({repo_mod, pub_sub_mod, watcher_options}) do - unique_label = "#{unique_label(watcher_options)}" - - ecto_schema_details = EctoSchemaDetails.from_watcher_options(watcher_options) - GenServer.start_link( __MODULE__, - {repo_mod, pub_sub_mod, ecto_schema_details, watcher_options, unique_label, - watcher_options.label}, + {repo_mod, pub_sub_mod, watcher_options}, name: unique_label(watcher_options) ) end - def handle_call( - {:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier_value}, - _from, - state - ) do - {column, value} = - case identifier_value do - {key, value} -> - {key, value} + def init({repo_mod, pub_sub_mod, options}) do + unique_label = "#{unique_label(options)}" - nil -> - {nil, nil} - - identifier_value -> - {state.ecto_schema_details.primary_key, identifier_value} - end - - result = - with :ok <- validate_subscription(state, update_type, column) do - unique_label = unique_label(schema_mod_or_label, update_type) - - channel_name = - if column && value do - "#{unique_label}|#{column}|#{value}" - else - "#{unique_label}" - end - - {:ok, {state.pub_sub_mod, channel_name}} - end - - {:reply, result, state} - end - - defp validate_subscription(state, update_type, column) do - cond do - update_type == :inserted && column == state.ecto_schema_details.primary_key -> - {:error, "Cannot subscribe to primary_key for inserted records"} - - column && not MapSet.member?(state.identifier_columns, column) -> - {:error, "Column #{column} is not an association column"} - - column && column != state.ecto_schema_details.primary_key && - column not in state.options.extra_columns -> - {:error, "Column #{column} is not in the list of extra columns"} - - true -> - :ok - end - end - - def init({repo_mod, pub_sub_mod, ecto_schema_details, options, unique_label, label}) do update_keyword = case options.update_type do :inserted -> "INSERT" :updated -> - if options.trigger_columns do + if options.trigger_columns && options.trigger_columns != [] do "UPDATE OF #{Enum.join(options.trigger_columns, ", ")}" else "UPDATE" @@ -132,13 +49,13 @@ defmodule EctoWatch.WatcherServer do end columns_sql = - [ecto_schema_details.primary_key | options.extra_columns] + [options.schema_definition.primary_key | options.extra_columns] |> Enum.map_join(",", &"'#{&1}',row.#{&1}") Ecto.Adapters.SQL.query!( repo_mod, """ - CREATE OR REPLACE FUNCTION \"#{ecto_schema_details.pg_schema_name}\".#{unique_label}_func() + CREATE OR REPLACE FUNCTION \"#{options.schema_definition.schema_prefix}\".#{unique_label}_func() RETURNS trigger AS $trigger$ DECLARE row record; @@ -159,7 +76,7 @@ defmodule EctoWatch.WatcherServer do Ecto.Adapters.SQL.query!( repo_mod, """ - DROP TRIGGER IF EXISTS #{unique_label}_trigger on \"#{ecto_schema_details.pg_schema_name}\".\"#{ecto_schema_details.table_name}\"; + DROP TRIGGER IF EXISTS #{unique_label}_trigger on \"#{options.schema_definition.schema_prefix}\".\"#{options.schema_definition.table_name}\"; """, [] ) @@ -168,8 +85,8 @@ defmodule EctoWatch.WatcherServer do repo_mod, """ CREATE TRIGGER #{unique_label}_trigger - AFTER #{update_keyword} ON \"#{ecto_schema_details.pg_schema_name}\".\"#{ecto_schema_details.table_name}\" FOR EACH ROW - EXECUTE PROCEDURE \"#{ecto_schema_details.pg_schema_name}\".#{unique_label}_func(); + AFTER #{update_keyword} ON \"#{options.schema_definition.schema_prefix}\".\"#{options.schema_definition.table_name}\" FOR EACH ROW + EXECUTE PROCEDURE \"#{options.schema_definition.schema_prefix}\".#{unique_label}_func(); """, [] ) @@ -181,22 +98,64 @@ defmodule EctoWatch.WatcherServer do %{ pub_sub_mod: pub_sub_mod, unique_label: unique_label, - ecto_schema_details: ecto_schema_details, identifier_columns: MapSet.put( - association_columns(ecto_schema_details.schema_mod), - ecto_schema_details.primary_key + MapSet.new(options.schema_definition.association_columns), + options.schema_definition.primary_key ), - options: options, - schema_mod_or_label: label || ecto_schema_details.schema_mod + options: options }} end - defp association_columns(schema_mod) do - schema_mod.__schema__(:associations) - |> Enum.map(&schema_mod.__schema__(:association, &1)) - |> Enum.map(& &1.owner_key) - |> MapSet.new() + def handle_call( + {:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier_value}, + _from, + state + ) do + {column, value} = + case identifier_value do + {key, value} -> + {key, value} + + nil -> + {nil, nil} + + identifier_value -> + {state.options.schema_definition.primary_key, identifier_value} + end + + result = + with :ok <- validate_subscription(state, update_type, column) do + unique_label = unique_label(schema_mod_or_label, update_type) + + channel_name = + if column && value do + "#{unique_label}|#{column}|#{value}" + else + "#{unique_label}" + end + + {:ok, {state.pub_sub_mod, channel_name}} + end + + {:reply, result, state} + end + + defp validate_subscription(state, update_type, column) do + cond do + update_type == :inserted && column == state.options.schema_definition.primary_key -> + {:error, "Cannot subscribe to primary_key for inserted records"} + + column && not MapSet.member?(state.identifier_columns, column) -> + {:error, "Column #{column} is not an association column"} + + column && column != state.options.schema_definition.primary_key && + column not in state.options.extra_columns -> + {:error, "Column #{column} is not in the list of extra columns"} + + true -> + :ok + end end def handle_info({:notification, _pid, _ref, channel_name, payload}, state) do @@ -210,7 +169,8 @@ defmodule EctoWatch.WatcherServer do type = String.to_existing_atom(type) - message = {type, state.schema_mod_or_label, returned_values} + message = + {type, state.options.label || state.options.schema_definition.label, returned_values} for topic <- topics( @@ -243,15 +203,10 @@ defmodule EctoWatch.WatcherServer do # that can be used as the watcher process name, trigger name, trigger function name, # and Phoenix.PubSub channel name. def unique_label(%WatcherOptions{} = options) do - unique_label( - options.label || options.schema_mod, - options.update_type - ) + :"ew_#{options.update_type}_for_#{Helpers.label(options.label || options.schema_definition.label)}" end defp unique_label(schema_mod_or_label, update_type) do - label = Helpers.label(schema_mod_or_label) - - :"ew_#{update_type}_for_#{label}" + :"ew_#{update_type}_for_#{Helpers.label(schema_mod_or_label)}" end end diff --git a/lib/test_repo.ex b/lib/test_repo.ex index 429da8e..99595fb 100644 --- a/lib/test_repo.ex +++ b/lib/test_repo.ex @@ -19,4 +19,3 @@ defmodule EctoWatch.TestRepo do )} end end - diff --git a/test/ecto_watch_test.exs b/test/ecto_watch_test.exs index 7398e14..f8b86b8 100644 --- a/test/ecto_watch_test.exs +++ b/test/ecto_watch_test.exs @@ -207,7 +207,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: invalid value for :schema_mod option: Expected schema_mod to be an Ecto schema module. Got: NotASchema/, + ~r/invalid value for :watchers option: Expected atom to be an Ecto schema module. Got: NotASchema/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -219,7 +219,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: invalid value for :update_type option: expected one of \[:inserted, :updated, :deleted\], got: :bad_update_type/, + ~r/invalid value for :watchers option: update_type was not one of :inserted, :updated, or :deleted/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -231,7 +231,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: should be either `{schema_mod, update_type}` or `{schema_mod, update_type, opts}`. Got: {EctoWatchTest.Thing}/, + ~r/invalid value for :watchers option: should be either `{schema_definition, update_type}` or `{schema_definition, update_type, opts}`. Got: {EctoWatchTest.Thing}/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -243,7 +243,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: should be either `{schema_mod, update_type}` or `{schema_mod, update_type, opts}`. Got: {EctoWatchTest.Thing, :inserted, \[\], :blah}/, + ~r/invalid value for :watchers option: should be either `{schema_definition, update_type}` or `{schema_definition, update_type, opts}`. Got: {EctoWatchTest.Thing, :inserted, \[\], :blah}/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -253,6 +253,92 @@ defmodule EctoWatchTest do ] ) end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: required :table_name option not found, received options: \[\]/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{}, :inserted, [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: Label must be used when passing in a map for schema_definition/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: :things}, :inserted, []} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid value for :primary_key option: expected atom, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: :things, primary_key: 1}, :inserted, [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid value for :columns option: expected list, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: :things, columns: 1}, :inserted, [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid list in :columns option: invalid value for list element at position 0: expected atom, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: :things, columns: [1]}, :inserted, [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid value for :association_columns option: expected list, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: :things, association_columns: 1}, :inserted, + [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid list in :association_columns option: invalid value for list element at position 0: expected atom, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: :things, association_columns: [1]}, :inserted, + [label: :foo]} + ] + ) + end end test "trigger_columns option only allowed for `updated`" do @@ -298,7 +384,7 @@ defmodule EctoWatchTest do test "columns must be in schema" do assert_raise ArgumentError, - ~r/invalid value for :watchers option: invalid value for :trigger_columns option: Invalid columns for EctoWatchTest.Thing: \[:not_a_column, :another_bad_column\]/, + ~r/invalid value for :watchers option: invalid value for :trigger_columns option: Invalid column: :not_a_column \(expected to be in \[:id, :the_string, :the_integer, :the_float, :parent_thing_id, :other_parent_thing_id, :inserted_at, :updated_at\]\)/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -317,7 +403,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: invalid value for :extra_columns option: Invalid columns for EctoWatchTest.Thing: \[:not_a_column, :another_bad_column\]/, + ~r/invalid value for :watchers option: invalid value for :extra_columns option: Invalid column: :not_a_column \(expected to be in \[:id, :the_string, :the_integer, :the_float, :parent_thing_id, :other_parent_thing_id, :inserted_at, :updated_at\]\)/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -367,12 +453,7 @@ defmodule EctoWatchTest do end test "Empty list of watcher is allowed" do - start_supervised!( - {EctoWatch, - repo: TestRepo, - pub_sub: TestPubSub, - watchers: []} - ) + start_supervised!({EctoWatch, repo: TestRepo, pub_sub: TestPubSub, watchers: []}) end test "subscribe requires proper Ecto schema", %{ @@ -466,18 +547,22 @@ defmodule EctoWatchTest do describe "inserts" do test "get notification about inserts" do - start_supervised!( - {EctoWatch, - repo: TestRepo, - pub_sub: TestPubSub, - watchers: [ - {Thing, :inserted}, - {Other, :inserted} - ]} - ) + start_supervised!({EctoWatch, + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {Thing, :inserted}, + {Other, :inserted}, + # schemaless definition + {%{table_name: :things}, :inserted, label: :things_inserted}, + {%{table_name: :other, schema_prefix: "0xabcd", primary_key: :weird_id}, :inserted, + label: :other_inserted} + ]}) EctoWatch.subscribe(Thing, :inserted) EctoWatch.subscribe(Other, :inserted) + EctoWatch.subscribe(:things_inserted, :inserted) + EctoWatch.subscribe(:other_inserted, :inserted) Ecto.Adapters.SQL.query!( TestRepo, @@ -492,7 +577,9 @@ defmodule EctoWatchTest do ) assert_receive {:inserted, Thing, %{id: 3}} + assert_receive {:inserted, :things_inserted, %{id: 3}} assert_receive {:inserted, Other, %{weird_id: 1234}} + assert_receive {:inserted, :other_inserted, %{weird_id: 1234}} end test "inserts for an association column", %{already_existing_id2: already_existing_id2} do @@ -501,12 +588,23 @@ defmodule EctoWatchTest do repo: TestRepo, pub_sub: TestPubSub, watchers: [ - {Thing, :inserted, extra_columns: [:parent_thing_id]} + {Thing, :inserted, extra_columns: [:parent_thing_id]}, + {%{ + table_name: :things, + columns: [:parent_thing_id], + association_columns: [:parent_thing_id] + }, :inserted, extra_columns: [:parent_thing_id], label: :things_parent_id_inserted} ]} ) EctoWatch.subscribe(Thing, :inserted, {:parent_thing_id, already_existing_id2}) + EctoWatch.subscribe( + :things_parent_id_inserted, + :inserted, + {:parent_thing_id, already_existing_id2} + ) + Ecto.Adapters.SQL.query!( TestRepo, "INSERT INTO things (the_string, the_integer, the_float, parent_thing_id, extra_field, inserted_at, updated_at) VALUES ('the other value', 8900, 24.53, #{already_existing_id2}, 'hey', NOW(), NOW())", @@ -514,6 +612,9 @@ defmodule EctoWatchTest do ) assert_receive {:inserted, Thing, %{id: 3, parent_thing_id: ^already_existing_id2}} + + assert_receive {:inserted, :things_parent_id_inserted, + %{id: 3, parent_thing_id: ^already_existing_id2}} end test "column is not in list of extra_columns", %{already_existing_id2: already_existing_id2} do @@ -587,22 +688,25 @@ defmodule EctoWatchTest do already_existing_id1: already_existing_id1, already_existing_id2: already_existing_id2 } do - start_supervised!( - {EctoWatch, - repo: TestRepo, - pub_sub: TestPubSub, - watchers: [ - {Thing, :updated} - ]} - ) + start_supervised!({EctoWatch, + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {Thing, :updated}, + # schemaless definition + {%{table_name: :things}, :updated, label: :things_updated} + ]}) EctoWatch.subscribe(Thing, :updated) + EctoWatch.subscribe(:things_updated, :updated) Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'the new value'", []) assert_receive {:updated, Thing, %{id: ^already_existing_id1}} + assert_receive {:updated, :things_updated, %{id: ^already_existing_id1}} assert_receive {:updated, Thing, %{id: ^already_existing_id2}} + assert_receive {:updated, :things_updated, %{id: ^already_existing_id2}} end test "updates for the primary key", %{ @@ -636,18 +740,33 @@ defmodule EctoWatchTest do repo: TestRepo, pub_sub: TestPubSub, watchers: [ - {Thing, :updated, extra_columns: [:parent_thing_id]} + {Thing, :updated, extra_columns: [:parent_thing_id]}, + {%{ + table_name: :things, + columns: [:parent_thing_id], + association_columns: [:parent_thing_id] + }, :updated, extra_columns: [:parent_thing_id], label: :things_parent_id_updated} ]} ) EctoWatch.subscribe(Thing, :updated, {:parent_thing_id, already_existing_id1}) + EctoWatch.subscribe( + :things_parent_id_updated, + :updated, + {:parent_thing_id, already_existing_id1} + ) + Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'the new value'", []) refute_receive {:updated, Thing, %{id: ^already_existing_id1}} + refute_receive {:things_parent_id_updated, Thing, %{id: ^already_existing_id1}} assert_receive {:updated, Thing, %{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}} + + assert_receive {:updated, :things_parent_id_updated, + %{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}} end test "column is not in list of extra_columns", %{already_existing_id2: already_existing_id2} do @@ -782,22 +901,25 @@ defmodule EctoWatchTest do already_existing_id1: already_existing_id1, already_existing_id2: already_existing_id2 } do - start_supervised!( - {EctoWatch, - repo: TestRepo, - pub_sub: TestPubSub, - watchers: [ - {Thing, :deleted} - ]} - ) + start_supervised!({EctoWatch, + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {Thing, :deleted}, + # schemaless definition + {%{table_name: :things}, :deleted, label: :things_deleted} + ]}) EctoWatch.subscribe(Thing, :deleted) + EctoWatch.subscribe(:things_deleted, :deleted) Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", []) assert_receive {:deleted, Thing, %{id: ^already_existing_id1}} + assert_receive {:deleted, :things_deleted, %{id: ^already_existing_id1}} assert_receive {:deleted, Thing, %{id: ^already_existing_id2}} + assert_receive {:deleted, :things_deleted, %{id: ^already_existing_id2}} end test "deletes for the primary key", %{ @@ -831,18 +953,33 @@ defmodule EctoWatchTest do repo: TestRepo, pub_sub: TestPubSub, watchers: [ - {Thing, :deleted, extra_columns: [:parent_thing_id]} + {Thing, :deleted, extra_columns: [:parent_thing_id]}, + {%{ + table_name: :things, + columns: [:parent_thing_id], + association_columns: [:parent_thing_id] + }, :deleted, extra_columns: [:parent_thing_id], label: :things_parent_id_deleted} ]} ) EctoWatch.subscribe(Thing, :deleted, {:parent_thing_id, already_existing_id1}) + EctoWatch.subscribe( + :things_parent_id_deleted, + :deleted, + {:parent_thing_id, already_existing_id1} + ) + Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", []) refute_receive {:deleted, Thing, %{id: ^already_existing_id1}} + refute_receive {:deleted, :things_parent_id_deleted, %{id: ^already_existing_id1}} assert_receive {:deleted, Thing, %{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}} + + assert_receive {:deleted, :things_parent_id_deleted, + %{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}} end test "column is not in list of extra_columns", %{