From 4992a119b2881c547f7c8eeb0ef89308172b02b5 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Fri, 16 Feb 2024 17:34:09 +0400 Subject: [PATCH] fix: add new dataCompressor --- ds/dc_wrapper/wrapper.go | 31 ++++++++++--------- go.mod | 2 +- migrations/000039_pf_version.up.sql | 12 +++++-- .../query_price_feed/token_ds.go | 11 ++++--- .../query_price_feed/yearn_price_feed.go | 7 ++--- models/chainlink_price_feed/on_log.go | 4 +++ .../cm_common/credit_session_state.go | 3 +- repository/handlers/blocks.go | 1 + repository/handlers/prev_price.go | 3 +- 9 files changed, 45 insertions(+), 29 deletions(-) diff --git a/ds/dc_wrapper/wrapper.go b/ds/dc_wrapper/wrapper.go index 1d4fefd9..b17727e0 100644 --- a/ds/dc_wrapper/wrapper.go +++ b/ds/dc_wrapper/wrapper.go @@ -31,8 +31,7 @@ type DataCompressorWrapper struct { BlockNumToName map[int64]string discoveredAtToAddr map[int64]common.Address // for v3 version to address - versionToAddress map[core.VersionType]addressAndBlock - byVersion bool + versionToAddress map[core.VersionType][]addressAndBlock // for v1 creditManagerToFilter map[common.Address]common.Address // @@ -41,10 +40,6 @@ type DataCompressorWrapper struct { client core.ClientI } -func (dcw *DataCompressorWrapper) DCAreByVersion() { - dcw.byVersion = true -} - var DCV1 = "DCV1" var DCV2 = "DCV2" var TESTING = "TESTING" @@ -64,7 +59,7 @@ func NewDataCompressorWrapper(client core.ClientI) *DataCompressorWrapper { calls: map[int64]*dc.DCCalls{}, client: client, }, - versionToAddress: map[core.VersionType]addressAndBlock{}, + versionToAddress: map[core.VersionType][]addressAndBlock{}, } } @@ -130,10 +125,13 @@ func (dcw *DataCompressorWrapper) AddDataCompressorByVersion(version core.Versio } func (dcw *DataCompressorWrapper) addDataCompressorByVersion(version core.VersionType, addr string, discoveredAt int64) { - dcw.versionToAddress[version] = addressAndBlock{ + dcw.versionToAddress[version] = append(dcw.versionToAddress[version], addressAndBlock{ address: common.HexToAddress(addr), block: discoveredAt, - } + }) + sort.Slice(dcw.versionToAddress[version], func(i, j int) bool { + return dcw.versionToAddress[version][i].block < dcw.versionToAddress[version][j].block + }) } func (dcw *DataCompressorWrapper) LoadMultipleDC(multiDCs interface{}) { @@ -170,21 +168,24 @@ func (dcw *DataCompressorWrapper) LoadMultipleDC(multiDCs interface{}) { func (dcw *DataCompressorWrapper) GetKeyAndAddress(version core.VersionType, blockNum int64) (string, common.Address) { if version.MoreThanEq(core.NewVersion(300)) { - if blockNum < dcw.versionToAddress[version].block { - return NODC, core.NULL_ADDR + for _, entry := range dcw.versionToAddress[version] { + if entry.block <= blockNum { + return DCV3, entry.address + } } - return DCV3, dcw.versionToAddress[version].address + return NODC, core.NULL_ADDR } key, discoveredAt := dcw.getDataCompressorIndex(blockNum) return key, dcw.getDCAddr(discoveredAt) } func (dcw *DataCompressorWrapper) GetLatestv3DC() (common.Address, bool) { version := core.NewVersion(300) - dc, ok := dcw.versionToAddress[version] - if !ok { + if len(dcw.versionToAddress[version]) == 0 { return core.NULL_ADDR, false } - return dc.address, true + // + dcs := dcw.versionToAddress[version] + return dcs[len(dcs)-1].address, true } func (dcw *DataCompressorWrapper) GetCreditAccountData(version core.VersionType, blockNum int64, creditManager common.Address, borrower common.Address, account common.Address) ( diff --git a/go.mod b/go.mod index 01538ee5..d9d1ed14 100644 --- a/go.mod +++ b/go.mod @@ -73,4 +73,4 @@ require ( replace github.com/btcsuite/btcd => github.com/btcsuite/btcd v0.22.1 -// replace github.com/Gearbox-protocol/sdk-go v0.0.0-20240121203826-89ee59ad653a => ../sdk-go +replace github.com/Gearbox-protocol/sdk-go v0.0.0-20240216014533-b42549c9dbfa => ../sdk-go diff --git a/migrations/000039_pf_version.up.sql b/migrations/000039_pf_version.up.sql index 467f1ff2..4d73756d 100644 --- a/migrations/000039_pf_version.up.sql +++ b/migrations/000039_pf_version.up.sql @@ -8,10 +8,14 @@ alter table price_feeds add merged_pf_version integer; update price_feeds set merged_pf_version=1 where price_in_usd='f'; update price_feeds set merged_pf_version= 2 where price_in_usd='t' and block_num < 18798084 ; update price_feeds set merged_pf_version= 2 where price_in_usd='t' and block_num >= 18798084 and feed='0x4A7b3F6c4aaB7Bc5617D6c30C3f22bAeBbc34F43'; -update price_feeds set merged_pf_version=4 where price_in_usd='t' and feed in (select feed from token_oracle where version=300); -- v3PF_main +update price_feeds set merged_pf_version=4 where price_in_usd='t' and block_num >= 18798084 and feed in (select feed from token_oracle where version=300); -- v3PF_main -update price_feeds set merged_pf_version=6 where block_num >= 18798084 and feed in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)); +update price_feeds set merged_pf_version=6 where block_num >= 18798084 and feed in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)); -- tokens that don't have entyr for v3 not are present in v2. +update price_feeds set merged_pf_version=6 where price_in_usd='t' and block_num >= 18798084 and feed in (select feed from (select * from token_oracle where version=300 union select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t group by feed,token having count(*)>1); -- feed that have both active v2 and v3 price oracle + + +delete from price_feeds where feed='0x37bC7498f4FF12C19678ee8fE19d713b87F6a9e6' and block_num > 17217055; -- // as there is already another chainlink activated 0xE62B71cf983019BFf55bC83B48601ce8419650CC -- update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',6)) where address in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)); @@ -19,8 +23,12 @@ update sync_adapters set details=( details || jsonb_build_object('mergedPFVersio update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',4)) where version=300 and type in ('ChainlinkPriceFeed', 'CompositeChainlinkPF', 'QueryPriceFeed'); update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',2)) where version =2 and address not in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)) and address!='0x6385892aCB085eaa24b745a712C9e682d80FF681'; +-- below query overrides the above query when a feed is active in both 2 and 6 . +update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',6)) where address in (select feed from (select * from token_oracle where version=300 union select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t group by feed,token having count(*)>1); + update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',1)) where version=1 and type in ('ChainlinkPriceFeed', 'CompositeChainlinkPF', 'QueryPriceFeed') and details is not null; + -- create table t as (select * from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)); diff --git a/models/aggregated_block_feed/query_price_feed/token_ds.go b/models/aggregated_block_feed/query_price_feed/token_ds.go index f3eb48a3..419b5e03 100644 --- a/models/aggregated_block_feed/query_price_feed/token_ds.go +++ b/models/aggregated_block_feed/query_price_feed/token_ds.go @@ -11,11 +11,12 @@ import ( ) type DetailsDS struct { - PFType string `json:"pfType"` - Reduntant map[string][]int64 `json:"token,omitempty"` // token enabled and disabled at block numbers - Tokens map[string]map[schemas.PFVersion][]int64 `json:"tokens"` // token enabled and disabled at block numbers - Logs [][]interface{} `json:"logs"` - MergedPFVersion *schemas.MergedPFVersion `json:"mergedPFVersion,omitempty"` + PFType string `json:"pfType"` + Tokens map[string]map[schemas.PFVersion][]int64 `json:"tokens"` // token enabled and disabled at block numbers + Logs [][]interface{} `json:"logs"` + // redunantt + Reduntant map[string][]int64 `json:"token,omitempty"` // token enabled and disabled at block numbers + MergedPFVersion *schemas.MergedPFVersion `json:"mergedPFVersion,omitempty"` } // func NewDetailsDS(pfType string, token string, discoveredAt int64, pfVersion schemas.PFVersion) *DetailsDS { diff --git a/models/aggregated_block_feed/query_price_feed/yearn_price_feed.go b/models/aggregated_block_feed/query_price_feed/yearn_price_feed.go index 337aa5b3..7eb6917b 100644 --- a/models/aggregated_block_feed/query_price_feed/yearn_price_feed.go +++ b/models/aggregated_block_feed/query_price_feed/yearn_price_feed.go @@ -31,9 +31,8 @@ func NewQueryPriceFeed(token, oracle string, pfType string, discoveredAt int64, Client: client, }, Details: map[string]interface{}{ - "tokens": map[string]map[schemas.PFVersion][]int64{token: {pfVersion: {discoveredAt}}}, - "pfType": pfType, - "pfVersion": map[string]int64{"token": int64(pfVersion)}, + "tokens": map[string]map[schemas.PFVersion][]int64{token: {pfVersion: {discoveredAt}}}, + "pfType": pfType, }, LastSync: discoveredAt, V: pfVersion.ToVersion(), @@ -133,6 +132,6 @@ func mergePFVersionAt(blockNum int64, details map[schemas.PFVersion][]int64) sch } func (mdl *QueryPriceFeed) AfterSyncHook(b int64) { - mdl.DetailsDS.Save() + mdl.Details = mdl.DetailsDS.Save() mdl.SyncAdapter.AfterSyncHook(b) } diff --git a/models/chainlink_price_feed/on_log.go b/models/chainlink_price_feed/on_log.go index c7100bec..39e05939 100644 --- a/models/chainlink_price_feed/on_log.go +++ b/models/chainlink_price_feed/on_log.go @@ -31,6 +31,10 @@ func (mdl *ChainlinkPriceFeed) OnLogs(txLogs []types.Log) { if txLogInd+1 < len(txLogs) && int64(txLogs[txLogInd+1].BlockNumber) == blockNum { continue } + if mdl.Token == "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" && + mdl.Address == "0x37bC7498f4FF12C19678ee8fE19d713b87F6a9e6" && blockNum > 17217055 { // as there is already another chainlink activated 0xE62B71cf983019BFf55bC83B48601ce8419650CC + continue + } // roundId, err := strconv.ParseInt(txLog.Topics[2].Hex()[50:], 16, 64) if err != nil { diff --git a/models/credit_manager/cm_common/credit_session_state.go b/models/credit_manager/cm_common/credit_session_state.go index 9958f4a8..82e20e5b 100644 --- a/models/credit_manager/cm_common/credit_session_state.go +++ b/models/credit_manager/cm_common/credit_session_state.go @@ -141,7 +141,8 @@ func (mdl *CommonCMAdapter) closeSessionCallAndResultFn(closedAt int64, sessionI } return call, func(result multicall.Multicall2Result) { if !result.Success { - log.Fatalf("Failing GetAccount for CM:%s Borrower:%s at %d: %v", mdl.GetAddress(), session.Borrower, closedAt-1, result.ReturnData) + key, dc := mdl.Repo.GetDCWrapper().GetKeyAndAddress(session.Version, closedAt-1) + log.Fatalf("Failing GetAccount for CM:%s Borrower:%s at %d: %v, dc: %s(%s)", mdl.GetAddress(), session.Borrower, closedAt-1, result.ReturnData, key, dc) } dcAccountData, err := resultFn(result.ReturnData) if err != nil { diff --git a/repository/handlers/blocks.go b/repository/handlers/blocks.go index a688be2e..ac77708b 100644 --- a/repository/handlers/blocks.go +++ b/repository/handlers/blocks.go @@ -171,6 +171,7 @@ func (repo *BlocksRepo) Clear() { // setter func (repo *BlocksRepo) AddPriceFeed(pf *schemas.PriceFeed) { + if pf.MergedPFVersion == 0 { log.Fatal(utils.ToJson(pf)) } diff --git a/repository/handlers/prev_price.go b/repository/handlers/prev_price.go index c8f4a3c1..d55a11c9 100644 --- a/repository/handlers/prev_price.go +++ b/repository/handlers/prev_price.go @@ -40,7 +40,8 @@ func NewPrevPriceStore(client core.ClientI, tokensRepo *TokensRepo) *PrevPriceSt func (repo *PrevPriceStore) loadPrevPriceFeed(db *gorm.DB) { defer utils.Elapsed("loadPrevPriceFeed")() data := []*schemas.PriceFeed{} - err := db.Raw("SELECT distinct on(token, merged_pf_version) * FROM price_feeds ORDER BY token, merged_pf_version, block_num DESC").Find(&data).Error + err := db.Raw(`SELECT * FROM + (SELECT distinct on(token, merged_pf_version) * FROM price_feeds ORDER BY token, merged_pf_version, block_num DESC) t ORDER BY block_num`).Find(&data).Error log.CheckFatal(err) for _, pf := range data { repo.isPFAdded(pf, false)