Skip to content

Commit

Permalink
fix: add new dataCompressor
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 committed Feb 16, 2024
1 parent 5c89592 commit 4992a11
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 29 deletions.
31 changes: 16 additions & 15 deletions ds/dc_wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ type DataCompressorWrapper struct {
BlockNumToName map[int64]string
discoveredAtToAddr map[int64]common.Address
// for v3 version to address
versionToAddress map[core.VersionType]addressAndBlock
byVersion bool
versionToAddress map[core.VersionType][]addressAndBlock
// for v1
creditManagerToFilter map[common.Address]common.Address
//
Expand All @@ -41,10 +40,6 @@ type DataCompressorWrapper struct {
client core.ClientI
}

func (dcw *DataCompressorWrapper) DCAreByVersion() {
dcw.byVersion = true
}

var DCV1 = "DCV1"
var DCV2 = "DCV2"
var TESTING = "TESTING"
Expand All @@ -64,7 +59,7 @@ func NewDataCompressorWrapper(client core.ClientI) *DataCompressorWrapper {
calls: map[int64]*dc.DCCalls{},
client: client,
},
versionToAddress: map[core.VersionType]addressAndBlock{},
versionToAddress: map[core.VersionType][]addressAndBlock{},
}
}

Expand Down Expand Up @@ -130,10 +125,13 @@ func (dcw *DataCompressorWrapper) AddDataCompressorByVersion(version core.Versio
}

func (dcw *DataCompressorWrapper) addDataCompressorByVersion(version core.VersionType, addr string, discoveredAt int64) {
dcw.versionToAddress[version] = addressAndBlock{
dcw.versionToAddress[version] = append(dcw.versionToAddress[version], addressAndBlock{
address: common.HexToAddress(addr),
block: discoveredAt,
}
})
sort.Slice(dcw.versionToAddress[version], func(i, j int) bool {
return dcw.versionToAddress[version][i].block < dcw.versionToAddress[version][j].block
})
}

func (dcw *DataCompressorWrapper) LoadMultipleDC(multiDCs interface{}) {
Expand Down Expand Up @@ -170,21 +168,24 @@ func (dcw *DataCompressorWrapper) LoadMultipleDC(multiDCs interface{}) {

func (dcw *DataCompressorWrapper) GetKeyAndAddress(version core.VersionType, blockNum int64) (string, common.Address) {
if version.MoreThanEq(core.NewVersion(300)) {
if blockNum < dcw.versionToAddress[version].block {
return NODC, core.NULL_ADDR
for _, entry := range dcw.versionToAddress[version] {
if entry.block <= blockNum {
return DCV3, entry.address
}
}
return DCV3, dcw.versionToAddress[version].address
return NODC, core.NULL_ADDR
}
key, discoveredAt := dcw.getDataCompressorIndex(blockNum)
return key, dcw.getDCAddr(discoveredAt)
}
func (dcw *DataCompressorWrapper) GetLatestv3DC() (common.Address, bool) {
version := core.NewVersion(300)
dc, ok := dcw.versionToAddress[version]
if !ok {
if len(dcw.versionToAddress[version]) == 0 {
return core.NULL_ADDR, false
}
return dc.address, true
//
dcs := dcw.versionToAddress[version]
return dcs[len(dcs)-1].address, true
}

func (dcw *DataCompressorWrapper) GetCreditAccountData(version core.VersionType, blockNum int64, creditManager common.Address, borrower common.Address, account common.Address) (
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ require (

replace github.com/btcsuite/btcd => github.com/btcsuite/btcd v0.22.1

// replace github.com/Gearbox-protocol/sdk-go v0.0.0-20240121203826-89ee59ad653a => ../sdk-go
replace github.com/Gearbox-protocol/sdk-go v0.0.0-20240216014533-b42549c9dbfa => ../sdk-go
12 changes: 10 additions & 2 deletions migrations/000039_pf_version.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@ alter table price_feeds add merged_pf_version integer;
update price_feeds set merged_pf_version=1 where price_in_usd='f';
update price_feeds set merged_pf_version= 2 where price_in_usd='t' and block_num < 18798084 ;
update price_feeds set merged_pf_version= 2 where price_in_usd='t' and block_num >= 18798084 and feed='0x4A7b3F6c4aaB7Bc5617D6c30C3f22bAeBbc34F43';
update price_feeds set merged_pf_version=4 where price_in_usd='t' and feed in (select feed from token_oracle where version=300); -- v3PF_main
update price_feeds set merged_pf_version=4 where price_in_usd='t' and block_num >= 18798084 and feed in (select feed from token_oracle where version=300); -- v3PF_main

update price_feeds set merged_pf_version=6 where block_num >= 18798084 and feed in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300));
update price_feeds set merged_pf_version=6 where block_num >= 18798084 and feed in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)); -- tokens that don't have entyr for v3 not are present in v2.

update price_feeds set merged_pf_version=6 where price_in_usd='t' and block_num >= 18798084 and feed in (select feed from (select * from token_oracle where version=300 union select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t group by feed,token having count(*)>1); -- feed that have both active v2 and v3 price oracle


delete from price_feeds where feed='0x37bC7498f4FF12C19678ee8fE19d713b87F6a9e6' and block_num > 17217055; -- // as there is already another chainlink activated 0xE62B71cf983019BFf55bC83B48601ce8419650CC

--
update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',6)) where address in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300));

update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',4)) where version=300 and type in ('ChainlinkPriceFeed', 'CompositeChainlinkPF', 'QueryPriceFeed');

update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',2)) where version =2 and address not in (select distinct feed from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300)) and address!='0x6385892aCB085eaa24b745a712C9e682d80FF681';
-- below query overrides the above query when a feed is active in both 2 and 6 .
update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',6)) where address in (select feed from (select * from token_oracle where version=300 union select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t group by feed,token having count(*)>1);


update sync_adapters set details=( details || jsonb_build_object('mergedPFVersion',1)) where version=1 and type in ('ChainlinkPriceFeed', 'CompositeChainlinkPF', 'QueryPriceFeed') and details is not null;

--

create table t as (select * from (select distinct on (token) * from token_oracle where version=2 order by token, block_num desc) t where token not in (select token from token_oracle where version=300));
Expand Down
11 changes: 6 additions & 5 deletions models/aggregated_block_feed/query_price_feed/token_ds.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
)

type DetailsDS struct {
PFType string `json:"pfType"`
Reduntant map[string][]int64 `json:"token,omitempty"` // token enabled and disabled at block numbers
Tokens map[string]map[schemas.PFVersion][]int64 `json:"tokens"` // token enabled and disabled at block numbers
Logs [][]interface{} `json:"logs"`
MergedPFVersion *schemas.MergedPFVersion `json:"mergedPFVersion,omitempty"`
PFType string `json:"pfType"`
Tokens map[string]map[schemas.PFVersion][]int64 `json:"tokens"` // token enabled and disabled at block numbers
Logs [][]interface{} `json:"logs"`
// redunantt
Reduntant map[string][]int64 `json:"token,omitempty"` // token enabled and disabled at block numbers
MergedPFVersion *schemas.MergedPFVersion `json:"mergedPFVersion,omitempty"`
}

// func NewDetailsDS(pfType string, token string, discoveredAt int64, pfVersion schemas.PFVersion) *DetailsDS {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ func NewQueryPriceFeed(token, oracle string, pfType string, discoveredAt int64,
Client: client,
},
Details: map[string]interface{}{
"tokens": map[string]map[schemas.PFVersion][]int64{token: {pfVersion: {discoveredAt}}},
"pfType": pfType,
"pfVersion": map[string]int64{"token": int64(pfVersion)},
"tokens": map[string]map[schemas.PFVersion][]int64{token: {pfVersion: {discoveredAt}}},
"pfType": pfType,
},
LastSync: discoveredAt,
V: pfVersion.ToVersion(),
Expand Down Expand Up @@ -133,6 +132,6 @@ func mergePFVersionAt(blockNum int64, details map[schemas.PFVersion][]int64) sch
}

func (mdl *QueryPriceFeed) AfterSyncHook(b int64) {
mdl.DetailsDS.Save()
mdl.Details = mdl.DetailsDS.Save()
mdl.SyncAdapter.AfterSyncHook(b)
}
4 changes: 4 additions & 0 deletions models/chainlink_price_feed/on_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (mdl *ChainlinkPriceFeed) OnLogs(txLogs []types.Log) {
if txLogInd+1 < len(txLogs) && int64(txLogs[txLogInd+1].BlockNumber) == blockNum {
continue
}
if mdl.Token == "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" &&
mdl.Address == "0x37bC7498f4FF12C19678ee8fE19d713b87F6a9e6" && blockNum > 17217055 { // as there is already another chainlink activated 0xE62B71cf983019BFf55bC83B48601ce8419650CC
continue
}
//
roundId, err := strconv.ParseInt(txLog.Topics[2].Hex()[50:], 16, 64)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion models/credit_manager/cm_common/credit_session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func (mdl *CommonCMAdapter) closeSessionCallAndResultFn(closedAt int64, sessionI
}
return call, func(result multicall.Multicall2Result) {
if !result.Success {
log.Fatalf("Failing GetAccount for CM:%s Borrower:%s at %d: %v", mdl.GetAddress(), session.Borrower, closedAt-1, result.ReturnData)
key, dc := mdl.Repo.GetDCWrapper().GetKeyAndAddress(session.Version, closedAt-1)
log.Fatalf("Failing GetAccount for CM:%s Borrower:%s at %d: %v, dc: %s(%s)", mdl.GetAddress(), session.Borrower, closedAt-1, result.ReturnData, key, dc)
}
dcAccountData, err := resultFn(result.ReturnData)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions repository/handlers/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (repo *BlocksRepo) Clear() {

// setter
func (repo *BlocksRepo) AddPriceFeed(pf *schemas.PriceFeed) {

if pf.MergedPFVersion == 0 {
log.Fatal(utils.ToJson(pf))
}
Expand Down
3 changes: 2 additions & 1 deletion repository/handlers/prev_price.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func NewPrevPriceStore(client core.ClientI, tokensRepo *TokensRepo) *PrevPriceSt
func (repo *PrevPriceStore) loadPrevPriceFeed(db *gorm.DB) {
defer utils.Elapsed("loadPrevPriceFeed")()
data := []*schemas.PriceFeed{}
err := db.Raw("SELECT distinct on(token, merged_pf_version) * FROM price_feeds ORDER BY token, merged_pf_version, block_num DESC").Find(&data).Error
err := db.Raw(`SELECT * FROM
(SELECT distinct on(token, merged_pf_version) * FROM price_feeds ORDER BY token, merged_pf_version, block_num DESC) t ORDER BY block_num`).Find(&data).Error
log.CheckFatal(err)
for _, pf := range data {
repo.isPFAdded(pf, false)
Expand Down

0 comments on commit 4992a11

Please sign in to comment.