Skip to content

Commit

Permalink
Code dans le bon module
Browse files Browse the repository at this point in the history
  • Loading branch information
ptitfred committed Dec 19, 2024
1 parent c3344a0 commit 2acbef3
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 43 deletions.
22 changes: 11 additions & 11 deletions apps/transport/lib/registry/engine.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ defmodule Transport.Registry.Engine do
Stream eligible resources and run extractors to produce a raw registry at the end.
"""

alias Transport.Registry.Extractor
alias Transport.Registry.GTFS
alias Transport.Registry.Model.Stop
alias Transport.Registry.Result

import Ecto.Query

Expand All @@ -19,13 +19,13 @@ defmodule Transport.Registry.Engine do
create_empty_csv_with_headers(output_file)

enumerate_gtfs_resources(limit, formats)
|> Extractor.map_result(&prepare_extractor/1)
|> Result.map_result(&prepare_extractor/1)
|> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000)
# one for Task.async_stream
|> Extractor.cat_results()
|> Result.cat_results()
# one for download/1
|> Extractor.cat_results()
|> Extractor.map_result(&extract_from_archive/1)
|> Result.cat_results()
|> Result.map_result(&extract_from_archive/1)
|> dump_to_csv(output_file)
end

Expand Down Expand Up @@ -54,9 +54,9 @@ defmodule Transport.Registry.Engine do
Logger.debug("download #{extractor} #{url}")
tmp_path = System.tmp_dir!() |> Path.join("#{Ecto.UUID.generate()}.dat")

error_result = fn msg ->
safe_error = fn msg ->
File.rm(tmp_path)
{:error, msg}
Result.error(msg)
end

http_result =
Expand All @@ -68,23 +68,23 @@ defmodule Transport.Registry.Engine do

case http_result do
{:error, error} ->
error_result.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}")
safe_error.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}")

{:ok, %{status: status}} ->
cond do
status >= 200 && status < 300 ->
{:ok, {extractor, tmp_path}}

status > 400 ->
error_result.("Error #{status} while downloading the resource from #{url}")
safe_error.("Error #{status} while downloading the resource from #{url}")

true ->
error_result.("Unexpected HTTP error #{status} while downloading the resource from #{url}")
safe_error.("Unexpected HTTP error #{status} while downloading the resource from #{url}")
end
end
end

@spec extract_from_archive({module(), Path.t()}) :: Extractor.result([Stop.t()])
@spec extract_from_archive({module(), Path.t()}) :: Result.t([Stop.t()])
def extract_from_archive({extractor, file}) do
Logger.debug("extract_from_archive #{extractor} #{file}")
extractor.extract_from_archive(file)
Expand Down
18 changes: 2 additions & 16 deletions apps/transport/lib/registry/extractor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,7 @@ defmodule Transport.Registry.Extractor do
require Logger

alias Transport.Registry.Model.Stop
alias Transport.Registry.Result

@type result(positive) :: {:ok, positive} | {:error, binary()}

@callback extract_from_archive(path :: Path.t()) :: result([Stop.t()])

@spec cat_results(Stream.t(result(term()))) :: Stream.t(term())
def cat_results(enumerable), do: Stream.flat_map(enumerable, &keep_ok/1)

defp keep_ok({:ok, result}), do: [result]
defp keep_ok(_), do: []

@spec map_result(Stream.t(term()), (term() -> result(term()))) :: Stream.t(term())
def map_result(enumerable, mapper) do
enumerable
|> Stream.map(mapper)
|> cat_results()
end
@callback extract_from_archive(path :: Path.t()) :: Result.t([Stop.t()])
end
24 changes: 12 additions & 12 deletions apps/transport/lib/registry/gtfs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Transport.Registry.GTFS do

alias Transport.Registry.Model.Stop
alias Transport.Registry.Model.StopIdentifier
alias Transport.Registry.Result

alias Transport.GTFS.Utils

Expand All @@ -18,18 +19,16 @@ defmodule Transport.Registry.GTFS do
case file_stream(archive) do
{:error, error} ->
Logger.error(error)
{:error, error}
Result.error(error)

{:ok, content} ->
Logger.debug("Valid Zip archive")

stops =
content
|> Utils.to_stream_of_maps()
|> Stream.flat_map(&handle_stop/1)
|> Enum.to_list()

{:ok, stops}
content
|> Utils.to_stream_of_maps()
|> Stream.flat_map(&handle_stop/1)
|> Enum.to_list()
|> Result.ok()
end
end

Expand Down Expand Up @@ -63,18 +62,19 @@ defmodule Transport.Registry.GTFS do
case Unzip.new(zip_file) do
{:ok, unzip} ->
if has_stops?(unzip) do
{:ok, Unzip.file_stream!(unzip, "stops.txt")}
unzip |> Unzip.file_stream!("stops.txt") |> Result.ok()
else
{:error, "Missing stops.txt in #{archive}"}
Result.error("Missing stops.txt in #{archive}")
end

{:error, error} ->
{:error, "Error while unzipping archive #{archive}: #{error}"}
Result.error("Error while unzipping archive #{archive}: #{error}")
end
end

defp has_stops?(unzip) do
Unzip.list_entries(unzip)
unzip
|> Unzip.list_entries()
|> Enum.any?(&entry_of_name?("stops.txt", &1))
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
defmodule Transport.Registry.ExtractorTest do
defmodule Transport.Registry.ResultTest do
use ExUnit.Case, async: false

require Integer
alias Transport.Registry.Extractor
alias Transport.Registry.Result

test "cat_results" do
assert [] == cat_results([])
Expand All @@ -16,11 +16,11 @@ defmodule Transport.Registry.ExtractorTest do
end

defp cat_results(enumerable) do
enumerable |> Extractor.cat_results() |> Enum.to_list()
enumerable |> Result.cat_results() |> Enum.to_list()
end

defp map_result(enumerable, mapper) do
enumerable |> Extractor.map_result(mapper) |> Enum.to_list()
enumerable |> Result.map_result(mapper) |> Enum.to_list()
end

defp even_is_forbidden(i) when Integer.is_odd(i), do: {:ok, i}
Expand Down

0 comments on commit 2acbef3

Please sign in to comment.