-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add: Allow watchers without an ecto schema
- Loading branch information
1 parent
12f8727
commit bf7786e
Showing
6 changed files
with
415 additions
and
242 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
defmodule EctoWatch.Options.WatcherOptions do | ||
@moduledoc """ | ||
Logic for processing the `EctoWatch` postgres notification watcher options | ||
which are passed in by the end user's config | ||
""" | ||
defstruct [:schema_definition, :update_type, :label, :trigger_columns, :extra_columns] | ||
|
||
def validate_list(list) when is_list(list) do | ||
result = | ||
list | ||
|> Enum.map(&validate/1) | ||
|
||
first_error = | ||
result | ||
|> Enum.find(&match?({:error, _}, &1)) | ||
|
||
first_error || {:ok, Enum.map(result, fn {:ok, value} -> value end)} | ||
end | ||
|
||
def validate_list(_) do | ||
{:error, "should be a list"} | ||
end | ||
|
||
defmodule SchemaDefinition do | ||
@moduledoc """ | ||
Generic representation of an app schema. Contains important details about a postgres table, | ||
whether it's create from an Ecto schema module or from a map. | ||
""" | ||
defstruct [ | ||
:pg_schema_name, | ||
:table_name, | ||
:primary_key, | ||
:columns, | ||
:association_columns, | ||
:label | ||
] | ||
|
||
def new(schema_mod) when is_atom(schema_mod) do | ||
pg_schema_name = | ||
case schema_mod.__schema__(:prefix) do | ||
nil -> "public" | ||
prefix -> prefix | ||
end | ||
|
||
table_name = "#{schema_mod.__schema__(:source)}" | ||
[primary_key] = schema_mod.__schema__(:primary_key) | ||
|
||
fields = schema_mod.__schema__(:fields) | ||
|
||
association_columns = | ||
schema_mod.__schema__(:associations) | ||
|> Enum.map(&schema_mod.__schema__(:association, &1)) | ||
|> Enum.map(& &1.owner_key) | ||
|
||
%__MODULE__{ | ||
pg_schema_name: pg_schema_name, | ||
table_name: table_name, | ||
primary_key: primary_key, | ||
columns: fields, | ||
association_columns: association_columns, | ||
label: schema_mod | ||
} | ||
end | ||
|
||
def new(%__MODULE__{}) do | ||
raise "There is a bug! SchemaDefinition struct was passed to new/1" | ||
end | ||
|
||
def new(opts) when is_map(opts) do | ||
pg_schema_name = opts[:pg_schema_name] || "public" | ||
|
||
%__MODULE__{ | ||
pg_schema_name: pg_schema_name, | ||
table_name: opts.table_name, | ||
primary_key: opts.primary_key, | ||
columns: opts.columns, | ||
association_columns: opts[:association_columns] || [], | ||
label: "#{pg_schema_name}|#{opts.table_name}" | ||
} | ||
end | ||
end | ||
|
||
def validate({schema_definition, update_type}) do | ||
validate({schema_definition, update_type, []}) | ||
end | ||
|
||
def validate({schema_definition, update_type, opts}) do | ||
with {:ok, schema_definition} <- validate_schema_definition(schema_definition, opts[:label]), | ||
{:ok, update_type} <- validate_update_type(update_type), | ||
{:ok, opts} <- validate_opts(opts, schema_definition, update_type) do | ||
{:ok, {schema_definition, update_type, opts}} | ||
end | ||
end | ||
|
||
def validate(other) do | ||
{:error, | ||
"should be either `{schema_definition, update_type}` or `{schema_definition, update_type, opts}`. Got: #{inspect(other)}"} | ||
end | ||
|
||
def validate_schema_definition(schema_mod, _label_opt) when is_atom(schema_mod) do | ||
if EctoWatch.Helpers.ecto_schema_mod?(schema_mod) do | ||
{:ok, schema_mod} | ||
else | ||
{:error, "Expected atom to be an Ecto schema module. Got: #{inspect(schema_mod)}"} | ||
end | ||
end | ||
|
||
# FIXME: TESTS! | ||
def validate_schema_definition(opts, label_opt) when is_map(opts) do | ||
schema = [ | ||
pg_schema_name: [ | ||
type: :string, | ||
required: false, | ||
default: "public" | ||
], | ||
table_name: [ | ||
type: :string, | ||
required: true | ||
], | ||
primary_key: [ | ||
type: :string, | ||
required: false, | ||
default: "id" | ||
], | ||
columns: [ | ||
type: {:list, :string}, | ||
required: false, | ||
default: [] | ||
], | ||
association_columns: [ | ||
type: {:list, :string}, | ||
required: false, | ||
default: [] | ||
] | ||
] | ||
|
||
if label_opt do | ||
with {:error, error} <- NimbleOptions.validate(opts, NimbleOptions.new!(schema)) do | ||
{:error, Exception.message(error)} | ||
end | ||
else | ||
{:error, "Label must be used when passing in a map for schema_definition"} | ||
end | ||
end | ||
|
||
def validate_schema_definition(_, _), do: {:error, "should be an ecto schema module name"} | ||
|
||
def validate_update_type(update_type) do | ||
if update_type in ~w[inserted updated deleted]a do | ||
{:ok, update_type} | ||
else | ||
{:error, "update_type was not one of :inserted, :updated, or :deleted"} | ||
end | ||
end | ||
|
||
def validate_opts(opts, schema_definition, update_type) do | ||
schema_definition = SchemaDefinition.new(schema_definition) | ||
|
||
schema = [ | ||
label: [ | ||
type: :atom, | ||
required: false | ||
], | ||
trigger_columns: [ | ||
type: | ||
{:custom, __MODULE__, :validate_trigger_columns, | ||
[opts[:label], schema_definition, update_type]}, | ||
required: false | ||
], | ||
extra_columns: [ | ||
type: {:custom, __MODULE__, :validate_columns, [schema_definition]}, | ||
required: false | ||
] | ||
] | ||
|
||
with {:error, error} <- NimbleOptions.validate(opts, schema) do | ||
{:error, Exception.message(error)} | ||
end | ||
end | ||
|
||
def validate_trigger_columns(columns, label, schema_definition, update_type) do | ||
cond do | ||
update_type != :updated -> | ||
{:error, "Cannot listen to trigger_columns for `#{update_type}` events."} | ||
|
||
label == nil -> | ||
{:error, "Label must be used when trigger_columns are specified."} | ||
|
||
true -> | ||
validate_columns(columns, schema_definition) | ||
end | ||
end | ||
|
||
def validate_columns([], _schema_mod), | ||
do: {:error, "List must not be empty"} | ||
|
||
def validate_columns(columns, schema_definition) do | ||
Enum.reject(columns, &(&1 in schema_definition.columns)) | ||
|> case do | ||
[] -> | ||
{:ok, columns} | ||
|
||
extra_fields -> | ||
{:error, | ||
"Invalid columns: #{inspect(extra_fields)} (expected to be in #{inspect(schema_definition.columns)})"} | ||
end | ||
end | ||
|
||
def new({schema_definition, update_type}) do | ||
new({schema_definition, update_type, []}) | ||
end | ||
|
||
def new({schema_definition, update_type, opts}) do | ||
schema_definition = SchemaDefinition.new(schema_definition) | ||
|
||
%__MODULE__{ | ||
schema_definition: schema_definition, | ||
update_type: update_type, | ||
label: opts[:label], | ||
trigger_columns: opts[:trigger_columns], | ||
extra_columns: opts[:extra_columns] || [] | ||
} | ||
end | ||
end |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.