diff --git a/lib/radiator/application.ex b/lib/radiator/application.ex index c930803f..77b4712f 100644 --- a/lib/radiator/application.ex +++ b/lib/radiator/application.ex @@ -7,6 +7,7 @@ defmodule Radiator.Application do alias Radiator.Outline.CommandProcessor alias Radiator.Outline.EventProducer + alias Radiator.Outline.NodeChangeListener @impl true def start(_type, _args) do @@ -22,7 +23,10 @@ defmodule Radiator.Application do # Start to serve requests, typically the last entry RadiatorWeb.Endpoint, {EventProducer, name: EventProducer}, - {CommandProcessor, name: CommandProcessor, subscribe_to: [{EventProducer, max_demand: 1}]} + {CommandProcessor, name: CommandProcessor, subscribe_to: [{EventProducer, max_demand: 1}]}, + {NodeChangeListener, name: NodeChangeListener}, + {DynamicSupervisor, strategy: :one_for_one, name: Radiator.JobRunner} + # {Workers, name: Workers} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/radiator/job.ex b/lib/radiator/job.ex new file mode 100644 index 00000000..31a2f646 --- /dev/null +++ b/lib/radiator/job.ex @@ -0,0 +1,67 @@ +# test with +# GenServer.start(Radiator.Job, work: fn -> Process.sleep(5000);{:ok,[]} end) +defmodule Radiator.Job do + @moduledoc """ + WIP: Job module to handle work in a GenServer + idea taken from https://pragprog.com/titles/sgdpelixir/concurrent-data-processing-in-elixir/ + """ + use GenServer, restart: :transient + require Logger + + defstruct [:work, :id, :max_retries, retries: 0, status: "new"] + + def start_link(args) do + GenServer.start_link(__MODULE__, args) + end + + def init(args) do + work = Keyword.fetch!(args, :work) + id = Keyword.get(args, :id, random_job_id()) + max_retries = Keyword.get(args, :max_retries, 3) + + state = %Radiator.Job{id: id, work: work, max_retries: max_retries} + {:ok, state, {:continue, :run}} + end + + def handle_continue(:run, state) do + new_state = state.work.() |> handle_job_result(state) + + if new_state.status == "errored" do + Process.send_after(self(), :retry, 5000) + {:noreply, new_state} + else + Logger.info("Job exiting #{state.id}") + {:stop, :normal, new_state} + end + end + + def handle_info(:retry, state) do + # Delegate work to the `handle_continue/2` callback. + {:noreply, state, {:continue, :run}} + end + + defp handle_job_result({:ok, _data}, state) do + Logger.info("Job completed #{state.id}") + %Radiator.Job{state | status: "done"} + end + + defp handle_job_result(:error, %{status: "new"} = state) do + Logger.warning("Job errored #{state.id}") + %Radiator.Job{state | status: "errored"} + end + + defp handle_job_result(:error, %{status: "errored"} = state) do + Logger.warning("Job retry failed #{state.id}") + new_state = %Radiator.Job{state | retries: state.retries + 1} + + if new_state.retries == state.max_retries do + %Radiator.Job{new_state | status: "failed"} + else + new_state + end + end + + defp random_job_id do + :crypto.strong_rand_bytes(5) |> Base.url_encode64(padding: false) + end +end diff --git a/lib/radiator/outline/node_change_listener.ex b/lib/radiator/outline/node_change_listener.ex new file mode 100644 index 00000000..bf159821 --- /dev/null +++ b/lib/radiator/outline/node_change_listener.ex @@ -0,0 +1,33 @@ +defmodule Radiator.Outline.NodeChangeListener do + @moduledoc """ + Genserver that listens to change events and starts jobs + It is an eventconsumer that listens to changes in the outline and starts workers + """ + use GenServer + alias Radiator.Outline.Dispatch + + def start_link(_) do + GenServer.start_link(__MODULE__, :ok, []) + end + + def init(_) do + Dispatch.subscribe() + {:ok, []} + end + + def handle_info(%Radiator.Outline.Event.NodeContentChangedEvent{} = _event, state) do + {:noreply, state} + end + + def handle_info(%Radiator.Outline.Event.NodeInsertedEvent{} = _event, state) do + {:noreply, state} + end + + def handle_info(%Radiator.Outline.Event.NodeMovedEvent{} = _event, state) do + {:noreply, state} + end + + def handle_info(%Radiator.Outline.Event.NodeDeletedEvent{} = _event, state) do + {:noreply, state} + end +end diff --git a/lib/radiator/url_extractor.ex b/lib/radiator/url_extractor.ex index 3bfb6128..418ab405 100644 --- a/lib/radiator/url_extractor.ex +++ b/lib/radiator/url_extractor.ex @@ -4,6 +4,18 @@ defmodule Radiator.UrlExtractor do """ @url_regex ~r/((([A-Za-z]{3,9}:(?:\/\/)?)(?:[-;:&=\+\$,\w]+@)?[A-Za-z0-9.-]+|(?:www.|[-;:&=\+\$,\w]+@)[A-Za-z0-9.-]+)((?:\/[\+~%\/.\w-_]*)?\??(?:[-\+=&;%@.\w_]*)#?(?:[\w]*))?)/ + def extract_urls(text) do + text + |> extract_url_positions + |> Enum.map(fn {start_bytes, size_bytes} -> + %{ + start_bytes: start_bytes, + size_bytes: size_bytes, + url: String.byte_slice(text, start_bytes, size_bytes) + } + end) + end + # should return two URLs that we can parse/scrape later # @return [{Integer.t, Integer.t}] list of positions of URLs in the text def extract_url_positions(text) do diff --git a/mix.exs b/mix.exs index eeff958f..79376c92 100644 --- a/mix.exs +++ b/mix.exs @@ -19,7 +19,7 @@ defmodule Radiator.MixProject do def application do [ mod: {Radiator.Application, []}, - extra_applications: [:logger, :runtime_tools] + extra_applications: [:logger, :runtime_tools, :crypto] ] end