Skip to content

Commit

Permalink
CCIP-4403 lbtc onchain reader
Browse files Browse the repository at this point in the history
  • Loading branch information
bukata-sa committed Dec 2, 2024
1 parent 8c94ed4 commit 9ae8e83
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 22 deletions.
4 changes: 0 additions & 4 deletions core/services/ocr2/plugins/ccip/exportinternal.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ func NewLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address,
return ccipdata.NewLBTCReader(lggr, jobID, transmitter, lp, registerFilters)
}

func CloseLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller) error {
return ccipdata.CloseLBTCReader(lggr, jobID, transmitter, lp)
}

type USDCReaderImpl = ccipdata.USDCReaderImpl
type LBTCReaderImpl = ccipdata.LBTCReaderImpl

Expand Down
122 changes: 118 additions & 4 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/lbtc_reader.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,137 @@
package ccipdata

import (
"bytes"
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
)

var (
_ LBTCReader = &LBTCReaderImpl{}
)

// TODO: Implement lbtc token reader
const (
LBTC_DEPOSIT_FILTER_NAME = "LBTC deposited"
LBTC_PAYLOAD_ABI = `[{"type": "bytes"}]`
)

type lbtcPayload []byte

func (d lbtcPayload) AbiString() string {
return LBTC_PAYLOAD_ABI
}

func (d lbtcPayload) Validate() error {
if len(d) == 0 {
return errors.New("must be non-empty")
}
return nil
}

type LBTCReader interface {
GetLBTCMessageInTx(ctx context.Context, payloadHash []byte, txHash string) ([]byte, error)
Close() error
}

type LBTCReaderImpl struct {
eventID common.Hash
lp logpoller.LogPoller
filter logpoller.Filter
lggr logger.Logger
transmitterAddress common.Address

// shortLivedInMemLogs is a short-lived cache (items expire every few seconds)
// used to prevent frequent log fetching from the log poller
shortLivedInMemLogs *cache.Cache
}

func NewLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller, registerFilters bool) (*LBTCReaderImpl, error) {
return &LBTCReaderImpl{}, nil
return NewLBTCReaderWithCache(lggr, jobID, transmitter, lp, cache.New(shortLivedInMemLogsCacheExpiration, 2*shortLivedInMemLogsCacheExpiration), registerFilters)
}

func CloseLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller) error {
return nil
func NewLBTCReaderWithCache(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller, cache *cache.Cache, registerFilters bool) (*LBTCReaderImpl, error) {
eventSig := utils.Keccak256Fixed([]byte("DepositToBridge(address,bytes32,bytes32,bytes)"))
r := &LBTCReaderImpl{
lggr: lggr,
lp: lp,
eventID: eventSig,
filter: logpoller.Filter{
Name: logpoller.FilterName(LBTC_DEPOSIT_FILTER_NAME, jobID, transmitter.Hex()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{transmitter},
Retention: CommitExecLogsRetention,
},
transmitterAddress: transmitter,
shortLivedInMemLogs: cache,
}

if registerFilters {
if err := r.RegisterFilters(); err != nil {
return nil, fmt.Errorf("register filters: %w", err)
}
}
return r, nil
}

func (r *LBTCReaderImpl) GetLBTCMessageInTx(ctx context.Context, payloadHash []byte, txHash string) ([]byte, error) {
var lpLogs []logpoller.Log

// fetch all the lbtc logs for the provided tx hash
key := fmt.Sprintf("lbtc-%s", txHash)
if rawLogs, foundInMem := r.shortLivedInMemLogs.Get(key); foundInMem {
inMemLogs, ok := rawLogs.([]logpoller.Log)
if !ok {
return nil, errors.Errorf("unexpected in-mem logs type %T", rawLogs)
}
r.lggr.Debugw("found logs in memory", "key", key, "len", len(inMemLogs))
lpLogs = inMemLogs
}
if len(lpLogs) == 0 {
r.lggr.Debugw("fetching logs from lp")
var err error
lpLogs, err = r.lp.IndexedLogsByTxHash(
ctx,
r.eventID,
r.transmitterAddress,
common.HexToHash(txHash),
)
if err != nil {
return nil, err
}
r.shortLivedInMemLogs.Set(key, lpLogs, cache.DefaultExpiration)
r.lggr.Debugw("fetched logs from lp", "logs", len(lpLogs))
}
for _, log := range lpLogs {
topics := log.GetTopics()
if currentPayloadHash := topics[3]; bytes.Equal(currentPayloadHash[:], payloadHash) {
return parseLBTCDeositPayload(log.Data)
}
}
return nil, fmt.Errorf("payload with hash=%s not found in logs", hexutil.Encode(payloadHash))
}

func parseLBTCDeositPayload(logData []byte) ([]byte, error) {
decodeAbiStruct, err := abihelpers.DecodeAbiStruct[lbtcPayload](logData)
if err != nil {
return nil, err
}
return decodeAbiStruct, nil
}

func (r *LBTCReaderImpl) RegisterFilters() error {
return r.lp.RegisterFilter(context.Background(), r.filter)
}

func (r *LBTCReaderImpl) Close() error {
return r.lp.UnregisterFilter(context.Background(), r.filter.Name)
}
198 changes: 198 additions & 0 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/lbtc_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package ccipdata

import (
"context"
"crypto/sha256"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
lpmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
types2 "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
)

func TestLBTCParse(t *testing.T) {
encodedPayload, err := hexutil.Decode("0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000e45c70a5050000000000000000000000000000000000000000000000000000000000aa36a7000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc0000000000000000000000000000000000000000000000000000000000014a34000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc00000000000000000000000062f10ce5b727edf787ea45776bd050308a61150800000000000000000000000000000000000000000000000000000000000003e6000000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000")
require.NoError(t, err)
payload, err := parseLBTCDeositPayload(encodedPayload)
require.NoError(t, err)
expected := "0x5c70a5050000000000000000000000000000000000000000000000000000000000aa36a7000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc0000000000000000000000000000000000000000000000000000000000014a34000000000000000000000000845f8e3c214d8d0e4d83fc094f302aa26a12a0bc00000000000000000000000062f10ce5b727edf787ea45776bd050308a61150800000000000000000000000000000000000000000000000000000000000003e60000000000000000000000000000000000000000000000000000000000000006"
assert.Equal(t, expected, hexutil.Encode(payload))
}

func Test_MockLogPoller(t *testing.T) {
lggr := logger.TestLogger(t)
payload := []byte("0x1111")
payloadHash := sha256.Sum256(payload)
t.Run("found one", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
reader, err := NewLBTCReader(lggr, "job_1", utils.RandomAddress(), lp, false)
require.NoError(t, err)
lp.On("IndexedLogsByTxHash", mock.Anything, reader.eventID, reader.transmitterAddress, mock.Anything).
Return([]logpoller.Log{
LogWithPayload(t, 20, payload),
}, nil)

data, err := reader.GetLBTCMessageInTx(context.Background(), payloadHash[:], "0x0001")
assert.NoError(t, err)
assert.Equal(t, payload, data)
})

t.Run("found multiple", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
reader, err := NewLBTCReader(lggr, "job_1", utils.RandomAddress(), lp, false)
require.NoError(t, err)
lp.On("IndexedLogsByTxHash", mock.Anything, reader.eventID, reader.transmitterAddress, mock.Anything).
Return([]logpoller.Log{
LogWithPayload(t, 10, []byte("0x1110")),
LogWithPayload(t, 20, payload),
LogWithPayload(t, 30, []byte("0x2222")),
}, nil)

data, err := reader.GetLBTCMessageInTx(context.Background(), payloadHash[:], "0x0001")
assert.NoError(t, err)
assert.Equal(t, payload, data)
})

t.Run("found multiple none match", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
reader, err := NewLBTCReader(lggr, "job_1", utils.RandomAddress(), lp, false)
require.NoError(t, err)
lp.On("IndexedLogsByTxHash", mock.Anything, reader.eventID, reader.transmitterAddress, mock.Anything).
Return([]logpoller.Log{
LogWithPayload(t, 10, []byte("0x1110")),
LogWithPayload(t, 30, []byte("0x2222")),
}, nil)

data, err := reader.GetLBTCMessageInTx(context.Background(), payloadHash[:], "0x0001")
assert.Nil(t, data)
assert.Errorf(t, err, "payload with hash=%s not found in logs", payloadHash)
})

t.Run("no logs found", func(t *testing.T) {
lp := lpmocks.NewLogPoller(t)
reader, err := NewLBTCReader(lggr, "job_1", utils.RandomAddress(), lp, false)
require.NoError(t, err)
lp.On("IndexedLogsByTxHash", mock.Anything, reader.eventID, reader.transmitterAddress, mock.Anything).
Return([]logpoller.Log{}, nil)

data, err := reader.GetLBTCMessageInTx(context.Background(), payloadHash[:], "0x0001")
assert.Nil(t, data)
assert.Errorf(t, err, "payload with hash=%s not found in logs", payloadHash)
})

t.Run("cache hit", func(t *testing.T) {
rCache := cache.New(cache.NoExpiration, cache.NoExpiration)
err := rCache.Add("lbtc-0x0001", []logpoller.Log{LogWithPayload(t, 20, payload)}, cache.NoExpiration)
require.NoError(t, err)
r, err := NewLBTCReaderWithCache(lggr, "job_1", utils.RandomAddress(), nil, rCache, false)
require.NoError(t, err)
data, err := r.GetLBTCMessageInTx(context.Background(), payloadHash[:], "0x0001")
assert.NoError(t, err)
assert.Equal(t, payload, data)
})
}

func Test_SimulatedLogPoller_FoundMultiple(t *testing.T) {
lggr := logger.TestLogger(t)
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)
o := logpoller.NewORM(chainID, db, lggr)

transmitter := utils.RandomAddress()
payload := []byte("0x1111")
payloadHash := sha256.Sum256(payload)
logs := []types.Log{
EthLogWithPayload(t, 10, transmitter, []byte("0x2222")),
EthLogWithPayload(t, 20, utils.RandomAddress(), payload),
EthLogWithPayload(t, 30, transmitter, payload),
}

ec := evmclimocks.NewClient(t)
head := types2.NewHead(big.NewInt(1), common.Hash{}, common.Hash{}, 0, ubig.New(chainID))
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil)
ec.On("FilterLogs", mock.Anything, mock.Anything).Return(logs, nil)
ec.On("ConfiguredChainID").Return(chainID, nil)

lpOpts := logpoller.Opts{
PollPeriod: time.Hour,
FinalityDepth: 1,
BackfillBatchSize: 1,
RpcBatchSize: 1,
KeepFinalizedBlocksDepth: 100,
}
headTracker := headtracker.NewSimulatedHeadTracker(ec, lpOpts.UseFinalityTag, lpOpts.FinalityDepth)
lp := logpoller.NewLogPoller(o, ec, lggr, headTracker, lpOpts)
lp.PollAndSaveLogs(context.Background(), 1)

reader, err := NewLBTCReader(lggr, "job_1", transmitter, lp, true)
require.NoError(t, err)

data, err := reader.GetLBTCMessageInTx(context.Background(), payloadHash[:], common.Hash{}.Hex())
assert.NoError(t, err)
assert.Equal(t, payload, data)
}

func EthLogWithPayload(t *testing.T, logIndex uint, transmitter common.Address, payload []byte) types.Log {
encodedPayload, err := abihelpers.ABIEncode(LBTC_PAYLOAD_ABI, payload)
require.NoError(t, err)
payloadHash := sha256.Sum256(payload)
topics := make([]common.Hash, 4, 4)

Check failure on line 158 in core/services/ocr2/plugins/ccip/internal/ccipdata/lbtc_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

S1019: should use make([]common.Hash, 4) instead (gosimple)
topics[0] = crypto.Keccak256Hash([]byte("DepositToBridge(address,bytes32,bytes32,bytes)"))
topics[3] = common.BytesToHash(payloadHash[:])
return types.Log{
Address: transmitter,
Topics: topics,
Data: encodedPayload,
BlockNumber: 1,
TxHash: common.Hash{},
TxIndex: 1,
BlockHash: common.Hash{},
Index: logIndex,
Removed: false,
}
}

func LogWithPayload(t *testing.T, index int64, payload []byte) logpoller.Log {
payloadHash := sha256.Sum256(payload)
topics := make([][]byte, 4, 4)

Check failure on line 176 in core/services/ocr2/plugins/ccip/internal/ccipdata/lbtc_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

S1019: should use make([][]byte, 4) instead (gosimple)
topics[3] = payloadHash[:]
logData, err := abihelpers.ABIEncode(LBTC_PAYLOAD_ABI, payload)
require.NoError(t, err)
return logpoller.Log{
LogIndex: index,
Topics: topics,
Data: logData,
}
}

//func EthLogWithPayload(t *testing.T, index int64, payload []byte) logpoller.Log {
// payloadHash := sha256.Sum256(payload)
// topics := make([][]byte, 4, 4)
// topics[3] = payloadHash[:]
// logData, err := abihelpers.ABIEncode(LBTC_PAYLOAD_ABI, payload)
// require.NoError(t, err)
// return types.Log{
// LogIndex: index,
// Topics: topics,
// Data: logData,
// }
//}
Loading

0 comments on commit 9ae8e83

Please sign in to comment.