Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add consensus data to block events #118

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions domain/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/goccy/go-json"
"github.com/golang/protobuf/jsonpb"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
log "github.com/sirupsen/logrus"

"github.com/forta-network/forta-core-go/protocol"
Expand Down Expand Up @@ -72,6 +73,7 @@ type BlockEvent struct {
Logs []LogEntry
Traces []Trace
Timestamps *TrackingTimestamps
BeaconData interfaces.SignedBeaconBlock
}

func str(val *string) string {
Expand Down
4 changes: 2 additions & 2 deletions ens/ens.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func (ensstore *ENSStore) Resolve(input string) (common.Address, error) {
// Resolve the domain
address, err := resolver.Address()
if err != nil {
return ens.UnknownAddress, err
return common.Address(ens.UnknownAddress), err
}
if bytes.Equal(address.Bytes(), ens.UnknownAddress.Bytes()) {
return ens.UnknownAddress, errors.New("no address")
return common.Address(ens.UnknownAddress), errors.New("no address")
}
return address, nil
}
Expand Down
67 changes: 67 additions & 0 deletions ethereum/beacon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package ethereum

import (
"context"

"github.com/prysmaticlabs/prysm/v3/api/client/beacon"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v3/encoding/ssz/detect"
)

// BeaconClient is an interface encompassing all ethereum consensus actions
type BeaconClient interface {
GetBlock(ctx context.Context, slot beacon.StateOrBlockId) (interfaces.SignedBeaconBlock, error)
}

// beaconClient wraps an ethereum client purpose-built for communicating with consensus layer.
type beaconClient struct {
apiName string
url string

client *beacon.Client
}

func (c *beaconClient) GetBlock(ctx context.Context, slot beacon.StateOrBlockId) (interfaces.SignedBeaconBlock, error) {
// get beacon block data
bb, err := c.client.GetBlock(ctx, beacon.StateOrBlockId(slot))
if err != nil {
return nil, err
}

// unmarshal data with appropriate unmarshaler
// this depends on the slot. Pre-merge and Post-merge blocks are incompatible to use the same unmarshaler.
unmarshaler, err := c.getVersionedUnmarshaler(ctx, slot)
if err != nil {
return nil, err
}

b, err := unmarshaler.UnmarshalBeaconBlock(bb)
if err != nil {
return nil, err
}

return b, nil
}

func (c *beaconClient) getVersionedUnmarshaler(ctx context.Context, state beacon.StateOrBlockId) (*detect.VersionedUnmarshaler, error) {
f, err := c.client.GetFork(ctx, state)
if err != nil {
return nil, err
}

var version [4]byte
copy(version[:], f.CurrentVersion[:4])

return detect.FromForkVersion(version)
}

// NewBeaconClient creates a new ethereum client
func NewBeaconClient(apiName, url string) (BeaconClient, error) {
bClient, err := beacon.NewClient(url)
if err != nil {
return nil, err
}

return &beaconClient{apiName: apiName, url: url, client: bClient}, nil
}

78 changes: 47 additions & 31 deletions feeds/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

eth "github.com/ethereum/go-ethereum"
"github.com/goccy/go-json"
"github.com/prysmaticlabs/prysm/v3/api/client/beacon"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
log "github.com/sirupsen/logrus"

"github.com/forta-network/forta-core-go/clients/health"
Expand All @@ -26,23 +28,25 @@ type bfHandler struct {
}

type blockFeed struct {
start *big.Int
end *big.Int
offset int
ctx context.Context
client ethereum.Client
traceClient ethereum.Client
cache utils.Cache
chainID *big.Int
tracing bool
started bool
rateLimit *time.Ticker
maxBlockAge *time.Duration
start *big.Int
end *big.Int
offset int
ctx context.Context
client ethereum.Client
traceClient ethereum.Client
beaconClient ethereum.BeaconClient
cache utils.Cache
chainID *big.Int
tracing bool
started bool
rateLimit *time.Ticker
maxBlockAge *time.Duration

lastBlock health.MessageTracker

handlers []bfHandler
handlersMu sync.RWMutex
handlers []bfHandler
handlersMu sync.RWMutex
beaconSupport bool
}

type BlockFeedConfig struct {
Expand All @@ -52,6 +56,7 @@ type BlockFeedConfig struct {
ChainID *big.Int
RateLimit *time.Ticker
Tracing bool
BeaconSupport bool
SkipBlocksOlderThan *time.Duration
}

Expand Down Expand Up @@ -230,6 +235,14 @@ func (bf *blockFeed) forEachBlock() error {
}
}

var beaconData interfaces.SignedBeaconBlock
if bf.beaconSupport {
beaconData, err = bf.beaconClient.GetBlock(bf.ctx, beacon.StateOrBlockId(blockNumToAnalyze.String()))
if err != nil {
logger.WithError(err).Error("error getting beacon data")
}
}

if len(traces) > 0 && block.Hash != utils.String(traces[0].BlockHash) {
logger.WithFields(log.Fields{
"traceBlockHash": utils.String(traces[0].BlockHash),
Expand All @@ -250,11 +263,12 @@ func (bf *blockFeed) forEachBlock() error {
}

evt := &domain.BlockEvent{
EventType: domain.EventTypeBlock,
Block: block,
ChainID: bf.chainID,
Traces: traces,
Logs: logs,
EventType: domain.EventTypeBlock,
Block: block,
ChainID: bf.chainID,
Traces: traces,
Logs: logs,
BeaconData: beaconData,
Timestamps: &domain.TrackingTimestamps{
Block: *blockTs,
Feed: time.Now().UTC(),
Expand Down Expand Up @@ -300,22 +314,24 @@ func (bf *blockFeed) Health() health.Reports {
}
}

func NewBlockFeed(ctx context.Context, client ethereum.Client, traceClient ethereum.Client, cfg BlockFeedConfig) (*blockFeed, error) {
func NewBlockFeed(ctx context.Context, client ethereum.Client, traceClient ethereum.Client, beaconClient ethereum.BeaconClient, cfg BlockFeedConfig) (*blockFeed, error) {
if cfg.Offset < 0 {
return nil, fmt.Errorf("offset cannot be below zero: offset=%d", cfg.Offset)
}
bf := &blockFeed{
start: cfg.Start,
end: cfg.End,
offset: cfg.Offset,
ctx: ctx,
client: client,
traceClient: traceClient,
cache: utils.NewCache(10000),
chainID: cfg.ChainID,
tracing: cfg.Tracing,
rateLimit: cfg.RateLimit,
maxBlockAge: cfg.SkipBlocksOlderThan,
start: cfg.Start,
end: cfg.End,
offset: cfg.Offset,
ctx: ctx,
client: client,
traceClient: traceClient,
beaconClient: beaconClient,
cache: utils.NewCache(10000),
chainID: cfg.ChainID,
tracing: cfg.Tracing,
beaconSupport: cfg.BeaconSupport,
rateLimit: cfg.RateLimit,
maxBlockAge: cfg.SkipBlocksOlderThan,
}
return bf, nil
}
Loading