Skip to content

Commit

Permalink
Don't crash when one endpoint is down, better paginate, better timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
prevostc committed Jun 24, 2024
1 parent d673557 commit b44d117
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 146 deletions.
17 changes: 13 additions & 4 deletions src/queries/InvestorTimeline.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ fragment InvestorTimelineClmPosition on CLM {

fragment InvestorTimelineClmPositionInteraction on ClmPositionInteraction {
id

timestamp
blockNumber
createdWith {
Expand All @@ -48,23 +49,31 @@ fragment InvestorTimelineClmPositionInteraction on ClmPositionInteraction {
}

query InvestorTimeline($investor_address: String!, $first: Int = 1000, $skip: Int = 0) {
clmPositions(skip: 0, first: 1000, where: { investor: $investor_address, totalBalance_gt: 0 }) {
clmPositions(skip: $skip, first: $first, where: { investor: $investor_address, totalBalance_gt: 0 }) {
id

managerBalance
rewardPoolBalance
totalBalance
clm {
...InvestorTimelineClmPosition
}
interactions(
}

clmPositionInteractions(
first: $first
skip: $skip
orderBy: timestamp
orderDirection: asc
where: {
investor: $investor_address,
type_in: [MANAGER_DEPOSIT, MANAGER_WITHDRAW, REWARD_POOL_STAKE, REWARD_POOL_UNSTAKE]
}
) {
...InvestorTimelineClmPositionInteraction
) {
...InvestorTimelineClmPositionInteraction

investorPosition {
id
}
}
}
101 changes: 50 additions & 51 deletions src/routes/v1/investor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { addressSchema, transactionHashSchema } from '../../schema/address';
import { bigDecimalSchema } from '../../schema/bigint';
import { getAsyncCache } from '../../utils/async-lock';
import type { Address, Hex } from '../../utils/scalar-types';
import { getClmTimeline } from '../../utils/timeline';
import { type TimelineClmInteraction, actionsEnumSchema } from '../../utils/timeline-types';
import { actionsEnumSchema, getClmTimeline } from '../../utils/timeline';

export default async function (
instance: FastifyInstance,
Expand Down Expand Up @@ -99,53 +98,53 @@ const timelineClmInteractionOutputSchema = Type.Union([
]);
type TimelineClmInteractionOutput = Static<typeof timelineClmInteractionOutputSchema>;

function clmInteractionToOutput(interaction: TimelineClmInteraction): TimelineClmInteractionOutput {
const { rewardPoolToken, rewardPool } = interaction;
const hasRewardPool = !!rewardPoolToken && !!rewardPool;
// ensure we don't include partial reward pool data
const rewardPoolFields: ClmInteractionRewardPool | undefined = hasRewardPool
? {
reward_pool_address: rewardPoolToken.address,
reward_pool_balance: rewardPool.balance.toString(),
reward_pool_diff: rewardPool.delta.toString(),
}
: undefined;

return {
datetime: interaction.datetime.toISOString(),
product_key: `beefy:vault:${interaction.chain}:${interaction.managerToken.address}`,
display_name: interaction.managerToken.name || interaction.managerToken.address,
chain: interaction.chain,
is_eol: false,
is_dashboard_eol: false,
transaction_hash: interaction.transactionHash,

token0_to_usd: interaction.token0ToUsd.toString(),
token1_to_usd: interaction.token1ToUsd.toString(),

// legacy: share -> total
share_balance: interaction.total.balance.toString(),
share_diff: interaction.total.delta.toString(),

manager_address: interaction.managerToken.address,
manager_balance: interaction.manager.balance.toString(),
manager_diff: interaction.manager.delta.toString(),

...rewardPoolFields,

underlying0_balance: interaction.underlying0.balance.toString(),
underlying0_diff: interaction.underlying0.delta.toString(),

underlying1_balance: interaction.underlying1.balance.toString(),
underlying1_diff: interaction.underlying1.delta.toString(),

usd_balance: interaction.usd.balance.toString(),
usd_diff: interaction.usd.delta.toString(),

actions: interaction.actions,
};
}

async function getTimeline(investor_address: Address) {
return getClmTimeline(investor_address, clmInteractionToOutput);
async function getTimeline(investor_address: Address): Promise<TimelineClmInteractionOutput[]> {
const timeline = await getClmTimeline(investor_address);

return timeline.map((interaction): TimelineClmInteractionOutput => {
const { rewardPoolToken, rewardPool } = interaction;
const hasRewardPool = !!rewardPoolToken && !!rewardPool;
// ensure we don't include partial reward pool data
const rewardPoolFields: ClmInteractionRewardPool | undefined = hasRewardPool
? {
reward_pool_address: rewardPoolToken.address,
reward_pool_balance: rewardPool.balance.toString(),
reward_pool_diff: rewardPool.delta.toString(),
}
: undefined;

return {
datetime: interaction.datetime.toISOString(),
product_key: `beefy:vault:${interaction.chain}:${interaction.managerToken.address}`,
display_name: interaction.managerToken.name || interaction.managerToken.address,
chain: interaction.chain,
is_eol: false,
is_dashboard_eol: false,
transaction_hash: interaction.transactionHash,

token0_to_usd: interaction.token0ToUsd.toString(),
token1_to_usd: interaction.token1ToUsd.toString(),

// legacy: share -> total
share_balance: interaction.total.balance.toString(),
share_diff: interaction.total.delta.toString(),

manager_address: interaction.managerToken.address,
manager_balance: interaction.manager.balance.toString(),
manager_diff: interaction.manager.delta.toString(),

...rewardPoolFields,

underlying0_balance: interaction.underlying0.balance.toString(),
underlying0_diff: interaction.underlying0.delta.toString(),

underlying1_balance: interaction.underlying1.balance.toString(),
underlying1_diff: interaction.underlying1.delta.toString(),

usd_balance: interaction.usd.balance.toString(),
usd_diff: interaction.usd.delta.toString(),

actions: interaction.actions,
};
});
}
11 changes: 3 additions & 8 deletions src/routes/v1/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { FastifyInstance, FastifyPluginOptions, FastifySchema } from 'fasti
import { type ChainId, chainIdAsKeySchema } from '../../config/chains';
import { timestampNumberSchema } from '../../schema/bigint';
import { getAsyncCache } from '../../utils/async-lock';
import { getAllSdks, sdkContextSchema } from '../../utils/sdk';
import { executeOnAllSdks, sdkContextSchema } from '../../utils/sdk';

export default async function (
instance: FastifyInstance,
Expand Down Expand Up @@ -45,14 +45,9 @@ const statusSchema = Type.Record(chainIdAsKeySchema, Type.Array(endpointStatusSc
type Status = Static<typeof statusSchema>;

async function getStatus(): Promise<Status> {
const sdks = getAllSdks();
const results = await Promise.all(
sdks.map(async sdk => {
return await sdk.Status();
})
);
const res = await executeOnAllSdks(sdk => sdk.Status());

return results
return res.results
.map((res): EndpointStatus & { chain: ChainId } => ({
chain: res.chain,
subgraph: res.subgraph,
Expand Down
12 changes: 5 additions & 7 deletions src/routes/v1/vault.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { type Period, getPeriodSeconds, periodSchema } from '../../schema/period
import { getAsyncCache } from '../../utils/async-lock';
import { interpretAsDecimal } from '../../utils/decimal';
import type { Address, Hex } from '../../utils/scalar-types';
import { getSdksForChain, paginateSdkCalls } from '../../utils/sdk';
import { getSdksForChain, paginate } from '../../utils/sdk';
import { setOpts } from '../../utils/typebox';

export default async function (
Expand Down Expand Up @@ -408,17 +408,15 @@ const getVaultInvestors = async (
): Promise<VaultInvestors> => {
const res = await Promise.all(
getSdksForChain(chain).map(async sdk =>
paginateSdkCalls(
sdk,
(sdk, skip, first) =>
paginate({
fetchPage: ({ skip, first }) =>
sdk.VaultInvestors({
clmAddress: vault_address,
skip,
first,
}),
res => res.data.clmPositions.length,
{ pageSize: 1000, fetchAtMost: 100_000 }
)
count: res => res.data.clmPositions.length,
})
)
);

Expand Down
12 changes: 5 additions & 7 deletions src/routes/v1/vaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { getAsyncCache } from '../../utils/async-lock';
import { fromUnixTime, getUnixTime } from '../../utils/date';
import { interpretAsDecimal } from '../../utils/decimal';
import type { Address, Hex } from '../../utils/scalar-types';
import { getSdksForChain, paginateSdkCalls } from '../../utils/sdk';
import { getSdksForChain, paginate } from '../../utils/sdk';
import { setOpts } from '../../utils/typebox';
import { prepareVaultHarvests, vaultHarvestSchema } from './vault';

Expand Down Expand Up @@ -159,17 +159,15 @@ const getVaults = async (chain: ChainId, period: Period): Promise<Vaults> => {

const res = await Promise.all(
getSdksForChain(chain).map(sdk =>
paginateSdkCalls(
sdk,
(sdk, skip, first) =>
paginate({
fetchPage: ({ skip, first }) =>
sdk.Vaults({
since,
skip,
first,
}),
res => max(res.data.clms.map(vault => vault.collectedFees.length)) || 0,
{ pageSize: 1000, fetchAtMost: 100_000 }
)
count: res => max(res.data.clms.map(vault => vault.collectedFees.length)) || 0,
})
)
);

Expand Down
4 changes: 4 additions & 0 deletions src/utils/async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export const withTimeout = async <T>(prom: Promise<T>, time: number): Promise<T> => {
const res = await Promise.race([prom, new Promise((_, reject) => setTimeout(reject, time))]);
return res as T;
};
60 changes: 50 additions & 10 deletions src/utils/sdk.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { type Static, Type } from '@sinclair/typebox';
import { GraphQLClient } from 'graphql-request';
import { isArray } from 'lodash';
import { type ChainId, allChainIds, chainIdSchema } from '../config/chains';
import { type Sdk, getSdk } from '../queries/codegen/sdk';
import { withTimeout } from './async';
import { GraphQueryError } from './error';
import { createCachedFactoryByChainId } from './factory';
import { getLoggerFor } from './log';
Expand All @@ -23,7 +25,7 @@ type EndpointSdk = {
) => Promise<Awaited<ReturnType<Sdk[key]>> & SdkContext>;
};

export const getAllSdks = () => allChainIds.flatMap(chain => getSdksForChain(chain));
const getAllSdks = () => allChainIds.flatMap(chain => getSdksForChain(chain));

export const getSdksForChain = createCachedFactoryByChainId((chain: ChainId): EndpointSdk[] => {
const configs = [getSubgraphConfig(chain, SUBGRAPH_TAG)];
Expand Down Expand Up @@ -69,21 +71,59 @@ function getSubgraphUrl(name: string, tag: string): string {
return `https://api.goldsky.com/api/public/project_clu2walwem1qm01w40v3yhw1f/subgraphs/${name}/${tag}/gn`;
}

export async function paginateSdkCalls<R>(
sdk: EndpointSdk,
fn: (sdk: Sdk, skip: number, first: number) => Promise<R>,
count: (res: R) => number,
options: { pageSize: number; fetchAtMost: number }
): Promise<R[]> {
const { pageSize, fetchAtMost } = options;
type AllSdkRes<T> = {
errors: unknown[];
results: Array<T>;
};

export async function executeOnAllSdks<T>(
fn: (sdk: EndpointSdk) => Promise<T>,
options: { timeout: number } = { timeout: 30000 }
): Promise<AllSdkRes<T>> {
const sdks = getAllSdks();
try {
const promises = sdks.map(sdk => withTimeout(fn(sdk), options.timeout));
const settled = await withTimeout(Promise.allSettled(promises), options.timeout);
return settled.reduce(
(acc, res) => {
if (res.status === 'fulfilled') {
acc.results.push(res.value);
} else {
acc.errors.push(res.reason);
}
return acc;
},
{ errors: [], results: [] } as AllSdkRes<T>
);
} catch (e) {
logger.error(`Failed to execute on all sdks: ${e}`);
return { errors: [e], results: [] };
}
}

export async function paginate<R>({
fetchPage,
count,
pageSize = 1000,
fetchAtMost = 10000,
}: {
fetchPage: (params: { skip: number; first: number }) => Promise<R>;
count: (res: R) => number | number[];
pageSize?: number;
fetchAtMost?: number;
}): Promise<R[]> {
const results: R[] = [];
let skip = 0;
let fetched = 0;

while (fetched < fetchAtMost) {
const res = await fn(sdk, skip, pageSize);
const res = await fetchPage({ skip, first: pageSize });
results.push(res);
const resCount = count(res);
const resCountOrCounts = count(res);
const resCount = isArray(resCountOrCounts)
? Math.max(...resCountOrCounts) || 0
: resCountOrCounts;

logger.debug(`Fetched ${resCount} results, total fetched: ${fetched}`);
if (resCount < pageSize) {
break;
Expand Down
36 changes: 0 additions & 36 deletions src/utils/timeline-types.ts

This file was deleted.

Loading

0 comments on commit b44d117

Please sign in to comment.