Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync from rpc #23

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ dist

test/gethData
test/coverage.out
test/data
7 changes: 7 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ func main() {
Action: start,
Flags: []cli.Flag{&configFileFlag},
},
{
Name: "rpc",
Aliases: []string{},
Usage: fmt.Sprintf("Sync data form RPC %v", appName),
Action: startSyncRpc,
Flags: []cli.Flag{&configFileFlag},
},
}

err := app.Run(os.Args)
Expand Down
57 changes: 57 additions & 0 deletions cmd/main_xlayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"github.com/0xPolygon/cdk-data-availability/config"
"github.com/0xPolygon/cdk-data-availability/db"
"github.com/0xPolygon/cdk-data-availability/log"
"github.com/0xPolygon/cdk-data-availability/rpc"
"github.com/0xPolygon/cdk-data-availability/rpcsyncer"
"github.com/0xPolygon/cdk-data-availability/services/sync"
"github.com/urfave/cli/v2"
)

func startSyncRpc(cliCtx *cli.Context) error {
// Load config
c, err := config.Load(cliCtx)
if err != nil {
panic(err)
}
setupLog(c.Log)

// Prepare DB
pg, err := db.InitContext(cliCtx.Context, c.DB)
if err != nil {
log.Fatal(err)
}

if err = db.RunMigrationsUp(pg); err != nil {
log.Fatal(err)
}

storage := db.New(pg)

var cancelFuncs []context.CancelFunc

syncer := rpcsyncer.NewRPCSyncer(c.L2RpcURL, c.MaxBatchSize, c.IntervalTime.Duration, storage)
go syncer.Start(cliCtx.Context)
cancelFuncs = append(cancelFuncs, syncer.Stop)
// Register services
server := rpc.NewServer(
c.RPC,
[]rpc.Service{
{
Name: sync.APISYNC,
Service: sync.NewSyncEndpoints(storage),
},
},
)

// Run!
if err := server.Start(); err != nil {
log.Fatal(err)
}

waitSignal(cancelFuncs)
return nil
}
13 changes: 8 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ const (

// Config represents the full configuration of the data node
type Config struct {
PrivateKey types.KeystoreFileConfig
DB db.Config
Log log.Config
RPC rpc.Config
L1 L1Config
L2RpcURL string `mapstructure:"L2RpcURL"`
MaxBatchSize uint64 `mapstructure:"MaxBatchSize"`
IntervalTime types.Duration `mapstructure:"IntervalTime"`
PrivateKey types.KeystoreFileConfig
DB db.Config
Log log.Config
RPC rpc.Config
L1 L1Config

PermitApiAddress common.Address `mapstructure:"PermitApiAddress"`
}
Expand Down
72 changes: 72 additions & 0 deletions rpcsyncer/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package rpcsyncer

import (
"encoding/json"
"fmt"

"github.com/0xPolygon/cdk-data-availability/rpc"
"github.com/0xPolygon/cdk-data-availability/types"
)

// BatchData is an abbreviated structure that only contains the number and L2 batch data
type BatchData struct {
Number types.ArgUint64 `json:"number"`
BatchL2Data types.ArgBytes `json:"batchL2Data,omitempty"`
Empty bool `json:"empty"`
}

// BatchDataResult is a list of BatchData for a BatchFilter
type BatchDataResult struct {
Data []*BatchData `json:"data"`
}

func BatchesByNumbers(url string, from, to uint64) ([]*BatchData, error) {

Check failure on line 23 in rpcsyncer/batch.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported function BatchesByNumbers should have comment or be unexported (revive)
var batchNumbers []string
for i := from; i <= to; i++ {
batchNumbers = append(batchNumbers, fmt.Sprintf("%v", i))
}

foo := make(map[string][]string, 0)
foo["numbers"] = batchNumbers // nolint: gosec
response, err := rpc.JSONRPCCall(url, "zkevm_getBatchDataByNumbers", foo)
if err != nil {
return nil, err
}

if response.Error != nil {

return nil, fmt.Errorf("%d - %s", response.Error.Code, response.Error.Message)
}

var result *BatchDataResult
err = json.Unmarshal(response.Result, &result)
if err != nil {
return nil, err
}

return result.Data, nil
}

func BatchNumber(url string) (uint64, error) {

Check failure on line 50 in rpcsyncer/batch.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported function BatchNumber should have comment or be unexported (revive)
response, err := rpc.JSONRPCCall(url, "zkevm_batchNumber")
if err != nil {
return 0, err
}

if response.Error != nil {

return 0, fmt.Errorf("%d - %s", response.Error.Code, response.Error.Message)
}

var result string
err = json.Unmarshal(response.Result, &result)
if err != nil {
return 0, err
}

bigBatchNumber := DecodeBig(result)

batchNumber := bigBatchNumber.Uint64()

return batchNumber, nil
}
121 changes: 121 additions & 0 deletions rpcsyncer/hex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package rpcsyncer

import (
"encoding/hex"
"fmt"
"math/big"
"strconv"
"strings"
)

const (
// Base represents the hexadecimal base, which is 16
Base = 16

// BitSize64 64 bits
BitSize64 = 64
)

// DecError represents an error when decoding a hex value
type DecError struct{ msg string }

func (err DecError) Error() string { return err.msg }

// EncodeToHex generates a hex string based on the byte representation, with the '0x' prefix
func EncodeToHex(str []byte) string {
return "0x" + hex.EncodeToString(str)
}

// EncodeToString is a wrapper method for hex.EncodeToString
func EncodeToString(str []byte) string {
return hex.EncodeToString(str)
}

// DecodeString returns the byte representation of the hexadecimal string
func DecodeString(str string) ([]byte, error) {
return hex.DecodeString(str)
}

// DecodeHex converts a hex string to a byte array
func DecodeHex(str string) ([]byte, error) {
str = strings.TrimPrefix(str, "0x")

// Check if the string has an odd length
if len(str)%2 != 0 {
// Prepend a '0' to make it even-length
str = "0" + str
}

return hex.DecodeString(str)
}

// MustDecodeHex type-checks and converts a hex string to a byte array
func MustDecodeHex(str string) []byte {
buf, err := DecodeHex(str)
if err != nil {
panic(fmt.Errorf("could not decode hex: %v", err))
}

return buf
}

// DecodeUint64 type-checks and converts a hex string to a uint64
func DecodeUint64(str string) uint64 {
i := DecodeBig(str)
return i.Uint64()
}

// EncodeUint64 encodes a number as a hex string with 0x prefix.
func EncodeUint64(i uint64) string {
enc := make([]byte, 2, 10) //nolint:gomnd
copy(enc, "0x")
return string(strconv.AppendUint(enc, i, Base))
}

// BadNibble is a nibble that is bad
const BadNibble = ^uint64(0)

// DecodeNibble decodes a byte into a uint64
func DecodeNibble(in byte) uint64 {
switch {
case in >= '0' && in <= '9':
return uint64(in - '0')
case in >= 'A' && in <= 'F':
return uint64(in - 'A' + 10) //nolint:gomnd
case in >= 'a' && in <= 'f':
return uint64(in - 'a' + 10) //nolint:gomnd
default:
return BadNibble
}
}

// EncodeBig encodes bigint as a hex string with 0x prefix.
// The sign of the integer is ignored.
func EncodeBig(bigint *big.Int) string {
numBits := bigint.BitLen()
if numBits == 0 {
return "0x0"
}

return fmt.Sprintf("%#x", bigint)
}

// DecodeBig converts a hex number to a big.Int value
func DecodeBig(hexNum string) *big.Int {
str := strings.TrimPrefix(hexNum, "0x")
createdNum := new(big.Int)
createdNum.SetString(str, Base)

return createdNum
}

// IsValid checks if the provided string is a valid hexadecimal value
func IsValid(s string) bool {
str := strings.TrimPrefix(s, "0x")
for _, b := range []byte(str) {
if !(b >= '0' && b <= '9' || b >= 'a' && b <= 'f' || b >= 'A' && b <= 'F') {
return false
}
}
return true
}
96 changes: 96 additions & 0 deletions rpcsyncer/rpc_syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package rpcsyncer

import (
"context"
"github.com/ethereum/go-ethereum/crypto"
"time"

"github.com/0xPolygon/cdk-data-availability/db"
"github.com/0xPolygon/cdk-data-availability/log"
"github.com/0xPolygon/cdk-data-availability/types"
)

type RPCSyncer struct {

Check failure on line 13 in rpcsyncer/rpc_syncer.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type RPCSyncer should have comment or be unexported (revive)
l2RPCUrl string
maxBatchSize uint64
intervalTime time.Duration
db db.DB

stop chan struct{}
}

func NewRPCSyncer(l2RPCUrl string, maxBatchSize uint64, intervalTime time.Duration, db db.DB) *RPCSyncer {

Check failure on line 22 in rpcsyncer/rpc_syncer.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported function NewRPCSyncer should have comment or be unexported (revive)
rpcSyncer := &RPCSyncer{
l2RPCUrl: l2RPCUrl,
maxBatchSize: maxBatchSize,
intervalTime: intervalTime,
db: db,
stop: make(chan struct{}),
}

return rpcSyncer
}

// Start starts the SequencerTracker
func (syncer *RPCSyncer) Start(ctx context.Context) {
log.Infof("starting rpc syncer")

start, _ := getStartBlock(syncer.db)

for {
select {
case <-ctx.Done():
if ctx.Err() != nil && ctx.Err() != context.DeadlineExceeded {
log.Warnf("context cancelled: %v", ctx.Err())
}
default:
time.Sleep(syncer.intervalTime)
l2MaxBatch, err := BatchNumber(syncer.l2RPCUrl)
if err != nil {
log.Fatal("error getting max batch: %v", err)
}
log.Infof("Starting from batch %v, max batch %v", start, l2MaxBatch)
if start > l2MaxBatch {
log.Infof("Sleep 30 seconds")
time.Sleep(30 * time.Second)

Check failure on line 55 in rpcsyncer/rpc_syncer.go

View workflow job for this annotation

GitHub Actions / lint

mnd: Magic number: 30, in <argument> detected (gomnd)
}

to := start + syncer.maxBatchSize - 1
if to > l2MaxBatch && to > syncer.maxBatchSize {
to = l2MaxBatch
start = to - syncer.maxBatchSize + 1
}
log.Infof("Calling ..")
seqBatches, err := BatchesByNumbers(syncer.l2RPCUrl, start, to)
if err != nil {
log.Fatal("error getting batch data: %v", err)
}

log.Infof("Hashing ..")
offChainData := []types.OffChainData{}
for _, seqBatch := range seqBatches {
key := crypto.Keccak256Hash(seqBatch.BatchL2Data)

offChainData = append(offChainData, types.OffChainData{
Key: key,
Value: seqBatch.BatchL2Data,
})
}
log.Infof("Storing ..")
err = setStoreOffChainData(syncer.db, offChainData)
if err != nil {
log.Fatal("error storing off chain data: %v", err)
}
log.Infof("Stored data for batchs [%v,%v], size:%v", start, to, len(offChainData))
if setStartBlock(syncer.db, to) != nil {
log.Fatal("error setting start block: %v", err)
}
start = to + 1
}
}
}

// Stop stops the SequencerTracker
func (st *RPCSyncer) Stop() {
close(st.stop)
}
Loading
Loading