diff --git a/relayer/relays/beacon/header/header.go b/relayer/relays/beacon/header/header.go index e7dca2c828..81735284e2 100644 --- a/relayer/relays/beacon/header/header.go +++ b/relayer/relays/beacon/header/header.go @@ -510,6 +510,35 @@ func (h *Header) FetchExecutionProof(blockRoot common.Hash, instantVerification } +func (h *Header) CheckHeaderFinalized(blockRoot common.Hash, instantVerification bool) error { + header, err := h.syncer.Client.GetHeaderByBlockRoot(blockRoot) + if err != nil { + return fmt.Errorf("get beacon header by blockRoot: %w", err) + } + lastFinalizedHeaderState, err := h.writer.GetLastFinalizedHeaderState() + if err != nil { + return fmt.Errorf("fetch last finalized header state: %w", err) + } + + // The latest finalized header on-chain is older than the header containing the message, so we need to sync the + // finalized header with the message. + finalizedHeader, err := h.syncer.GetFinalizedHeader() + if err != nil { + return err + } + + // If the header is not finalized yet, we can't do anything further. + if header.Slot > uint64(finalizedHeader.Slot) { + return fmt.Errorf("chain not finalized yet: %w", ErrBeaconHeaderNotFinalized) + } + + if header.Slot > lastFinalizedHeaderState.BeaconSlot && !instantVerification { + return fmt.Errorf("on-chain header not recent enough and instantVerification is off: %w", ErrBeaconHeaderNotFinalized) + } + + return nil +} + func (h *Header) isInitialSyncPeriod() bool { initialPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.InitialCheckpointSlot) lastFinalizedPeriod := h.protocol.ComputeSyncPeriodAtSlot(h.cache.Finalized.LastSyncedSlot) diff --git a/relayer/relays/execution/main.go b/relayer/relays/execution/main.go index ac45d8f8d8..b2cbc74f43 100644 --- a/relayer/relays/execution/main.go +++ b/relayer/relays/execution/main.go @@ -141,21 +141,20 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error { blockNumber, err := ethconn.Client().BlockNumber(ctx) if err != nil { - return fmt.Errorf("geOkayt last block number: %w", err) + return fmt.Errorf("get last block number: %w", err) } - events, err := r.findEvents(ctx, blockNumber, paraNonce+1) - if err != nil { - return fmt.Errorf("find events: %w", err) + events, err := r.findFinalizedEvents(ctx, blockNumber, paraNonce+1) + if errors.Is(err, header.ErrBeaconHeaderNotFinalized) { + log.WithField("nonce", events[0].Nonce).Info("beacon header not finalized yet") + continue + } else if err != nil { + return fmt.Errorf("find finalized events: %w", err) } for _, ev := range events { - err = r.waitAndSend(ctx, ev) - switch { - case errors.Is(err, header.ErrBeaconHeaderNotFinalized): - log.WithField("nonce", ev.Nonce).Info("beacon header not finalized yet") - break - case err != nil: + err := r.waitAndSend(ctx, ev) + if err != nil { return fmt.Errorf("submit message: %w", err) } } @@ -237,6 +236,19 @@ func (r *Relay) fetchEthereumNonce(ctx context.Context) (uint64, error) { const BlocksPerQuery = 4096 +func (r *Relay) findFinalizedEvents( + ctx context.Context, + blockNumber uint64, + start uint64, +) ([]*contracts.GatewayOutboundMessageAccepted, error) { + events, err := r.findEvents(ctx, blockNumber, start) + if err != nil { + return []*contracts.GatewayOutboundMessageAccepted{}, fmt.Errorf("find events: %w", err) + } + + return events, r.isInFinalizedBlock(ctx, events) +} + func (r *Relay) findEvents( ctx context.Context, latestFinalizedBlockNumber uint64, @@ -359,37 +371,31 @@ func (r *Relay) makeInboundMessage( } func (r *Relay) waitAndSend(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) (err error) { - var paraNonce uint64 ethNonce := ev.Nonce waitingPeriod := (ethNonce + r.config.Schedule.TotalRelayerCount - r.config.Schedule.ID) % r.config.Schedule.TotalRelayerCount log.WithFields(logrus.Fields{ - "ethNonce": ethNonce, - "relayerCount": r.config.Schedule.TotalRelayerCount, - "relayerID": r.config.Schedule.ID, "waitingPeriod": waitingPeriod, - }).Info("relayer decentralization details") + }).Info("relayer waiting period") var cnt uint64 for { - log.Info("checking if message should be processed") - paraNonce, err = r.fetchLatestParachainNonce() + // Check the nonce again in case another relayer processed the message while this relayer downloading beacon state + isProcessed, err := r.isMessageProcessed(ev.Nonce) if err != nil { - return fmt.Errorf("fetch latest parachain nonce: %w", err) + return fmt.Errorf("is message procssed: %w", err) } - if ethNonce <= paraNonce { - log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped") + // If the message is already processed we shouldn't try to submit it again + if isProcessed { return nil } if cnt == waitingPeriod { - log.WithField("cnt", cnt).Info("waiting period done") break } - log.Info("sleeping...") + log.Info(fmt.Sprintf("sleeping for %d seconds.", time.Duration(r.config.Schedule.SleepInterval))) + time.Sleep(time.Duration(r.config.Schedule.SleepInterval) * time.Second) - log.Info("done sleeping...") cnt++ } - err = r.doSubmit(ctx, ev) if err != nil { return fmt.Errorf("submit inbound message: %w", err) @@ -399,7 +405,6 @@ func (r *Relay) waitAndSend(ctx context.Context, ev *contracts.GatewayOutboundMe } func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) error { - log.Info("getting messages") inboundMsg, err := r.makeInboundMessage(ctx, r.headerCache, ev) if err != nil { return fmt.Errorf("make outgoing message: %w", err) @@ -424,7 +429,6 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa } // ParentBeaconRoot in https://eips.ethereum.org/EIPS/eip-4788 from Deneb onward - log.Info("getting execution proof") proof, err := r.beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, r.config.InstantVerification) if errors.Is(err, header.ErrBeaconHeaderNotFinalized) { return err @@ -433,13 +437,13 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa return fmt.Errorf("fetch execution header proof: %w", err) } - paraNonce, err := r.fetchLatestParachainNonce() + // Check the nonce again in case another relayer processed the message while this relayer downloading beacon state + isProcessed, err := r.isMessageProcessed(ev.Nonce) if err != nil { - return fmt.Errorf("fetch latest parachain nonce: %w", err) + return fmt.Errorf("is message procssed: %w", err) } - // Check the nonce again in case another relayer processed the message while this relayer downloading beacon state - if ev.Nonce <= paraNonce { - log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped") + // If the message is already processed we shouldn't try to submit it again + if isProcessed { return nil } @@ -448,7 +452,7 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa return fmt.Errorf("write to parachain: %w", err) } - paraNonce, err = r.fetchLatestParachainNonce() + paraNonce, err := r.fetchLatestParachainNonce() if err != nil { return fmt.Errorf("fetch latest parachain nonce: %w", err) } @@ -459,3 +463,33 @@ func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessa return nil } + +func (r *Relay) isMessageProcessed(eventNonce uint64) (bool, error) { + paraNonce, err := r.fetchLatestParachainNonce() + if err != nil { + return false, fmt.Errorf("fetch latest parachain nonce: %w", err) + } + // Check the nonce again in case another relayer processed the message while this relayer downloading beacon state + if eventNonce <= paraNonce { + log.WithField("nonce", paraNonce).Info("message picked up by another relayer, skipped") + return true, nil + } + + return false, nil +} + +func (r *Relay) isInFinalizedBlock(ctx context.Context, events []*contracts.GatewayOutboundMessageAccepted) error { + if len(events) > 0 { + return nil + } + firstEvent := events[0] + + nextBlockNumber := new(big.Int).SetUint64(firstEvent.Raw.BlockNumber + 1) + + blockHeader, err := r.ethconn.Client().HeaderByNumber(ctx, nextBlockNumber) + if err != nil { + return fmt.Errorf("get block header: %w", err) + } + + return r.beaconHeader.CheckHeaderFinalized(*blockHeader.ParentBeaconRoot, r.config.InstantVerification) +}