Skip to content

Commit

Permalink
change: Don't require specifying update type for watchers with labels
Browse files Browse the repository at this point in the history
  • Loading branch information
cheerfulstoic committed Aug 6, 2024
1 parent e9824a0 commit 9626a49
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 144 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.8.0] - 2023-08-01

### Changed

- BREAKING: Don't require specifying update type for watchers with labels (#19)

### Added

- Allow watchers without an ecto schema (thanks @frerich / #18)

## [0.7.0] - 2023-07-31

### Changed
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ You can also setup the database to trigger only on specific column changes on `:
]}

# subscribing
EctoWatch.subscribe(:user_contact_info_updated, :updated)
EctoWatch.subscribe(:user_contact_info_updated)
# or...
EctoWatch.subscribe(:user_contact_info_updated, :updated, package.id)
EctoWatch.subscribe(:user_contact_info_updated, package.id)

# handling messages
def handle_info({:updated, :user_contact_info_updated, %{id: id}}, socket) do
def handle_info({:user_contact_info_updated, %{id: id}}, socket) do
```

A label is required for two reasons:
Expand All @@ -137,12 +137,12 @@ You can also use labels in general without tracking specific columns:
]}

# subscribing
EctoWatch.subscribe(:user_update, :updated)
EctoWatch.subscribe(:user_update)
# or...
EctoWatch.subscribe(:user_update, :updated, package.id)
EctoWatch.subscribe(:user_update, package.id)

# handling messages
def handle_info({:updated, :user_update, %{id: id}}, socket) do
def handle_info({:user_update, %{id: id}}, socket) do
```

## Getting additional values
Expand Down
95 changes: 72 additions & 23 deletions lib/ecto_watch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,95 @@ defmodule EctoWatch do
@moduledoc false

alias EctoWatch.WatcherServer
alias EctoWatch.Helpers

use Supervisor

def subscribe(schema_mod_or_label, update_type, id \\ nil) do
if !Process.whereis(__MODULE__) do
raise "EctoWatch is not running. Please start it by adding it to your supervision tree or using EctoWatch.start_link/1"
def subscribe(schema_mod_or_label, update_type, id) when is_atom(schema_mod_or_label) do
if Helpers.ecto_schema_mod?(schema_mod_or_label) do
raise ArgumentError,
"""
This way of subscribing was removed in version 0.8.0. Instead call:
subscribe({#{inspect(schema_mod_or_label)}, #{inspect(update_type)}}, #{id})
See the updated documentation for subscribing."
"""
else
raise ArgumentError,
"""
This way of subscribing was removed in version 0.8.0. Instead call:
subscribe(#{inspect(schema_mod_or_label)}, #{id})
See the updated documentation for subscribing."
"""
end
end

def subscribe(identifier, id \\ nil) do
if is_atom(identifier) && id in ~w[inserted updated deleted]a do
if Helpers.ecto_schema_mod?(identifier) do
raise ArgumentError,
"""
This way of subscribing was removed in version 0.8.0. Instead call:
subscribe({#{inspect(identifier)}, #{inspect(id)}})
See the updated documentation for subscribing."
"""
else
raise ArgumentError,
"""
This way of subscribing was removed in version 0.8.0. Instead call:
subscribe(#{inspect(identifier)})
See the updated documentation for subscribing."
"""
end
end

with :ok <- check_update_args(update_type, id),
validate_ecto_watch_running!()

with :ok <- validate_identifier(identifier),
{:ok, {pub_sub_mod, channel_name}} <-
WatcherServer.pub_sub_subscription_details(schema_mod_or_label, update_type, id) do
WatcherServer.pub_sub_subscription_details(identifier, id) do
Phoenix.PubSub.subscribe(pub_sub_mod, channel_name)
else
{:error, error} ->
raise ArgumentError, error
end
end

def check_update_args(update_type, id) do
case {update_type, id} do
{:inserted, _} ->
:ok
defp validate_identifier({schema_mod, update_type})
when is_atom(schema_mod) and is_atom(update_type) do
cond do
!EctoWatch.Helpers.ecto_schema_mod?(schema_mod) ->
raise ArgumentError,
"Expected atom to be an Ecto schema module. Got: #{inspect(schema_mod)}"

{:updated, _} ->
:ok
update_type not in ~w[inserted updated deleted]a ->
raise ArgumentError,
"Unexpected update_type: #{inspect(update_type)}. Expected :inserted, :updated, or :deleted"

{:deleted, _} ->
true ->
:ok
end
end

{other, _} ->
raise ArgumentError,
"Unexpected update_type: #{inspect(other)}. Expected :inserted, :updated, or :deleted"
defp validate_identifier(label) when is_atom(label) do
:ok
end

defp validate_identifier(other) do
raise ArgumentError,
"Invalid subscription (expected either `{schema_module, :inserted | :updated | :deleted}` or a label): #{inspect(other)}"
end

defp validate_ecto_watch_running! do
if !Process.whereis(__MODULE__) do
raise "EctoWatch is not running. Please start it by adding it to your supervision tree or using EctoWatch.start_link/1"
end
end

Expand Down Expand Up @@ -63,14 +120,6 @@ defmodule EctoWatch do
{EctoWatch.WatcherSupervisor, options}
]

# children = children ++
# Enum.map(options.watchers, fn watcher_options ->
# %{
# id: WatcherServer.name(watcher_options),
# start: {WatcherServer, :start_link, [{options.repo_mod, options.pub_sub_mod, watcher_options}]}
# }
# end)

Supervisor.init(children, strategy: :rest_for_one)
end
end
60 changes: 37 additions & 23 deletions lib/ecto_watch/watcher_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ defmodule EctoWatch.WatcherServer do

use GenServer

def pub_sub_subscription_details(schema_mod_or_label, update_type, identifier_value) do
name = unique_label(schema_mod_or_label, update_type)

if Process.whereis(name) do
GenServer.call(
name,
{:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier_value}
)
else
{:error, "No watcher found for #{inspect(schema_mod_or_label)} / #{inspect(update_type)}"}
def pub_sub_subscription_details(identifier, identifier_value) do
with {:ok, pid} <- find(identifier) do
GenServer.call(pid, {:pub_sub_subscription_details, identifier, identifier_value})
end
end

defp find(identifier) do
name = unique_label(identifier)

case Process.whereis(name) do
nil -> {:error, "No watcher found for #{inspect(identifier)}"}
pid -> {:ok, pid}
end
end

Expand Down Expand Up @@ -108,7 +110,7 @@ defmodule EctoWatch.WatcherServer do
end

def handle_call(
{:pub_sub_subscription_details, schema_mod_or_label, update_type, identifier_value},
{:pub_sub_subscription_details, identifier, identifier_value},
_from,
state
) do
Expand All @@ -125,14 +127,12 @@ defmodule EctoWatch.WatcherServer do
end

result =
with :ok <- validate_subscription(state, update_type, column) do
unique_label = unique_label(schema_mod_or_label, update_type)

with :ok <- validate_subscription(state, identifier, column) do
channel_name =
if column && value do
"#{unique_label}|#{column}|#{value}"
"#{state.unique_label}|#{column}|#{value}"
else
"#{unique_label}"
"#{state.unique_label}"
end

{:ok, {state.pub_sub_mod, channel_name}}
Expand All @@ -141,9 +141,9 @@ defmodule EctoWatch.WatcherServer do
{:reply, result, state}
end

defp validate_subscription(state, update_type, column) do
defp validate_subscription(state, identifier, column) do
cond do
update_type == :inserted && column == state.options.schema_definition.primary_key ->
match?({_, :inserted}, identifier) && column == state.options.schema_definition.primary_key ->
{:error, "Cannot subscribe to primary_key for inserted records"}

column && not MapSet.member?(state.identifier_columns, column) ->
Expand All @@ -170,7 +170,13 @@ defmodule EctoWatch.WatcherServer do
type = String.to_existing_atom(type)

message =
{type, state.options.label || state.options.schema_definition.label, returned_values}
case state.options.label do
nil ->
{{state.options.label || state.options.schema_definition.label, type}, returned_values}

label ->
{label, returned_values}
end

for topic <-
topics(
Expand Down Expand Up @@ -202,11 +208,19 @@ defmodule EctoWatch.WatcherServer do
# To make things simple: generate a single string which is unique for each watcher
# that can be used as the watcher process name, trigger name, trigger function name,
# and Phoenix.PubSub channel name.
def unique_label(%WatcherOptions{} = options) do
:"ew_#{options.update_type}_for_#{Helpers.label(options.label || options.schema_definition.label)}"
defp unique_label(%WatcherOptions{} = options) do
if options.label do
unique_label(options.label)
else
unique_label({options.schema_definition.label, options.update_type})
end
end

defp unique_label({schema_mod, update_type}) do
:"ew_#{update_type}_for_#{Helpers.label(schema_mod)}"
end

defp unique_label(schema_mod_or_label, update_type) do
:"ew_#{update_type}_for_#{Helpers.label(schema_mod_or_label)}"
defp unique_label(label) do
:"ew_for_#{Helpers.label(label)}"
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule EctoWatch.MixProject do
def project do
[
app: :ecto_watch,
version: "0.7.0",
version: "0.8.0",
elixir: "~> 1.10",
description:
"EctoWatch allows you to easily get Phoenix.PubSub notifications directly from postgresql.",
Expand Down
Loading

0 comments on commit 9626a49

Please sign in to comment.