Skip to content

Commit

Permalink
support communication with both SCU types (#751)
Browse files Browse the repository at this point in the history
  • Loading branch information
panentheos authored May 14, 2024
1 parent 6b3c844 commit f4d6c15
Show file tree
Hide file tree
Showing 18 changed files with 169 additions and 84 deletions.
1 change: 0 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ config :realtime_signs,
http_client: HTTPoison,
posts_log_dir: "log/posts/",
time_zone: "America/New_York",
sign_updater_mod: MessageQueue,
http_poster_mod: HTTPoison,
scheduled_headway_requester: Headway.Request,
external_config_getter: ExternalConfig.Local,
Expand Down
13 changes: 13 additions & 0 deletions lib/engine/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Engine.Config do
@table_signs :config_engine_signs
@table_headways :config_engine_headways
@table_chelsea_bridge :config_engine_chelsea_bridge
@table_scus_migrated :config_engine_scus_migrated

def start_link([]) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
Expand All @@ -44,6 +45,14 @@ defmodule Engine.Config do
Headways.get_headway(table_name, {headway_group, time_period})
end

@impl true
def scu_migrated?(table_name \\ @table_scus_migrated, scu_id) do
case :ets.lookup(table_name, scu_id) do
[{^scu_id, value}] -> value
_ -> false
end
end

@spec chelsea_bridge_config(:ets.tab()) :: :off | :auto
def chelsea_bridge_config(table_name \\ @table_chelsea_bridge) do
case :ets.lookup(table_name, :status) do
Expand All @@ -62,6 +71,7 @@ defmodule Engine.Config do
table_name_signs: @table_signs,
table_name_headways: @table_headways,
table_name_chelsea_bridge: @table_chelsea_bridge,
table_name_scus_migrated: @table_scus_migrated,
current_version: nil,
time_fetcher: opts[:time_fetcher] || fn -> DateTime.utc_now() end
}
Expand All @@ -77,6 +87,7 @@ defmodule Engine.Config do
def create_tables(state) do
:ets.new(state.table_name_signs, [:set, :protected, :named_table, read_concurrency: true])
Headways.create_table(state.table_name_headways)
:ets.new(state.table_name_scus_migrated, [:named_table, read_concurrency: true])

:ets.new(state.table_name_chelsea_bridge, [
:set,
Expand Down Expand Up @@ -105,11 +116,13 @@ defmodule Engine.Config do
|> Map.get("configured_headways", %{})
|> Headways.parse()

scus_migrated = Map.get(config, "scus_migrated", %{})
config_chelsea_bridge = Map.get(config, "chelsea_bridge_announcements", "auto")

:ets.insert(state.table_name_signs, Enum.into(config_signs, []))
:ok = Headways.update_table(state.table_name_headways, config_headways)
:ets.insert(state.table_name_chelsea_bridge, {:status, config_chelsea_bridge})
:ets.insert(state.table_name_scus_migrated, Map.to_list(scus_migrated))

version

Expand Down
1 change: 1 addition & 0 deletions lib/engine/config_api.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Engine.ConfigAPI do
@callback sign_config(String.t()) :: Engine.Config.sign_config()
@callback headway_config(String.t(), DateTime.t()) :: Engine.Config.Headway.t() | nil
@callback scu_migrated?(String.t()) :: boolean()
end
24 changes: 19 additions & 5 deletions lib/message_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ defmodule MessageQueue do
message to accommodate the new one.
"""

@behaviour PaEss.Updater

@type message :: {
:update_sign | :send_audio,
[term()]
Expand All @@ -34,15 +32,31 @@ defmodule MessageQueue do
{:ok, %{queue: :queue.new(), length: 0}}
end

@impl PaEss.Updater
@spec update_sign(
GenServer.server(),
PaEss.text_id(),
Content.Message.value(),
Content.Message.value(),
integer(),
integer() | :now,
String.t()
) :: :ok
def update_sign(pid \\ __MODULE__, text_id, top_line, bottom_line, duration, start, sign_id) do
GenServer.call(
pid,
{:queue_update, {:update_sign, [text_id, top_line, bottom_line, duration, start, sign_id]}}
)
end

@impl PaEss.Updater
@spec send_audio(
GenServer.server(),
PaEss.audio_id(),
[Content.Audio.value()],
integer(),
integer(),
String.t(),
[keyword()]
) :: :ok
def send_audio(pid \\ __MODULE__, audio_id, audios, priority, timeout, sign_id, extra_logs) do
GenServer.call(
pid,
Expand Down Expand Up @@ -72,7 +86,7 @@ defmodule MessageQueue do

queue = :queue.in(msg, queue)

{:reply, {:ok, :sent}, %{state | queue: queue, length: length + 1}}
{:reply, :ok, %{state | queue: queue, length: length + 1}}
end

def handle_call(:get_message, _from, state) do
Expand Down
54 changes: 40 additions & 14 deletions lib/pa_ess/updater.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,43 @@
defmodule PaEss.Updater do
@callback update_sign(PaEss.text_id(), top_line, bottom_line, duration, start, sign_id) ::
{:ok, :sent} | {:error, :bad_status} | {:error, :post_error}
when top_line: Content.Message.t(),
bottom_line: Content.Message.t(),
duration: integer(),
start: integer() | :now,
sign_id: String.t()
@behaviour PaEss.UpdaterAPI

@callback send_audio(PaEss.audio_id(), audios, priority, timeout, sign_id, extra_logs) ::
{:ok, :sent} | {:error, any()}
when priority: integer(),
timeout: integer(),
audios: [Content.Audio.t()],
sign_id: String.t(),
extra_logs: list
require Logger

@impl true
def set_background_message(
%{
id: id,
scu_id: scu_id,
pa_ess_loc: pa_ess_loc,
text_zone: text_zone,
config_engine: config_engine
},
top,
bottom
) do
if config_engine.scu_migrated?(scu_id) do
Logger.error("Error sending to new SCU, not implemented")
else
MessageQueue.update_sign({pa_ess_loc, text_zone}, top, bottom, 180, :now, id)
end
end

@impl true
def play_message(
%{
id: id,
scu_id: scu_id,
pa_ess_loc: pa_ess_loc,
audio_zones: audio_zones,
config_engine: config_engine
},
audios,
extra_logs
) do
if config_engine.scu_migrated?(scu_id) do
Logger.error("Error sending to new SCU, not implemented")
else
MessageQueue.send_audio({pa_ess_loc, audio_zones}, audios, 5, 60, id, extra_logs)
end
end
end
10 changes: 10 additions & 0 deletions lib/pa_ess/updater_api.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule PaEss.UpdaterAPI do
@callback set_background_message(
Signs.Realtime.t() | Signs.Bus.t(),
Content.Message.value(),
Content.Message.value()
) :: :ok

@callback play_message(Signs.Realtime.t() | Signs.Bus.t(), [Content.Audio.value()], [keyword()]) ::
:ok
end
45 changes: 34 additions & 11 deletions lib/signs/bus.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Signs.Bus do
@enforce_keys [
:id,
:pa_ess_loc,
:scu_id,
:text_zone,
:audio_zones,
:max_minutes,
Expand All @@ -37,10 +38,38 @@ defmodule Signs.Bus do
]
defstruct @enforce_keys

@type t :: %__MODULE__{
id: String.t(),
pa_ess_loc: String.t(),
scu_id: String.t(),
text_zone: String.t(),
audio_zones: [String.t()],
max_minutes: integer(),
configs: list(),
top_configs: list(),
bottom_configs: list(),
extra_audio_configs: list(),
chelsea_bridge: String.t() | nil,
read_loop_interval: integer(),
read_loop_offset: integer(),
config_engine: module(),
prediction_engine: module(),
bridge_engine: module(),
alerts_engine: module(),
routes_engine: module(),
sign_updater: module(),
prev_predictions: list(),
prev_bridge_status: nil | map(),
current_messages: tuple(),
last_update: nil | DateTime.t(),
last_read_time: DateTime.t()
}

def start_link(sign, opts \\ []) do
state = %__MODULE__{
id: Map.fetch!(sign, "id"),
pa_ess_loc: Map.fetch!(sign, "pa_ess_loc"),
scu_id: Map.fetch!(sign, "scu_id"),
text_zone: Map.fetch!(sign, "text_zone"),
audio_zones: Map.fetch!(sign, "audio_zones"),
max_minutes: Map.fetch!(sign, "max_minutes"),
Expand All @@ -56,7 +85,7 @@ defmodule Signs.Bus do
bridge_engine: opts[:bridge_engine] || Engine.ChelseaBridge,
alerts_engine: opts[:alerts_engine] || Engine.Alerts,
routes_engine: Engine.Routes,
sign_updater: opts[:sign_updater] || MessageQueue,
sign_updater: PaEss.Updater,
prev_predictions: [],
prev_bridge_status: nil,
current_messages: {nil, nil},
Expand Down Expand Up @@ -98,14 +127,11 @@ defmodule Signs.Bus do

%__MODULE__{
id: id,
pa_ess_loc: pa_ess_loc,
text_zone: text_zone,
configs: configs,
config_engine: config_engine,
prediction_engine: prediction_engine,
bridge_engine: bridge_engine,
alerts_engine: alerts_engine,
sign_updater: sign_updater,
prev_predictions: prev_predictions
} = state

Expand Down Expand Up @@ -191,7 +217,7 @@ defmodule Signs.Bus do
state
|> then(fn state ->
if should_update?({top, bottom}, current_time, state) do
sign_updater.update_sign({pa_ess_loc, text_zone}, top, bottom, 180, :now, state.id)
state.sign_updater.set_background_message(state, top, bottom)
%{state | current_messages: {top, bottom}, last_update: current_time}
else
state
Expand Down Expand Up @@ -803,15 +829,12 @@ defmodule Signs.Bus do
end

defp send_audio(audios, state) do
%{pa_ess_loc: pa_ess_loc, audio_zones: audio_zones, sign_updater: sign_updater} = state
%{audio_zones: audio_zones, sign_updater: sign_updater} = state

if audios != [] && audio_zones != [] do
sign_updater.send_audio(
{pa_ess_loc, audio_zones},
sign_updater.play_message(
state,
audios,
5,
180,
state.id,
Enum.map(audios, fn _ -> [message_type: "Bus"] end)
)
end
Expand Down
30 changes: 14 additions & 16 deletions lib/signs/realtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ defmodule Signs.Realtime do

@enforce_keys [
:id,
:text_id,
:audio_id,
:pa_ess_loc,
:scu_id,
:text_zone,
:audio_zones,
:source_config,
:current_content_top,
:current_content_bottom,
Expand Down Expand Up @@ -56,8 +58,10 @@ defmodule Signs.Realtime do

@type t :: %__MODULE__{
id: String.t(),
text_id: PaEss.text_id(),
audio_id: PaEss.audio_id(),
pa_ess_loc: String.t(),
scu_id: String.t(),
text_zone: String.t(),
audio_zones: [String.t()],
source_config: SourceConfig.config() | {SourceConfig.config(), SourceConfig.config()},
current_content_top: Content.Message.value(),
current_content_bottom: Content.Message.value(),
Expand Down Expand Up @@ -93,12 +97,13 @@ defmodule Signs.Realtime do
config_engine = opts[:config_engine] || Engine.Config
alerts_engine = opts[:alerts_engine] || Engine.Alerts
last_trip_engine = opts[:last_trip_engine] || Engine.LastTrip
sign_updater = opts[:sign_updater] || Application.get_env(:realtime_signs, :sign_updater_mod)

sign = %__MODULE__{
id: Map.fetch!(config, "id"),
text_id: {Map.fetch!(config, "pa_ess_loc"), Map.fetch!(config, "text_zone")},
audio_id: {Map.fetch!(config, "pa_ess_loc"), Map.fetch!(config, "audio_zones")},
pa_ess_loc: Map.fetch!(config, "pa_ess_loc"),
scu_id: Map.fetch!(config, "scu_id"),
text_zone: Map.fetch!(config, "text_zone"),
audio_zones: Map.fetch!(config, "audio_zones"),
source_config: source_config,
current_content_top: "",
current_content_bottom: "",
Expand All @@ -114,7 +119,7 @@ defmodule Signs.Realtime do
time_zone = Application.get_env(:realtime_signs, :time_zone)
DateTime.utc_now() |> DateTime.shift_zone!(time_zone)
end,
sign_updater: sign_updater,
sign_updater: PaEss.Updater,
last_update: nil,
tick_read: 240 + Map.fetch!(config, "read_loop_offset"),
read_period_seconds: 240,
Expand Down Expand Up @@ -262,14 +267,7 @@ defmodule Signs.Realtime do
Utilities.Predictions.get_passthrough_train_audio(predictions)
|> Enum.reduce(sign, fn audio, sign ->
if audio.trip_id not in sign.announced_passthroughs do
sign.sign_updater.send_audio(
sign.audio_id,
[Content.Audio.to_params(audio)],
5,
60,
sign.id,
[Utilities.Audio.audio_log_details(audio)]
)
Signs.Utilities.Audio.send_audio(sign, [audio])

update_in(sign.announced_passthroughs, fn list ->
Enum.take([audio.trip_id | list], @announced_history_length)
Expand Down
17 changes: 12 additions & 5 deletions lib/signs/utilities/audio.ex
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,17 @@ defmodule Signs.Utilities.Audio do
end)
end

def audio_log_details(audio) do
[
message_type: to_string(audio.__struct__) |> String.split(".") |> List.last(),
message_details: Map.from_struct(audio) |> inspect()
]
@spec send_audio(Signs.Realtime.t(), [Content.Audio.t()]) :: :ok
def send_audio(sign, audios) do
sign.sign_updater.play_message(
sign,
Enum.map(audios, &Content.Audio.to_params(&1)),
Enum.map(audios, fn audio ->
[
message_type: to_string(audio.__struct__) |> String.split(".") |> List.last(),
message_details: Map.from_struct(audio) |> inspect()
]
end)
)
end
end
2 changes: 1 addition & 1 deletion lib/signs/utilities/messages.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ defmodule Signs.Utilities.Messages do
Content.Message.Empty.new()
end

defp get_alert_messages(alert_status, %{text_id: {"GUNS", _}}) do
defp get_alert_messages(alert_status, %{pa_ess_loc: "GUNS"}) do
if alert_status in [:none, :alert_along_route],
do: nil,
else: {%Alert.NoService{}, %Alert.UseRoutes{}}
Expand Down
2 changes: 1 addition & 1 deletion lib/signs/utilities/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule Signs.Utilities.Predictions do
defp prediction_messages(
predictions,
%{sources: sources},
%{text_id: {station_code, zone}} = sign
%{pa_ess_loc: station_code, text_zone: zone} = sign
) do
predictions
|> Enum.filter(fn p ->
Expand Down
Loading

0 comments on commit f4d6c15

Please sign in to comment.