Skip to content

Commit

Permalink
add GenServer which handles job processing
Browse files Browse the repository at this point in the history
  • Loading branch information
electronicbites committed Sep 1, 2024
1 parent bd04d74 commit d8cd54c
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 2 deletions.
6 changes: 5 additions & 1 deletion lib/radiator/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Radiator.Application do

alias Radiator.Outline.CommandProcessor
alias Radiator.Outline.EventProducer
alias Radiator.Outline.NodeChangeListener

@impl true
def start(_type, _args) do
Expand All @@ -22,7 +23,10 @@ defmodule Radiator.Application do
# Start to serve requests, typically the last entry
RadiatorWeb.Endpoint,
{EventProducer, name: EventProducer},
{CommandProcessor, name: CommandProcessor, subscribe_to: [{EventProducer, max_demand: 1}]}
{CommandProcessor, name: CommandProcessor, subscribe_to: [{EventProducer, max_demand: 1}]},
{NodeChangeListener, name: NodeChangeListener},
{DynamicSupervisor, strategy: :one_for_one, name: Radiator.JobRunner}
# {Workers, name: Workers}
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
67 changes: 67 additions & 0 deletions lib/radiator/job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# test with
# GenServer.start(Radiator.Job, work: fn -> Process.sleep(5000);{:ok,[]} end)
defmodule Radiator.Job do
@moduledoc """
WIP: Job module to handle work in a GenServer
idea taken from https://pragprog.com/titles/sgdpelixir/concurrent-data-processing-in-elixir/
"""
use GenServer, restart: :transient
require Logger

defstruct [:work, :id, :max_retries, retries: 0, status: "new"]

def start_link(args) do
GenServer.start_link(__MODULE__, args)
end

def init(args) do
work = Keyword.fetch!(args, :work)
id = Keyword.get(args, :id, random_job_id())
max_retries = Keyword.get(args, :max_retries, 3)

state = %Radiator.Job{id: id, work: work, max_retries: max_retries}
{:ok, state, {:continue, :run}}
end

def handle_continue(:run, state) do
new_state = state.work.() |> handle_job_result(state)

if new_state.status == "errored" do
Process.send_after(self(), :retry, 5000)
{:noreply, new_state}
else
Logger.info("Job exiting #{state.id}")
{:stop, :normal, new_state}
end
end

def handle_info(:retry, state) do
# Delegate work to the `handle_continue/2` callback.
{:noreply, state, {:continue, :run}}
end

defp handle_job_result({:ok, _data}, state) do
Logger.info("Job completed #{state.id}")
%Radiator.Job{state | status: "done"}
end

defp handle_job_result(:error, %{status: "new"} = state) do
Logger.warning("Job errored #{state.id}")
%Radiator.Job{state | status: "errored"}
end

defp handle_job_result(:error, %{status: "errored"} = state) do
Logger.warning("Job retry failed #{state.id}")
new_state = %Radiator.Job{state | retries: state.retries + 1}

if new_state.retries == state.max_retries do
%Radiator.Job{new_state | status: "failed"}
else
new_state
end
end

defp random_job_id do
:crypto.strong_rand_bytes(5) |> Base.url_encode64(padding: false)
end
end
33 changes: 33 additions & 0 deletions lib/radiator/outline/node_change_listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Radiator.Outline.NodeChangeListener do
@moduledoc """
Genserver that listens to change events and starts jobs
It is an eventconsumer that listens to changes in the outline and starts workers
"""
use GenServer
alias Radiator.Outline.Dispatch

def start_link(_) do
GenServer.start_link(__MODULE__, :ok, [])
end

def init(_) do
Dispatch.subscribe()
{:ok, []}
end

def handle_info(%Radiator.Outline.Event.NodeContentChangedEvent{} = _event, state) do
{:noreply, state}
end

def handle_info(%Radiator.Outline.Event.NodeInsertedEvent{} = _event, state) do
{:noreply, state}
end

def handle_info(%Radiator.Outline.Event.NodeMovedEvent{} = _event, state) do
{:noreply, state}
end

def handle_info(%Radiator.Outline.Event.NodeDeletedEvent{} = _event, state) do
{:noreply, state}
end
end
12 changes: 12 additions & 0 deletions lib/radiator/url_extractor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ defmodule Radiator.UrlExtractor do
"""
@url_regex ~r/((([A-Za-z]{3,9}:(?:\/\/)?)(?:[-;:&=\+\$,\w]+@)?[A-Za-z0-9.-]+|(?:www.|[-;:&=\+\$,\w]+@)[A-Za-z0-9.-]+)((?:\/[\+~%\/.\w-_]*)?\??(?:[-\+=&;%@.\w_]*)#?(?:[\w]*))?)/

def extract_urls(text) do
text
|> extract_url_positions
|> Enum.map(fn {start_bytes, size_bytes} ->
%{
start_bytes: start_bytes,
size_bytes: size_bytes,
url: String.byte_slice(text, start_bytes, size_bytes)
}
end)
end

# should return two URLs that we can parse/scrape later
# @return [{Integer.t, Integer.t}] list of positions of URLs in the text
def extract_url_positions(text) do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Radiator.MixProject do
def application do
[
mod: {Radiator.Application, []},
extra_applications: [:logger, :runtime_tools]
extra_applications: [:logger, :runtime_tools, :crypto]
]
end

Expand Down

0 comments on commit d8cd54c

Please sign in to comment.