diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex index 0ebbeec0fb..ca70c753e1 100644 --- a/apps/transport/lib/registry/engine.ex +++ b/apps/transport/lib/registry/engine.ex @@ -3,9 +3,9 @@ defmodule Transport.Registry.Engine do Stream eligible resources and run extractors to produce a raw registry at the end. """ - alias Transport.Registry.Extractor alias Transport.Registry.GTFS alias Transport.Registry.Model.Stop + alias Transport.Registry.Result import Ecto.Query @@ -19,13 +19,13 @@ defmodule Transport.Registry.Engine do create_empty_csv_with_headers(output_file) enumerate_gtfs_resources(limit, formats) - |> Extractor.map_result(&prepare_extractor/1) + |> Result.map_result(&prepare_extractor/1) |> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000) # one for Task.async_stream - |> Extractor.cat_results() + |> Result.cat_results() # one for download/1 - |> Extractor.cat_results() - |> Extractor.map_result(&extract_from_archive/1) + |> Result.cat_results() + |> Result.map_result(&extract_from_archive/1) |> dump_to_csv(output_file) end @@ -54,9 +54,9 @@ defmodule Transport.Registry.Engine do Logger.debug("download #{extractor} #{url}") tmp_path = System.tmp_dir!() |> Path.join("#{Ecto.UUID.generate()}.dat") - error_result = fn msg -> + safe_error = fn msg -> File.rm(tmp_path) - {:error, msg} + Result.error(msg) end http_result = @@ -68,7 +68,7 @@ defmodule Transport.Registry.Engine do case http_result do {:error, error} -> - error_result.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}") + safe_error.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}") {:ok, %{status: status}} -> cond do @@ -76,15 +76,15 @@ defmodule Transport.Registry.Engine do {:ok, {extractor, tmp_path}} status > 400 -> - error_result.("Error #{status} while downloading the resource from #{url}") + safe_error.("Error #{status} while downloading the resource from #{url}") true -> - error_result.("Unexpected HTTP error #{status} while downloading the resource from #{url}") + safe_error.("Unexpected HTTP error #{status} while downloading the resource from #{url}") end end end - @spec extract_from_archive({module(), Path.t()}) :: Extractor.result([Stop.t()]) + @spec extract_from_archive({module(), Path.t()}) :: Result.t([Stop.t()]) def extract_from_archive({extractor, file}) do Logger.debug("extract_from_archive #{extractor} #{file}") extractor.extract_from_archive(file) diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex index 19ab45308b..0f5db7d9ec 100644 --- a/apps/transport/lib/registry/extractor.ex +++ b/apps/transport/lib/registry/extractor.ex @@ -6,21 +6,7 @@ defmodule Transport.Registry.Extractor do require Logger alias Transport.Registry.Model.Stop + alias Transport.Registry.Result - @type result(positive) :: {:ok, positive} | {:error, binary()} - - @callback extract_from_archive(path :: Path.t()) :: result([Stop.t()]) - - @spec cat_results(Stream.t(result(term()))) :: Stream.t(term()) - def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1) - - defp keep_ok({:ok, result}), do: [result] - defp keep_ok(_), do: [] - - @spec map_result(Stream.t(term()), (term() -> result(term()))) :: Stream.t(term()) - def map_result(enumerable, mapper) do - enumerable - |> Stream.map(mapper) - |> cat_results() - end + @callback extract_from_archive(path :: Path.t()) :: Result.t([Stop.t()]) end diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index 25ecb7bbea..80c875e0ee 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -5,6 +5,7 @@ defmodule Transport.Registry.GTFS do alias Transport.Registry.Model.Stop alias Transport.Registry.Model.StopIdentifier + alias Transport.Registry.Result alias Transport.GTFS.Utils @@ -18,18 +19,16 @@ defmodule Transport.Registry.GTFS do case file_stream(archive) do {:error, error} -> Logger.error(error) - {:error, error} + Result.error(error) {:ok, content} -> Logger.debug("Valid Zip archive") - stops = - content - |> Utils.to_stream_of_maps() - |> Stream.flat_map(&handle_stop/1) - |> Enum.to_list() - - {:ok, stops} + content + |> Utils.to_stream_of_maps() + |> Stream.flat_map(&handle_stop/1) + |> Enum.to_list() + |> Result.ok() end end @@ -63,18 +62,19 @@ defmodule Transport.Registry.GTFS do case Unzip.new(zip_file) do {:ok, unzip} -> if has_stops?(unzip) do - {:ok, Unzip.file_stream!(unzip, "stops.txt")} + unzip |> Unzip.file_stream!("stops.txt") |> Result.ok() else - {:error, "Missing stops.txt in #{archive}"} + Result.error("Missing stops.txt in #{archive}") end {:error, error} -> - {:error, "Error while unzipping archive #{archive}: #{error}"} + Result.error("Error while unzipping archive #{archive}: #{error}") end end defp has_stops?(unzip) do - Unzip.list_entries(unzip) + unzip + |> Unzip.list_entries() |> Enum.any?(&entry_of_name?("stops.txt", &1)) end diff --git a/apps/transport/test/registry/extractor_test.exs b/apps/transport/test/registry/result_test.exs similarity index 75% rename from apps/transport/test/registry/extractor_test.exs rename to apps/transport/test/registry/result_test.exs index da60497735..d48a8d587b 100644 --- a/apps/transport/test/registry/extractor_test.exs +++ b/apps/transport/test/registry/result_test.exs @@ -1,8 +1,8 @@ -defmodule Transport.Registry.ExtractorTest do +defmodule Transport.Registry.ResultTest do use ExUnit.Case, async: false require Integer - alias Transport.Registry.Extractor + alias Transport.Registry.Result test "cat_results" do assert [] == cat_results([]) @@ -16,11 +16,11 @@ defmodule Transport.Registry.ExtractorTest do end defp cat_results(enumerable) do - enumerable |> Extractor.cat_results() |> Enum.to_list() + enumerable |> Result.cat_results() |> Enum.to_list() end defp map_result(enumerable, mapper) do - enumerable |> Extractor.map_result(mapper) |> Enum.to_list() + enumerable |> Result.map_result(mapper) |> Enum.to_list() end defp even_is_forbidden(i) when Integer.is_odd(i), do: {:ok, i}