Skip to content

Commit

Permalink
Migrate from RTR to GTFS-rt (#810)
Browse files Browse the repository at this point in the history
* Migrate to GTFS-rt predictions and locations feeds

* PR feedback

* simplify abstraction

* Update .envrc.template

Co-authored-by: Brett Heath-Wlaz <[email protected]>

* Filter out past departures and adjust last trip filtering

* Remove some references to RTR

* Move route_id check to a shared helper

* Log prediction details for terminal predictions

* add inspects

* add inspect

* fix typo

* only log when seconds til boarding is under terminal brd seconds

* Add some logging to stops after terminals

* Try adding a buffer to account for potential latency between RTR and concentrate

* Log more details

* Log stopped_at_predicted_stop

* increase buffer to account for negative departures

* Account for skipped predictions when calculating seconds_until_passthrough

* Add filter for determining if prediction has passed

* Remove extra logging

* read passthrough_time field directly

* oops forgot the actual variable

* don't fiter out passthroughs

* use the right field

* remove stops_away

* Remove required concentrate urls from envrc template

---------

Co-authored-by: Brett Heath-Wlaz <[email protected]>
  • Loading branch information
PaulJKim and panentheos authored Nov 25, 2024
1 parent f9909ac commit 9d5d608
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 273 deletions.
7 changes: 3 additions & 4 deletions .envrc.template
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export API_V3_URL=https://api-dev-green.mbtace.com
#export CHELSEA_BRIDGE_URL=
#export CHELSEA_BRIDGE_AUTH=

# URLs of the enhanced trip-update and vehicle-position feeds. Default to the real feed URLs if
# not set here.
#export TRIP_UPDATE_URL=
#export VEHICLE_POSITIONS_URL=
# URLs of the enhanced trip-update and vehicle-position feeds.
#export TRIP_UPDATE_URL="https://s3.amazonaws.com/mbta-gtfs-s3/concentrate/TripUpdates_enhanced.json"
#export VEHICLE_POSITIONS_URL="https://s3.amazonaws.com/mbta-gtfs-s3/concentrate/VehiclePositions_enhanced.json"
4 changes: 2 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ if config_env() != :test do
trip_update_url:
System.get_env(
"TRIP_UPDATE_URL",
"https://s3.amazonaws.com/mbta-gtfs-s3/rtr/TripUpdates_enhanced.json"
"https://s3.amazonaws.com/mbta-gtfs-s3/concentrate/TripUpdates_enhanced.json"
),
vehicle_positions_url:
System.get_env(
"VEHICLE_POSITIONS_URL",
"https://s3.amazonaws.com/mbta-gtfs-s3/rtr/VehiclePositions_enhanced.json"
"https://s3.amazonaws.com/mbta-gtfs-s3/concentrate/VehiclePositions_enhanced.json"
),
s3_bucket: System.get_env("SIGNS_S3_BUCKET"),
s3_path: System.get_env("SIGNS_S3_PATH"),
Expand Down
3 changes: 1 addition & 2 deletions lib/content/message/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ defmodule Content.Message.Predictions do
end

min = round(sec / 60)
stopped_at? = prediction.stops_away == 0
reverse_prediction? = Signs.Utilities.Predictions.reverse_prediction?(prediction, terminal?)

{minutes, approximate?} =
cond do
stopped_at? and (!terminal? or sec <= 30) -> {:boarding, false}
prediction.stopped_at_predicted_stop? and (!terminal? or sec <= 30) -> {:boarding, false}
!terminal? and sec <= 30 -> {:arriving, false}
!terminal? and sec <= 60 -> {:approaching, false}
min > 60 -> {60, true}
Expand Down
21 changes: 5 additions & 16 deletions lib/fake/httpoison.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,11 @@ defmodule Fake.HTTPoison do
%{
"entity" => [
%{
"alert" => nil,
"id" => "1490783458_32568935",
"is_deleted" => false,
"trip_update" => %{
"delay" => nil,
"stop_time_update" => [
%{
"arrival" => %{
"delay" => nil,
"time" => 1_491_570_120,
"uncertainty" => nil
},
Expand All @@ -84,7 +80,6 @@ defmodule Fake.HTTPoison do
},
%{
"arrival" => %{
"delay" => nil,
"time" => 1_491_570_180,
"uncertainty" => nil
},
Expand Down Expand Up @@ -243,23 +238,17 @@ defmodule Fake.HTTPoison do
%{
"entity" => [
%{
"alert" => nil,
"id" => "1490783458_32568935",
"is_deleted" => false,
"trip_update" => %{
"delay" => nil,
"stop_time_update" => [
%{
"arrival" => %{
"delay" => nil,
"time" => 1_491_570_180,
"uncertainty" => nil
"uncertainty" => 60
},
"departure" => nil,
"schedule_relationship" => "SCHEDULED",
"stop_id" => "stop_to_update",
"stop_sequence" => 1,
"stops_away" => 0
"stop_sequence" => 1
}
],
"timestamp" => nil,
Expand All @@ -269,15 +258,15 @@ defmodule Fake.HTTPoison do
"schedule_relationship" => "SCHEDULED",
"start_date" => "20170329",
"start_time" => nil,
"trip_id" => "32568935"
"trip_id" => "32568935",
"revenue" => true
},
"vehicle" => %{
"id" => "G-10040",
"label" => "3260",
"license_plate" => nil
}
},
"vehicle" => nil
}
}
],
"header" => %{
Expand Down
7 changes: 6 additions & 1 deletion lib/predictions/last_trip.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
defmodule Predictions.LastTrip do
alias Predictions.Predictions

defp get_running_trips(predictions_feed) do
predictions_feed["entity"]
|> Stream.map(& &1["trip_update"])
|> Enum.reject(&(&1["trip"]["schedule_relationship"] == "CANCELED"))
|> Stream.filter(
&(Predictions.relevant_rail_route?(&1["trip"]["route_id"]) and
&1["trip"]["schedule_relationship"] != "CANCELED")
)
end

def get_last_trips(predictions_feed) do
Expand Down
6 changes: 2 additions & 4 deletions lib/predictions/prediction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ defmodule Predictions.Prediction do
route_id: nil,
trip_id: nil,
destination_stop_id: nil,
stopped?: false,
stops_away: 0,
stopped_at_predicted_stop?: false,
boarding_status: nil,
revenue_trip?: true,
vehicle_id: nil
Expand All @@ -30,8 +29,7 @@ defmodule Predictions.Prediction do
route_id: String.t(),
trip_id: trip_id() | nil,
destination_stop_id: String.t(),
stopped?: boolean(),
stops_away: integer(),
stopped_at_predicted_stop?: boolean(),
boarding_status: String.t() | nil,
revenue_trip?: boolean(),
vehicle_id: String.t() | nil
Expand Down
66 changes: 46 additions & 20 deletions lib/predictions/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,28 @@ defmodule Predictions.Predictions do
def get_all(feed_message, current_time) do
predictions =
feed_message["entity"]
|> Enum.map(& &1["trip_update"])
|> Enum.reject(&(&1["trip"]["schedule_relationship"] == "CANCELED"))
|> Enum.flat_map(&transform_stop_time_updates/1)
|> Enum.filter(fn {update, _, _, _, _, _, _} ->
((update["arrival"] || update["departure"]) &&
not is_nil(update["stops_away"])) || update["passthrough_time"]
|> Stream.map(& &1["trip_update"])
|> Stream.filter(
&(relevant_rail_route?(&1["trip"]["route_id"]) and
&1["trip"]["schedule_relationship"] != "CANCELED")
)
|> Stream.flat_map(&transform_stop_time_updates/1)
|> Stream.filter(fn {update, _, _, _, _, _, _} ->
(update["arrival"] && update["arrival"]["uncertainty"]) ||
(update["departure"] && update["departure"]["uncertainty"]) ||
update["passthrough_time"]
end)
|> Enum.map(&prediction_from_update(&1, current_time))
|> Stream.map(&prediction_from_update(&1, current_time))
|> Enum.reject(
&(is_nil(&1.seconds_until_arrival) and is_nil(&1.seconds_until_departure) and
is_nil(&1.seconds_until_passthrough))
&((is_nil(&1.seconds_until_arrival) and is_nil(&1.seconds_until_departure) and
is_nil(&1.seconds_until_passthrough)) or
has_departed?(&1))
)

vehicles_running_revenue_trips =
predictions
|> Enum.filter(& &1.revenue_trip?)
|> Enum.map(& &1.vehicle_id)
|> Stream.filter(& &1.revenue_trip?)
|> Stream.map(& &1.vehicle_id)
|> MapSet.new()

{Enum.group_by(predictions, fn prediction ->
Expand All @@ -43,15 +48,12 @@ defmodule Predictions.Predictions do
end)
|> Map.get("stop_id")

revenue_trip? =
Enum.any?(trip_update["stop_time_update"], &(&1["schedule_relationship"] != "SKIPPED"))

vehicle_id = get_in(trip_update, ["vehicle", "id"])

Enum.map(
trip_update["stop_time_update"],
&{&1, last_stop_id, trip_update["trip"]["route_id"], trip_update["trip"]["direction_id"],
trip_update["trip"]["trip_id"], revenue_trip?, vehicle_id}
trip_update["trip"]["trip_id"], trip_update["trip"]["revenue"], vehicle_id}
)
end

Expand All @@ -67,6 +69,9 @@ defmodule Predictions.Predictions do
) do
current_time_seconds = DateTime.to_unix(current_time)

schedule_relationship =
translate_schedule_relationship(stop_time_update["schedule_relationship"])

seconds_until_arrival =
if stop_time_update["arrival"] &&
sufficient_certainty?(stop_time_update["arrival"], route_id),
Expand All @@ -84,21 +89,23 @@ defmodule Predictions.Predictions do
do: stop_time_update["passthrough_time"] - current_time_seconds,
else: nil

vehicle_location = Engine.Locations.for_vehicle(vehicle_id)

%Prediction{
stop_id: stop_time_update["stop_id"],
direction_id: direction_id,
seconds_until_arrival: max(0, seconds_until_arrival),
arrival_certainty: stop_time_update["arrival"]["uncertainty"],
seconds_until_departure: max(0, seconds_until_departure),
seconds_until_departure: seconds_until_departure,
departure_certainty: stop_time_update["departure"]["uncertainty"],
seconds_until_passthrough: max(0, seconds_until_passthrough),
schedule_relationship:
translate_schedule_relationship(stop_time_update["schedule_relationship"]),
schedule_relationship: schedule_relationship,
route_id: route_id,
trip_id: trip_id,
destination_stop_id: last_stop_id,
stopped?: stop_time_update["stopped?"],
stops_away: stop_time_update["stops_away"],
stopped_at_predicted_stop?:
not is_nil(vehicle_location) and vehicle_location.status == :stopped_at and
stop_time_update["stop_id"] == vehicle_location.stop_id,
boarding_status: stop_time_update["boarding_status"],
revenue_trip?: revenue_trip?,
vehicle_id: vehicle_id
Expand All @@ -113,6 +120,19 @@ defmodule Predictions.Predictions do
Jason.decode!(body)
end

def relevant_rail_route?(route_id) do
route_id in [
"Red",
"Blue",
"Orange",
"Green-B",
"Green-C",
"Green-D",
"Green-E",
"Mattapan"
]
end

@spec translate_schedule_relationship(String.t()) :: :skipped | :scheduled
defp translate_schedule_relationship("SKIPPED") do
:skipped
Expand All @@ -135,4 +155,10 @@ defmodule Predictions.Predictions do
true
end
end

@spec has_departed?(Predictions.Prediction.t()) :: boolean()
defp has_departed?(prediction) do
prediction.seconds_until_departure && prediction.seconds_until_departure < 0 &&
not prediction.stopped_at_predicted_stop?
end
end
7 changes: 3 additions & 4 deletions lib/signs/utilities/predictions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ defmodule Signs.Utilities.Predictions do
{if terminal? do
0
else
case prediction.stops_away do
0 -> 0
_ -> 1
end
if prediction.stopped_at_predicted_stop?,
do: 0,
else: 1
end, prediction.seconds_until_departure, prediction.seconds_until_arrival}
end)
|> filter_large_red_line_gaps()
Expand Down
Loading

0 comments on commit 9d5d608

Please sign in to comment.