Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulJKim committed Apr 24, 2024
1 parent 9984b40 commit 473e2f6
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 156 deletions.
133 changes: 48 additions & 85 deletions lib/engine/last_trip.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ defmodule Engine.LastTrip do
end
end

def update_last_trips(last_trips) do
GenServer.cast(__MODULE__, {:update_last_trips, last_trips})
end

def update_recent_departures(new_recent_departures) do
GenServer.cast(__MODULE__, {:update_recent_departures, new_recent_departures})
end

@impl true
def init(_) do
schedule_update(self())
schedule_clean(self())

state = %{
Expand All @@ -56,98 +63,59 @@ defmodule Engine.LastTrip do
end

@impl true
def handle_info(:update, %{last_modified: last_modified} = state) do
schedule_update(self())

http_client = Application.get_env(:realtime_signs, :http_client)

new_last_modified =
case http_client.get(
Application.get_env(:realtime_signs, :trip_update_url),
if(last_modified, do: [{"If-Modified-Since", last_modified}], else: []),
timeout: 2000,
recv_timeout: 2000
) do
{:ok, %HTTPoison.Response{body: body, status_code: 200, headers: headers}} ->
predictions_feed = Predictions.Predictions.parse_json_response(body)

predictions_feed["entity"]
|> Stream.map(& &1["trip_update"])
|> Stream.reject(&(&1["trip"]["schedule_relationship"] == "CANCELED"))
|> tap(fn trips ->
insert_last_trips(trips, state)
end)
|> Stream.map(&{&1["trip"]["trip_id"], &1["stop_time_update"]})
|> tap(fn predictions_by_trip ->
insert_recent_departures(predictions_by_trip, state)
end)

Enum.find_value(headers, fn {key, value} -> if(key == "Last-Modified", do: value) end)

{:ok, %HTTPoison.Response{status_code: 304}} ->
last_modified

{_, response} ->
Logger.warn("Could not fetch predictions: #{inspect(response)}")
last_modified
end
def handle_cast({:update_last_trips, last_trips}, %{last_trips: last_trips_table} = state) do
current_time = Timex.now()

{:noreply, %{state | last_modified: new_last_modified}}
end
last_trips =
Enum.map(last_trips, fn {trip_id, route_id} -> {trip_id, route_id, current_time} end)

@impl true
def handle_info(:clean_old_data, state) do
schedule_clean(self())
clean_last_trips(state)
clean_recent_departures(state)
:ets.insert(last_trips_table, last_trips)

{:noreply, state}
end

defp insert_last_trips(trips, state) do
current_time = Timex.now()

Stream.filter(trips, &(&1["trip"]["last_trip"] == true))
|> Stream.map(&{&1["trip"]["trip_id"], current_time})
|> Enum.each(fn {trip_id, timestamp} ->
:ets.insert(state.last_trips, {trip_id, timestamp})
@impl true
def handle_cast(
{:update_recent_departures, new_recent_departures},
%{recent_departures: recent_departures_table} = state
) do
current_recent_departures =
:ets.tab2list(recent_departures_table)
|> Stream.map(&{elem(&1, 0), elem(&1, 1)})
|> Map.new()

Enum.reduce(new_recent_departures, current_recent_departures, fn {stop_id, trip_id, route_id,
departure_time},
acc ->
Map.get_and_update(acc, {stop_id, route_id}, fn recent_departures ->
if recent_departures do
{recent_departures, Map.put(recent_departures, trip_id, departure_time)}
else
{recent_departures, Map.new([{trip_id, departure_time}])}
end
end)
|> elem(1)
end)
|> Map.to_list()
|> then(&:ets.insert(recent_departures_table, &1))

{:noreply, state}
end

defp insert_recent_departures(predictions_by_trip, state) do
current_time = Timex.now()
@impl true
def handle_info(:clean_old_data, state) do
schedule_clean(self())
clean_last_trips(state)
clean_recent_departures(state)

for {trip_id, predictions} <- predictions_by_trip,
prediction <- predictions,
prediction["departure"] do
seconds_until_departure = prediction["departure"]["time"] - DateTime.to_unix(current_time)

if seconds_until_departure <= 0 and abs(seconds_until_departure) <= @hour_in_seconds do
:ets.tab2list(state.recent_departures)
|> Stream.map(&{elem(&1, 0), elem(&1, 1)})
|> Map.new()
|> Map.get_and_update(prediction["stop_id"], fn recent_departures ->
if recent_departures do
{recent_departures,
Map.put(recent_departures, trip_id, prediction["departure"]["time"])}
else
{recent_departures, Map.new([{trip_id, prediction["departure"]["time"]}])}
end
end)
|> elem(1)
|> Map.to_list()
|> then(&:ets.insert(state.recent_departures, &1))
end
end
{:noreply, state}
end

defp clean_last_trips(state) do
:ets.tab2list(state.last_trips)
|> Enum.each(fn {trip_id, timestamp} ->
if Timex.diff(Timex.now(), timestamp, :seconds) > @hour_in_seconds do
if Timex.diff(Timex.now(), timestamp, :seconds) > @hour_in_seconds * 2 do
:ets.delete(state.last_trips, trip_id)
else
:ok
end
end)
end
Expand All @@ -156,21 +124,16 @@ defmodule Engine.LastTrip do
current_time = Timex.now()

:ets.tab2list(state.recent_departures)
|> Enum.each(fn {stop_id, departures} ->
|> Enum.each(fn {key, departures} ->
departures_within_last_hour =
Enum.filter(departures, fn {_, departed_time} ->
Map.filter(departures, fn {_, departed_time} ->
DateTime.to_unix(current_time) - departed_time <= @hour_in_seconds
end)
|> Map.new()

:ets.insert(state.recent_departures, {stop_id, departures_within_last_hour})
:ets.insert(state.recent_departures, {key, departures_within_last_hour})
end)
end

defp schedule_update(pid) do
Process.send_after(pid, :update, 1_000)
end

defp schedule_clean(pid) do
Process.send_after(pid, :clean_old_data, 1_000)
end
Expand Down
11 changes: 9 additions & 2 deletions lib/engine/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,16 @@ defmodule Engine.Predictions do
recv_timeout: 2000
) do
{:ok, %HTTPoison.Response{body: body, status_code: 200, headers: headers}} ->
parsed_json = Predictions.Predictions.parse_json_response(body)

{new_predictions, vehicles_running_revenue_trips} =
Predictions.Predictions.parse_json_response(body)
|> Predictions.Predictions.get_all(current_time)
Predictions.Predictions.get_all(parsed_json, current_time)

Predictions.LastTrip.get_last_trips(parsed_json)
|> Engine.LastTrip.update_last_trips()

Predictions.LastTrip.get_recent_departures(parsed_json)
|> Engine.LastTrip.update_recent_departures()

:ets.tab2list(state.trip_updates_table)
|> Enum.map(&{elem(&1, 0), []})
Expand Down
42 changes: 42 additions & 0 deletions lib/predictions/last_trip.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Predictions.LastTrip do
@hour_in_seconds 3600

def parse_json_response("") do
%{"entity" => []}
end

def parse_json_response(body) do
Jason.decode!(body)
end

defp get_running_trips(predictions_feed) do
predictions_feed["entity"]
|> Stream.map(& &1["trip_update"])
|> Enum.reject(&(&1["trip"]["schedule_relationship"] == "CANCELED"))
end

def get_last_trips(predictions_feed) do
get_running_trips(predictions_feed)
|> Stream.filter(&(&1["trip"]["last_trip"] == true))
|> Enum.map(& &1["trip"]["trip_id"])
end

def get_recent_departures(predictions_feed) do
current_time = Timex.now()

predictions_by_trip =
get_running_trips(predictions_feed)
|> Enum.map(&{&1["trip"]["trip_id"], &1["trip"]["route_id"], &1["stop_time_update"]})

for {trip_id, route_id, predictions} <- predictions_by_trip,
prediction <- predictions,
prediction["departure"] do
seconds_until_departure = prediction["departure"]["time"] - DateTime.to_unix(current_time)

if seconds_until_departure in -@hour_in_seconds..0 do
{prediction["stop_id"], trip_id, route_id, prediction["departure"]["time"]}
end
end
|> Enum.reject(&is_nil/1)
end
end
42 changes: 40 additions & 2 deletions lib/signs/realtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,12 @@ defmodule Signs.Realtime do
end

defp has_service_ended_for_source?(sign, source, current_time) do
SourceConfig.sign_stop_ids(source)
|> Enum.all?(&has_last_trip_departed_stop?(&1, sign, current_time))
stop_ids = SourceConfig.sign_stop_ids(source)

# Red Line trunk stops will have two last trips in both directions
if Enum.all?(stop_ids, &is_red_line_trunk_stop?/1),
do: Enum.count(stop_ids, &has_last_trip_departed_stop?(&1, sign, current_time)) >= 2,
else: Enum.any?(stop_ids, &has_last_trip_departed_stop?(&1, sign, current_time))
end

defp has_last_trip_departed_stop?(stop_id, sign, current_time) do
Expand Down Expand Up @@ -285,4 +289,38 @@ defmodule Signs.Realtime do
def decrement_ticks(sign) do
%{sign | tick_read: sign.tick_read - 1}
end

defp is_red_line_trunk_stop?(stop_id) do
stop_id in [
"70061",
"Alewife-01",
"Alewife-02",
"70064",
"70063",
"70066",
"70065",
"70068",
"70067",
"70070",
"70069",
"70072",
"70071",
"70074",
"70073",
"70076",
"70075",
"70078",
"70077",
"70080",
"70079",
"70082",
"70081",
"70084",
"70083",
"70085",
"70095",
"70086",
"70096"
]
end
end
Loading

0 comments on commit 473e2f6

Please sign in to comment.