From bf7786ef8b43094d40e482ac39fb1352b28bc454 Mon Sep 17 00:00:00 2001 From: Brian Underwood Date: Thu, 1 Aug 2024 11:45:21 +0200 Subject: [PATCH] add: Allow watchers without an ecto schema --- lib/ecto_watch/options.ex | 2 +- lib/ecto_watch/options/watcher_options.ex | 224 ++++++++++++++++++++++ lib/ecto_watch/watcher_options.ex | 118 ------------ lib/ecto_watch/watcher_server.ex | 175 +++++++---------- lib/test_repo.ex | 1 - test/ecto_watch_test.exs | 137 +++++++++++-- 6 files changed, 415 insertions(+), 242 deletions(-) create mode 100644 lib/ecto_watch/options/watcher_options.ex delete mode 100644 lib/ecto_watch/watcher_options.ex diff --git a/lib/ecto_watch/options.ex b/lib/ecto_watch/options.ex index 1398e8d..decbd87 100644 --- a/lib/ecto_watch/options.ex +++ b/lib/ecto_watch/options.ex @@ -3,7 +3,7 @@ defmodule EctoWatch.Options do Logic for processing the `EctoWatch` options passed by the end user's config """ - alias EctoWatch.WatcherOptions + alias EctoWatch.Options.WatcherOptions defstruct [:repo_mod, :pub_sub_mod, :watchers] diff --git a/lib/ecto_watch/options/watcher_options.ex b/lib/ecto_watch/options/watcher_options.ex new file mode 100644 index 0000000..56bc377 --- /dev/null +++ b/lib/ecto_watch/options/watcher_options.ex @@ -0,0 +1,224 @@ +defmodule EctoWatch.Options.WatcherOptions do + @moduledoc """ + Logic for processing the `EctoWatch` postgres notification watcher options + which are passed in by the end user's config + """ + defstruct [:schema_definition, :update_type, :label, :trigger_columns, :extra_columns] + + def validate_list(list) when is_list(list) do + result = + list + |> Enum.map(&validate/1) + + first_error = + result + |> Enum.find(&match?({:error, _}, &1)) + + first_error || {:ok, Enum.map(result, fn {:ok, value} -> value end)} + end + + def validate_list(_) do + {:error, "should be a list"} + end + + defmodule SchemaDefinition do + @moduledoc """ + Generic representation of an app schema. Contains important details about a postgres table, + whether it's create from an Ecto schema module or from a map. + """ + defstruct [ + :pg_schema_name, + :table_name, + :primary_key, + :columns, + :association_columns, + :label + ] + + def new(schema_mod) when is_atom(schema_mod) do + pg_schema_name = + case schema_mod.__schema__(:prefix) do + nil -> "public" + prefix -> prefix + end + + table_name = "#{schema_mod.__schema__(:source)}" + [primary_key] = schema_mod.__schema__(:primary_key) + + fields = schema_mod.__schema__(:fields) + + association_columns = + schema_mod.__schema__(:associations) + |> Enum.map(&schema_mod.__schema__(:association, &1)) + |> Enum.map(& &1.owner_key) + + %__MODULE__{ + pg_schema_name: pg_schema_name, + table_name: table_name, + primary_key: primary_key, + columns: fields, + association_columns: association_columns, + label: schema_mod + } + end + + def new(%__MODULE__{}) do + raise "There is a bug! SchemaDefinition struct was passed to new/1" + end + + def new(opts) when is_map(opts) do + pg_schema_name = opts[:pg_schema_name] || "public" + + %__MODULE__{ + pg_schema_name: pg_schema_name, + table_name: opts.table_name, + primary_key: opts.primary_key, + columns: opts.columns, + association_columns: opts[:association_columns] || [], + label: "#{pg_schema_name}|#{opts.table_name}" + } + end + end + + def validate({schema_definition, update_type}) do + validate({schema_definition, update_type, []}) + end + + def validate({schema_definition, update_type, opts}) do + with {:ok, schema_definition} <- validate_schema_definition(schema_definition, opts[:label]), + {:ok, update_type} <- validate_update_type(update_type), + {:ok, opts} <- validate_opts(opts, schema_definition, update_type) do + {:ok, {schema_definition, update_type, opts}} + end + end + + def validate(other) do + {:error, + "should be either `{schema_definition, update_type}` or `{schema_definition, update_type, opts}`. Got: #{inspect(other)}"} + end + + def validate_schema_definition(schema_mod, _label_opt) when is_atom(schema_mod) do + if EctoWatch.Helpers.ecto_schema_mod?(schema_mod) do + {:ok, schema_mod} + else + {:error, "Expected atom to be an Ecto schema module. Got: #{inspect(schema_mod)}"} + end + end + + # FIXME: TESTS! + def validate_schema_definition(opts, label_opt) when is_map(opts) do + schema = [ + pg_schema_name: [ + type: :string, + required: false, + default: "public" + ], + table_name: [ + type: :string, + required: true + ], + primary_key: [ + type: :string, + required: false, + default: "id" + ], + columns: [ + type: {:list, :string}, + required: false, + default: [] + ], + association_columns: [ + type: {:list, :string}, + required: false, + default: [] + ] + ] + + if label_opt do + with {:error, error} <- NimbleOptions.validate(opts, NimbleOptions.new!(schema)) do + {:error, Exception.message(error)} + end + else + {:error, "Label must be used when passing in a map for schema_definition"} + end + end + + def validate_schema_definition(_, _), do: {:error, "should be an ecto schema module name"} + + def validate_update_type(update_type) do + if update_type in ~w[inserted updated deleted]a do + {:ok, update_type} + else + {:error, "update_type was not one of :inserted, :updated, or :deleted"} + end + end + + def validate_opts(opts, schema_definition, update_type) do + schema_definition = SchemaDefinition.new(schema_definition) + + schema = [ + label: [ + type: :atom, + required: false + ], + trigger_columns: [ + type: + {:custom, __MODULE__, :validate_trigger_columns, + [opts[:label], schema_definition, update_type]}, + required: false + ], + extra_columns: [ + type: {:custom, __MODULE__, :validate_columns, [schema_definition]}, + required: false + ] + ] + + with {:error, error} <- NimbleOptions.validate(opts, schema) do + {:error, Exception.message(error)} + end + end + + def validate_trigger_columns(columns, label, schema_definition, update_type) do + cond do + update_type != :updated -> + {:error, "Cannot listen to trigger_columns for `#{update_type}` events."} + + label == nil -> + {:error, "Label must be used when trigger_columns are specified."} + + true -> + validate_columns(columns, schema_definition) + end + end + + def validate_columns([], _schema_mod), + do: {:error, "List must not be empty"} + + def validate_columns(columns, schema_definition) do + Enum.reject(columns, &(&1 in schema_definition.columns)) + |> case do + [] -> + {:ok, columns} + + extra_fields -> + {:error, + "Invalid columns: #{inspect(extra_fields)} (expected to be in #{inspect(schema_definition.columns)})"} + end + end + + def new({schema_definition, update_type}) do + new({schema_definition, update_type, []}) + end + + def new({schema_definition, update_type, opts}) do + schema_definition = SchemaDefinition.new(schema_definition) + + %__MODULE__{ + schema_definition: schema_definition, + update_type: update_type, + label: opts[:label], + trigger_columns: opts[:trigger_columns], + extra_columns: opts[:extra_columns] || [] + } + end +end diff --git a/lib/ecto_watch/watcher_options.ex b/lib/ecto_watch/watcher_options.ex deleted file mode 100644 index a0ae7a0..0000000 --- a/lib/ecto_watch/watcher_options.ex +++ /dev/null @@ -1,118 +0,0 @@ -defmodule EctoWatch.WatcherOptions do - @moduledoc """ - Logic for processing the `EctoWatch` postgres notification watcher options - which are passed in by the end user's config - """ - defstruct [:schema_mod, :update_type, :label, :trigger_columns, :extra_columns] - - def validate_list(list) when is_list(list) do - result = - list - |> Enum.map(&validate/1) - |> Enum.find(&match?({:error, _}, &1)) - - result || {:ok, list} - end - - def validate_list(_) do - {:error, "should be a list"} - end - - def validate({schema_mod, update_type}) do - validate({schema_mod, update_type, []}) - end - - def validate({schema_mod, update_type, opts}) do - opts = - opts - |> Keyword.put(:schema_mod, schema_mod) - |> Keyword.put(:update_type, update_type) - - schema = [ - schema_mod: [ - type: {:custom, __MODULE__, :validate_schema_mod, []}, - required: true - ], - update_type: [ - type: {:in, ~w[inserted updated deleted]a}, - required: true - ], - label: [ - type: :atom, - required: false - ], - trigger_columns: [ - type: - {:custom, __MODULE__, :validate_trigger_columns, - [opts[:label], schema_mod, update_type]}, - required: false - ], - extra_columns: [ - type: {:custom, __MODULE__, :validate_columns, [schema_mod]}, - required: false - ] - ] - - with {:error, error} <- NimbleOptions.validate(opts, schema) do - {:error, Exception.message(error)} - end - end - - def validate(other) do - {:error, - "should be either `{schema_mod, update_type}` or `{schema_mod, update_type, opts}`. Got: #{inspect(other)}"} - end - - def validate_schema_mod(schema_mod) when is_atom(schema_mod) do - if EctoWatch.Helpers.ecto_schema_mod?(schema_mod) do - {:ok, schema_mod} - else - {:error, "Expected schema_mod to be an Ecto schema module. Got: #{inspect(schema_mod)}"} - end - end - - def validate_schema_mod(_), do: {:error, "should be an atom"} - - def validate_trigger_columns(columns, label, schema_mod, update_type) do - cond do - update_type != :updated -> - {:error, "Cannot listen to trigger_columns for `#{update_type}` events."} - - label == nil -> - {:error, "Label must be used when trigger_columns are specified."} - - true -> - validate_columns(columns, schema_mod) - end - end - - def validate_columns([], _schema_mod), - do: {:error, "List must not be empty"} - - def validate_columns(columns, schema_mod) do - schema_fields = schema_mod.__schema__(:fields) - - Enum.reject(columns, &(&1 in schema_fields)) - |> case do - [] -> - {:ok, columns} - - extra_fields -> - {:error, "Invalid columns for #{inspect(schema_mod)}: #{inspect(extra_fields)}"} - end - end - - def new({schema_mod, update_type}) do - new({schema_mod, update_type, []}) - end - - def new({schema_mod, update_type, opts}) do - %__MODULE__{ - schema_mod: schema_mod, - update_type: update_type, - label: opts[:label], - trigger_columns: opts[:trigger_columns], - extra_columns: opts[:extra_columns] || [] - } - end -end diff --git a/lib/ecto_watch/watcher_server.ex b/lib/ecto_watch/watcher_server.ex index da1501b..00ef456 100644 --- a/lib/ecto_watch/watcher_server.ex +++ b/lib/ecto_watch/watcher_server.ex @@ -4,7 +4,7 @@ defmodule EctoWatch.WatcherServer do """ alias EctoWatch.Helpers - alias EctoWatch.WatcherOptions + alias EctoWatch.Options.WatcherOptions use GenServer @@ -21,100 +21,17 @@ defmodule EctoWatch.WatcherServer do end end - defmodule EctoSchemaDetails do - @moduledoc """ - Struct holding pre-processed details about Ecto schemas for use in the watcher server - """ - - defstruct ~w[schema_mod pg_schema_name table_name primary_key]a - - def from_watcher_options(watcher_options) do - pg_schema_name = - case watcher_options.schema_mod.__schema__(:prefix) do - nil -> "public" - prefix -> prefix - end - - table_name = "#{watcher_options.schema_mod.__schema__(:source)}" - - # TODO: Raise an "unsupported" error if primary key is more than one column - # Or maybe multiple columns could be supported? - [primary_key] = watcher_options.schema_mod.__schema__(:primary_key) - - %__MODULE__{ - schema_mod: watcher_options.schema_mod, - pg_schema_name: pg_schema_name, - table_name: table_name, - primary_key: primary_key - } - end - end - def start_link({repo_mod, pub_sub_mod, watcher_options}) do - unique_label = "#{unique_label(watcher_options)}" - - ecto_schema_details = EctoSchemaDetails.from_watcher_options(watcher_options) - GenServer.start_link( __MODULE__, - {repo_mod, pub_sub_mod, ecto_schema_details, watcher_options, unique_label, - watcher_options.label}, + {repo_mod, pub_sub_mod, watcher_options}, name: unique_label(watcher_options) ) end - def handle_call( - {:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier_value}, - _from, - state - ) do - {column, value} = - case identifier_value do - {key, value} -> - {key, value} + def init({repo_mod, pub_sub_mod, options}) do + unique_label = "#{unique_label(options)}" - nil -> - {nil, nil} - - identifier_value -> - {state.ecto_schema_details.primary_key, identifier_value} - end - - result = - with :ok <- validate_subscription(state, update_type, column) do - unique_label = unique_label(schema_mod_or_label, update_type) - - channel_name = - if column && value do - "#{unique_label}|#{column}|#{value}" - else - "#{unique_label}" - end - - {:ok, {state.pub_sub_mod, channel_name}} - end - - {:reply, result, state} - end - - defp validate_subscription(state, update_type, column) do - cond do - update_type == :inserted && column == state.ecto_schema_details.primary_key -> - {:error, "Cannot subscribe to primary_key for inserted records"} - - column && not MapSet.member?(state.identifier_columns, column) -> - {:error, "Column #{column} is not an association column"} - - column && column != state.ecto_schema_details.primary_key && - column not in state.options.extra_columns -> - {:error, "Column #{column} is not in the list of extra columns"} - - true -> - :ok - end - end - - def init({repo_mod, pub_sub_mod, ecto_schema_details, options, unique_label, label}) do update_keyword = case options.update_type do :inserted -> @@ -132,13 +49,13 @@ defmodule EctoWatch.WatcherServer do end columns_sql = - [ecto_schema_details.primary_key | options.extra_columns] + [options.schema_definition.primary_key | options.extra_columns] |> Enum.map_join(",", &"'#{&1}',row.#{&1}") Ecto.Adapters.SQL.query!( repo_mod, """ - CREATE OR REPLACE FUNCTION \"#{ecto_schema_details.pg_schema_name}\".#{unique_label}_func() + CREATE OR REPLACE FUNCTION \"#{options.schema_definition.pg_schema_name}\".#{unique_label}_func() RETURNS trigger AS $trigger$ DECLARE row record; @@ -159,7 +76,7 @@ defmodule EctoWatch.WatcherServer do Ecto.Adapters.SQL.query!( repo_mod, """ - DROP TRIGGER IF EXISTS #{unique_label}_trigger on \"#{ecto_schema_details.pg_schema_name}\".\"#{ecto_schema_details.table_name}\"; + DROP TRIGGER IF EXISTS #{unique_label}_trigger on \"#{options.schema_definition.pg_schema_name}\".\"#{options.schema_definition.table_name}\"; """, [] ) @@ -168,8 +85,8 @@ defmodule EctoWatch.WatcherServer do repo_mod, """ CREATE TRIGGER #{unique_label}_trigger - AFTER #{update_keyword} ON \"#{ecto_schema_details.pg_schema_name}\".\"#{ecto_schema_details.table_name}\" FOR EACH ROW - EXECUTE PROCEDURE \"#{ecto_schema_details.pg_schema_name}\".#{unique_label}_func(); + AFTER #{update_keyword} ON \"#{options.schema_definition.pg_schema_name}\".\"#{options.schema_definition.table_name}\" FOR EACH ROW + EXECUTE PROCEDURE \"#{options.schema_definition.pg_schema_name}\".#{unique_label}_func(); """, [] ) @@ -181,22 +98,64 @@ defmodule EctoWatch.WatcherServer do %{ pub_sub_mod: pub_sub_mod, unique_label: unique_label, - ecto_schema_details: ecto_schema_details, identifier_columns: MapSet.put( - association_columns(ecto_schema_details.schema_mod), - ecto_schema_details.primary_key + MapSet.new(options.schema_definition.association_columns), + options.schema_definition.primary_key ), - options: options, - schema_mod_or_label: label || ecto_schema_details.schema_mod + options: options }} end - defp association_columns(schema_mod) do - schema_mod.__schema__(:associations) - |> Enum.map(&schema_mod.__schema__(:association, &1)) - |> Enum.map(& &1.owner_key) - |> MapSet.new() + def handle_call( + {:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier_value}, + _from, + state + ) do + {column, value} = + case identifier_value do + {key, value} -> + {key, value} + + nil -> + {nil, nil} + + identifier_value -> + {state.options.schema_definition.primary_key, identifier_value} + end + + result = + with :ok <- validate_subscription(state, update_type, column) do + unique_label = unique_label(schema_mod_or_label, update_type) + + channel_name = + if column && value do + "#{unique_label}|#{column}|#{value}" + else + "#{unique_label}" + end + + {:ok, {state.pub_sub_mod, channel_name}} + end + + {:reply, result, state} + end + + defp validate_subscription(state, update_type, column) do + cond do + update_type == :inserted && column == state.options.schema_definition.primary_key -> + {:error, "Cannot subscribe to primary_key for inserted records"} + + column && not MapSet.member?(state.identifier_columns, column) -> + {:error, "Column #{column} is not an association column"} + + column && column != state.options.schema_definition.primary_key && + column not in state.options.extra_columns -> + {:error, "Column #{column} is not in the list of extra columns"} + + true -> + :ok + end end def handle_info({:notification, _pid, _ref, channel_name, payload}, state) do @@ -210,7 +169,8 @@ defmodule EctoWatch.WatcherServer do type = String.to_existing_atom(type) - message = {type, state.schema_mod_or_label, returned_values} + message = + {type, state.options.label || state.options.schema_definition.label, returned_values} for topic <- topics( @@ -243,15 +203,10 @@ defmodule EctoWatch.WatcherServer do # that can be used as the watcher process name, trigger name, trigger function name, # and Phoenix.PubSub channel name. def unique_label(%WatcherOptions{} = options) do - unique_label( - options.label || options.schema_mod, - options.update_type - ) + :"ew_#{options.update_type}_for_#{Helpers.label(options.label || options.schema_definition.label)}" end defp unique_label(schema_mod_or_label, update_type) do - label = Helpers.label(schema_mod_or_label) - - :"ew_#{update_type}_for_#{label}" + :"ew_#{update_type}_for_#{Helpers.label(schema_mod_or_label)}" end end diff --git a/lib/test_repo.ex b/lib/test_repo.ex index 429da8e..99595fb 100644 --- a/lib/test_repo.ex +++ b/lib/test_repo.ex @@ -19,4 +19,3 @@ defmodule EctoWatch.TestRepo do )} end end - diff --git a/test/ecto_watch_test.exs b/test/ecto_watch_test.exs index 7398e14..fe21fe2 100644 --- a/test/ecto_watch_test.exs +++ b/test/ecto_watch_test.exs @@ -188,6 +188,7 @@ defmodule EctoWatchTest do end end + # FIXME: Require `label` for schemaless configuration test "watcher option validations" do assert_raise ArgumentError, ~r/required :watchers option not found/, fn -> EctoWatch.start_link( @@ -207,7 +208,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: invalid value for :schema_mod option: Expected schema_mod to be an Ecto schema module. Got: NotASchema/, + ~r/invalid value for :watchers option: Expected atom to be an Ecto schema module. Got: NotASchema/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -219,7 +220,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: invalid value for :update_type option: expected one of \[:inserted, :updated, :deleted\], got: :bad_update_type/, + ~r/invalid value for :watchers option: update_type was not one of :inserted, :updated, or :deleted/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -231,7 +232,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: should be either `{schema_mod, update_type}` or `{schema_mod, update_type, opts}`. Got: {EctoWatchTest.Thing}/, + ~r/invalid value for :watchers option: should be either `{schema_definition, update_type}` or `{schema_definition, update_type, opts}`. Got: {EctoWatchTest.Thing}/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -243,7 +244,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: should be either `{schema_mod, update_type}` or `{schema_mod, update_type, opts}`. Got: {EctoWatchTest.Thing, :inserted, \[\], :blah}/, + ~r/invalid value for :watchers option: should be either `{schema_definition, update_type}` or `{schema_definition, update_type, opts}`. Got: {EctoWatchTest.Thing, :inserted, \[\], :blah}/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -253,6 +254,92 @@ defmodule EctoWatchTest do ] ) end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: required :table_name option not found, received options: \[\]/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{}, :inserted, [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: Label must be used when passing in a map for schema_definition/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: "things"}, :inserted, []} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid value for :primary_key option: expected string, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: "things", primary_key: 1}, :inserted, [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid value for :columns option: expected list, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: "things", columns: 1}, :inserted, [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid list in :columns option: invalid value for list element at position 0: expected string, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: "things", columns: [1]}, :inserted, [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid value for :association_columns option: expected list, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: "things", association_columns: 1}, :inserted, + [label: :foo]} + ] + ) + end + + assert_raise ArgumentError, + ~r/invalid value for :watchers option: invalid list in :association_columns option: invalid value for list element at position 0: expected string, got: 1/, + fn -> + EctoWatch.start_link( + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: "things", association_columns: [1]}, :inserted, + [label: :foo]} + ] + ) + end end test "trigger_columns option only allowed for `updated`" do @@ -298,7 +385,7 @@ defmodule EctoWatchTest do test "columns must be in schema" do assert_raise ArgumentError, - ~r/invalid value for :watchers option: invalid value for :trigger_columns option: Invalid columns for EctoWatchTest.Thing: \[:not_a_column, :another_bad_column\]/, + ~r/invalid value for :watchers option: invalid value for :trigger_columns option: Invalid columns: \[:not_a_column, :another_bad_column\] \(expected to be in \[:id, :the_string, :the_integer, :the_float, :parent_thing_id, :other_parent_thing_id, :inserted_at, :updated_at\]\)/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -317,7 +404,7 @@ defmodule EctoWatchTest do end assert_raise ArgumentError, - ~r/invalid value for :watchers option: invalid value for :extra_columns option: Invalid columns for EctoWatchTest.Thing: \[:not_a_column, :another_bad_column\]/, + ~r/invalid value for :watchers option: invalid value for :extra_columns option: Invalid columns: \[:not_a_column, :another_bad_column\] \(expected to be in \[:id, :the_string, :the_integer, :the_float, :parent_thing_id, :other_parent_thing_id, :inserted_at, :updated_at\]\)/, fn -> EctoWatch.start_link( repo: TestRepo, @@ -367,12 +454,7 @@ defmodule EctoWatchTest do end test "Empty list of watcher is allowed" do - start_supervised!( - {EctoWatch, - repo: TestRepo, - pub_sub: TestPubSub, - watchers: []} - ) + start_supervised!({EctoWatch, repo: TestRepo, pub_sub: TestPubSub, watchers: []}) end test "subscribe requires proper Ecto schema", %{ @@ -495,6 +577,37 @@ defmodule EctoWatchTest do assert_receive {:inserted, Other, %{weird_id: 1234}} end + test "schemaless definition" do + start_supervised!( + {EctoWatch, + repo: TestRepo, + pub_sub: TestPubSub, + watchers: [ + {%{table_name: "things"}, :inserted, label: :things_inserted}, + {%{table_name: "other", pg_schema_name: "0xabcd", primary_key: "weird_id"}, :inserted, + label: :other_inserted} + ]} + ) + + EctoWatch.subscribe(:things_inserted, :inserted) + EctoWatch.subscribe(:other_inserted, :inserted) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())", + [] + ) + + Ecto.Adapters.SQL.query!( + TestRepo, + "INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (1234, 'the value', NOW(), NOW())", + [] + ) + + assert_receive {:inserted, :things_inserted, %{id: 3}} + assert_receive {:inserted, :other_inserted, %{weird_id: 1234}} + end + test "inserts for an association column", %{already_existing_id2: already_existing_id2} do start_supervised!( {EctoWatch,