Skip to content

Commit

Permalink
CCIP-4403 skeleton: LBTC attestation (#1554)
Browse files Browse the repository at this point in the history
https://smartcontract-it.atlassian.net/browse/CCIP-3488
## Motivation
Basic logic needed for LBTC attestation

---------

Co-authored-by: Nour Elrashidy <[email protected]>
  • Loading branch information
bukata-sa and NourElRashidy authored Dec 2, 2024
1 parent eb419a0 commit 8c94ed4
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 61 deletions.
14 changes: 14 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, jb job.Job, srcPro
}
tokenDataProviders[cciptypes.Address(pluginConfig.USDCConfig.SourceTokenAddress.String())] = usdcReader
}
// init lbtc token data provider
if pluginConfig.LBTCConfig.AttestationAPI != "" {
lggr.Infof("LBTC token data provider enabled")
err2 := pluginConfig.LBTCConfig.ValidateLBTCConfig()
if err2 != nil {
return nil, err2
}

lbtcReader, err2 := srcProvider.NewTokenDataReader(ctx, ccip.EvmAddrToGeneric(pluginConfig.LBTCConfig.SourceTokenAddress))
if err2 != nil {
return nil, fmt.Errorf("new usdc reader: %w", err2)
}
tokenDataProviders[cciptypes.Address(pluginConfig.LBTCConfig.SourceTokenAddress.String())] = lbtcReader
}

// Prom wrappers
onRampReader = observability.NewObservedOnRampReader(onRampReader, srcChainID, ccip.ExecPluginLabel)
Expand Down
32 changes: 28 additions & 4 deletions core/services/ocr2/plugins/ccip/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (c *DynamicPriceGetterConfig) Validate() error {
type ExecPluginJobSpecConfig struct {
SourceStartBlock, DestStartBlock uint64 // Only for first time job add.
USDCConfig USDCConfig
LBTCConfig LBTCConfig
}

type USDCConfig struct {
Expand All @@ -119,10 +120,20 @@ type USDCConfig struct {
AttestationAPIIntervalMilliseconds int
}

type LBTCConfig struct {
SourceTokenAddress common.Address
SourceMessageTransmitterAddress common.Address
AttestationAPI string
AttestationAPITimeoutSeconds uint
// AttestationAPIIntervalMilliseconds can be set to -1 to disable or 0 to use a default interval.
AttestationAPIIntervalMilliseconds int
}

type ExecPluginConfig struct {
SourceStartBlock, DestStartBlock uint64 // Only for first time job add.
IsSourceProvider bool
USDCConfig USDCConfig
LBTCConfig LBTCConfig
JobID string
}

Expand All @@ -136,17 +147,30 @@ func (e ExecPluginConfig) Encode() ([]byte, error) {

func (uc *USDCConfig) ValidateUSDCConfig() error {
if uc.AttestationAPI == "" {
return errors.New("AttestationAPI is required")
return errors.New("USDCConfig: AttestationAPI is required")
}
if uc.AttestationAPIIntervalMilliseconds < -1 {
return errors.New("AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
return errors.New("USDCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
}
if uc.SourceTokenAddress == utils.ZeroAddress {
return errors.New("SourceTokenAddress is required")
return errors.New("USDCConfig: SourceTokenAddress is required")
}
if uc.SourceMessageTransmitterAddress == utils.ZeroAddress {
return errors.New("SourceMessageTransmitterAddress is required")
return errors.New("USDCConfig: SourceMessageTransmitterAddress is required")
}

return nil
}

func (lc *LBTCConfig) ValidateLBTCConfig() error {
if lc.AttestationAPI == "" {
return errors.New("LBTCConfig: AttestationAPI is required")
}
if lc.AttestationAPIIntervalMilliseconds < -1 {
return errors.New("LBTCConfig: AttestationAPIIntervalMilliseconds must be -1 to disable, 0 for default or greater to define the exact interval")
}
if lc.SourceTokenAddress == utils.ZeroAddress {
return errors.New("LBTCConfig: SourceTokenAddress is required")
}
return nil
}
9 changes: 9 additions & 0 deletions core/services/ocr2/plugins/ccip/exportinternal.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,16 @@ func CloseUSDCReader(lggr logger.Logger, jobID string, transmitter common.Addres
return ccipdata.CloseUSDCReader(lggr, jobID, transmitter, lp)
}

func NewLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller, registerFilters bool) (*ccipdata.LBTCReaderImpl, error) {
return ccipdata.NewLBTCReader(lggr, jobID, transmitter, lp, registerFilters)
}

func CloseLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller) error {
return ccipdata.CloseLBTCReader(lggr, jobID, transmitter, lp)
}

type USDCReaderImpl = ccipdata.USDCReaderImpl
type LBTCReaderImpl = ccipdata.LBTCReaderImpl

var DefaultRpcBatchSizeLimit = rpclib.DefaultRpcBatchSizeLimit
var DefaultRpcBatchBackOffMultiplier = rpclib.DefaultRpcBatchBackOffMultiplier
Expand Down
23 changes: 23 additions & 0 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/lbtc_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ccipdata

import (
"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// TODO: Implement lbtc token reader
type LBTCReader interface {
}

type LBTCReaderImpl struct {
}

func NewLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller, registerFilters bool) (*LBTCReaderImpl, error) {
return &LBTCReaderImpl{}, nil
}

func CloseLBTCReader(lggr logger.Logger, jobID string, transmitter common.Address, lp logpoller.LogPoller) error {
return nil
}
10 changes: 9 additions & 1 deletion core/services/ocr2/plugins/ccip/tokendata/http/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
)

type IHttpClient interface {
// Get issue a GET request to the given url and return the response body and status code.
// Get issues a GET request to the given url and returns the response body and status code.
Get(ctx context.Context, url string, timeout time.Duration) ([]byte, int, http.Header, error)

// Post issues a POST request to the given url with the given request data and returns the response body and status code.
Post(ctx context.Context, url string, requestData io.Reader, timeout time.Duration) ([]byte, int, http.Header, error)
}

type HttpClient struct {
Expand Down Expand Up @@ -46,3 +49,8 @@ func (s *HttpClient) Get(ctx context.Context, url string, timeout time.Duration)
body, err := io.ReadAll(res.Body)
return body, res.StatusCode, res.Header, err
}

func (s *HttpClient) Post(ctx context.Context, url string, requestData io.Reader, timeout time.Duration) ([]byte, int, http.Header, error) {
// TODO: Implement
return nil, 0, nil, nil
}
234 changes: 234 additions & 0 deletions core/services/ocr2/plugins/ccip/tokendata/lbtc/lbtc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package lbtc

import (
"context"
"crypto/sha256"
"fmt"
"net/url"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"golang.org/x/time/rate"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/http"
)

// TODO: double check the validty of default values for lombard's API after checking docs
const (
apiVersion = "v1"
attestationPath = "deposits/getByHash"
defaultAttestationTimeout = 5 * time.Second

// defaultCoolDownDurationSec defines the default time to wait after getting rate limited.
// this value is only used if the 429 response does not contain the Retry-After header
defaultCoolDownDuration = 5 * time.Minute

// maxCoolDownDuration defines the maximum duration we can wait till firing the next request
maxCoolDownDuration = 10 * time.Minute

// defaultRequestInterval defines the rate in requests per second that the attestation API can be called.
// this is set according to the APIs documentated 10 requests per second rate limit.
defaultRequestInterval = 100 * time.Millisecond

// APIIntervalRateLimitDisabled is a special value to disable the rate limiting.
APIIntervalRateLimitDisabled = -1
// APIIntervalRateLimitDefault is a special value to select the default rate limit interval.
APIIntervalRateLimitDefault = 0
)

type attestationStatus string

const (
attestationStatusUnspecified attestationStatus = "NOTARIZATION_STATUS_UNSPECIFIED"
attestationStatusPending attestationStatus = "NOTARIZATION_STATUS_PENDING"
attestationStatusSubmitted attestationStatus = "NOTARIZATION_STATUS_SUBMITTED"
attestationStatusSessionApproved attestationStatus = "NOTARIZATION_STATUS_SESSION_APPROVED"
attestationStatusFailed attestationStatus = "NOTARIZATION_STATUS_FAILED"
)

var (
ErrUnknownResponse = errors.New("unexpected response from attestation API")
)

type TokenDataReader struct {
lggr logger.Logger
httpClient http.IHttpClient
attestationApi *url.URL
attestationApiTimeout time.Duration
lbtcTokenAddress common.Address
rate *rate.Limiter

// coolDownUntil defines whether requests are blocked or not.
coolDownUntil time.Time
coolDownMu *sync.RWMutex
}

type messageAttestationResponse struct {
MessageHash string `json:"message_hash"`
Status attestationStatus `json:"status"`
Attestation string `json:"attestation"`
}

// TODO: Adjust after checking API docs
type attestationResponse struct {
Attestations []messageAttestationResponse `json:"attestations"`
}

// TODO: Implement encoding/decoding

var _ tokendata.Reader = &TokenDataReader{}

func NewLBTCTokenDataReader(
lggr logger.Logger,
lbtcAttestationApi *url.URL,
lbtcAttestationApiTimeoutSeconds int,
lbtcTokenAddress common.Address,
requestInterval time.Duration,
) *TokenDataReader {
timeout := time.Duration(lbtcAttestationApiTimeoutSeconds) * time.Second
if lbtcAttestationApiTimeoutSeconds == 0 {
timeout = defaultAttestationTimeout
}

if requestInterval == APIIntervalRateLimitDisabled {
requestInterval = 0
} else if requestInterval == APIIntervalRateLimitDefault {
requestInterval = defaultRequestInterval
}

return &TokenDataReader{
lggr: lggr,
httpClient: http.NewObservedIHttpClient(&http.HttpClient{}),
attestationApi: lbtcAttestationApi,
attestationApiTimeout: timeout,
lbtcTokenAddress: lbtcTokenAddress,
coolDownMu: &sync.RWMutex{},
rate: rate.NewLimiter(rate.Every(requestInterval), 1),
}
}

func NewLBTCTokenDataReaderWithHttpClient(
origin TokenDataReader,
httpClient http.IHttpClient,
lbtcTokenAddress common.Address,
requestInterval time.Duration,
) *TokenDataReader {
return &TokenDataReader{
lggr: origin.lggr,
httpClient: httpClient,
attestationApi: origin.attestationApi,
attestationApiTimeout: origin.attestationApiTimeout,
coolDownMu: origin.coolDownMu,
lbtcTokenAddress: lbtcTokenAddress,
rate: rate.NewLimiter(rate.Every(requestInterval), 1),
}
}

// ReadTokenData queries the LBTC attestation API.
func (s *TokenDataReader) ReadTokenData(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) ([]byte, error) {
if tokenIndex < 0 || tokenIndex >= len(msg.TokenAmounts) {
return nil, fmt.Errorf("token index out of bounds")
}

if s.inCoolDownPeriod() {
// rate limiting cool-down period, we prevent new requests from being sent
return nil, tokendata.ErrRequestsBlocked
}

if s.rate != nil {
// Wait blocks until it the attestation API can be called or the
// context is Done.
if waitErr := s.rate.Wait(ctx); waitErr != nil {
return nil, fmt.Errorf("lbtc rate limiting error: %w", waitErr)
}
}

messageBody, err := s.getLBTCMessageBody(ctx, msg, tokenIndex)
if err != nil {
return []byte{}, errors.Wrap(err, "failed getting the LBTC message body")
}

msgID := hexutil.Encode(msg.MessageID[:])
messageBodyHash := sha256.Sum256(messageBody)
messageBodyHashHex := hexutil.Encode(messageBodyHash[:])
s.lggr.Infow("Calling attestation API", "messageBodyHash", messageBodyHashHex, "messageID", msgID)

attestationResp, err := s.callAttestationApi(ctx, messageBodyHash)
if err != nil {
return nil, err
}
if attestationResp.Attestations == nil || len(attestationResp.Attestations) == 0 {
return nil, errors.New("attestation response is empty")
}
if len(attestationResp.Attestations) > 1 {
s.lggr.Warnw("Multiple attestations received, expected one", "attestations", attestationResp.Attestations)
}
var attestation messageAttestationResponse
for _, attestationCandidate := range attestationResp.Attestations {
if attestationCandidate.MessageHash == messageBodyHashHex {
attestation = attestationCandidate
}
}
s.lggr.Infow("Got response from attestation API", "messageID", msgID,
"attestationStatus", attestation.Status, "attestation", attestation)
switch attestation.Status {
case attestationStatusSessionApproved:
messageAndAttestation, err := encodeMessageAndAttestation(messageBody, attestation.Attestation)
if err != nil {
return nil, fmt.Errorf("failed to encode messageAndAttestation : %w", err)
}
return messageAndAttestation, nil
case attestationStatusPending:
return nil, tokendata.ErrNotReady
case attestationStatusSubmitted:
return nil, tokendata.ErrNotReady
default:
s.lggr.Errorw("Unexpected response from attestation API", "attestation", attestation)
return nil, ErrUnknownResponse
}
}

func (s *TokenDataReader) getLBTCMessageBody(ctx context.Context, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, tokenIndex int) ([]byte, error) {
return nil, nil
}

func (s *TokenDataReader) callAttestationApi(ctx context.Context, lbtcMessageHash [32]byte) (attestationResponse, error) {
_, _, _, err := s.httpClient.Get(ctx, "", s.attestationApiTimeout)
switch {
case errors.Is(err, tokendata.ErrRateLimit):
s.setCoolDownPeriod(defaultCoolDownDuration)
return attestationResponse{}, tokendata.ErrRateLimit
case err != nil:
return attestationResponse{}, err
}
return attestationResponse{}, nil
}

func encodeMessageAndAttestation(messageBody []byte, attestation string) ([]byte, error) {
return nil, nil
}

func (s *TokenDataReader) setCoolDownPeriod(d time.Duration) {
s.coolDownMu.Lock()
if d > maxCoolDownDuration {
d = maxCoolDownDuration
}
s.coolDownUntil = time.Now().Add(d)
s.coolDownMu.Unlock()
}

func (s *TokenDataReader) inCoolDownPeriod() bool {
s.coolDownMu.RLock()
defer s.coolDownMu.RUnlock()
return time.Now().Before(s.coolDownUntil)
}

func (s *TokenDataReader) Close() error {
return nil
}
Loading

0 comments on commit 8c94ed4

Please sign in to comment.