Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add update queues for SCUs #747

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ if config_env() != :test do
s3_path: System.get_env("SIGNS_S3_PATH"),
api_v3_url: System.get_env("API_V3_URL"),
api_v3_key: System.get_env("API_V3_KEY"),
scully_api_key: System.get_env("SCULLY_API_KEY"),
scu_ip_map: System.get_env("SCU_IP_MAP", "null") |> Jason.decode!(),
chelsea_bridge_url: System.get_env("CHELSEA_BRIDGE_URL"),
chelsea_bridge_auth: System.get_env("CHELSEA_BRIDGE_AUTH"),
filter_uncertain_predictions?: System.get_env("FILTER_UNCERTAIN_PREDICTIONS", "false") == "true",
Expand Down
30 changes: 30 additions & 0 deletions lib/pa_ess/scu_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule PaEss.ScuQueue do
use GenStage

def start_link(id) do
GenStage.start_link(__MODULE__, nil, name: stage_name(id))
end

def enqueue_message(id, message) do
GenStage.call(stage_name(id), {:enqueue_message, message})
end

@impl true
def init(_) do
{:producer, %{}}
end

@impl true
def handle_call({:enqueue_message, message}, _from, state) do
{:reply, :ok, [message], state}
end

@impl true
def handle_demand(_demand, state) do
{:noreply, [], state}
end

def stage_name(id) do
:"ScuQueue/#{id}"
end
end
92 changes: 92 additions & 0 deletions lib/pa_ess/scu_updater.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
defmodule PaEss.ScuUpdater do
use GenStage
require Logger

def start_link(id) do
GenStage.start_link(__MODULE__, id, name: :"ScuUpdater/#{id}")
end

@impl true
def init(id) do
{:consumer, %{}, subscribe_to: [{PaEss.ScuQueue.stage_name(id), []}]}
end

@impl true
def handle_events([{:message, scu_id, payload}], _from, state) do
body = Jason.encode!(payload)

if send_to_scu(scu_id, "/message", body) == :ok do
send_to_signs_ui(scu_id, "/message", body)
end

{:noreply, [], state}
end

def handle_events([{:background, scu_id, payload}], _from, state) do
body = Jason.encode!(payload)

if send_to_scu(scu_id, "/background", body) == :ok do
send_to_signs_ui(scu_id, "/background", body)
end

{:noreply, [], state}
end

defp send_to_scu(scu_id, path, body) do
http_poster = Application.get_env(:realtime_signs, :http_poster_mod)
scu_ip_map = Application.get_env(:realtime_signs, :scu_ip_map)
scully_api_key = Application.get_env(:realtime_signs, :scully_api_key)

if scu_ip_map do
http_poster.post(
"https://#{Map.fetch!(scu_ip_map, scu_id)}#{path}",
body,
[{"Content-type", "application/json"}, {"x-api-key", scully_api_key}],
hackney: [pool: :arinc_pool]
)
|> case do
{:ok, %HTTPoison.Response{status_code: status}} when status in 200..299 ->
:ok

{:ok, %HTTPoison.Response{status_code: status}} ->
Logger.warn("scu_error: status=#{inspect(status)}")
:error

{:error, %HTTPoison.Error{reason: reason}} ->
Logger.warn("scu_error: #{inspect(reason)}")
:error
end
else
:ok
end
end

defp send_to_signs_ui(scu_id, path, body) do
http_poster = Application.get_env(:realtime_signs, :http_poster_mod)
sign_ui_url = Application.get_env(:realtime_signs, :sign_ui_url)
sign_ui_api_key = Application.get_env(:realtime_signs, :sign_ui_api_key)

if sign_ui_url do
http_poster.post(
"https://#{sign_ui_url}#{path}",
body,
[
{"Content-type", "application/json"},
{"x-api-key", sign_ui_api_key},
{"x-scu-id", scu_id}
Copy link
Collaborator

@PaulJKim PaulJKim Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why do we need to send the scu_id to Signs UI as a header?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This endpoint doesn't exist yet, but Signs UI will need this in order to attribute the data to the right sign, since the payload itself is not sufficient to differentiate it globally.

],
hackney: [pool: :arinc_pool]
)
|> case do
{:ok, %HTTPoison.Response{status_code: status}} when status in 200..299 ->
nil

{:ok, %HTTPoison.Response{status_code: status}} ->
Logger.warn("signs_ui_error: status=#{inspect(status)}")

{:error, %HTTPoison.Error{reason: reason}} ->
Logger.warn("signs_ui_error: #{inspect(reason)}")
end
end
end
end
10 changes: 10 additions & 0 deletions lib/realtime_signs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ defmodule RealtimeSigns do

log_runtime_config()

scu_updater_children =
Signs.Utilities.SignsConfig.all_scu_ids()
|> Enum.flat_map(fn id ->
[
Supervisor.child_spec({PaEss.ScuQueue, id}, id: {PaEss.ScuQueue, id}),
Supervisor.child_spec({PaEss.ScuUpdater, id}, id: {PaEss.ScuUpdater, id})
]
end)

children =
[
:hackney_pool.child_spec(:default, []),
Expand All @@ -30,6 +39,7 @@ defmodule RealtimeSigns do
HeadwayAnalysis.Supervisor
] ++
http_updater_children() ++
scu_updater_children ++
[
Signs.Supervisor
]
Expand Down
7 changes: 7 additions & 0 deletions lib/signs/utilities/signs_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ defmodule Signs.Utilities.SignsConfig do
Enum.uniq(train_routes ++ bus_routes)
end

def all_scu_ids do
for %{"scu_id" => scu_id} <- children_config(),
uniq: true do
scu_id
end
end

@spec get_stop_ids_for_sign(map()) :: [String.t()]
def get_stop_ids_for_sign(sign) do
sign["source_config"]
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ defmodule RealtimeSigns.Mixfile do
{:ex_aws, "~> 2.0"},
{:lcov_ex, "~> 0.2", only: [:dev, :test], runtime: false},
{:hackney, "== 1.17.4"},
{:gen_stage, "~> 1.2"},
{:httpoison, "~> 1.0"},
{:jason, "~> 1.4.0"},
{:logger_splunk_backend, "~> 2.0"},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"ex_aws": {:hex, :ex_aws, "2.4.1", "d1dc8965d1dc1c939dd4570e37f9f1d21e047e4ecd4f9373dc89cd4e45dce5ef", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "803387db51b4e91be4bf0110ba999003ec6103de7028b808ee9b01f28dbb9eee"},
"ex_aws_s3": {:hex, :ex_aws_s3, "2.3.3", "61412e524616ea31d3f31675d8bc4c73f277e367dee0ae8245610446f9b778aa", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "0044f0b6f9ce925666021eafd630de64c2b3404d79c85245cc7c8a9a32d7f104"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"gen_stage": {:hex, :gen_stage, "1.1.2", "b1656cd4ba431ed02c5656fe10cb5423820847113a07218da68eae5d6a260c23", [:mix], [], "hexpm", "9e39af23140f704e2b07a3e29d8f05fd21c2aaf4088ff43cb82be4b9e3148d02"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"gen_state_machine": {:hex, :gen_state_machine, "2.1.0", "a38b0e53fad812d29ec149f0d354da5d1bc0d7222c3711f3a0bd5aa608b42992", [:mix], [], "hexpm", "ae367038808db25cee2f2c4b8d0531522ea587c4995eb6f96ee73410a60fa06b"},
"gettext": {:hex, :gettext, "0.20.0", "75ad71de05f2ef56991dbae224d35c68b098dd0e26918def5bb45591d5c8d429", [:mix], [], "hexpm", "1c03b177435e93a47441d7f681a7040bd2a816ece9e2666d1c9001035121eb3d"},
"hackney": {:hex, :hackney, "1.17.4", "99da4674592504d3fb0cfef0db84c3ba02b4508bae2dff8c0108baa0d6e0977c", [:rebar3], [{:certifi, "~> 2.6.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "de16ff4996556c8548d512f4dbe22dd58a587bf3332e7fd362430a7ef3986b16"},
Expand Down
Loading