From 5c895924ea4badd6805afb3b1e7b1a1d61a28fde Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Fri, 16 Feb 2024 05:48:59 +0400 Subject: [PATCH] feat: handle reserve price oracles --- README.md | 5 + db_scripts/local_testing/anvil_test.sh | 54 ++++++ db_scripts/local_testing/local_test.sh | 59 +------ debts/calc_details.go | 2 +- debts/engine.go | 76 ++++----- debts/index.go | 16 +- debts/pnl.go | 12 +- debts/tf_index.go | 5 +- debts/token_threshold.go | 13 +- ds/adapter_name.go | 1 + ds/repo.go | 34 ++-- ds/repo_dummy.go | 2 +- go.mod | 4 +- go.sum | 2 + migrations/000039_pf_version.up.sql | 37 +++++ models/aggregated_block_feed/dependency.go | 9 +- models/aggregated_block_feed/model.go | 69 ++++---- models/aggregated_block_feed/query.go | 24 ++- .../query_price_feed/token_ds.go | 61 +++++++ .../yearn_pf_internal.go | 12 +- .../yearn_pf_internal_test.go | 4 +- .../query_price_feed/yearn_price_feed.go | 138 ++++++++++++++++ .../aggregated_block_feed/yearn_price_feed.go | 156 ------------------ models/chainlink_price_feed/model.go | 7 +- models/chainlink_price_feed/on_log.go | 50 +++++- models/composite_chainlink/model.go | 10 +- models/composite_chainlink/on_log.go | 49 +++++- models/contract_register/on_log.go | 14 +- models/credit_manager/cm_v3/v3operation.go | 8 +- models/pool/pool_v3/get_details.go | 3 +- models/pool_lmrewards/v3/log.go | 4 +- models/price_oracle/on_log.go | 24 ++- repository/handlers/blocks.go | 17 +- repository/handlers/prev_price.go | 93 +++++------ repository/handlers/sync_adapters.go | 4 +- repository/handlers/token_oracle.go | 80 +++++---- repository/save.go | 1 + tests/aqf_test.go | 9 +- 38 files changed, 685 insertions(+), 483 deletions(-) create mode 100755 db_scripts/local_testing/anvil_test.sh mode change 100644 => 100755 db_scripts/local_testing/local_test.sh create mode 100644 migrations/000039_pf_version.up.sql create mode 100644 models/aggregated_block_feed/query_price_feed/token_ds.go rename models/aggregated_block_feed/{ => query_price_feed}/yearn_pf_internal.go (88%) rename models/aggregated_block_feed/{ => query_price_feed}/yearn_pf_internal_test.go (91%) create mode 100644 models/aggregated_block_feed/query_price_feed/yearn_price_feed.go delete mode 100644 models/aggregated_block_feed/yearn_price_feed.go diff --git a/README.md b/README.md index e436af7c..233c3908 100644 --- a/README.md +++ b/README.md @@ -52,3 +52,8 @@ modifying, or distributing this software product. ### Important information for contributors As a contributor to the Gearbox Protocol GitHub repository, your pull requests indicate acceptance of our Gearbox Contribution Agreement. This agreement outlines that you assign the Intellectual Property Rights of your contributions to the Gearbox Foundation. This helps safeguard the Gearbox protocol and ensure the accumulation of its intellectual property. Contributions become part of the repository and may be used for various purposes, including commercial. As recognition for your expertise and work, you receive the opportunity to participate in the protocol's development and the potential to see your work integrated within it. The full Gearbox Contribution Agreement is accessible within the [repository](/ContributionAgreement) for comprehensive understanding. [Let's innovate together!] + + +# TODO +# (p *PriceFeed) Clone() *PriceFeed { +# PFVersion: p.PFVersion, \ No newline at end of file diff --git a/db_scripts/local_testing/anvil_test.sh b/db_scripts/local_testing/anvil_test.sh new file mode 100755 index 00000000..473484a3 --- /dev/null +++ b/db_scripts/local_testing/anvil_test.sh @@ -0,0 +1,54 @@ +set -e + +PARENT_DIR=$(dirname $0) +REMOTE_DB=$1 +SUPERUSER=$2 + +FORK_BLOCK=`jq .forkBlock.number < <(curl https://anvil.gearbox.foundation/api/forks/432945bc-3620-11ee-be56-0242ac120002 )` + +# if [ "$PROXY_IP" = '' ]; then +# ssh -t debian@$MAINNET_IP "bash /home/debian/db_copy.sh" +# scp debian@$MAINNET_IP:/tmp/db.sql /tmp/db.sql +# else +# ssh -t root@$PROXY_IP 'ssh -t debian@'$MAINNET_IP' "bash /home/debian/db_copy.sh"' +# ssh -t root@$PROXY_IP 'scp debian@'$MAINNET_IP':/tmp/db.sql /tmp/db.sql' +# scp root@$PROXY_IP:/tmp/db.sql /tmp/db.sql +# fi + +# if [ "$SUPERUSER" = "postgres" ]; then +# sudo su postgres +# fi + + +export SAMPLE_DB="postgres://$SUPERUSER@localhost:5432/sample?sslmode=disable" + +set +e +psql -U $SUPERUSER -d postgres -c 'drop database sample' +psql -U $SUPERUSER -d postgres -c 'create database sample' +pg_dump "$REMOTE_DB" | psql -U $SUPERUSER -d sample +set -e + +# psql -U $SUPERUSER -d sample < db_scripts/local_testing/missing_table_from_download_db.sql +psql -U $SUPERUSER -d sample < $PARENT_DIR/../../migrations/000016_rankings.up.sql +migrate -path $PARENT_DIR/../../migrations/ -database "$SAMPLE_DB" up + + +psql -U $SUPERUSER -d sample < <(cat $PARENT_DIR/reset_to_blocknum.sql | sed "s/18246321/$FORK_BLOCK/" ) +set +e +psql -U $SUPERUSER -d postgres -c 'drop database tmp_sample' +set -e +createdb -O $SUPERUSER -T sample tmp_sample + +# create user sample with encrypted password '123Sample'; +# GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO sample; +# ALTER DATABASE sample OWNER TO sample; +# update schema_migrations set version=27, dirty='f'; +# +# SELECT format( +# 'ALTER TABLE public.%I OWNER TO sample', +# table_name +# ) +# FROM information_schema.tables +# WHERE table_schema = 'public'; + +# SELECT * FROM information_schema.tables WHERE table_schema = 'public'; \ No newline at end of file diff --git a/db_scripts/local_testing/local_test.sh b/db_scripts/local_testing/local_test.sh old mode 100644 new mode 100755 index 473484a3..0855cf4b --- a/db_scripts/local_testing/local_test.sh +++ b/db_scripts/local_testing/local_test.sh @@ -1,54 +1,7 @@ -set -e - -PARENT_DIR=$(dirname $0) -REMOTE_DB=$1 -SUPERUSER=$2 - -FORK_BLOCK=`jq .forkBlock.number < <(curl https://anvil.gearbox.foundation/api/forks/432945bc-3620-11ee-be56-0242ac120002 )` - -# if [ "$PROXY_IP" = '' ]; then -# ssh -t debian@$MAINNET_IP "bash /home/debian/db_copy.sh" -# scp debian@$MAINNET_IP:/tmp/db.sql /tmp/db.sql -# else -# ssh -t root@$PROXY_IP 'ssh -t debian@'$MAINNET_IP' "bash /home/debian/db_copy.sh"' -# ssh -t root@$PROXY_IP 'scp debian@'$MAINNET_IP':/tmp/db.sql /tmp/db.sql' -# scp root@$PROXY_IP:/tmp/db.sql /tmp/db.sql -# fi - -# if [ "$SUPERUSER" = "postgres" ]; then -# sudo su postgres -# fi - - -export SAMPLE_DB="postgres://$SUPERUSER@localhost:5432/sample?sslmode=disable" - -set +e +SUPERUSER="$1" psql -U $SUPERUSER -d postgres -c 'drop database sample' -psql -U $SUPERUSER -d postgres -c 'create database sample' -pg_dump "$REMOTE_DB" | psql -U $SUPERUSER -d sample -set -e - -# psql -U $SUPERUSER -d sample < db_scripts/local_testing/missing_table_from_download_db.sql -psql -U $SUPERUSER -d sample < $PARENT_DIR/../../migrations/000016_rankings.up.sql -migrate -path $PARENT_DIR/../../migrations/ -database "$SAMPLE_DB" up - - -psql -U $SUPERUSER -d sample < <(cat $PARENT_DIR/reset_to_blocknum.sql | sed "s/18246321/$FORK_BLOCK/" ) -set +e -psql -U $SUPERUSER -d postgres -c 'drop database tmp_sample' -set -e -createdb -O $SUPERUSER -T sample tmp_sample - -# create user sample with encrypted password '123Sample'; -# GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO sample; -# ALTER DATABASE sample OWNER TO sample; -# update schema_migrations set version=27, dirty='f'; -# -# SELECT format( -# 'ALTER TABLE public.%I OWNER TO sample', -# table_name -# ) -# FROM information_schema.tables -# WHERE table_schema = 'public'; - -# SELECT * FROM information_schema.tables WHERE table_schema = 'public'; \ No newline at end of file +psql -U $SUPERUSER -d postgres -c "create database sample with owner $SUPERUSER" +pg_dump "$DCDB" | psql -U $SUPERUSER -d sample +psql -U $SUPERUSER -d sample < migrations/000016_rankings.up.sql +FORK_BLOCK=`jq .forkBlock.number < <(curl https://anvil.gearbox.foundation/api/forks/$2 )` +psql -U $SUPERUSER -d sample < <(cat db_scripts/local_testing/reset_to_blocknum.sql | sed "s/18246321/$FORK_BLOCK/" ) \ No newline at end of file diff --git a/debts/calc_details.go b/debts/calc_details.go index 258a63b0..3c8e399f 100644 --- a/debts/calc_details.go +++ b/debts/calc_details.go @@ -54,7 +54,7 @@ type storeForCalc struct { func (s storeForCalc) GetToken(token string) *schemas.Token { return s.inner.repo.GetToken(token) } -func (s storeForCalc) GetPrices(token string, version core.VersionType, blockNums ...int64) *big.Int { +func (s storeForCalc) GetPrices(token string, version schemas.PFVersion, blockNums ...int64) *big.Int { return s.inner.GetTokenLastPrice(token, version) } diff --git a/debts/engine.go b/debts/engine.go index ee8ab480..04b0c3d8 100644 --- a/debts/engine.go +++ b/debts/engine.go @@ -138,7 +138,7 @@ func (eng *DebtEngine) CalculateDebt() { caTotalValueInUSD += utils.GetFloat64Decimal( eng.GetAmountInUSD( cmToPoolDetails[session.CreditManager].Token, - sessionSnapshot.TotalValueBI.Convert(), session.Version, + sessionSnapshot.TotalValueBI.Convert(), schemas.VersionToPFVersion(session.Version, false), ), 8) for token, tokenBalance := range *sessionSnapshot.Balances { @@ -192,9 +192,12 @@ func (eng *DebtEngine) createTvlSnapshots(blockNum int64, caTotalValueInUSD floa // underlyingToken := state.(*schemas.PoolState).UnderlyingToken // - version := core.NewVersion(1) - if eng.tokenLastPriceV2[underlyingToken] != nil { - version = core.NewVersion(2) + version := schemas.V1PF + if eng.tokenLastPrice[schemas.V2PF] != nil { + version = schemas.V2PF + } + if eng.tokenLastPrice[schemas.V3PF_MAIN] != nil { + version = schemas.V3PF_MAIN } // totalAvailableLiquidityInUSD += utils.GetFloat64Decimal( @@ -281,12 +284,8 @@ func (eng *DebtEngine) GetCumulativeIndexAndDecimalForCMs(blockNum int64, ts uin return cmToCumIndex } -func (eng *DebtEngine) getTokenPriceFeed(token string, version core.VersionType) *schemas.PriceFeed { - if version.IsGBv1() { - return eng.tokenLastPrice[token] - } else { // v2 and above - return eng.tokenLastPriceV2[token] - } +func (eng *DebtEngine) getTokenPriceFeed(token string, pfVersion schemas.PFVersion) *schemas.PriceFeed { + return eng.tokenLastPrice[pfVersion][token] } func (eng *DebtEngine) SessionDebtHandler(blockNum int64, session *schemas.CreditSession, cumIndexAndUToken *ds.CumIndexAndUToken) { @@ -301,12 +300,12 @@ func (eng *DebtEngine) SessionDebtHandler(blockNum int64, session *schemas.Credi yearnFeeds := eng.repo.GetYearnFeedAddrs() for tokenAddr, details := range *sessionSnapshot.Balances { if details.IsEnabled && details.HasBalanceMoreThanOne() { - lastPriceEvent := eng.getTokenPriceFeed(tokenAddr, session.Version) + lastPriceEvent := eng.getTokenPriceFeed(tokenAddr, schemas.VersionToPFVersion(session.Version, false)) // don't use reserve // if tokenAddr != eng.repo.GetWETHAddr() && lastPriceEvent.BlockNumber != blockNum { feed := lastPriceEvent.Feed if utils.Contains(yearnFeeds, feed) { - eng.requestPriceFeed(blockNum, feed, tokenAddr, lastPriceEvent.IsPriceInUSD) + eng.requestPriceFeed(blockNum, feed, tokenAddr, lastPriceEvent.MergedPFVersion) } } } @@ -380,7 +379,7 @@ func (eng *DebtEngine) CalculateSessionDebt(blockNum int64, session *schemas.Cre for tokenAddr, details := range *sessionSnapshot.Balances { if details.IsEnabled { profile.Tokens[tokenAddr] = ds.TokenDetails{ - Price: eng.GetTokenLastPrice(tokenAddr, session.Version), + Price: eng.GetTokenLastPrice(tokenAddr, schemas.VersionToPFVersion(session.Version, false)), // don't use reserve Decimals: eng.repo.GetToken(tokenAddr).Decimals, TokenLiqThreshold: eng.allowedTokensThreshold[session.CreditManager][tokenAddr], } @@ -428,14 +427,15 @@ func (eng *DebtEngine) CalculateSessionDebt(blockNum int64, session *schemas.Cre } // helper methods -func (eng *DebtEngine) GetAmountInUSD(tokenAddr string, amount *big.Int, version core.VersionType) *big.Int { - usdcAddr := eng.repo.GetUSDCAddr() - tokenPrice := eng.GetTokenLastPrice(tokenAddr, version) +func (eng *DebtEngine) GetAmountInUSD(tokenAddr string, amount *big.Int, pfVersion schemas.PFVersion) *big.Int { + tokenPrice := eng.GetTokenLastPrice(tokenAddr, pfVersion) tokenDecimals := eng.repo.GetToken(tokenAddr).Decimals - if version.Eq(2) { + if pfVersion.Decimals() == 8 { return utils.GetInt64(new(big.Int).Mul(tokenPrice, amount), tokenDecimals) } - usdcPrice := eng.GetTokenLastPrice(usdcAddr, version) + // for v1 + usdcAddr := eng.repo.GetUSDCAddr() + usdcPrice := eng.GetTokenLastPrice(usdcAddr, pfVersion) usdcDecimals := eng.repo.GetToken(usdcAddr).Decimals value := new(big.Int).Mul(amount, tokenPrice) @@ -444,22 +444,18 @@ func (eng *DebtEngine) GetAmountInUSD(tokenAddr string, amount *big.Int, version return new(big.Int).Mul(value, big.NewInt(100)) } -func (eng *DebtEngine) GetTokenLastPrice(addr string, version core.VersionType, dontFail ...bool) *big.Int { - if version.Eq(1) { - if eng.tokenLastPrice[addr] != nil { - return eng.tokenLastPrice[addr].PriceBI.Convert() - } else if eng.repo.GetWETHAddr() == addr { - return core.WETHPrice - } - } else if version.Eq(2) || version.Eq(300) { - if eng.tokenLastPriceV2[addr] != nil { - return eng.tokenLastPriceV2[addr].PriceBI.Convert() - } +func (eng *DebtEngine) GetTokenLastPrice(addr string, pfVersion schemas.PFVersion, dontFail ...bool) *big.Int { + if pfVersion.ToVersion().Eq(1) && eng.repo.GetWETHAddr() == addr { + return core.WETHPrice + } + if eng.tokenLastPrice[pfVersion][addr] != nil { + return eng.tokenLastPrice[pfVersion][addr].PriceBI.Convert() } + // if len(dontFail) > 0 && dontFail[0] { return nil } - log.Fatal(fmt.Sprintf("Price not found for %s version: %d", addr, version)) + log.Fatal(fmt.Sprintf("Price not found for %s pfversion: %d", addr, pfVersion)) return nil } @@ -483,7 +479,7 @@ func (eng *DebtEngine) SessionDataFromDC(version core.VersionType, blockNum int6 return data } -func (eng *DebtEngine) requestPriceFeed(blockNum int64, feed, token string, isPriceInUSD bool) { +func (eng *DebtEngine) requestPriceFeed(blockNum int64, feed, token string, pfVersion schemas.MergedPFVersion) { // PFFIX yearnPFContract, err := yearnPriceFeed.NewYearnPriceFeed(common.HexToAddress(feed), eng.client) log.CheckFatal(err) @@ -494,17 +490,13 @@ func (eng *DebtEngine) requestPriceFeed(blockNum int64, feed, token string, isPr if err != nil { log.Fatal(err) } - var decimals int8 = 18 // for eth - if isPriceInUSD { - decimals = 8 // for usd - } eng.AddTokenLastPrice(&schemas.PriceFeed{ - BlockNumber: blockNum, - Token: token, - Feed: feed, - RoundId: roundData.RoundId.Int64(), - PriceBI: (*core.BigInt)(roundData.Answer), - Price: utils.GetFloat64Decimal(roundData.Answer, decimals), - IsPriceInUSD: isPriceInUSD, + BlockNumber: blockNum, + Token: token, + Feed: feed, + RoundId: roundData.RoundId.Int64(), + PriceBI: (*core.BigInt)(roundData.Answer), + Price: utils.GetFloat64Decimal(roundData.Answer, pfVersion.Decimals()), + MergedPFVersion: pfVersion, }) } diff --git a/debts/index.go b/debts/index.go index 133e141b..b977abc0 100644 --- a/debts/index.go +++ b/debts/index.go @@ -14,13 +14,12 @@ import ( ) type DebtEngine struct { - repo ds.RepositoryI - db *gorm.DB - client core.ClientI - config *config.Config - lastCSS map[string]*schemas.CreditSessionSnapshot - tokenLastPrice map[string]*schemas.PriceFeed - tokenLastPriceV2 map[string]*schemas.PriceFeed + repo ds.RepositoryI + db *gorm.DB + client core.ClientI + config *config.Config + lastCSS map[string]*schemas.CreditSessionSnapshot + tokenLastPrice map[schemas.PFVersion]map[string]*schemas.PriceFeed //// credit_manager -> token -> liquidity threshold allowedTokensThreshold map[string]map[string]*core.BigInt poolLastInterestData map[string]*schemas.PoolInterestData @@ -48,8 +47,7 @@ func GetDebtEngine(db *gorm.DB, client core.ClientI, config *config.Config, repo client: client, config: config, lastCSS: make(map[string]*schemas.CreditSessionSnapshot), - tokenLastPrice: make(map[string]*schemas.PriceFeed), - tokenLastPriceV2: make(map[string]*schemas.PriceFeed), + tokenLastPrice: make(map[schemas.PFVersion]map[string]*schemas.PriceFeed), allowedTokensThreshold: make(map[string]map[string]*core.BigInt), poolLastInterestData: make(map[string]*schemas.PoolInterestData), lastDebts: make(map[string]*schemas.Debt), diff --git a/debts/pnl.go b/debts/pnl.go index d0372a88..9ac16e6a 100644 --- a/debts/pnl.go +++ b/debts/pnl.go @@ -65,14 +65,14 @@ func (eng *DebtEngine) calAmountToPoolAndProfit(debt *schemas.Debt, session *sch debt.RepayAmountBI = (*core.BigInt)(repayAmount) } - remainingFundsInUSD := eng.GetAmountInUSD(cumIndexAndUToken.Token, remainingFunds, session.Version) + remainingFundsInUSD := eng.GetAmountInUSD(cumIndexAndUToken.Token, remainingFunds, schemas.VersionToPFVersion(session.Version, false)) debt.ProfitInUnderlying = utils.GetFloat64Decimal(remainingFunds, cumIndexAndUToken.Decimals) - debt.CollateralInUnderlying // debt.CollateralInUnderlying = sessionSnapshot.CollateralInUnderlying // fields in USD debt.CollateralInUSD = sessionSnapshot.CollateralInUSD debt.ProfitInUSD = utils.GetFloat64Decimal(remainingFundsInUSD, 8) - sessionSnapshot.CollateralInUSD debt.TotalValueInUSD = utils.GetFloat64Decimal( - eng.GetAmountInUSD(cumIndexAndUToken.Token, debt.CalTotalValueBI.Convert(), session.Version), 8) + eng.GetAmountInUSD(cumIndexAndUToken.Token, debt.CalTotalValueBI.Convert(), schemas.VersionToPFVersion(session.Version, false)), 8) } func (eng *DebtEngine) remainingFundsv3(session *schemas.CreditSession, debt *schemas.Debt, cumIndexAndUToken *ds.CumIndexAndUToken, @@ -81,7 +81,7 @@ func (eng *DebtEngine) remainingFundsv3(session *schemas.CreditSession, debt *sc if session.Status == schemas.Closed && session.ClosedAt == debt.BlockNumber+1 { prices := core.JsonFloatMap{} for token, transferAmt := range *session.CloseTransfers { - tokenPrice := eng.GetTokenLastPrice(token, session.Version) + tokenPrice := eng.GetTokenLastPrice(token, schemas.VersionToPFVersion(session.Version, false)) price := utils.GetFloat64Decimal(tokenPrice, 8) prices[token] = price if transferAmt < 0 { @@ -95,7 +95,7 @@ func (eng *DebtEngine) remainingFundsv3(session *schemas.CreditSession, debt *sc // remainingFunds calculation // set price for underlying token prices[cumIndexAndUToken.Token] = utils.GetFloat64Decimal( - eng.GetTokenLastPrice(cumIndexAndUToken.Token, session.Version), 8) + eng.GetTokenLastPrice(cumIndexAndUToken.Token, schemas.VersionToPFVersion(session.Version, false)), 8) remainingFunds = session.CloseTransfers.ValueInUnderlying(cumIndexAndUToken.Token, cumIndexAndUToken.Decimals, prices) } else if secStatus := session.StatusAt(debt.BlockNumber + 1); schemas.IsStatusLiquidated(secStatus) { // @@ -129,7 +129,7 @@ func (eng *DebtEngine) remainingFundsv2(session *schemas.CreditSession, debt *sc if session.Status == schemas.Closed && session.ClosedAt == debt.BlockNumber+1 { prices := core.JsonFloatMap{} for token, transferAmt := range *session.CloseTransfers { - tokenPrice := eng.GetTokenLastPrice(token, session.Version) + tokenPrice := eng.GetTokenLastPrice(token, schemas.VersionToPFVersion(session.Version, false)) price := utils.GetFloat64Decimal(tokenPrice, 8) prices[token] = price if transferAmt < 0 { @@ -143,7 +143,7 @@ func (eng *DebtEngine) remainingFundsv2(session *schemas.CreditSession, debt *sc // remainingFunds calculation // set price for underlying token prices[cumIndexAndUToken.Token] = utils.GetFloat64Decimal( - eng.GetTokenLastPrice(cumIndexAndUToken.Token, session.Version), 8) + eng.GetTokenLastPrice(cumIndexAndUToken.Token, schemas.VersionToPFVersion(session.Version, false)), 8) remainingFunds = session.CloseTransfers.ValueInUnderlying(cumIndexAndUToken.Token, cumIndexAndUToken.Decimals, prices) } else if session.ClosedAt == debt.BlockNumber+1 && schemas.IsStatusLiquidated(session.Status) { remainingFunds = session.RemainingFunds.Convert() diff --git a/debts/tf_index.go b/debts/tf_index.go index 642dc2f9..17cda24a 100644 --- a/debts/tf_index.go +++ b/debts/tf_index.go @@ -36,6 +36,7 @@ func (calc FarmingCalculator) addFarmingVal(debt *schemas.Debt, session *schemas if calc.testing || session.Status != schemas.Active { return } + pfVersion := schemas.VersionToPFVersion(session.Version, false) var farmingVal float64 = 0 for token, balance := range *css.Balances { if balance.IsEnabled && balance.HasBalanceMoreThanOne() && !calc.tradingTokensMap[token] { @@ -43,11 +44,11 @@ func (calc FarmingCalculator) addFarmingVal(debt *schemas.Debt, session *schemas if session.Version.Eq(1) { priceDecimals = 18 } - farmingVal += balance.F * utils.GetFloat64Decimal(priceStore.GetPrices(token, session.Version), priceDecimals) + farmingVal += balance.F * utils.GetFloat64Decimal(priceStore.GetPrices(token, pfVersion), priceDecimals) } } if session.Version.Eq(1) { - farmingVal = farmingVal / utils.GetFloat64Decimal(priceStore.GetPrices(calc.usdc, session.Version), 18) // convert to usd + farmingVal = farmingVal / utils.GetFloat64Decimal(priceStore.GetPrices(calc.usdc, pfVersion), 18) // convert to usd // by dividing by usdc price in eth } // farming val is zero for closed accounts diff --git a/debts/token_threshold.go b/debts/token_threshold.go index 7e8893d3..237c1a50 100644 --- a/debts/token_threshold.go +++ b/debts/token_threshold.go @@ -61,9 +61,7 @@ func (eng *DebtEngine) AddTokenLTRamp(atoken *schemas_v3.TokenLTRamp) { func (eng *DebtEngine) loadTokenLastPrice(lastDebtSync int64) { defer utils.Elapsed("Debt(loadTokenLastPrice)")() data := []*schemas.PriceFeed{} - query := `SELECT pf.* FROM price_feeds pf - JOIN (SELECT max(block_num) b, token, price_in_usd FROM price_feeds WHERE block_num <= ? GROUP BY token, price_in_usd) AS max_pf - ON max_pf.b = pf.block_num and pf.token=max_pf.token and max_pf.price_in_usd=pf.price_in_usd` + query := `SELECT distinct on (token, merged_pf_version) * FROM price_feeds WHERE block_num <= ? ORDER BY token, merged_pf_version, block_num DESC;` err := eng.db.Raw(query, lastDebtSync, lastDebtSync).Find(&data).Error if err != nil { log.Fatal(err) @@ -74,9 +72,10 @@ func (eng *DebtEngine) loadTokenLastPrice(lastDebtSync int64) { } func (eng *DebtEngine) AddTokenLastPrice(pf *schemas.PriceFeed) { - if !pf.IsPriceInUSD { - eng.tokenLastPrice[pf.Token] = pf - } else { - eng.tokenLastPriceV2[pf.Token] = pf + for _, pfVersion := range pf.MergedPFVersion.MergedPFVersionToList() { + if eng.tokenLastPrice[pfVersion] == nil { + eng.tokenLastPrice[pfVersion] = make(map[string]*schemas.PriceFeed) + } + eng.tokenLastPrice[pfVersion][pf.Token] = pf } } diff --git a/ds/adapter_name.go b/ds/adapter_name.go index 367e1d0b..fca87198 100644 --- a/ds/adapter_name.go +++ b/ds/adapter_name.go @@ -47,6 +47,7 @@ const ( YearnPF = "YearnPF" SingleAssetPF = "SingleAssetPF" CurvePF = "CurvePF" + RedStonePF = "RedStonePF" ZeroPF = "ZeroPF" CompositeChainlinkPF = "CompositeChainlinkPF" AlmostZeroPF = "AlmostZeroPF" diff --git a/ds/repo.go b/ds/repo.go index d0fac3dc..a147ce7b 100644 --- a/ds/repo.go +++ b/ds/repo.go @@ -1,9 +1,7 @@ package ds import ( - "fmt" "math/big" - "strings" "github.com/Gearbox-protocol/sdk-go/core" "github.com/Gearbox-protocol/sdk-go/core/schemas" @@ -12,25 +10,25 @@ import ( "github.com/Gearbox-protocol/third-eye/ds/dc_wrapper" ) -type PriceInUSDType bool +// type PriceInUSDType bool -func (z *PriceInUSDType) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf("%v", *z)), nil -} +// func (z *PriceInUSDType) MarshalJSON() ([]byte, error) { +// return []byte(fmt.Sprintf("%v", *z)), nil +// } -func (z *PriceInUSDType) UnmarshalJSON(b []byte) error { - str := strings.Trim(string(b), "\"") - *z = (str == "true") - return nil +// func (z *PriceInUSDType) UnmarshalJSON(b []byte) error { +// str := strings.Trim(string(b), "\"") +// *z = (str == "true") +// return nil -} -func (z PriceInUSDType) MarshalText() (text []byte, err error) { - return z.MarshalJSON() -} +// } +// func (z PriceInUSDType) MarshalText() (text []byte, err error) { +// return z.MarshalJSON() +// } -func (s *PriceInUSDType) UnmarshalText(text []byte) error { - return s.UnmarshalJSON(text) -} +// func (s *PriceInUSDType) UnmarshalText(text []byte) error { +// return s.UnmarshalJSON(text) +// } type EngineI interface { SyncHandler() @@ -58,7 +56,7 @@ type RepositoryI interface { // for getting executeparser GetExecuteParser() ExecuteParserI // price feed/oracle funcs - GetTokenOracles() map[PriceInUSDType]map[string]*schemas.TokenOracle + GetTokenOracles() map[schemas.PFVersion]map[string]*schemas.TokenOracle DirectlyAddTokenOracle(tokenOracle *schemas.TokenOracle) AddNewPriceOracleEvent(tokenOracle *schemas.TokenOracle, bounded bool) // diff --git a/ds/repo_dummy.go b/ds/repo_dummy.go index afd2c55c..ce794ae6 100644 --- a/ds/repo_dummy.go +++ b/ds/repo_dummy.go @@ -47,7 +47,7 @@ func (DummyRepo) SetAndGetBlock(blockNum int64) *schemas.Block { func (DummyRepo) GetBlocks() map[int64]*schemas.Block { return nil } -func (DummyRepo) GetTokenOracles() map[PriceInUSDType]map[string]*schemas.TokenOracle { +func (DummyRepo) GetTokenOracles() map[schemas.PFVersion]map[string]*schemas.TokenOracle { return nil } func (DummyRepo) GetDisabledTokens() []*schemas.AllowedToken { diff --git a/go.mod b/go.mod index 89b2a134..01538ee5 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Gearbox-protocol/third-eye go 1.19 require ( - github.com/Gearbox-protocol/sdk-go v0.0.0-20240121203826-89ee59ad653a + github.com/Gearbox-protocol/sdk-go v0.0.0-20240216014533-b42549c9dbfa github.com/ethereum/go-ethereum v1.10.17 github.com/go-playground/validator/v10 v10.4.1 github.com/google/go-cmp v0.5.8 @@ -73,4 +73,4 @@ require ( replace github.com/btcsuite/btcd => github.com/btcsuite/btcd v0.22.1 -// replace github.com/Gearbox-protocol/sdk-go v0.0.0-20240112091010-e6a3b573c3cb => ../sdk-go +// replace github.com/Gearbox-protocol/sdk-go v0.0.0-20240121203826-89ee59ad653a => ../sdk-go diff --git a/go.sum b/go.sum index bc8f2dc2..48f75814 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/Gearbox-protocol/sdk-go v0.0.0-20240121203826-89ee59ad653a h1:MdCm3RFrp9AKemFa3phsptON+wL4acwl9aeskl8gfdc= github.com/Gearbox-protocol/sdk-go v0.0.0-20240121203826-89ee59ad653a/go.mod h1:GniLx/DU7tCT+QSlKt9REqUaF748X8rbDNR4vAd1m+Y= +github.com/Gearbox-protocol/sdk-go v0.0.0-20240216014533-b42549c9dbfa h1:vxCl/4yH2wjUuLL+JljamikMIzH9NSgIm290q/eKJIk= +github.com/Gearbox-protocol/sdk-go v0.0.0-20240216014533-b42549c9dbfa/go.mod h1:GniLx/DU7tCT+QSlKt9REqUaF748X8rbDNR4vAd1m+Y= github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= diff --git a/migrations/000039_pf_version.up.sql b/migrations/000039_pf_version.up.sql new file mode 100644 index 00000000..467f1ff2 --- /dev/null +++ b/migrations/000039_pf_version.up.sql @@ -0,0 +1,37 @@ +-- +alter table token_oracle add reserve boolean; +update token_oracle set reserve= 'f'; + + +-- +alter table price_feeds add merged_pf_version integer; +update price_feeds set merged_pf_version=1 where price_in_usd='f'; +update price_feeds set merged_pf_version= 2 where price_in_usd='t' and block_num < 18798084 ; +update price_feeds set merged_pf_version= 2 where price_in_usd='t' and block_num >= 18798084 and feed='0x4A7b3F6c4aaB7Bc5617D6c30C3f22bAeBbc34F43'; +update price_feeds set merged_pf_version=4 where price_in_usd='t' and feed in (select feed from token_oracle where version=300); -- v3PF_main + +update price_feeds set merged_pf_version=6 where block_num >= 18798084 and feed in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)); + + +-- +update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',6)) where address in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)); + +update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',4)) where version=300 and type in ('ChainlinkPriceFeed', 'CompositeChainlinkPF', 'QueryPriceFeed'); + +update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',2)) where version =2 and address not in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)) and address!='0x6385892aCB085eaa24b745a712C9e682d80FF681'; + +update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',1)) where version=1 and type in ('ChainlinkPriceFeed', 'CompositeChainlinkPF', 'QueryPriceFeed') and details is not null; +-- + +create table t as (select * from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)); +update t set block_num=18798084 , version=300 ; +insert into token_oracle select * from t; +drop table t; + +alter table price_feeds drop column price_in_usd; + +alter table token_oracle drop constraint token_oracle_pkey; +alter table token_oracle add PRIMARY KEY (block_num, token, version, reserve); +alter table price_feeds drop constraint price_feeds_pkey; +delete from price_feeds where feed = '0x91401cedCBFd9680cE193A5F54E716504233e998'; +alter table price_feeds add PRIMARY KEY (block_num, token, merged_pf_version); \ No newline at end of file diff --git a/models/aggregated_block_feed/dependency.go b/models/aggregated_block_feed/dependency.go index 045f3b9e..1203aaf0 100644 --- a/models/aggregated_block_feed/dependency.go +++ b/models/aggregated_block_feed/dependency.go @@ -9,6 +9,7 @@ import ( "github.com/Gearbox-protocol/sdk-go/core/schemas" "github.com/Gearbox-protocol/sdk-go/log" "github.com/Gearbox-protocol/third-eye/ds" + "github.com/Gearbox-protocol/third-eye/models/aggregated_block_feed/query_price_feed" "github.com/ethereum/go-ethereum/common" ) @@ -203,8 +204,7 @@ func getInvertDependencyGraph(depGraph map[string][]string) map[string][]string func (q *QueryPFDependencies) checkInDepGraph(token, oracle string, blockNum int64) { depQueryPFSym := q.getTokenSym(token) if q.depGraph[depQueryPFSym] == nil { - // log.Info(depQueryPFSym, blockNum) - log.Fatalf("Dep for query based price feed(%s) not found for token(%s) at %d", oracle, depQueryPFSym, blockNum) + log.Infof("Warn: Dep for query based price feed(%s) not found for token(%s) at %d", oracle, depQueryPFSym, blockNum) } } @@ -266,11 +266,11 @@ func (q *QueryPFDependencies) fetchRoundData(blockNum int64, tokens map[string]b } else { // if failed check and pfType of the queryPrice is YearnPF adapterI := q.repo.GetAdapter(details.Feed) - if adapter, ok := adapterI.(*QueryPriceFeed); !ok { + if adapter, ok := adapterI.(*query_price_feed.QueryPriceFeed); !ok { log.Fatal("Conversion of adapter to queryPriceFeed failed ", details.Feed) } else if adapter.GetDetailsByKey("pfType") == ds.YearnPF { // if underlying price feed address is null, then don't set price - if _newPrice, err := adapter.calculateYearnPFInternally(blockNum); err == nil { + if _newPrice, err := adapter.CalculateYearnPFInternally(blockNum); err == nil { newPrice = _newPrice } } @@ -280,6 +280,7 @@ func (q *QueryPFDependencies) fetchRoundData(blockNum int64, tokens map[string]b newPrice.Token = details.Token newPrice.Feed = details.Feed newPrice.BlockNumber = blockNum + newPrice.MergedPFVersion = details.MergedPFVersion newPrices = append(newPrices, newPrice) } } diff --git a/models/aggregated_block_feed/model.go b/models/aggregated_block_feed/model.go index 91f5bfea..335846ec 100644 --- a/models/aggregated_block_feed/model.go +++ b/models/aggregated_block_feed/model.go @@ -11,6 +11,7 @@ import ( "github.com/Gearbox-protocol/sdk-go/log" "github.com/Gearbox-protocol/sdk-go/utils" "github.com/Gearbox-protocol/third-eye/ds" + "github.com/Gearbox-protocol/third-eye/models/aggregated_block_feed/query_price_feed" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -20,7 +21,7 @@ type AQFWrapper struct { *ds.SyncAdapter mu *sync.Mutex // yearn feed - QueryFeeds map[string]*QueryPriceFeed + QueryFeeds map[string]*query_price_feed.QueryPriceFeed // for dependency based fetching price queryPFdeps *QueryPFDependencies @@ -52,7 +53,7 @@ func NewAQFWrapper(client core.ClientI, repo ds.RepositoryI, interval int64) *AQ SyncAdapter: syncAdapter, Interval: interval, mu: &sync.Mutex{}, - QueryFeeds: map[string]*QueryPriceFeed{}, + QueryFeeds: map[string]*query_price_feed.QueryPriceFeed{}, queryPFdeps: NewQueryPFDepenencies(repo, client), } wrapper.queryPFdeps.aqf = wrapper @@ -61,7 +62,7 @@ func NewAQFWrapper(client core.ClientI, repo ds.RepositoryI, interval int64) *AQ // only called by priceoracle func (mdl *AQFWrapper) AddYearnFeed(adapter ds.SyncAdapterI) { - yearnFeed, ok := adapter.(*QueryPriceFeed) + yearnFeed, ok := adapter.(*query_price_feed.QueryPriceFeed) if !ok { log.Fatal("Failed in parsing yearn feed for aggregated yearn feed") } @@ -69,27 +70,27 @@ func (mdl *AQFWrapper) AddYearnFeed(adapter ds.SyncAdapterI) { mdl.QueryFeeds[adapter.GetAddress()] = yearnFeed } -func (mdl *AQFWrapper) GetQueryFeeds() []*QueryPriceFeed { - feeds := make([]*QueryPriceFeed, 0, len(mdl.QueryFeeds)) +func (mdl *AQFWrapper) GetQueryFeeds() []*query_price_feed.QueryPriceFeed { + feeds := make([]*query_price_feed.QueryPriceFeed, 0, len(mdl.QueryFeeds)) for _, feed := range mdl.QueryFeeds { feeds = append(feeds, feed) } return feeds } -func (mdl *AQFWrapper) AddFeedOrToken(token, oracle string, pfType string, discoveredAt int64, version core.VersionType) { - log.Infof("Add new %s for token(%s): %s discovered at %d", pfType, token, oracle, discoveredAt) +func (mdl *AQFWrapper) AddFeedOrToken(token, oracle string, pfType string, discoveredAt int64, pfVersion schemas.PFVersion) { + log.Infof("Add new %s:pfversion(%d) for token(%s): %s discovered at %d", pfType, pfVersion, token, oracle, discoveredAt) // MAINNET: yearn yvUSDC has changed over time, previous token was 0x5f18C75AbDAe578b483E5F43f12a39cF75b973a9(only added in gearbox v1 priceOracle) and 0xa354F35829Ae975e850e23e9615b11Da1B3dC4DE, so we can ignore 0xc1 yvUSDC token dependency if token != "0x5f18C75AbDAe578b483E5F43f12a39cF75b973a9" { mdl.queryPFdeps.checkInDepGraph(token, oracle, discoveredAt) } if mdl.QueryFeeds[oracle] != nil { - mdl.QueryFeeds[oracle].AddToken(token, discoveredAt) + mdl.QueryFeeds[oracle].AddToken(token, discoveredAt, pfVersion) } else { - mdl.AddYearnFeed(NewQueryPriceFeed(token, oracle, pfType, discoveredAt, mdl.Client, mdl.Repo, version)) + mdl.AddYearnFeed(query_price_feed.NewQueryPriceFeed(token, oracle, pfType, discoveredAt, mdl.Client, mdl.Repo, pfVersion)) // MAINNET: old yvUSDC added on gearbox v1 if token == "0x5f18C75AbDAe578b483E5F43f12a39cF75b973a9" { - mdl.QueryFeeds[oracle].DisableToken(token, 13856183) // new yvUSDC added on gearbox v1 + mdl.QueryFeeds[oracle].DisableToken(token, 13856183, pfVersion) // new yvUSDC added on gearbox v1 } } // when token is added to the queryPricefeed, add price object at discoveredAt @@ -97,24 +98,36 @@ func (mdl *AQFWrapper) AddFeedOrToken(token, oracle string, pfType string, disco mdl.updateQueryPrices(createPriceFeedOnInit(mdl.QueryFeeds[oracle], token, discoveredAt)) } -func createPriceFeedOnInit(qpf *QueryPriceFeed, token string, discoveredAt int64) []*schemas.PriceFeed { +func mergePFVersionAt(blockNum int64, details map[schemas.PFVersion][]int64) schemas.MergedPFVersion { + var pfVersion schemas.MergedPFVersion = 0 + for version, blockNums := range details { + // log.Info(version, blockNums, blockNum) + if blockNums[0] <= blockNum && (len(blockNums) == 1 || blockNum < blockNums[1]) { // 1 is added as price is already added at blockNum + pfVersion = pfVersion | schemas.MergedPFVersion(version) + } + } + return pfVersion +} +func createPriceFeedOnInit(qpf *query_price_feed.QueryPriceFeed, token string, discoveredAt int64) []*schemas.PriceFeed { mainPFContract, err := priceFeed.NewPriceFeed(common.HexToAddress(qpf.Address), qpf.Client) log.CheckFatal(err) data, err := mainPFContract.LatestRoundData(&bind.CallOpts{BlockNumber: big.NewInt(discoveredAt)}) log.CheckFatal(err) + // + pfVersion := mergePFVersionAt(discoveredAt, qpf.DetailsDS.Tokens[token]) return []*schemas.PriceFeed{{ - BlockNumber: discoveredAt, - Feed: qpf.Address, - Token: token, - RoundId: data.RoundId.Int64(), - IsPriceInUSD: qpf.GetVersion().IsPriceInUSD(), // for version more than 1 - PriceBI: (*core.BigInt)(data.Answer), - Price: utils.GetFloat64Decimal(data.Answer, qpf.GetVersion().Decimals()), + BlockNumber: discoveredAt, + Feed: qpf.Address, + Token: token, + RoundId: data.RoundId.Int64(), + MergedPFVersion: pfVersion, + PriceBI: (*core.BigInt)(data.Answer), + Price: utils.GetFloat64Decimal(data.Answer, pfVersion.Decimals()), }} } -func (mdl *AQFWrapper) DisableYearnFeed(token, oracle string, disabledAt int64) { - mdl.QueryFeeds[oracle].DisableToken(token, disabledAt) +func (mdl *AQFWrapper) DisableYearnFeed(token, oracle string, disabledAt int64, pfVersion schemas.PFVersion) { + mdl.QueryFeeds[oracle].DisableToken(token, disabledAt, pfVersion) } func (mdl AQFWrapper) GetDepFetcher() *QueryPFDependencies { @@ -124,25 +137,17 @@ func (mdl AQFWrapper) GetDepFetcher() *QueryPFDependencies { func (mdl *AQFWrapper) OnLog(txLog types.Log) { } -type local struct { - Feed string - Token string -} - // no need to check version of feed, as while adding from chainlink we make sure that the version is more than 1 // and we can't have version 2 and 3 feed active at the same time. -func (mdl AQFWrapper) getFeeds(blockNum int64, neededTokens map[string]bool) (result []local) { +func (mdl AQFWrapper) getFeeds(blockNum int64, neededTokens map[string]bool) (result []schemas.TokenAndMergedPFVersion) { for _, adapter := range mdl.QueryFeeds { if !adapter.GetVersion().MoreThan(core.NewVersion(1)) { continue } tokensForAdapter := adapter.TokensValidAtBlock(blockNum) - for _, token := range tokensForAdapter { - if neededTokens[token] { - result = append(result, local{ - Feed: adapter.GetAddress(), - Token: token, - }) + for _, entry := range tokensForAdapter { + if neededTokens[entry.Token] { + result = append(result, entry) } } } diff --git a/models/aggregated_block_feed/query.go b/models/aggregated_block_feed/query.go index 04356dbe..9714414d 100644 --- a/models/aggregated_block_feed/query.go +++ b/models/aggregated_block_feed/query.go @@ -13,6 +13,7 @@ import ( "github.com/Gearbox-protocol/sdk-go/log" "github.com/Gearbox-protocol/sdk-go/utils" "github.com/Gearbox-protocol/third-eye/ds" + "github.com/Gearbox-protocol/third-eye/models/aggregated_block_feed/query_price_feed" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" // "fmt" @@ -98,6 +99,11 @@ func (mdl *AQFWrapper) queryAsync(blockNum int64, ch chan int, wg *sync.WaitGrou func (mdl *AQFWrapper) updateQueryPrices(pfs []*schemas.PriceFeed) { mdl.mu.Lock() defer mdl.mu.Unlock() + for _, pf := range pfs { + if pf.MergedPFVersion == 0 { + log.Fatalf("MergedPFVersion is 0 for %s", pf) + } + } mdl.queryFeedPrices = append(mdl.queryFeedPrices, pfs...) } @@ -115,7 +121,7 @@ func (mdl *AQFWrapper) QueryData(blockNum int64) []*schemas.PriceFeed { return queryFeedPrices } -func (mdl *AQFWrapper) getRoundDataCalls(blockNum int64) (calls []multicall.Multicall2Call, queryAbleAdapters []*QueryPriceFeed) { +func (mdl *AQFWrapper) getRoundDataCalls(blockNum int64) (calls []multicall.Multicall2Call, queryAbleAdapters []*query_price_feed.QueryPriceFeed) { priceFeedABI := core.GetAbi("PriceFeed") // for _, adapter := range mdl.QueryFeeds { @@ -136,7 +142,7 @@ func (mdl *AQFWrapper) getRoundDataCalls(blockNum int64) (calls []multicall.Mult var curvePFLatestRoundDataTimer = map[string]log.TimerFn{} -func (mdl *AQFWrapper) processRoundData(blockNum int64, adapter *QueryPriceFeed, entry multicall.Multicall2Result) []*schemas.PriceFeed { +func (mdl *AQFWrapper) processRoundData(blockNum int64, adapter *query_price_feed.QueryPriceFeed, entry multicall.Multicall2Result) []*schemas.PriceFeed { var priceData *schemas.PriceFeed if entry.Success { @@ -153,7 +159,7 @@ func (mdl *AQFWrapper) processRoundData(blockNum int64, adapter *QueryPriceFeed, switch adapter.GetDetailsByKey("pfType") { case ds.YearnPF: // fail on err, since we only sync for block_num which is more than discovered_at, we can assume that underlying price feed will be set for given block_num - _priceData, err := adapter.calculateYearnPFInternally(blockNum) + _priceData, err := adapter.CalculateYearnPFInternally(blockNum) if err != nil { log.Fatal(fmt.Errorf("At %d can't calculate yearnfeed(%s)'s price internally: %s", blockNum, @@ -199,11 +205,12 @@ func (mdl *AQFWrapper) processRoundData(blockNum int64, adapter *QueryPriceFeed, } } priceFeeds := []*schemas.PriceFeed{} - for _, token := range adapter.TokensValidAtBlock(blockNum) { + for _, entry := range adapter.TokensValidAtBlock(blockNum) { priceDataCopy := priceData.Clone() // priceDataCopy.BlockNumber = blockNum - priceDataCopy.Token = token + priceDataCopy.Token = entry.Token + priceDataCopy.MergedPFVersion = entry.MergedPFVersion priceDataCopy.Feed = adapter.GetAddress() // priceFeeds = append(priceFeeds, priceDataCopy) @@ -228,9 +235,8 @@ func parseRoundData(returnData []byte, isPriceInUSD bool, feed string) *schemas. decimals = 8 // for usd } return &schemas.PriceFeed{ - RoundId: roundData.RoundId.Int64(), - PriceBI: (*core.BigInt)(roundData.Answer), - Price: utils.GetFloat64Decimal(roundData.Answer, decimals), - IsPriceInUSD: isPriceInUSD, // for 2 and above the prices are in usd + RoundId: roundData.RoundId.Int64(), + PriceBI: (*core.BigInt)(roundData.Answer), + Price: utils.GetFloat64Decimal(roundData.Answer, decimals), } } diff --git a/models/aggregated_block_feed/query_price_feed/token_ds.go b/models/aggregated_block_feed/query_price_feed/token_ds.go new file mode 100644 index 00000000..f3eb48a3 --- /dev/null +++ b/models/aggregated_block_feed/query_price_feed/token_ds.go @@ -0,0 +1,61 @@ +package query_price_feed + +import ( + "bytes" + "encoding/json" + + "github.com/Gearbox-protocol/sdk-go/core" + "github.com/Gearbox-protocol/sdk-go/core/schemas" + "github.com/Gearbox-protocol/sdk-go/log" + "github.com/Gearbox-protocol/sdk-go/utils" +) + +type DetailsDS struct { + PFType string `json:"pfType"` + Reduntant map[string][]int64 `json:"token,omitempty"` // token enabled and disabled at block numbers + Tokens map[string]map[schemas.PFVersion][]int64 `json:"tokens"` // token enabled and disabled at block numbers + Logs [][]interface{} `json:"logs"` + MergedPFVersion *schemas.MergedPFVersion `json:"mergedPFVersion,omitempty"` +} + +// func NewDetailsDS(pfType string, token string, discoveredAt int64, pfVersion schemas.PFVersion) *DetailsDS { +// return &DetailsDS{ +// PFType: pfType, +// Tokens: map[string][]int64{token: {discoveredAt}}, +// PFVersion: map[string]schemas.PFVersion{token: pfVersion}, +// } +// } +func (obj *DetailsDS) Load(in core.Json, version core.VersionType) { + data, err := json.Marshal(in) + log.CheckFatal(err) + err = utils.ReadJsonReaderAndSetInterface(bytes.NewBuffer(data), obj) + log.CheckFatal(err) + + if obj.Tokens == nil { + obj.Tokens = map[string]map[schemas.PFVersion][]int64{} + } + for token, blockNums := range obj.Reduntant { + if obj.Tokens[token] == nil { + obj.Tokens[token] = map[schemas.PFVersion][]int64{} + } + if version == core.NewVersion(1) { + obj.Tokens[token][schemas.V1PF] = blockNums + } else if version == core.NewVersion(300) { + obj.Tokens[token][schemas.V3PF_MAIN] = blockNums + } else { + for _, pf := range obj.MergedPFVersion.MergedPFVersionToList() { + obj.Tokens[token][pf] = blockNums + } + } + } + obj.Reduntant = nil + obj.MergedPFVersion = nil + // log.Info(utils.ToJson(obj)) +} +func (obj *DetailsDS) Save() core.Json { + data, err := json.Marshal(obj) + log.CheckFatal(err) + out, err := utils.ReadJsonReader(bytes.NewBuffer(data)) + log.CheckFatal(err) + return core.Json(out) +} diff --git a/models/aggregated_block_feed/yearn_pf_internal.go b/models/aggregated_block_feed/query_price_feed/yearn_pf_internal.go similarity index 88% rename from models/aggregated_block_feed/yearn_pf_internal.go rename to models/aggregated_block_feed/query_price_feed/yearn_pf_internal.go index ac3ca05c..79766349 100644 --- a/models/aggregated_block_feed/yearn_pf_internal.go +++ b/models/aggregated_block_feed/query_price_feed/yearn_pf_internal.go @@ -1,4 +1,4 @@ -package aggregated_block_feed +package query_price_feed import ( "fmt" @@ -21,6 +21,7 @@ type yearnPFInternal struct { version core.VersionType } +// only used in v1 and v2 for calculating price if latestRoudnData execution is Reverted func (mdl *yearnPFInternal) calculatePrice(blockNum int64, client core.ClientI, version core.VersionType) (*schemas.PriceFeed, error) { if mdl.underlyingPFContract == nil { if err := mdl.setContracts(blockNum, client); err != nil { @@ -47,11 +48,12 @@ func (mdl *yearnPFInternal) calculatePrice(blockNum int64, client core.ClientI, new(big.Int).Mul(pricePerShare, roundData.Answer), mdl.decimalDivider, ) + pfVersion := schemas.VersionToPFVersion(version, false) return &schemas.PriceFeed{ - RoundId: roundData.RoundId.Int64(), - PriceBI: (*core.BigInt)(newAnswer), - Price: utils.GetFloat64Decimal(newAnswer, version.Decimals()), - IsPriceInUSD: version.IsPriceInUSD(), + RoundId: roundData.RoundId.Int64(), + PriceBI: (*core.BigInt)(newAnswer), + Price: utils.GetFloat64Decimal(newAnswer, pfVersion.Decimals()), + MergedPFVersion: schemas.MergedPFVersion(pfVersion), // only used for v1,v2 so can convert from pfVersion to MergedPFVersion }, nil } diff --git a/models/aggregated_block_feed/yearn_pf_internal_test.go b/models/aggregated_block_feed/query_price_feed/yearn_pf_internal_test.go similarity index 91% rename from models/aggregated_block_feed/yearn_pf_internal_test.go rename to models/aggregated_block_feed/query_price_feed/yearn_pf_internal_test.go index abe15e9a..ce69161c 100644 --- a/models/aggregated_block_feed/yearn_pf_internal_test.go +++ b/models/aggregated_block_feed/query_price_feed/yearn_pf_internal_test.go @@ -1,4 +1,4 @@ -package aggregated_block_feed +package query_price_feed import ( "sync" @@ -33,7 +33,7 @@ func TestInternalyearnPrice(t *testing.T) { version: core.NewVersion(300), }, // main price feed } - pf, err := obj.calculateYearnPFInternally(18631514) + pf, err := obj.CalculateYearnPFInternally(18631514) log.CheckFatal(err) log.Info(utils.ToJson(pf)) } diff --git a/models/aggregated_block_feed/query_price_feed/yearn_price_feed.go b/models/aggregated_block_feed/query_price_feed/yearn_price_feed.go new file mode 100644 index 00000000..337aa5b3 --- /dev/null +++ b/models/aggregated_block_feed/query_price_feed/yearn_price_feed.go @@ -0,0 +1,138 @@ +package query_price_feed + +import ( + "sync" + + "github.com/Gearbox-protocol/sdk-go/core" + "github.com/Gearbox-protocol/sdk-go/core/schemas" + "github.com/Gearbox-protocol/sdk-go/log" + "github.com/Gearbox-protocol/third-eye/ds" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +type QueryPriceFeed struct { + *ds.SyncAdapter + mu *sync.Mutex + yearnPFInternal + DetailsDS DetailsDS +} + +// single querypricefeed can be valid for multiple tokens so we have to maintain tokens within the details +// details->token is token map to start and end block +func NewQueryPriceFeed(token, oracle string, pfType string, discoveredAt int64, client core.ClientI, repo ds.RepositoryI, pfVersion schemas.PFVersion) *QueryPriceFeed { + syncAdapter := &ds.SyncAdapter{ + SyncAdapterSchema: &schemas.SyncAdapterSchema{ + Contract: &schemas.Contract{ + Address: oracle, + DiscoveredAt: discoveredAt, + FirstLogAt: discoveredAt, + ContractName: ds.QueryPriceFeed, + Client: client, + }, + Details: map[string]interface{}{ + "tokens": map[string]map[schemas.PFVersion][]int64{token: {pfVersion: {discoveredAt}}}, + "pfType": pfType, + "pfVersion": map[string]int64{"token": int64(pfVersion)}, + }, + LastSync: discoveredAt, + V: pfVersion.ToVersion(), + }, + Repo: repo, + } + + mdl := NewQueryPriceFeedFromAdapter( + syncAdapter, + ) + return mdl +} + +func NewQueryPriceFeedFromAdapter(adapter *ds.SyncAdapter) *QueryPriceFeed { + obj := &QueryPriceFeed{ + SyncAdapter: adapter, + mu: &sync.Mutex{}, + yearnPFInternal: yearnPFInternal{ + mainPFAddress: common.HexToAddress(adapter.Address), + version: adapter.GetVersion(), + }, // main price feed + } + obj.DataProcessType = ds.ViaQuery + obj.DetailsDS.Load(obj.Details, obj.GetVersion()) + return obj +} + +func (mdl *QueryPriceFeed) OnLog(txLog types.Log) { + +} + +// only used in v1,v2 +func (mdl *QueryPriceFeed) CalculateYearnPFInternally(blockNum int64) (*schemas.PriceFeed, error) { + return mdl.yearnPFInternal.calculatePrice(blockNum, mdl.Client, mdl.GetVersion()) +} + +func (mdl *QueryPriceFeed) GetTokenAddr() string { + mdl.mu.Lock() + defer mdl.mu.Unlock() + tokenAddr, ok := mdl.Details["token"].(string) + if !ok { + log.Fatalf("Failing in asserting to string: %s", mdl.Details["token"]) + } + return tokenAddr +} + +/////////////////////// +// details for token +/////////////////////// + +func (mdl *QueryPriceFeed) AddToken(token string, discoveredAt int64, pfVersion schemas.PFVersion) { + tokenDetails := mdl.DetailsDS.Tokens[token] + if tokenDetails == nil { + mdl.DetailsDS.Tokens[token] = map[schemas.PFVersion][]int64{pfVersion: {discoveredAt}} + tokenDetails = mdl.DetailsDS.Tokens[token] + } + blockNums := tokenDetails[pfVersion] + if len(blockNums) == 1 { + log.Debugf("Token/Feed(%s/%s) previously added at %d, again added at %d", token, mdl.Address, blockNums[0], discoveredAt) + return + } else if len(blockNums) == 2 { + mdl.DetailsDS.Logs = append(mdl.DetailsDS.Logs, []interface{}{token, blockNums, pfVersion}) // token blockNums pfVersion + } + tokenDetails[pfVersion] = []int64{discoveredAt} +} + +// sync till < endBlock number +func (mdl *QueryPriceFeed) DisableToken(token string, disabledAt int64, pfVersion schemas.PFVersion) { + tokenDetails := mdl.DetailsDS.Tokens[token] + if tokenDetails == nil || len(tokenDetails[pfVersion]) != 1 { + log.Fatalf("%s's enable block number for pricefeed is malformed: %v", token, tokenDetails) + } + tokenDetails[pfVersion] = append(tokenDetails[pfVersion], disabledAt) +} + +// read method +func (mdl *QueryPriceFeed) TokensValidAtBlock(blockNum int64) (ans []schemas.TokenAndMergedPFVersion) { + for token, details := range mdl.DetailsDS.Tokens { + mpfVersion := mergePFVersionAt(blockNum, details) + if mpfVersion != 0 { + ans = append(ans, schemas.TokenAndMergedPFVersion{Token: token, MergedPFVersion: mpfVersion, Feed: mdl.GetAddress()}) + } + } + return ans + // +} + +func mergePFVersionAt(blockNum int64, details map[schemas.PFVersion][]int64) schemas.MergedPFVersion { + var pfVersion schemas.MergedPFVersion = 0 + for version, blockNums := range details { + // log.Info(version, blockNums, blockNum) + if blockNums[0]+1 <= blockNum && (len(blockNums) == 1 || blockNum < blockNums[1]) { // 1 is added as price is already added at blockNum + pfVersion = pfVersion | schemas.MergedPFVersion(version) + } + } + return pfVersion +} + +func (mdl *QueryPriceFeed) AfterSyncHook(b int64) { + mdl.DetailsDS.Save() + mdl.SyncAdapter.AfterSyncHook(b) +} diff --git a/models/aggregated_block_feed/yearn_price_feed.go b/models/aggregated_block_feed/yearn_price_feed.go deleted file mode 100644 index d6244640..00000000 --- a/models/aggregated_block_feed/yearn_price_feed.go +++ /dev/null @@ -1,156 +0,0 @@ -package aggregated_block_feed - -import ( - "sync" - - "github.com/Gearbox-protocol/sdk-go/core" - "github.com/Gearbox-protocol/sdk-go/core/schemas" - "github.com/Gearbox-protocol/sdk-go/log" - "github.com/Gearbox-protocol/sdk-go/utils" - "github.com/Gearbox-protocol/third-eye/ds" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" -) - -type QueryPriceFeed struct { - *ds.SyncAdapter - mu *sync.Mutex - yearnPFInternal -} - -// single querypricefeed can be valid for multiple tokens so we have to maintain tokens within the details -// details->token is token map to start and end block -func NewQueryPriceFeed(token, oracle string, pfType string, discoveredAt int64, client core.ClientI, repo ds.RepositoryI, version core.VersionType) *QueryPriceFeed { - syncAdapter := &ds.SyncAdapter{ - SyncAdapterSchema: &schemas.SyncAdapterSchema{ - Contract: &schemas.Contract{ - Address: oracle, - DiscoveredAt: discoveredAt, - FirstLogAt: discoveredAt, - ContractName: ds.QueryPriceFeed, - Client: client, - }, - Details: map[string]interface{}{"token": map[string]interface{}{token: []int64{discoveredAt}}, "pfType": pfType}, - LastSync: discoveredAt, - V: version, - }, - Repo: repo, - } - mdl := NewQueryPriceFeedFromAdapter( - syncAdapter, - ) - return mdl -} - -func NewQueryPriceFeedFromAdapter(adapter *ds.SyncAdapter) *QueryPriceFeed { - obj := &QueryPriceFeed{ - SyncAdapter: adapter, - mu: &sync.Mutex{}, - yearnPFInternal: yearnPFInternal{ - mainPFAddress: common.HexToAddress(adapter.Address), - version: adapter.GetVersion(), - }, // main price feed - } - obj.DataProcessType = ds.ViaQuery - return obj -} - -func (mdl *QueryPriceFeed) OnLog(txLog types.Log) { - -} -func (mdl *QueryPriceFeed) calculateYearnPFInternally(blockNum int64) (*schemas.PriceFeed, error) { - return mdl.yearnPFInternal.calculatePrice(blockNum, mdl.Client, mdl.GetVersion()) -} - -func (mdl *QueryPriceFeed) GetTokenAddr() string { - mdl.mu.Lock() - defer mdl.mu.Unlock() - tokenAddr, ok := mdl.Details["token"].(string) - if !ok { - log.Fatalf("Failing in asserting to string: %s", mdl.Details["token"]) - } - return tokenAddr -} - -/////////////////////// -// details for token -/////////////////////// - -func (mdl *QueryPriceFeed) AddToken(token string, discoveredAt int64) { - if mdl.Details == nil { - mdl.Details = core.Json{} - } - if mdl.Details["token"] != nil { - obj := map[string]interface{}{} - switch mdl.Details["token"].(type) { - case map[string]interface{}: - obj, _ = mdl.Details["token"].(map[string]interface{}) - ints := utils.ConvertToListOfInt64(obj[token]) - // token is already in enabled state, we are trying to add again - if obj[token] != nil && len(ints) == 1 { - log.Debugf("Token/Feed(%s/%s) previously added at %d, again added at %d", token, mdl.Address, ints[0], discoveredAt) - return - // token is disabled so reenable and add to logs - } else if len(ints) == 2 { - mdl.Details["logs"] = append(parseLogArray(mdl.Details["logs"]), []interface{}{token, ints}) - } - } - obj[token] = []int64{discoveredAt} - mdl.Details["token"] = obj - } else { - log.Fatal("Can't reach this part in the yearn price feed") - } -} - -func parseLogArray(logs interface{}) (parsedLogs [][]interface{}) { - if logs != nil { - l, ok := logs.([]interface{}) - if !ok { - log.Fatal("failed in converting to log array", logs) - } - for _, ele := range l { - obj, ok := ele.([]interface{}) - if !ok { - log.Fatal("failed in converting to log array element", ele) - } - parsedEle := []interface{}{obj[0].(string), utils.ConvertToListOfInt64(obj[1])} - parsedLogs = append(parsedLogs, parsedEle) - } - } - return -} - -// sync till < endBlock number -func (mdl *QueryPriceFeed) DisableToken(token string, disabledAt int64) { - obj := map[string]interface{}{} - switch mdl.Details["token"].(type) { - case map[string]interface{}: - obj = mdl.Details["token"].(map[string]interface{}) - ints := utils.ConvertToListOfInt64(obj[token]) - if len(ints) != 1 { - log.Fatalf("%s's enable block number for pricefeed is malformed: %v", token, ints) - } - ints = append(ints, disabledAt) - obj[token] = ints - } - mdl.Details["token"] = obj -} - -// read method -func (mdl *QueryPriceFeed) TokensValidAtBlock(blockNum int64) []string { - switch mdl.Details["token"].(type) { - case map[string]interface{}: - tokens := []string{} - obj := mdl.Details["token"].(map[string]interface{}) - for token, info := range obj { - ints := utils.ConvertToListOfInt64(info) - // when token is added to the feed, price at discoveredAt is added for that token - // so we can ignore that token is valid at discoveredAt, hence added one to discoveredAt - if ints[0]+1 <= blockNum && (len(ints) == 1 || blockNum < ints[1]) { - tokens = append(tokens, token) - } - } - return tokens - } - return nil -} diff --git a/models/chainlink_price_feed/model.go b/models/chainlink_price_feed/model.go index 7195a475..25d62c04 100644 --- a/models/chainlink_price_feed/model.go +++ b/models/chainlink_price_feed/model.go @@ -20,7 +20,7 @@ type ChainlinkPriceFeed struct { // if oracle and address are same then the normal chainlink interface is not working for this price feed // it maybe custom price feed of gearbox . so we will disable on 'vm execution error' or 'execution reverted'. // if oracle and adress are same we try to get the pricefeed. -func NewChainlinkPriceFeed(token, oracle string, discoveredAt int64, client core.ClientI, repo ds.RepositoryI, version core.VersionType, bounded bool) *ChainlinkPriceFeed { +func NewChainlinkPriceFeed(client core.ClientI, repo ds.RepositoryI, token, oracle string, discoveredAt int64, mergedPFVersion schemas.MergedPFVersion, bounded bool) *ChainlinkPriceFeed { var upperLimit string if bounded { returnData, err := core.CallFuncWithExtraBytes(client, "b09ad8a0", common.HexToAddress(oracle), discoveredAt, nil) // upperBound @@ -36,9 +36,9 @@ func NewChainlinkPriceFeed(token, oracle string, discoveredAt int64, client core ContractName: ds.ChainlinkPriceFeed, Client: client, }, - Details: map[string]interface{}{"oracle": oracle, "token": token}, + Details: map[string]interface{}{"oracle": oracle, "token": token, "mergedPFVersion": mergedPFVersion}, LastSync: discoveredAt - 1, - V: version, + V: mergedPFVersion.MergedPFVersionToList()[0].ToVersion(), }, Repo: repo, } @@ -111,6 +111,7 @@ func (mdl *ChainlinkPriceFeed) AfterSyncHook(syncedTill int64) { Feed: mdl.MainAgg.Addr.Hex(), // feed is same as oracle BlockNumber: discoveredAt, Version: mdl.GetVersion(), + Reserve: schemas.GetReservefromDetails(mdl.Details), FeedType: ds.ChainlinkPriceFeed, }, mdl.upperLimit().Cmp(new(big.Int)) != 0) // if upperLImit is not zero, then the price is bounded by upperLimit } diff --git a/models/chainlink_price_feed/on_log.go b/models/chainlink_price_feed/on_log.go index ca7dcc75..c7100bec 100644 --- a/models/chainlink_price_feed/on_log.go +++ b/models/chainlink_price_feed/on_log.go @@ -46,14 +46,15 @@ func (mdl *ChainlinkPriceFeed) OnLogs(txLogs []types.Log) { answerBI = upperLimit } // new(big.Int).SetString(txLog.Data[2:], 16) + pfVersion := schemas.VersionToPFVersion(mdl.GetVersion(), schemas.GetReservefromDetails(mdl.Details)) priceFeed = &schemas.PriceFeed{ - BlockNumber: blockNum, - Token: mdl.Token, - Feed: mdl.Address, - RoundId: roundId, - PriceBI: (*core.BigInt)(answerBI), - Price: utils.GetFloat64Decimal(answerBI, mdl.GetVersion().Decimals()), - IsPriceInUSD: mdl.GetVersion().IsPriceInUSD(), + BlockNumber: blockNum, + Token: mdl.Token, + Feed: mdl.Address, + RoundId: roundId, + PriceBI: (*core.BigInt)(answerBI), + Price: utils.GetFloat64Decimal(answerBI, pfVersion.Decimals()), + MergedPFVersion: mdl.GetMergedPFVersion(), } mdl.Repo.AddPriceFeed(priceFeed) blockNums = append(blockNums, blockNum) @@ -65,3 +66,38 @@ func (mdl *ChainlinkPriceFeed) OnLogs(txLogs []types.Log) { } } + +func (mdl ChainlinkPriceFeed) GetMergedPFVersion() schemas.MergedPFVersion { + if mdl.Details["mergedPFVersion"] != nil { + if v, ok := mdl.Details["mergedPFVersion"].(int8); ok { + return schemas.MergedPFVersion(v) + } + if v, ok := mdl.Details["mergedPFVersion"].(float64); ok { + return schemas.MergedPFVersion(v) + } + return schemas.MergedPFVersion(mdl.Details["mergedPFVersion"].(schemas.MergedPFVersion)) + } + log.Fatal("Can't get mergedPFVersion", utils.ToJson(mdl.Details)) + return schemas.MergedPFVersion(0) +} +func (mdl ChainlinkPriceFeed) AddToken(token string, pfVersion schemas.PFVersion) { + if mdl.Details["token"] != nil { + if mdl.Details["token"].(string) != token { + log.Fatal("stored token for chainlink is different from new added token", mdl.Details["token"].(string), token) + } + } + mdl.Details["mergedPFVersion"] = mdl.GetMergedPFVersion() | schemas.MergedPFVersion(pfVersion) +} + +func (mdl ChainlinkPriceFeed) DisableToken(token string, blockNum int64, pfVersion schemas.PFVersion) { + if mdl.Details["token"] != nil { + if mdl.Details["token"].(string) != token { + log.Fatal("stored token for chainlink is different from new added token", mdl.Details["token"].(string), token) + } + } + final := mdl.GetMergedPFVersion() ^ schemas.MergedPFVersion(pfVersion) + mdl.Details["mergedPFVersion"] = final + if final == 0 { + mdl.SetBlockToDisableOn(blockNum) + } +} diff --git a/models/composite_chainlink/model.go b/models/composite_chainlink/model.go index 402ecd91..70b645b9 100644 --- a/models/composite_chainlink/model.go +++ b/models/composite_chainlink/model.go @@ -26,7 +26,7 @@ type CompositeChainlinkPF struct { // compositeChainlink price feed has token base oracle and base usd oracle for calculating the price of token in usd. // address is set as identifier(random), as same oracle can be added for different tokens. -func NewCompositeChainlinkPF(token, oracle string, discoveredAt int64, client core.ClientI, repo ds.RepositoryI, version core.VersionType) *CompositeChainlinkPF { +func NewCompositeChainlinkPF(token, oracle string, discoveredAt int64, client core.ClientI, repo ds.RepositoryI, version core.VersionType, reserve bool) *CompositeChainlinkPF { oracleAddr := common.HexToAddress(oracle) tokenETHPF := getAddrFromRPC(client, "targetETH", oracleAddr, discoveredAt) // get decimals @@ -52,15 +52,17 @@ func NewCompositeChainlinkPF(token, oracle string, discoveredAt int64, client co Client: client, }, Details: map[string]interface{}{ - "oracle": oracle, - "token": token, - "decimals": decimalsToBasePF, + "oracle": oracle, + "token": token, + "decimals": decimalsToBasePF, + "mergedPFVersion": schemas.MergedPFVersion(schemas.VersionToPFVersion(version, reserve)), "secAddrs": map[string]interface{}{ "target": tokenETHPF.Hex(), "base": ethUSDPF.Hex(), "targetPhase": mainPhaseAgg.Hex(), "basePhase": basePhaseAgg.Hex(), }}, + // since last_sync is set to discoveredAt not discoveredAt-1, setPrice will get tokenBase and baseUSD price at discoveredAt // so the db entry that is added at addPriceToDB will have the correct price while creating new compositeChainlinkPF LastSync: discoveredAt, diff --git a/models/composite_chainlink/on_log.go b/models/composite_chainlink/on_log.go index 1d8a9795..de58a2e9 100644 --- a/models/composite_chainlink/on_log.go +++ b/models/composite_chainlink/on_log.go @@ -107,13 +107,13 @@ func (mdl *CompositeChainlinkPF) addPriceToDB(blockNum int64) { ) // only usd price feed priceFeed := &schemas.PriceFeed{ - BlockNumber: blockNum, - Token: mdl.Token, - Feed: mdl.GetDetailsByKey("oracle"), - RoundId: 0, - PriceBI: (*core.BigInt)(answerBI), - Price: utils.GetFloat64Decimal(answerBI, 8), - IsPriceInUSD: true, + BlockNumber: blockNum, + Token: mdl.Token, + Feed: mdl.GetDetailsByKey("oracle"), + RoundId: 0, + PriceBI: (*core.BigInt)(answerBI), + Price: utils.GetFloat64Decimal(answerBI, 8), + MergedPFVersion: mdl.GetMergedPFVersion(), } mdl.Repo.AddPriceFeed(priceFeed) } @@ -121,3 +121,38 @@ func (mdl *CompositeChainlinkPF) addPriceToDB(blockNum int64) { func (mdl *CompositeChainlinkPF) OnLog(types.Log) { } + +func (mdl CompositeChainlinkPF) GetMergedPFVersion() schemas.MergedPFVersion { + if mdl.Details["mergedPFVersion"] != nil { + if v, ok := mdl.Details["mergedPFVersion"].(int8); ok { + return schemas.MergedPFVersion(v) + } + if v, ok := mdl.Details["mergedPFVersion"].(float64); ok { + return schemas.MergedPFVersion(v) + } + return schemas.MergedPFVersion(mdl.Details["mergedPFVersion"].(schemas.MergedPFVersion)) + } + log.Fatal(utils.ToJson(mdl.Details)) + return schemas.MergedPFVersion(0) +} +func (mdl CompositeChainlinkPF) AddToken(token string, pfVersion schemas.PFVersion) { + if mdl.Details["token"] != nil { + if mdl.Details["token"].(string) != token { + log.Fatal("stored token for chainlink is different from new added token", mdl.Details["token"].(string), token) + } + } + mdl.Details["mergedPFVersion"] = mdl.GetMergedPFVersion() | schemas.MergedPFVersion(pfVersion) +} + +func (mdl CompositeChainlinkPF) DisableToken(token string, blockNum int64, pfVersion schemas.PFVersion) { + if mdl.Details["token"] != nil { + if mdl.Details["token"].(string) != token { + log.Fatal("stored token for chainlink is different from new added token", mdl.Details["token"].(string), token) + } + } + final := mdl.GetMergedPFVersion() ^ schemas.MergedPFVersion(pfVersion) + mdl.Details["mergedPFVersion"] = final + if final == 0 { + mdl.SetBlockToDisableOn(blockNum) + } +} diff --git a/models/contract_register/on_log.go b/models/contract_register/on_log.go index 3a273544..503a564a 100644 --- a/models/contract_register/on_log.go +++ b/models/contract_register/on_log.go @@ -37,11 +37,12 @@ func NewCM(addr string, client core.ClientI, repo ds.RepositoryI, blockNum int64 return cm_v1.NewCMv1(addr, client, repo, blockNum) case core.NewVersion(2): return cm_v2.NewCMv2(addr, client, repo, blockNum) - case core.NewVersion(300): - return cm_v3.NewCMv3(addr, client, repo, blockNum) default: - log.Fatalf("Version(%d) of cm can't be created.", version) + if version.MoreThanEq(core.NewVersion(300)) { + return cm_v3.NewCMv3(addr, client, repo, blockNum) + } } + log.Fatalf("Version(%d) of cm can't be created.", version) return nil } @@ -50,10 +51,11 @@ func NewPool(addr string, client core.ClientI, repo ds.RepositoryI, blockNum int switch version { case core.NewVersion(1), core.NewVersion(2): return pool_v2.NewPool(addr, client, repo, blockNum) - case core.NewVersion(300): - return pool_v3.NewPool(addr, client, repo, blockNum) default: - log.Fatalf("Version(%d) of pool can't be created.", version) + if version.MoreThanEq(core.NewVersion(300)) { + return pool_v3.NewPool(addr, client, repo, blockNum) + } } + log.Fatalf("Version(%d) of pool can't be created.", version) return nil } diff --git a/models/credit_manager/cm_v3/v3operation.go b/models/credit_manager/cm_v3/v3operation.go index e819f5a0..e3314fa2 100644 --- a/models/credit_manager/cm_v3/v3operation.go +++ b/models/credit_manager/cm_v3/v3operation.go @@ -66,7 +66,7 @@ func (mdl *CMv3) onCloseCreditAccountV3(txLog *types.Log, creditAccount, to stri mdl.State.TotalClosedAccounts++ // update totalclosedStats sessionId, owner := mdl.GetSessionIdAndBorrower(creditAccount) cfAddr := txLog.Address.Hex() - blockNum := int64(txLog.BlockNumber) + closedAt := int64(txLog.BlockNumber) ////////// // get token transfer when account was closed @@ -82,7 +82,7 @@ func (mdl *CMv3) onCloseCreditAccountV3(txLog *types.Log, creditAccount, to stri tokens = append(tokens, token) } tokens = append(tokens, mdl.GetUnderlyingToken()) - prices := mdl.Repo.GetPricesInUSD(blockNum, tokens) + prices := mdl.Repo.GetPricesInUSD(closedAt, tokens) remainingFunds := (userTransfers.ValueInUnderlying( mdl.GetUnderlyingToken(), mdl.GetUnderlyingDecimal(), prices)) ////////// @@ -91,7 +91,7 @@ func (mdl *CMv3) onCloseCreditAccountV3(txLog *types.Log, creditAccount, to stri (*args)["remainingFunds"] = (*core.BigInt)(remainingFunds) accountOperation := &schemas.AccountOperation{ // add account operation TxHash: txLog.TxHash.Hex(), - BlockNumber: blockNum, + BlockNumber: closedAt, LogId: txLog.Index, Borrower: owner, SessionId: sessionId, @@ -121,7 +121,7 @@ func (mdl *CMv3) onCloseCreditAccountV3(txLog *types.Log, creditAccount, to stri }) mdl.RemoveCreditAccount(creditAccount) // remove session to manager object - mdl.CloseAccount(sessionId, blockNum, txLog.TxHash.Hex(), txLog.Index) + mdl.CloseAccount(sessionId, closedAt, txLog.TxHash.Hex(), txLog.Index) } func (mdl *CMv3) onLiquidateCreditAccountV3(txLog *types.Log, creditAccount, liquidator string, remainingUnderlyingSentTo string, remainingFunds *big.Int) { diff --git a/models/pool/pool_v3/get_details.go b/models/pool/pool_v3/get_details.go index aac6f4e9..2be6c1b9 100644 --- a/models/pool/pool_v3/get_details.go +++ b/models/pool/pool_v3/get_details.go @@ -75,6 +75,7 @@ func (mdl *Poolv3) setZapper() { // 2. dUSDC-farmedUSDCv3 // 3. ETH-farmedETHv3 syms := core.GetSymToAddrByChainId(core.GetChainId(mdl.Client)) + farmingPools := core.GetFarmingPoolsToSymbolByChainId(core.GetChainId(mdl.Client)) var ETHAddr common.Address if poolToCheck.Underlying == syms.Tokens["WETH"] { ETHAddr = syms.Tokens["ETH"] @@ -82,7 +83,7 @@ func (mdl *Poolv3) setZapper() { // out = farmedUSDCv3, dUSDCv3 for _, zapper := range poolToCheck.Zappers { - if zapper.TokenIn == poolToCheck.Underlying && zapper.TokenOut != poolToCheck.DieselToken { // tokenIn = USDC, tokenOut != dUSDCv3 + if zapper.TokenIn == poolToCheck.Underlying && farmingPools[zapper.TokenOut] != "" { // tokenIn = USDC, tokenOut is farming Pool(!= dUSDCv3) mdl.setDetailsByKey("USDC-farmedUSDCv3", zapper.Zapper.Hex()) mdl.setDetailsByKey("farmedUSDCv3", zapper.TokenOut.Hex()) } diff --git a/models/pool_lmrewards/v3/log.go b/models/pool_lmrewards/v3/log.go index be4a7b48..eae5b138 100644 --- a/models/pool_lmrewards/v3/log.go +++ b/models/pool_lmrewards/v3/log.go @@ -44,12 +44,12 @@ func (mdl *LMRewardsv3) getFarmsv3() { } pools, found := mdl.Repo.GetDCWrapper().GetPoolListv3() if found && len(mdl.farms) == 0 { - addrToSym := core.GetAddrToSymbolByChainId(core.GetChainId(mdl.Client)) + farmingPools := core.GetFarmingPoolsToSymbolByChainId(core.GetChainId(mdl.Client)) poolAndFarms := []*Farmv3{} for _, pool := range pools { for _, zapper := range pool.Zappers { // can be diselToken zapperOut -- https://etherscan.io/address/0xcaa199f91294e6ee95f9ea90fe716cbd2f9f2900#code - if _, ok := addrToSym[zapper.TokenOut]; !ok && zapper.TokenIn == pool.Underlying && zapper.TokenOut != pool.DieselToken { + if _, ok := farmingPools[zapper.TokenOut]; ok && zapper.TokenIn == pool.Underlying && zapper.TokenOut != pool.DieselToken { poolAndFarms = append(poolAndFarms, &Farmv3{ Farm: zapper.TokenOut.Hex(), Pool: pool.Addr.Hex(), diff --git a/models/price_oracle/on_log.go b/models/price_oracle/on_log.go index 2d123c44..f2bbed28 100644 --- a/models/price_oracle/on_log.go +++ b/models/price_oracle/on_log.go @@ -17,13 +17,22 @@ import ( "gorm.io/gorm/utils" ) +// QueryPriceFeed stores in details reserve status via PFVersion in details.Tokens.pfversion +// chainlinkPriceFeed and compositeChainlinkPriceFeed stores pfversion as reserve status in details + func (mdl *PriceOracle) OnLog(txLog types.Log) { blockNum := int64(txLog.BlockNumber) switch txLog.Topics[0] { - case core.Topic("NewPriceFeed(address,address)"), core.Topic("SetPriceFeed(address,address,uint32,bool,bool)"): + case core.Topic("NewPriceFeed(address,address)"), + core.Topic("SetPriceFeed(address,address,uint32,bool,bool)"), + core.Topic("SetReservePriceFeed(address,address,uint32,bool)"): + // token := common.BytesToAddress(txLog.Topics[1].Bytes()).Hex() // token oracle := common.BytesToAddress(txLog.Topics[2].Bytes()).Hex() // priceFeed - + isReverse := core.Topic("SetReservePriceFeed(address,address,uint32,bool)") == txLog.Topics[0] + // if isReverse { + // log.Fatal("token", token, "oracle", oracle) + // } // mdl.Repo.AddDAOOperation(&schemas.DAOOperation{ BlockNumber: blockNum, @@ -34,17 +43,17 @@ func (mdl *PriceOracle) OnLog(txLog types.Log) { Args: &core.Json{ "priceFeed": oracle, "token": token, + "reverse": isReverse, }, }) - version := mdl.GetVersion() priceFeedType, bounded, err := mdl.checkPriceFeedContract(blockNum, oracle, token) if err != nil { log.Fatalf("Oracle %s, err: %s, blockNum %d", oracle, err, blockNum) } switch priceFeedType { // almost zero price feed is for blocker token on credit account - case ds.YearnPF, ds.SingleAssetPF, ds.CurvePF, ds.ChainlinkPriceFeed, ds.ZeroPF, ds.AlmostZeroPF, ds.CompositeChainlinkPF: + case ds.YearnPF, ds.SingleAssetPF, ds.CurvePF, ds.ChainlinkPriceFeed, ds.ZeroPF, ds.AlmostZeroPF, ds.CompositeChainlinkPF, ds.RedStonePF: // four types of oracles // - Zero or almost zero price feed: constant price value // - Chainlink price feed: market based price value @@ -56,7 +65,8 @@ func (mdl *PriceOracle) OnLog(txLog types.Log) { Oracle: oracle, Feed: oracle, // feed is same as oracle BlockNumber: blockNum, - Version: version, + Version: mdl.GetVersion(), + Reserve: isReverse, FeedType: priceFeedType, }, bounded) default: @@ -124,12 +134,14 @@ func (mdl *PriceOracle) v3PriceFeedType(opts *bind.CallOpts, oracle, token strin core.V3_COMPOUND_V2_ORACLE, // compounder core.V3_ERC4626_VAULT_ORACLE: // erc4626 return ds.SingleAssetPF, false, nil + case core.V3_REDSTONE_ORACLE: + return ds.RedStonePF, false, nil default: yearnContract, err := yearnPriceFeed.NewYearnPriceFeed(common.HexToAddress(oracle), mdl.Client) log.CheckFatal(err) description, err := yearnContract.Description(opts) log.CheckFatal(err) - return ds.UnknownPF, false, fmt.Errorf("Unknown v3 pfType %v, oracle: %s token: %s, description: %s", pfType, oracle, token, description) + return ds.UnknownPF, false, fmt.Errorf("unknown v3 pfType %v, oracle: %s token: %s, description: %s", pfType, oracle, token, description) } } diff --git a/repository/handlers/blocks.go b/repository/handlers/blocks.go index 92972cab..a688be2e 100644 --- a/repository/handlers/blocks.go +++ b/repository/handlers/blocks.go @@ -58,7 +58,9 @@ func (repo *BlocksRepo) LoadBlocks(from, to int64) { func (repo *BlocksRepo) Save(tx *gorm.DB, blockNum int64) { defer utils.Elapsed("blocks sql statements")() blocksToSync := make([]*schemas.Block, 0, len(repo.GetBlocks())) + a := []*schemas.PriceFeed{} for _, block := range repo.GetBlocks() { + a = append(a, block.PriceFeeds...) blocksToSync = append(blocksToSync, block) } // clauses not needed here @@ -71,8 +73,14 @@ func (repo *BlocksRepo) Save(tx *gorm.DB, blockNum int64) { // external funcs func (repo *BlocksRepo) GetPrice(token string) *big.Int { - if details := repo.prevStore.currentPrices[token]; details != nil { - return details.PriceBI.Convert() + store := repo.prevStore.prevPriceFeeds[schemas.V3PF_MAIN] + if store != nil { + return store[token].PriceBI.Convert() + } + + store = repo.prevStore.prevPriceFeeds[schemas.V2PF] + if store != nil { + return store[token].PriceBI.Convert() } return nil } @@ -163,7 +171,10 @@ func (repo *BlocksRepo) Clear() { // setter func (repo *BlocksRepo) AddPriceFeed(pf *schemas.PriceFeed) { - if repo.prevStore.canAddPF(pf) { + if pf.MergedPFVersion == 0 { + log.Fatal(utils.ToJson(pf)) + } + if repo.prevStore.isPFAdded(pf, true) { repo.SetAndGetBlock(pf.BlockNumber).AddPriceFeed(pf) } } diff --git a/repository/handlers/prev_price.go b/repository/handlers/prev_price.go index ae4634e4..c8f4a3c1 100644 --- a/repository/handlers/prev_price.go +++ b/repository/handlers/prev_price.go @@ -17,8 +17,8 @@ import ( type PrevPriceStore struct { // for prevently duplicate query price feed already with same price for a token // token to feed - prevPriceFeeds map[bool]map[string]map[string]*schemas.PriceFeed - currentPrices map[string]*schemas.TokenCurrentPrice + // pfversion -> token -> price feed object + prevPriceFeeds map[schemas.PFVersion]map[string]*schemas.PriceFeed spotOracle *priceFetcher.OneInchOracle mu *sync.Mutex } @@ -28,8 +28,7 @@ func NewPrevPriceStore(client core.ClientI, tokensRepo *TokensRepo) *PrevPriceSt log.CheckFatal(err) store := &PrevPriceStore{ - prevPriceFeeds: map[bool]map[string]map[string]*schemas.PriceFeed{}, - currentPrices: map[string]*schemas.TokenCurrentPrice{}, + prevPriceFeeds: map[schemas.PFVersion]map[string]*schemas.PriceFeed{}, mu: &sync.Mutex{}, } if chainId.Int64() == 1 || chainId.Int64() == 7878 { @@ -41,73 +40,59 @@ func NewPrevPriceStore(client core.ClientI, tokensRepo *TokensRepo) *PrevPriceSt func (repo *PrevPriceStore) loadPrevPriceFeed(db *gorm.DB) { defer utils.Elapsed("loadPrevPriceFeed")() data := []*schemas.PriceFeed{} - err := db.Raw("SELECT distinct on(token, price_in_usd)* FROM price_feeds ORDER BY token, price_in_usd, block_num DESC").Find(&data).Error + err := db.Raw("SELECT distinct on(token, merged_pf_version) * FROM price_feeds ORDER BY token, merged_pf_version, block_num DESC").Find(&data).Error log.CheckFatal(err) for _, pf := range data { - repo.addPrevPriceFeed(pf) - repo.addCurrentPrice(pf, false) + repo.isPFAdded(pf, false) } } // isUSD -> token -> feed -> price feed object -func (repo *PrevPriceStore) addPrevPriceFeed(pf *schemas.PriceFeed) { - if repo.prevPriceFeeds[pf.IsPriceInUSD] == nil { - repo.prevPriceFeeds[pf.IsPriceInUSD] = map[string]map[string]*schemas.PriceFeed{} - } - if repo.prevPriceFeeds[pf.IsPriceInUSD][pf.Token] == nil { - repo.prevPriceFeeds[pf.IsPriceInUSD][pf.Token] = map[string]*schemas.PriceFeed{} - } - oldPF := repo.prevPriceFeeds[pf.IsPriceInUSD][pf.Token][pf.Feed] - price := pf.PriceBI.Convert().Int64() - if oldPF != nil && oldPF.BlockNumber >= pf.BlockNumber && !(price == 0 || price == 100) { - log.Fatalf("oldPF %s.\n NewPF %s.", oldPF, pf) - } - // - repo.prevPriceFeeds[pf.IsPriceInUSD][pf.Token][pf.Feed] = pf -} - -func (repo *PrevPriceStore) canAddPF(pf *schemas.PriceFeed) bool { - repo.mu.Lock() - defer repo.mu.Unlock() - if repo.prevPriceFeeds[pf.IsPriceInUSD] != nil && - repo.prevPriceFeeds[pf.IsPriceInUSD][pf.Token] != nil && - repo.prevPriceFeeds[pf.IsPriceInUSD][pf.Token][pf.Feed] != nil { - prevPF := repo.prevPriceFeeds[pf.IsPriceInUSD][pf.Token][pf.Feed] - if prevPF.BlockNumber >= pf.BlockNumber { - log.Fatalf("oldPF %s.\n NewPF %s.", prevPF, pf) +func (repo *PrevPriceStore) isPFAdded(pf *schemas.PriceFeed, save bool) bool { + for _, pfVersion := range pf.MergedPFVersion.MergedPFVersionToList() { + if repo.prevPriceFeeds[pfVersion] == nil { + repo.prevPriceFeeds[pfVersion] = map[string]*schemas.PriceFeed{} } - if prevPF.PriceBI.Cmp(pf.PriceBI) == 0 { - repo.addPrevPriceFeed(pf) - return false + oldPF := repo.prevPriceFeeds[pfVersion][pf.Token] + // + price := pf.PriceBI.Convert().Int64() + if oldPF != nil { + // if the blocknum of new price is less than previous seenly price , ignore + if oldPF.BlockNumber >= pf.BlockNumber && !(price == 0 || price == 100) { + log.Fatalf("oldPF %s.\n NewPF %s.", oldPF, pf) + } + // same price then don't add + if oldPF.PriceBI.Cmp(pf.PriceBI) == 0 { + return false + } } + repo.prevPriceFeeds[pfVersion][pf.Token] = pf } - repo.addPrevPriceFeed(pf) - repo.addCurrentPrice(pf, true) return true } -func (repo PrevPriceStore) addCurrentPrice(pf *schemas.PriceFeed, save bool) { - if !pf.IsPriceInUSD { - return +func getPrice(pfs map[string]*schemas.PriceFeed) (ans []*schemas.TokenCurrentPrice) { + for _, pf := range pfs { + ans = append(ans, &schemas.TokenCurrentPrice{ + PriceBI: pf.PriceBI, + Price: pf.Price, + BlockNum: pf.BlockNumber, + Token: pf.Token, + PriceSrc: string(core.SOURCE_CHAINLINK), + }) } - repo.currentPrices[pf.Token] = &schemas.TokenCurrentPrice{ - Save: save, - PriceBI: pf.PriceBI, - Price: pf.Price, - BlockNum: pf.BlockNumber, - Token: pf.Token, - PriceSrc: string(core.SOURCE_CHAINLINK), + return ans +} + +func (repo *PrevPriceStore) getCurrentPrice() (ans []*schemas.TokenCurrentPrice) { + if repo.prevPriceFeeds[schemas.V3PF_MAIN] != nil { + return getPrice(repo.prevPriceFeeds[schemas.V3PF_MAIN]) } + return getPrice(repo.prevPriceFeeds[schemas.V2PF]) } func (repo *PrevPriceStore) saveCurrentPrices(client core.ClientI, tx *gorm.DB, blockNum int64) { // chainlink current prices to updated - var currentPricesToSync []*schemas.TokenCurrentPrice - for _, currentPrice := range repo.currentPrices { - if currentPrice.Save { - currentPrice.Save = false - currentPricesToSync = append(currentPricesToSync, currentPrice) - } - } + currentPricesToSync := repo.getCurrentPrice() if len(currentPricesToSync) == 0 { // usd prices are set? only valid from v2 // so if it's empty, we don't need to store currentPrice and nor fetch 1inch prices in usdc return diff --git a/repository/handlers/sync_adapters.go b/repository/handlers/sync_adapters.go index 661e076f..c2a7cebf 100644 --- a/repository/handlers/sync_adapters.go +++ b/repository/handlers/sync_adapters.go @@ -13,7 +13,7 @@ import ( "github.com/Gearbox-protocol/third-eye/models/account_manager" "github.com/Gearbox-protocol/third-eye/models/acl" "github.com/Gearbox-protocol/third-eye/models/address_provider" - "github.com/Gearbox-protocol/third-eye/models/aggregated_block_feed" + "github.com/Gearbox-protocol/third-eye/models/aggregated_block_feed/query_price_feed" "github.com/Gearbox-protocol/third-eye/models/chainlink_price_feed" "github.com/Gearbox-protocol/third-eye/models/composite_chainlink" "github.com/Gearbox-protocol/third-eye/models/configurators" @@ -141,7 +141,7 @@ func (repo *SyncAdaptersRepo) PrepareSyncAdapter(adapter *ds.SyncAdapter) ds.Syn case ds.CompositeChainlinkPF: return composite_chainlink.NewCompositeChainlinkPFFromAdapter(adapter) case ds.QueryPriceFeed: - return aggregated_block_feed.NewQueryPriceFeedFromAdapter(adapter) + return query_price_feed.NewQueryPriceFeedFromAdapter(adapter) case ds.ContractRegister: return contract_register.NewContractRegisterFromAdapter(adapter) case ds.GearToken: diff --git a/repository/handlers/token_oracle.go b/repository/handlers/token_oracle.go index 0d0a0351..a8baed1e 100644 --- a/repository/handlers/token_oracle.go +++ b/repository/handlers/token_oracle.go @@ -16,7 +16,7 @@ import ( type TokenOracleRepo struct { // version to token to oracle - tokensCurrentOracle map[ds.PriceInUSDType]map[string]*schemas.TokenOracle // done + tokensCurrentOracle map[schemas.PFVersion]map[string]*schemas.TokenOracle // done mu *sync.Mutex adapters *SyncAdaptersRepo blocks *BlocksRepo @@ -27,7 +27,7 @@ type TokenOracleRepo struct { func NewTokenOracleRepo(adapters *SyncAdaptersRepo, blocks *BlocksRepo, repo ds.RepositoryI, client core.ClientI) *TokenOracleRepo { return &TokenOracleRepo{ - tokensCurrentOracle: make(map[ds.PriceInUSDType]map[string]*schemas.TokenOracle), + tokensCurrentOracle: make(map[schemas.PFVersion]map[string]*schemas.TokenOracle), mu: &sync.Mutex{}, adapters: adapters, blocks: blocks, @@ -41,9 +41,7 @@ func NewTokenOracleRepo(adapters *SyncAdaptersRepo, blocks *BlocksRepo, repo ds. func (repo *TokenOracleRepo) LoadCurrentTokenOracle(db *gorm.DB) { defer utils.Elapsed("loadCurrentTokenOracle")() data := []*schemas.TokenOracle{} - query := `SELECT token_oracle.* FROM token_oracle - JOIN (SELECT max(block_num) AS bn, token FROM token_oracle GROUP BY token, version) AS max_to - ON max_to.bn = token_oracle.block_num AND max_to.token = token_oracle.token order by block_num` + query := `SELECT distinct on (token, version, reserve) * FROM token_oracle order by token, version, reserve, block_num desc;` err := db.Raw(query).Find(&data).Error if err != nil { log.Fatal(err) @@ -66,20 +64,21 @@ func (repo *TokenOracleRepo) loadZeroPFs(db *gorm.DB) { } func (repo *TokenOracleRepo) addTokenCurrentOracle(oracle *schemas.TokenOracle) { - priceInUSD := ds.PriceInUSDType(oracle.Version.IsPriceInUSD()) - if repo.tokensCurrentOracle[priceInUSD] == nil { - repo.tokensCurrentOracle[priceInUSD] = map[string]*schemas.TokenOracle{} + pfVersion := schemas.VersionToPFVersion(oracle.Version, oracle.Reserve) + if repo.tokensCurrentOracle[pfVersion] == nil { + repo.tokensCurrentOracle[pfVersion] = map[string]*schemas.TokenOracle{} } - repo.tokensCurrentOracle[priceInUSD][oracle.Token] = oracle + repo.tokensCurrentOracle[pfVersion][oracle.Token] = oracle } // if same feed is active for current token and version func (repo *TokenOracleRepo) alreadyActiveFeedForToken(newTokenOracle *schemas.TokenOracle) bool { feedType := newTokenOracle.FeedType - newPriceInUSD := ds.PriceInUSDType(newTokenOracle.Version.IsPriceInUSD()) - if repo.tokensCurrentOracle[newPriceInUSD] != nil && - repo.tokensCurrentOracle[newPriceInUSD][newTokenOracle.Token] != nil { - oldTokenOracle := repo.tokensCurrentOracle[newPriceInUSD][newTokenOracle.Token] + pfVersion := schemas.VersionToPFVersion(newTokenOracle.Version, newTokenOracle.Reserve) + // + if repo.tokensCurrentOracle[pfVersion] != nil && + repo.tokensCurrentOracle[pfVersion][newTokenOracle.Token] != nil { + oldTokenOracle := repo.tokensCurrentOracle[pfVersion][newTokenOracle.Token] if oldTokenOracle.Feed == newTokenOracle.Feed { log.Debugf("Same %s(%s) added for token(%s)", feedType, newTokenOracle.Feed, newTokenOracle.Token) @@ -90,10 +89,11 @@ func (repo *TokenOracleRepo) alreadyActiveFeedForToken(newTokenOracle *schemas.T } func (repo *TokenOracleRepo) disablePrevAdapterAndAddNewTokenOracle(newTokenOracle *schemas.TokenOracle) { - newPriceInUSD := ds.PriceInUSDType(newTokenOracle.Version.IsPriceInUSD()) - if repo.tokensCurrentOracle[newPriceInUSD] != nil && - repo.tokensCurrentOracle[newPriceInUSD][newTokenOracle.Token] != nil { - oldTokenOracle := repo.tokensCurrentOracle[newPriceInUSD][newTokenOracle.Token] + pfVersion := schemas.VersionToPFVersion(newTokenOracle.Version, newTokenOracle.Reserve) + // + if repo.tokensCurrentOracle[pfVersion] != nil && + repo.tokensCurrentOracle[pfVersion][newTokenOracle.Token] != nil { + oldTokenOracle := repo.tokensCurrentOracle[pfVersion][newTokenOracle.Token] oldFeed := oldTokenOracle.Feed adapter := repo.adapters.GetAdapter(oldFeed) @@ -104,9 +104,15 @@ func (repo *TokenOracleRepo) disablePrevAdapterAndAddNewTokenOracle(newTokenOrac } else if adapter == nil { log.Error("Adapter not found for", oldFeed, utils.ToJson(oldTokenOracle)) } else if adapter.GetName() != ds.QueryPriceFeed { - adapter.SetBlockToDisableOn(newTokenOracle.BlockNumber) + if mdl, ok := adapter.(*chainlink_price_feed.ChainlinkPriceFeed); ok { + mdl.DisableToken(oldTokenOracle.Token, oldTokenOracle.BlockNumber, pfVersion) + } + if mdl, ok := adapter.(*composite_chainlink.CompositeChainlinkPF); ok { + mdl.DisableToken(oldTokenOracle.Token, oldTokenOracle.BlockNumber, pfVersion) + } } else { - repo.adapters.GetAggregatedFeed().DisableYearnFeed(newTokenOracle.Token, oldFeed, newTokenOracle.BlockNumber) + repo.adapters.GetAggregatedFeed().DisableYearnFeed( + newTokenOracle.Token, oldFeed, newTokenOracle.BlockNumber, pfVersion) } } // set current state of oracle for token. @@ -135,8 +141,9 @@ func (repo *TokenOracleRepo) AddNewPriceOracleEvent(newTokenOracle *schemas.Toke if newTokenOracle.Feed == "0xBc1c306920309F795fB5A740083eCBf5057349e9" && newTokenOracle.BlockNumber == 15371802 { return } + pfVersion := schemas.VersionToPFVersion(newTokenOracle.Version, newTokenOracle.Reserve) switch newTokenOracle.FeedType { - case ds.CurvePF, ds.SingleAssetPF, ds.YearnPF, ds.ZeroPF, ds.AlmostZeroPF: + case ds.CurvePF, ds.SingleAssetPF, ds.YearnPF, ds.ZeroPF, ds.AlmostZeroPF, ds.RedStonePF: if repo.alreadyActiveFeedForToken(newTokenOracle) { return } @@ -148,13 +155,13 @@ func (repo *TokenOracleRepo) AddNewPriceOracleEvent(newTokenOracle *schemas.Toke priceBI = big.NewInt(100) } repo.blocks.AddPriceFeed(&schemas.PriceFeed{ - BlockNumber: newTokenOracle.BlockNumber, - Token: newTokenOracle.Token, - Feed: newTokenOracle.Oracle, - RoundId: 0, - PriceBI: (*core.BigInt)(priceBI), - Price: utils.GetFloat64Decimal(priceBI, newTokenOracle.Version.Decimals()), - IsPriceInUSD: newTokenOracle.Version.IsPriceInUSD(), + BlockNumber: newTokenOracle.BlockNumber, + Token: newTokenOracle.Token, + Feed: newTokenOracle.Oracle, + RoundId: 0, + PriceBI: (*core.BigInt)(priceBI), + Price: utils.GetFloat64Decimal(priceBI, pfVersion.Decimals()), + MergedPFVersion: schemas.MergedPFVersion(pfVersion), // for 0 and almost zero pf }) repo.zeroPFs[newTokenOracle.Oracle] = true // oracle and feed are same for non-chainlink price feed } else { @@ -163,16 +170,16 @@ func (repo *TokenOracleRepo) AddNewPriceOracleEvent(newTokenOracle *schemas.Toke newTokenOracle.Oracle, newTokenOracle.FeedType, newTokenOracle.BlockNumber, - newTokenOracle.Version, + pfVersion, ) } case ds.ChainlinkPriceFeed: obj := chainlink_price_feed.NewChainlinkPriceFeed( + repo.client, repo.repo, newTokenOracle.Token, newTokenOracle.Oracle, newTokenOracle.BlockNumber, - repo.client, repo.repo, - newTokenOracle.Version, + schemas.MergedPFVersion(pfVersion), bounded, ) newTokenOracle.Feed = obj.Address @@ -181,6 +188,11 @@ func (repo *TokenOracleRepo) AddNewPriceOracleEvent(newTokenOracle *schemas.Toke return } repo.disablePrevAdapterAndAddNewTokenOracle(newTokenOracle) + // + if adapter := repo.adapters.GetAdapter(obj.Address); adapter != nil { + adapter.(*chainlink_price_feed.ChainlinkPriceFeed).AddToken(newTokenOracle.Token, pfVersion) + return + } // SPECIAL CASE // on goerli, there are two v2 priceoracles added // on first priceoracle cvx token is 0x9683a59Ad8D7B5ac3eD01e4cff1D1A2a51A8f1c0 @@ -198,19 +210,25 @@ func (repo *TokenOracleRepo) AddNewPriceOracleEvent(newTokenOracle *schemas.Toke newTokenOracle.BlockNumber, repo.client, repo.repo, newTokenOracle.Version, + newTokenOracle.Reserve, ) // if repo.alreadyActiveFeedForToken(newTokenOracle) { return } repo.disablePrevAdapterAndAddNewTokenOracle(newTokenOracle) + + if adapter := repo.adapters.GetAdapter(obj.Address); adapter != nil { + adapter.(*composite_chainlink.CompositeChainlinkPF).AddToken(newTokenOracle.Token, pfVersion) + return + } repo.adapters.AddSyncAdapter(obj) default: log.Fatal(newTokenOracle.FeedType, "not handled") } } -func (repo *TokenOracleRepo) GetTokenOracles() map[ds.PriceInUSDType]map[string]*schemas.TokenOracle { +func (repo *TokenOracleRepo) GetTokenOracles() map[schemas.PFVersion]map[string]*schemas.TokenOracle { return repo.tokensCurrentOracle } diff --git a/repository/save.go b/repository/save.go index 38805c03..b56c82cb 100644 --- a/repository/save.go +++ b/repository/save.go @@ -6,6 +6,7 @@ import ( ) func (repo *Repository) Flush(syncTill int64) error { + // log.Fatal("") repo.mu.Lock() defer repo.mu.Unlock() // preferred order (adapter | token) => pools => cm => credit session => blocks => allowedTokens diff --git a/tests/aqf_test.go b/tests/aqf_test.go index e51a0be5..201d60e0 100644 --- a/tests/aqf_test.go +++ b/tests/aqf_test.go @@ -4,11 +4,12 @@ import ( "sort" "testing" - "github.com/Gearbox-protocol/sdk-go/core" + "github.com/Gearbox-protocol/sdk-go/core/schemas" "github.com/Gearbox-protocol/sdk-go/log" "github.com/Gearbox-protocol/sdk-go/test" "github.com/Gearbox-protocol/third-eye/ds" "github.com/Gearbox-protocol/third-eye/models/aggregated_block_feed" + "github.com/Gearbox-protocol/third-eye/models/aggregated_block_feed/query_price_feed" "github.com/Gearbox-protocol/third-eye/tests/framework" "github.com/ethereum/go-ethereum/common" ) @@ -43,7 +44,7 @@ func updateAQF(t *testing.T, aqf *aggregated_block_feed.AQFWrapper, addressMap m // set feed to token if syncAdapterObj != nil { for _, adapter := range syncAdapterObj.Adapters { - aqf.AddYearnFeed(aggregated_block_feed.NewQueryPriceFeedFromAdapter(adapter)) + aqf.AddYearnFeed(query_price_feed.NewQueryPriceFeedFromAdapter(adapter)) } } log.Info(addressMap) @@ -61,8 +62,8 @@ func updateAQF(t *testing.T, aqf *aggregated_block_feed.AQFWrapper, addressMap m aqf.GetDepFetcher().TokenSymMap = tokenSymMap aqf.ChainlinkPriceUpdatedAt(addressMap["Token_1"], []int64{4, 11, 26, 51, 53, 58}) // - aqf.DisableYearnFeed(addressMap["Token_4"], addressMap["YearnFeed_3"], 56) - aqf.AddFeedOrToken(addressMap["Token_4"], addressMap["YearnFeed_4"], ds.YearnPF, 56, core.NewVersion(2)) + aqf.DisableYearnFeed(addressMap["Token_4"], addressMap["YearnFeed_3"], 56, schemas.V2PF) + aqf.AddFeedOrToken(addressMap["Token_4"], addressMap["YearnFeed_4"], ds.YearnPF, 56, schemas.V2PF) } func reverseMap(in map[string]string) (r map[string]string) {