diff --git a/lib/sanbase/clickhouse/metric/metric_adapter.ex b/lib/sanbase/clickhouse/metric/metric_adapter.ex index 6dc1122fb4..8fb2f43510 100644 --- a/lib/sanbase/clickhouse/metric/metric_adapter.ex +++ b/lib/sanbase/clickhouse/metric/metric_adapter.ex @@ -322,12 +322,12 @@ defmodule Sanbase.Clickhouse.MetricAdapter do end defp get_aggregated_timeseries_data(metric, slugs, from, to, aggregation, filters) - when is_list(slugs) and length(slugs) > 50 do + when is_list(slugs) and length(slugs) > 1000 do result = - Enum.chunk_every(slugs, 50) + Enum.chunk_every(slugs, 1000) |> Sanbase.Parallel.map( &get_aggregated_timeseries_data(metric, &1, from, to, aggregation, filters), - timeout: 25_000, + timeout: 55_000, max_concurrency: 8, ordered: false, on_timeout: :kill_task diff --git a/lib/sanbase/prices/price.ex b/lib/sanbase/prices/price.ex index 04594e9b87..04e6fb9f98 100644 --- a/lib/sanbase/prices/price.ex +++ b/lib/sanbase/prices/price.ex @@ -305,14 +305,14 @@ defmodule Sanbase.Price do def aggregated_metric_timeseries_data([], _, _, _, _), do: {:ok, %{}} def aggregated_metric_timeseries_data(slugs, metric, from, to, opts) - when is_list(slugs) and length(slugs) > 50 do + when is_list(slugs) and length(slugs) > 1000 do # Break here otherwise the Enum.filter/2 will remove all errors and report a wrong result with {:ok, _source} <- opts_to_source(opts) do result = - Enum.chunk_every(slugs, 50) + Enum.chunk_every(slugs, 1000) |> Sanbase.Parallel.map( &aggregated_metric_timeseries_data(&1, metric, from, to, opts), - timeout: 25_000, + timeout: 55_000, max_concurrency: 8, ordered: false ) diff --git a/lib/sanbase/prices/price_pair/price_pair.ex b/lib/sanbase/prices/price_pair/price_pair.ex index a2d9c42885..0da597df8a 100644 --- a/lib/sanbase/prices/price_pair/price_pair.ex +++ b/lib/sanbase/prices/price_pair/price_pair.ex @@ -103,12 +103,12 @@ defmodule Sanbase.PricePair do def aggregated_timeseries_data([], _, _, _, _), do: {:ok, []} def aggregated_timeseries_data(slugs, quote_asset, from, to, opts) - when is_list(slugs) and length(slugs) > 50 do + when is_list(slugs) and length(slugs) > 1000 do result = - Enum.chunk_every(slugs, 50) + Enum.chunk_every(slugs, 1000) |> Sanbase.Parallel.map( &aggregated_timeseries_data(&1, quote_asset, from, to, opts), - timeout: 25_000, + timeout: 55_000, max_concurrency: 8, ordered: false ) diff --git a/lib/sanbase_web/graphql/dataloader/clickhouse_dataloader.ex b/lib/sanbase_web/graphql/dataloader/clickhouse_dataloader.ex index ba6eb1eeaf..32c82a7400 100644 --- a/lib/sanbase_web/graphql/dataloader/clickhouse_dataloader.ex +++ b/lib/sanbase_web/graphql/dataloader/clickhouse_dataloader.ex @@ -10,18 +10,22 @@ defmodule SanbaseWeb.Graphql.ClickhouseDataloader do args_list |> Enum.group_by(fn %{selector: selector} -> selector end) - |> Sanbase.Parallel.map(fn {selector, group} -> - {metric, from, to, opts} = selector - slugs = Enum.map(group, & &1.slug) + |> Sanbase.Parallel.map( + fn {selector, group} -> + {metric, from, to, opts} = selector + slugs = Enum.map(group, & &1.slug) - data = - case Metric.aggregated_timeseries_data(metric, %{slug: slugs}, from, to, opts) do - {:ok, result} -> result - {:error, error} -> {:error, error} - end + data = + case Metric.aggregated_timeseries_data(metric, %{slug: slugs}, from, to, opts) do + {:ok, result} -> result + {:error, error} -> {:error, error} + end - {selector, data} - end) + {selector, data} + end, + timeout: 60_000, + ordered: false + ) |> Map.new() end @@ -29,9 +33,13 @@ defmodule SanbaseWeb.Graphql.ClickhouseDataloader do args |> Enum.to_list() |> Enum.group_by(fn %{from: from, to: to} -> {from, to} end) - |> Sanbase.Parallel.map(fn {{from, to}, group} -> - {{from, to}, average_daily_active_addresses(group, from, to)} - end) + |> Sanbase.Parallel.map( + fn {{from, to}, group} -> + {{from, to}, average_daily_active_addresses(group, from, to)} + end, + timeout: 60_000, + ordered: false + ) |> Map.new() end diff --git a/lib/sanbase_web/graphql/resolvers/blockchain_address_resolver.ex b/lib/sanbase_web/graphql/resolvers/blockchain_address_resolver.ex index 5692a49413..277ceb04bd 100644 --- a/lib/sanbase_web/graphql/resolvers/blockchain_address_resolver.ex +++ b/lib/sanbase_web/graphql/resolvers/blockchain_address_resolver.ex @@ -269,28 +269,29 @@ defmodule SanbaseWeb.Graphql.Resolvers.BlockchainAddressResolver do loader |> Dataloader.load(SanbaseDataloader, :address_labels, address) |> on_load(fn loader -> - santiment_labels = - Dataloader.get(loader, SanbaseDataloader, :address_labels, address) || - [] - - # The root can be built either from a BlockchainAddress in case the - # `blockchain_address` query is used, or from a BlockchainAddressUserPair - # in casethe address is part of a watchlist. In the second case, the root - # has an additional `labels` key which holds the list of user-defined labels - # for that address. The santiment defined labels from CH are provided with a - # `origin: "santiment"` key-value pair so they could be distinguished from - # the user-defined labels. - user_labels = - Map.get(root, :labels, []) - |> Enum.map(fn label -> - label |> Map.put(:origin, "user") - end) - - labels = - (user_labels ++ santiment_labels) - |> Enum.sort_by(& &1.name) - - {:ok, labels} + with {:ok, santiment_labels} <- + Dataloader.get(loader, SanbaseDataloader, :address_labels, address) do + santiment_labels = santiment_labels || [] + + # The root can be built either from a BlockchainAddress in case the + # `blockchain_address` query is used, or from a BlockchainAddressUserPair + # in casethe address is part of a watchlist. In the second case, the root + # has an additional `labels` key which holds the list of user-defined labels + # for that address. The santiment defined labels from CH are provided with a + # `origin: "santiment"` key-value pair so they could be distinguished from + # the user-defined labels. + user_labels = + Map.get(root, :labels, []) + |> Enum.map(fn label -> + label |> Map.put(:origin, "user") + end) + + labels = + (user_labels ++ santiment_labels) + |> Enum.sort_by(& &1.name) + + {:ok, labels} + end end) end end diff --git a/lib/sanbase_web/graphql/resolvers/comment_entity_id_resolver.ex b/lib/sanbase_web/graphql/resolvers/comment_entity_id_resolver.ex index f58a8cc864..2b89cce34c 100644 --- a/lib/sanbase_web/graphql/resolvers/comment_entity_id_resolver.ex +++ b/lib/sanbase_web/graphql/resolvers/comment_entity_id_resolver.ex @@ -36,7 +36,7 @@ defmodule SanbaseWeb.Graphql.Resolvers.CommentEntityIdResolver do loader |> Dataloader.load(SanbaseDataloader, entity_id_name, id) |> on_load(fn loader -> - {:ok, Dataloader.get(loader, SanbaseDataloader, entity_id_name, id)} + Dataloader.get(loader, SanbaseDataloader, entity_id_name, id) end) end end diff --git a/lib/sanbase_web/graphql/resolvers/insight_resolver.ex b/lib/sanbase_web/graphql/resolvers/insight_resolver.ex index 25d9a7dd51..e35ff1bd8e 100644 --- a/lib/sanbase_web/graphql/resolvers/insight_resolver.ex +++ b/lib/sanbase_web/graphql/resolvers/insight_resolver.ex @@ -209,9 +209,14 @@ defmodule SanbaseWeb.Graphql.Resolvers.InsightResolver do loader |> Dataloader.load(SanbaseDataloader, :insights_count_per_user, id) |> on_load(fn loader -> - {:ok, - Dataloader.get(loader, SanbaseDataloader, :insights_count_per_user, id) || - %{total_count: 0, draft_count: 0, pulse_count: 0, paywall_count: 0}} + case Dataloader.get(loader, SanbaseDataloader, :insights_count_per_user, id) do + {:ok, val} -> + val = val || %{total_count: 0, draft_count: 0, pulse_count: 0, paywall_count: 0} + {:ok, val} + + {:error, error} -> + {:error, error} + end end) end @@ -228,7 +233,10 @@ defmodule SanbaseWeb.Graphql.Resolvers.InsightResolver do loader |> Dataloader.load(SanbaseDataloader, :insights_comments_count, id) |> on_load(fn loader -> - {:ok, Dataloader.get(loader, SanbaseDataloader, :insights_comments_count, id) || 0} + case Dataloader.get(loader, SanbaseDataloader, :insights_comments_count, id) do + {:ok, val} -> {:ok, val || 0} + {:error, error} -> {:error, error} + end end) end diff --git a/lib/sanbase_web/graphql/resolvers/project/project_metrics_resolver.ex b/lib/sanbase_web/graphql/resolvers/project/project_metrics_resolver.ex index dc1819d1a9..033f941684 100644 --- a/lib/sanbase_web/graphql/resolvers/project/project_metrics_resolver.ex +++ b/lib/sanbase_web/graphql/resolvers/project/project_metrics_resolver.ex @@ -93,25 +93,28 @@ defmodule SanbaseWeb.Graphql.Resolvers.ProjectMetricsResolver do selector: {metric, from, to, opts} } + error_on_failed_fetch = Map.get(args, :error_on_failed_fetch, false) + loader |> Dataloader.load(SanbaseDataloader, :aggregated_metric, data) - |> on_load(&aggregated_metric_from_loader(&1, data)) + |> on_load(&aggregated_metric_from_loader(&1, data, error_on_failed_fetch)) end end # Private functions - defp aggregated_metric_from_loader(loader, data) do + defp aggregated_metric_from_loader(loader, data, error_on_failed_fetch) do %{selector: selector, slug: slug, metric: metric} = data - loader - |> Dataloader.get(SanbaseDataloader, :aggregated_metric, selector) - |> case do - map when is_map(map) -> + case Dataloader.get(loader, SanbaseDataloader, :aggregated_metric, selector) do + {:ok, map} when is_list(map) -> aggregated_metric_from_loader_map(map, slug, metric, data[:opts]) - _ -> + _ignored when error_on_failed_fetch == false -> {:nocache, {:ok, nil}} + + _ignored when error_on_failed_fetch == true -> + {:error, "Failed to fetch #{metric} for #{slug} while executing aggregatedTimeseriesData"} end end diff --git a/lib/sanbase_web/graphql/resolvers/short_url_resolver.ex b/lib/sanbase_web/graphql/resolvers/short_url_resolver.ex index eea9a4ec87..b22768db5c 100644 --- a/lib/sanbase_web/graphql/resolvers/short_url_resolver.ex +++ b/lib/sanbase_web/graphql/resolvers/short_url_resolver.ex @@ -42,8 +42,9 @@ defmodule SanbaseWeb.Graphql.Resolvers.ShortUrlResolver do loader |> Dataloader.load(SanbaseDataloader, :short_urls_comments_count, id) |> on_load(fn loader -> - {:ok, Dataloader.get(loader, SanbaseDataloader, :short_urls_comments_count, id) || 0} + Dataloader.get(loader, SanbaseDataloader, :short_urls_comments_count, id) || 0 end) + |> dbg() end # Private functions diff --git a/lib/sanbase_web/graphql/resolvers/social_data_resolver.ex b/lib/sanbase_web/graphql/resolvers/social_data_resolver.ex index a2267483c6..3ed668a653 100644 --- a/lib/sanbase_web/graphql/resolvers/social_data_resolver.ex +++ b/lib/sanbase_web/graphql/resolvers/social_data_resolver.ex @@ -17,7 +17,7 @@ defmodule SanbaseWeb.Graphql.Resolvers.SocialDataResolver do loader |> Dataloader.load(SanbaseDataloader, :project_by_slug, slug) |> on_load(fn loader -> - {:ok, Dataloader.get(loader, SanbaseDataloader, :project_by_slug, slug)} + Dataloader.get(loader, SanbaseDataloader, :project_by_slug, slug) end) end diff --git a/lib/sanbase_web/graphql/resolvers/timeline/timeline_event_resolver.ex b/lib/sanbase_web/graphql/resolvers/timeline/timeline_event_resolver.ex index d08787700f..7ab4a2f4a9 100644 --- a/lib/sanbase_web/graphql/resolvers/timeline/timeline_event_resolver.ex +++ b/lib/sanbase_web/graphql/resolvers/timeline/timeline_event_resolver.ex @@ -68,7 +68,10 @@ defmodule SanbaseWeb.Graphql.Resolvers.TimelineEventResolver do loader |> Dataloader.load(SanbaseDataloader, :timeline_events_comments_count, id) |> on_load(fn loader -> - {:ok, Dataloader.get(loader, SanbaseDataloader, :timeline_events_comments_count, id) || 0} + case Dataloader.get(loader, SanbaseDataloader, :timeline_events_comments_count, id) do + {:ok, val} -> {:ok, val || 0} + {:error, error} -> {:error, error} + end end) end end diff --git a/lib/sanbase_web/graphql/schema.ex b/lib/sanbase_web/graphql/schema.ex index e17bbf7f4e..c9a2fe18e5 100644 --- a/lib/sanbase_web/graphql/schema.ex +++ b/lib/sanbase_web/graphql/schema.ex @@ -131,7 +131,7 @@ defmodule SanbaseWeb.Graphql.Schema do import_types(Graphql.Schema.WidgetQueries) def dataloader() do - Dataloader.new(timeout: :timer.seconds(20), get_policy: :return_nil_on_error) + Dataloader.new(timeout: :timer.seconds(20), get_policy: :tuples) |> Dataloader.add_source(SanbaseRepo, SanbaseRepo.data()) |> Dataloader.add_source(SanbaseDataloader, SanbaseDataloader.data()) end diff --git a/lib/sanbase_web/graphql/schema/types/project_types.ex b/lib/sanbase_web/graphql/schema/types/project_types.ex index 7a288074fb..e0a09a8c0b 100644 --- a/lib/sanbase_web/graphql/schema/types/project_types.ex +++ b/lib/sanbase_web/graphql/schema/types/project_types.ex @@ -323,6 +323,7 @@ defmodule SanbaseWeb.Graphql.ProjectTypes do arg(:aggregation, :aggregation, default_value: nil) arg(:include_incomplete_data, :boolean, default_value: false) arg(:caching_params, :caching_params_input_object) + arg(:error_on_failed_fetch, :boolean, default_value: false) complexity(&Complexity.from_to_interval/3) middleware(AccessControl)