Skip to content

Commit

Permalink
PublishBlock before storing "payload delivery" in data API (#79)
Browse files Browse the repository at this point in the history
* publihBlock before storing payload delivery in data API

* add metrics to PublishBlock beacon call

* use context.Background() in PutDelivered

* rename beacon metric enpoints

* add method to beacon metrics

* avoid variables in metrics
  • Loading branch information
aratz-lasa authored Jan 27, 2023
1 parent 91e5c06 commit 82fa0a0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
22 changes: 18 additions & 4 deletions pkg/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ func (b *beaconClient) GetProposerDuties(epoch structs.Epoch) (*RegisteredPropos
// https://ethereum.github.io/beacon-APIs/#/Validator/getProposerDuties
u.Path = fmt.Sprintf("/eth/v1/validator/duties/proposer/%d", epoch)
resp := new(RegisteredProposersResponse)

t := prometheus.NewTimer(b.m.Timing.WithLabelValues("/eth/v1/validator/duties/proposer", "GET"))
defer t.ObserveDuration()

err := b.queryBeacon(&u, "GET", resp)
return resp, err
}
Expand All @@ -285,6 +289,10 @@ func (b *beaconClient) SyncStatus() (*SyncStatusPayloadData, error) {
// https://ethereum.github.io/beacon-APIs/#/ValidatorRequiredApi/getSyncingStatus
u.Path = "/eth/v1/node/syncing"
resp := new(SyncStatusPayload)

t := prometheus.NewTimer(b.m.Timing.WithLabelValues("/eth/v1/node/syncing", "GET"))
defer t.ObserveDuration()

err := b.queryBeacon(&u, "GET", resp)
if err != nil {
return nil, err
Expand All @@ -299,6 +307,9 @@ func (b *beaconClient) KnownValidators(headSlot structs.Slot) (AllValidatorsResp
q.Add("status", "active,pending")
u.RawQuery = q.Encode()

t := prometheus.NewTimer(b.m.Timing.WithLabelValues("/eth/v1/beacon/states/validators", "GET"))
defer t.ObserveDuration()

var vd AllValidatorsResponse
err := b.queryBeacon(&u, "GET", &vd)

Expand All @@ -309,6 +320,9 @@ func (b *beaconClient) Genesis() (structs.GenesisInfo, error) {
resp := new(GenesisResponse)
u := *b.beaconEndpoint
// https://ethereum.github.io/beacon-APIs/#/ValidatorRequiredApi/getSyncingStatus
t := prometheus.NewTimer(b.m.Timing.WithLabelValues("/eth/v1/beacon/genesis", "GET"))
defer t.ObserveDuration()

u.Path = "/eth/v1/beacon/genesis"
err := b.queryBeacon(&u, "GET", &resp)
return resp.Data, err
Expand All @@ -320,6 +334,9 @@ func (b *beaconClient) PublishBlock(block *types.SignedBeaconBlock) error {
return fmt.Errorf("fail to marshal block: %w", err)
}

t := prometheus.NewTimer(b.m.Timing.WithLabelValues("/eth/v1/beacon/blocks", "POST"))
defer t.ObserveDuration()

resp, err := http.Post(b.beaconEndpoint.String()+"/eth/v1/beacon/blocks", "application/json", bytes.NewBuffer(bb))
if err != nil {
return fmt.Errorf("fail to publish block: %w", err)
Expand Down Expand Up @@ -356,7 +373,7 @@ func (b *beaconClient) initMetrics() {
Subsystem: "beacon",
Name: "timing",
Help: "Duration of requests per endpoint",
}, []string{"endpoint"})
}, []string{"endpoint", "method"})
}

func (b *beaconClient) AttachMetrics(m *metrics.Metrics) {
Expand All @@ -370,9 +387,6 @@ func (b *beaconClient) queryBeacon(u *url.URL, method string, dst any) error {
}
req.Header.Set("accept", "application/json")

t := prometheus.NewTimer(b.m.Timing.WithLabelValues(u.String()))
defer t.ObserveDuration()

resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("client refused for %s: %w", u, err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,6 @@ func (rs *Relay) GetPayload(ctx context.Context, m *structs.MetricGroup, payload

// defer put delivered datastore write
go func(rs *Relay, slot structs.Slot, trace structs.DeliveredTrace) {
if err := rs.d.PutDelivered(ctx, slot, trace, rs.config.TTL); err != nil {
logger.WithError(err).Warn("failed to set payload after delivery")
}

if rs.config.PublishBlock {
beaconBlock := structs.SignedBlindedBeaconBlockToBeaconBlock(payloadRequest, payload.Payload.Data)
if err := rs.beacon.PublishBlock(beaconBlock); err != nil {
Expand All @@ -286,6 +282,10 @@ func (rs *Relay) GetPayload(ctx context.Context, m *structs.MetricGroup, payload
logger.Info("published block to beacon node")
}
}

if err := rs.d.PutDelivered(context.Background(), slot, trace, rs.config.TTL); err != nil {
logger.WithError(err).Warn("failed to set payload after delivery")
}
}(rs, structs.Slot(payloadRequest.Message.Slot), trace)

logger.With(log.F{
Expand Down

0 comments on commit 82fa0a0

Please sign in to comment.