Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Absinthe Dataloader use :tuples get_policy to propagate errors #4380

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/sanbase/clickhouse/metric/metric_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,12 @@ defmodule Sanbase.Clickhouse.MetricAdapter do
end

defp get_aggregated_timeseries_data(metric, slugs, from, to, aggregation, filters)
when is_list(slugs) and length(slugs) > 50 do
when is_list(slugs) and length(slugs) > 1000 do
result =
Enum.chunk_every(slugs, 50)
Enum.chunk_every(slugs, 1000)
|> Sanbase.Parallel.map(
&get_aggregated_timeseries_data(metric, &1, from, to, aggregation, filters),
timeout: 25_000,
timeout: 55_000,
max_concurrency: 8,
ordered: false,
on_timeout: :kill_task
Expand Down
6 changes: 3 additions & 3 deletions lib/sanbase/prices/price.ex
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,14 @@ defmodule Sanbase.Price do
def aggregated_metric_timeseries_data([], _, _, _, _), do: {:ok, %{}}

def aggregated_metric_timeseries_data(slugs, metric, from, to, opts)
when is_list(slugs) and length(slugs) > 50 do
when is_list(slugs) and length(slugs) > 1000 do
# Break here otherwise the Enum.filter/2 will remove all errors and report a wrong result
with {:ok, _source} <- opts_to_source(opts) do
result =
Enum.chunk_every(slugs, 50)
Enum.chunk_every(slugs, 1000)
|> Sanbase.Parallel.map(
&aggregated_metric_timeseries_data(&1, metric, from, to, opts),
timeout: 25_000,
timeout: 55_000,
max_concurrency: 8,
ordered: false
)
Expand Down
6 changes: 3 additions & 3 deletions lib/sanbase/prices/price_pair/price_pair.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ defmodule Sanbase.PricePair do
def aggregated_timeseries_data([], _, _, _, _), do: {:ok, []}

def aggregated_timeseries_data(slugs, quote_asset, from, to, opts)
when is_list(slugs) and length(slugs) > 50 do
when is_list(slugs) and length(slugs) > 1000 do
result =
Enum.chunk_every(slugs, 50)
Enum.chunk_every(slugs, 1000)
|> Sanbase.Parallel.map(
&aggregated_timeseries_data(&1, quote_asset, from, to, opts),
timeout: 25_000,
timeout: 55_000,
max_concurrency: 8,
ordered: false
)
Expand Down
34 changes: 21 additions & 13 deletions lib/sanbase_web/graphql/dataloader/clickhouse_dataloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,36 @@ defmodule SanbaseWeb.Graphql.ClickhouseDataloader do

args_list
|> Enum.group_by(fn %{selector: selector} -> selector end)
|> Sanbase.Parallel.map(fn {selector, group} ->
{metric, from, to, opts} = selector
slugs = Enum.map(group, & &1.slug)
|> Sanbase.Parallel.map(
fn {selector, group} ->
{metric, from, to, opts} = selector
slugs = Enum.map(group, & &1.slug)

data =
case Metric.aggregated_timeseries_data(metric, %{slug: slugs}, from, to, opts) do
{:ok, result} -> result
{:error, error} -> {:error, error}
end
data =
case Metric.aggregated_timeseries_data(metric, %{slug: slugs}, from, to, opts) do
{:ok, result} -> result
{:error, error} -> {:error, error}
end

{selector, data}
end)
{selector, data}
end,
timeout: 60_000,
ordered: false
)
|> Map.new()
end

def query(:average_daily_active_addresses, args) do
args
|> Enum.to_list()
|> Enum.group_by(fn %{from: from, to: to} -> {from, to} end)
|> Sanbase.Parallel.map(fn {{from, to}, group} ->
{{from, to}, average_daily_active_addresses(group, from, to)}
end)
|> Sanbase.Parallel.map(
fn {{from, to}, group} ->
{{from, to}, average_daily_active_addresses(group, from, to)}
end,
timeout: 60_000,
ordered: false
)
|> Map.new()
end

Expand Down
45 changes: 23 additions & 22 deletions lib/sanbase_web/graphql/resolvers/blockchain_address_resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -269,28 +269,29 @@ defmodule SanbaseWeb.Graphql.Resolvers.BlockchainAddressResolver do
loader
|> Dataloader.load(SanbaseDataloader, :address_labels, address)
|> on_load(fn loader ->
santiment_labels =
Dataloader.get(loader, SanbaseDataloader, :address_labels, address) ||
[]

# The root can be built either from a BlockchainAddress in case the
# `blockchain_address` query is used, or from a BlockchainAddressUserPair
# in casethe address is part of a watchlist. In the second case, the root
# has an additional `labels` key which holds the list of user-defined labels
# for that address. The santiment defined labels from CH are provided with a
# `origin: "santiment"` key-value pair so they could be distinguished from
# the user-defined labels.
user_labels =
Map.get(root, :labels, [])
|> Enum.map(fn label ->
label |> Map.put(:origin, "user")
end)

labels =
(user_labels ++ santiment_labels)
|> Enum.sort_by(& &1.name)

{:ok, labels}
with {:ok, santiment_labels} <-
Dataloader.get(loader, SanbaseDataloader, :address_labels, address) do
santiment_labels = santiment_labels || []

# The root can be built either from a BlockchainAddress in case the
# `blockchain_address` query is used, or from a BlockchainAddressUserPair
# in casethe address is part of a watchlist. In the second case, the root
# has an additional `labels` key which holds the list of user-defined labels
# for that address. The santiment defined labels from CH are provided with a
# `origin: "santiment"` key-value pair so they could be distinguished from
# the user-defined labels.
user_labels =
Map.get(root, :labels, [])
|> Enum.map(fn label ->
label |> Map.put(:origin, "user")
end)

labels =
(user_labels ++ santiment_labels)
|> Enum.sort_by(& &1.name)

{:ok, labels}
end
end)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule SanbaseWeb.Graphql.Resolvers.CommentEntityIdResolver do
loader
|> Dataloader.load(SanbaseDataloader, entity_id_name, id)
|> on_load(fn loader ->
{:ok, Dataloader.get(loader, SanbaseDataloader, entity_id_name, id)}
Dataloader.get(loader, SanbaseDataloader, entity_id_name, id)
end)
end
end
16 changes: 12 additions & 4 deletions lib/sanbase_web/graphql/resolvers/insight_resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,14 @@ defmodule SanbaseWeb.Graphql.Resolvers.InsightResolver do
loader
|> Dataloader.load(SanbaseDataloader, :insights_count_per_user, id)
|> on_load(fn loader ->
{:ok,
Dataloader.get(loader, SanbaseDataloader, :insights_count_per_user, id) ||
%{total_count: 0, draft_count: 0, pulse_count: 0, paywall_count: 0}}
case Dataloader.get(loader, SanbaseDataloader, :insights_count_per_user, id) do
{:ok, val} ->
val = val || %{total_count: 0, draft_count: 0, pulse_count: 0, paywall_count: 0}
{:ok, val}

{:error, error} ->
{:error, error}
end
end)
end

Expand All @@ -228,7 +233,10 @@ defmodule SanbaseWeb.Graphql.Resolvers.InsightResolver do
loader
|> Dataloader.load(SanbaseDataloader, :insights_comments_count, id)
|> on_load(fn loader ->
{:ok, Dataloader.get(loader, SanbaseDataloader, :insights_comments_count, id) || 0}
case Dataloader.get(loader, SanbaseDataloader, :insights_comments_count, id) do
{:ok, val} -> {:ok, val || 0}
{:error, error} -> {:error, error}
end
end)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,28 @@ defmodule SanbaseWeb.Graphql.Resolvers.ProjectMetricsResolver do
selector: {metric, from, to, opts}
}

error_on_failed_fetch = Map.get(args, :error_on_failed_fetch, false)

loader
|> Dataloader.load(SanbaseDataloader, :aggregated_metric, data)
|> on_load(&aggregated_metric_from_loader(&1, data))
|> on_load(&aggregated_metric_from_loader(&1, data, error_on_failed_fetch))
end
end

# Private functions

defp aggregated_metric_from_loader(loader, data) do
defp aggregated_metric_from_loader(loader, data, error_on_failed_fetch) do
%{selector: selector, slug: slug, metric: metric} = data

loader
|> Dataloader.get(SanbaseDataloader, :aggregated_metric, selector)
|> case do
map when is_map(map) ->
case Dataloader.get(loader, SanbaseDataloader, :aggregated_metric, selector) do
{:ok, map} when is_list(map) ->
aggregated_metric_from_loader_map(map, slug, metric, data[:opts])

_ ->
_ignored when error_on_failed_fetch == false ->
{:nocache, {:ok, nil}}

_ignored when error_on_failed_fetch == true ->
{:error, "Failed to fetch #{metric} for #{slug} while executing aggregatedTimeseriesData"}
end
end

Expand Down
3 changes: 2 additions & 1 deletion lib/sanbase_web/graphql/resolvers/short_url_resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ defmodule SanbaseWeb.Graphql.Resolvers.ShortUrlResolver do
loader
|> Dataloader.load(SanbaseDataloader, :short_urls_comments_count, id)
|> on_load(fn loader ->
{:ok, Dataloader.get(loader, SanbaseDataloader, :short_urls_comments_count, id) || 0}
Dataloader.get(loader, SanbaseDataloader, :short_urls_comments_count, id) || 0
end)
|> dbg()
end

# Private functions
Expand Down
2 changes: 1 addition & 1 deletion lib/sanbase_web/graphql/resolvers/social_data_resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule SanbaseWeb.Graphql.Resolvers.SocialDataResolver do
loader
|> Dataloader.load(SanbaseDataloader, :project_by_slug, slug)
|> on_load(fn loader ->
{:ok, Dataloader.get(loader, SanbaseDataloader, :project_by_slug, slug)}
Dataloader.get(loader, SanbaseDataloader, :project_by_slug, slug)
end)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ defmodule SanbaseWeb.Graphql.Resolvers.TimelineEventResolver do
loader
|> Dataloader.load(SanbaseDataloader, :timeline_events_comments_count, id)
|> on_load(fn loader ->
{:ok, Dataloader.get(loader, SanbaseDataloader, :timeline_events_comments_count, id) || 0}
case Dataloader.get(loader, SanbaseDataloader, :timeline_events_comments_count, id) do
{:ok, val} -> {:ok, val || 0}
{:error, error} -> {:error, error}
end
end)
end
end
2 changes: 1 addition & 1 deletion lib/sanbase_web/graphql/schema.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ defmodule SanbaseWeb.Graphql.Schema do
import_types(Graphql.Schema.WidgetQueries)

def dataloader() do
Dataloader.new(timeout: :timer.seconds(20), get_policy: :return_nil_on_error)
Dataloader.new(timeout: :timer.seconds(20), get_policy: :tuples)
|> Dataloader.add_source(SanbaseRepo, SanbaseRepo.data())
|> Dataloader.add_source(SanbaseDataloader, SanbaseDataloader.data())
end
Expand Down
1 change: 1 addition & 0 deletions lib/sanbase_web/graphql/schema/types/project_types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ defmodule SanbaseWeb.Graphql.ProjectTypes do
arg(:aggregation, :aggregation, default_value: nil)
arg(:include_incomplete_data, :boolean, default_value: false)
arg(:caching_params, :caching_params_input_object)
arg(:error_on_failed_fetch, :boolean, default_value: false)

complexity(&Complexity.from_to_interval/3)
middleware(AccessControl)
Expand Down