Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable communication with new SCU app #760

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ if config_env() != :test do
api_v3_url: System.get_env("API_V3_URL"),
api_v3_key: System.get_env("API_V3_KEY"),
scully_api_key: System.get_env("SCULLY_API_KEY"),
watts_url: System.get_env("WATTS_URL"),
watts_api_key: System.get_env("WATTS_API_KEY"),
scu_ip_map: System.get_env("SCU_IP_MAP", "null") |> Jason.decode!(),
chelsea_bridge_url: System.get_env("CHELSEA_BRIDGE_URL"),
chelsea_bridge_auth: System.get_env("CHELSEA_BRIDGE_AUTH"),
Expand Down
3 changes: 2 additions & 1 deletion lib/content/audio.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ defprotocol Content.Audio do

@type language :: :english | :spanish
@type value :: canned_message() | ad_hoc_message() | nil
@type tts_value :: {audio :: String.t(), visual :: Content.Message.pages() | nil}

@doc "Converts an audio struct to the mid/vars params for the PA system"
@spec to_params(Content.Audio.t()) :: value()
def to_params(audio)
@spec to_tts(Content.Audio.t()) :: {String.t(), [{String.t(), String.t(), integer()}] | nil}
@spec to_tts(Content.Audio.t()) :: tts_value()
def to_tts(audio)
end
1 change: 1 addition & 0 deletions lib/content/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defprotocol Content.Message do
"""

@type value :: String.t() | [{String.t(), non_neg_integer()}]
@type pages :: [{top :: String.t(), bottom :: String.t(), duration :: integer()}]

@doc "converts a content message to a string for display on a sign"
@spec to_string(Content.Message.t()) :: value()
Expand Down
13 changes: 11 additions & 2 deletions lib/pa_ess/scu_updater.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ defmodule PaEss.ScuUpdater do
end

@impl true
def handle_events([{:message, scu_id, payload}], _from, state) do
def handle_events([{:message, scu_id, payload, logs}], _from, state) do
body = Jason.encode!(payload)
log("play_message", logs)

if send_to_scu(scu_id, "/message", body) == :ok do
send_to_signs_ui(scu_id, "/message", body)
Expand All @@ -22,8 +23,9 @@ defmodule PaEss.ScuUpdater do
{:noreply, [], state}
end

def handle_events([{:background, scu_id, payload}], _from, state) do
def handle_events([{:background, scu_id, payload, logs}], _from, state) do
body = Jason.encode!(payload)
log("set_background_message", logs)

if send_to_scu(scu_id, "/background", body) == :ok do
send_to_signs_ui(scu_id, "/background", body)
Expand Down Expand Up @@ -89,4 +91,11 @@ defmodule PaEss.ScuUpdater do
end
end
end

defp log(token, items) do
fields =
Enum.map([pid: inspect(self())] ++ items, fn {k, v} -> "#{k}=#{v}" end) |> Enum.join(" ")

Logger.info("#{token}: #{fields}")
end
end
84 changes: 82 additions & 2 deletions lib/pa_ess/updater.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,18 @@ defmodule PaEss.Updater do
bottom
) do
if config_engine.scu_migrated?(scu_id) do
Logger.error("Error sending to new SCU, not implemented")
pages = zip_pages(top, bottom)

PaEss.ScuQueue.enqueue_message(
scu_id,
{:background, scu_id,
%{
visual_zones: [text_zone],
visual_data: format_pages(pages),
expiration: 180,
tag: nil
}, [sign_id: id, visual: inspect(pages)]}
)
else
MessageQueue.update_sign({pa_ess_loc, text_zone}, top, bottom, 180, :now, id)
end
Expand All @@ -32,12 +43,81 @@ defmodule PaEss.Updater do
config_engine: config_engine
},
audios,
tts_audios,
extra_logs
) do
if config_engine.scu_migrated?(scu_id) do
Logger.error("Error sending to new SCU, not implemented")
Task.Supervisor.start_child(PaEss.TaskSupervisor, fn ->
files =
Enum.map(tts_audios, fn {text, _} ->
Task.async(fn -> fetch_tts(text) end)
end)
|> Task.await_many()

Enum.zip([files, tts_audios, extra_logs])
|> Enum.each(fn {file, {text, pages}, logs} ->
PaEss.ScuQueue.enqueue_message(
scu_id,
{:message, scu_id,
%{
visual_zones: audio_zones,
visual_data: format_pages(pages),
audio_zones: audio_zones,
audio_data: [Base.encode64(file)],
expiration: 30,
tag: nil
}, [sign_id: id, audio: inspect(text), visual: inspect(pages)] ++ logs}
)
end)
end)
else
MessageQueue.send_audio({pa_ess_loc, audio_zones}, audios, 5, 60, id, extra_logs)
end
end

defp zip_pages(top, bottom) do
max_length =
Enum.map([top, bottom], fn
str when is_binary(str) -> 1
list -> length(list)
end)
|> Enum.max()

Enum.map(0..(max_length - 1), fn i ->
[{top, top_duration}, {bottom, bottom_duration}] =
Enum.map([top, bottom], fn
str when is_binary(str) -> {str, 6}
list -> Enum.at(list, i, List.last(list))
end)

^top_duration = bottom_duration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will crash the process if the top and bottom durations don't match. Is that desired, or would we want to just default to the top duration and log an error instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I put that in there as an assertion to make sure they match. Looking at the code, I think everything is the same now (6 seconds), and the intention is to enforce that going forward. If this seems to harsh, though, we could log an error instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we loosely standardized on 6 seconds a while back so I guess this assertion would rarely, if ever, fail. But at least in concept, it does seem a bit harsh since every message gets funneled through this process which I think means a few consecutive crashes could be pretty disruptive.

{top, bottom, top_duration}
end)
end

defp format_pages(nil), do: nil

defp format_pages(pages) do
%{
pages:
Enum.map(pages, fn {top, bottom, duration} ->
%{top: top, bottom: bottom, duration: duration}
end)
}
end

defp fetch_tts(text) do
http_poster = Application.get_env(:realtime_signs, :http_poster_mod)
watts_url = Application.get_env(:realtime_signs, :watts_url)
watts_api_key = Application.get_env(:realtime_signs, :watts_api_key)

http_poster.post("#{watts_url}/tts", %{text: text, voice_id: "Matthew"} |> Jason.encode!(), [
{"Content-type", "application/json"},
{"x-api-key", watts_api_key}
])
|> case do
{:ok, %HTTPoison.Response{status_code: status, body: body}} when status in 200..299 ->
body
end
end
end
7 changes: 6 additions & 1 deletion lib/pa_ess/updater_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ defmodule PaEss.UpdaterAPI do
Content.Message.value()
) :: :ok

@callback play_message(Signs.Realtime.t() | Signs.Bus.t(), [Content.Audio.value()], [keyword()]) ::
@callback play_message(
Signs.Realtime.t() | Signs.Bus.t(),
[Content.Audio.value()],
[Content.Audio.tts_value()],
[keyword()]
) ::
:ok
end
2 changes: 1 addition & 1 deletion lib/pa_ess/utilities.ex
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ defmodule PaEss.Utilities do
def audio_take({:route, route}), do: @route_take_lookup[route]
def audio_take(atom) when is_atom(atom), do: @atom_take_lookup[atom]

@spec paginate_text(String.t(), integer()) :: [{String.t(), String.t(), integer()}]
@spec paginate_text(String.t(), integer()) :: Content.Message.pages()
def paginate_text(text, max_length \\ 24) do
String.split(text)
|> Stream.chunk_while(
Expand Down
1 change: 1 addition & 0 deletions lib/realtime_signs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule RealtimeSigns do
[
:hackney_pool.child_spec(:default, []),
:hackney_pool.child_spec(:arinc_pool, []),
{Task.Supervisor, name: PaEss.TaskSupervisor},
Engine.Health,
Engine.Config,
Engine.Locations,
Expand Down
2 changes: 2 additions & 0 deletions lib/signs/bus.ex
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,8 @@ defmodule Signs.Bus do
sign_updater.play_message(
state,
audios,
# TODO: Implement TTS for bus audio
[],
Enum.map(audios, fn _ -> [message_type: "Bus"] end)
)
end
Expand Down
1 change: 1 addition & 0 deletions lib/signs/utilities/audio.ex
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ defmodule Signs.Utilities.Audio do
sign.sign_updater.play_message(
sign,
Enum.map(audios, &Content.Audio.to_params(&1)),
Enum.map(audios, &Content.Audio.to_tts(&1)),
Enum.map(audios, fn audio ->
[
message_type: to_string(audio.__struct__) |> String.split(".") |> List.last(),
Expand Down
2 changes: 1 addition & 1 deletion test/signs/bus_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ defmodule Signs.BusTest do
end

defp expect_audios(audios) do
expect(PaEss.Updater.Mock, :play_message, fn _, list, _ ->
expect(PaEss.Updater.Mock, :play_message, fn _, list, _, _ ->
assert list == audios
:ok
end)
Expand Down
2 changes: 1 addition & 1 deletion test/signs/realtime_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,7 @@ defmodule Signs.RealtimeTest do
end

defp expect_audios(audios) do
expect(PaEss.Updater.Mock, :play_message, fn _, list, _ ->
expect(PaEss.Updater.Mock, :play_message, fn _, list, _, _ ->
assert list == audios
:ok
end)
Expand Down
Loading