Skip to content

Commit

Permalink
livestream fixes (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
hamdiallam authored Oct 28, 2024
1 parent 5c7d64f commit e67759e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 28 deletions.
2 changes: 1 addition & 1 deletion anvil/anvil.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (a *Anvil) SimulatedLogs(ctx context.Context, tx *types.Transaction) ([]typ

txArgs := txArgs{From: from, To: tx.To(), Gas: hexutil.Uint64(tx.Gas()), GasPrice: (*hexutil.Big)(tx.GasPrice()), Data: tx.Data(), Value: (*hexutil.Big)(tx.Value())}
result := callFrame{}
if err = a.rpcClient.CallContext(ctx, &result, "debug_traceCall", txArgs, "latest", logTracerParams); err != nil {
if err := a.rpcClient.CallContext(ctx, &result, "debug_traceCall", txArgs, "latest", logTracerParams); err != nil {
return nil, err
}

Expand Down
24 changes: 10 additions & 14 deletions interop/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func (i *L2ToL2MessageIndexer) processEventLog(ctx context.Context, backend ethe
sentMessageEventId := bindings.L2ToL2CrossDomainMessengerParsedABI.Events["SentMessage"].ID
failedRelayedMessageEventId := bindings.L2ToL2CrossDomainMessengerParsedABI.Events["FailedRelayedMessage"].ID

if log.Topics[0] == sentMessageEventId {
switch log.Topics[0] {
case sentMessageEventId:
identifier, err := getIdentifier(ctx, backend, chainID, log)
if err != nil {
return fmt.Errorf("failed to get log identifier: %w", err)
Expand All @@ -128,40 +129,35 @@ func (i *L2ToL2MessageIndexer) processEventLog(ctx context.Context, backend ethe
return fmt.Errorf("failed to handle SentMessage event: %w", err)
}

i.logMessageEvent("SentMessage", entry)

i.logMessageEvent("SentMessage", entry, log)
i.eb.Publish(sentMessageFromSourceKey(entry.message.Source), entry)
i.eb.Publish(sentMessageToDestinationKey(entry.message.Destination), entry)

} else if log.Topics[0] == relayedMessageEventId {
case relayedMessageEventId:
entry, err := i.storeManager.HandleRelayedEvent(log)
if err != nil {
return fmt.Errorf("failed to handle RelayedMessage event: %w", err)
}

i.logMessageEvent("RelayedMessage", entry)

i.logMessageEvent("RelayedMessage", entry, log)
i.eb.Publish(relayedMessageToDestinationKey(entry.message.Destination), entry)
} else if log.Topics[0] == failedRelayedMessageEventId {
case failedRelayedMessageEventId:
entry, err := i.storeManager.HandleFailedRelayedEvent(log)
if err != nil {
return fmt.Errorf("failed to handle FailedRelayedMessage event: %w", err)
}

i.logMessageEvent("FailedRelayedMessage", entry)

i.logMessageEvent("FailedRelayedMessage", entry, log)
i.eb.Publish(failedRelayedMessageToDestinationKey(entry.message.Destination), entry)

} else {
default:
return fmt.Errorf("unexpected event type: %x", log.Topics[0])
}

return nil
}

func (i *L2ToL2MessageIndexer) logMessageEvent(eventName string, entry *L2ToL2MessageStoreEntry) {
func (i *L2ToL2MessageIndexer) logMessageEvent(eventName string, entry *L2ToL2MessageStoreEntry, log *types.Log) {
msg := entry.Message()
i.log.Info(fmt.Sprintf("L2ToL2CrossChainMessenger#%s", eventName), "sourceChainID", msg.Source, "destinationChainID", msg.Destination, "nonce", msg.Nonce, "sender", msg.Sender, "target", msg.Target)
i.log.Info(fmt.Sprintf("L2ToL2CrossChainMessenger#%s", eventName), "sourceChainID", msg.Source, "destinationChainID", msg.Destination, "nonce", msg.Nonce, "sender", msg.Sender, "target", msg.Target, "txHash", log.TxHash.String())
}

func (i *L2ToL2MessageIndexer) createSubscription(key string, messageChan chan<- *L2ToL2MessageStoreEntry) (func(), error) {
Expand Down
4 changes: 2 additions & 2 deletions interop/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ func (r *L2ToL2MessageRelayer) Start(indexer *L2ToL2MessageIndexer, clients map[
unsubscribe, err := r.l2ToL2MessageIndexer.SubscribeSentMessageToDestination(destinationChainID, sentMessageCh)

if err != nil {
r.logger.Debug("failed to create transactor", "err", err)
r.logger.Debug("failed to create transactor", "err", err)
return fmt.Errorf("failed to subscribe to sent message events: %w", err)
}

transactor, err := bind.NewKeyedTransactorWithChainID(privateKey, big.NewInt(int64(destinationChainID)))
if err != nil {
r.logger.Debug("failed to create transactor", "err", err)
r.logger.Debug("failed to create transactor", "err", err)
return fmt.Errorf("failed to create transactor: %w", err)
}

Expand Down
28 changes: 17 additions & 11 deletions opsimulator/opsimulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, l1Chain,
return &OpSimulator{
Chain: l2Chain,

log: log,
log: log.New("chain.id", l2Chain.Config().ChainID),
port: port,
l1Chain: l1Chain,
crossL2Inbox: crossL2Inbox,
Expand Down Expand Up @@ -157,7 +157,8 @@ func (opSim *OpSimulator) startBackgroundTasks() {
if err := clnt.SendTransaction(opSim.bgTasksCtx, depTx); err != nil {
opSim.log.Error("failed to submit deposit tx to chain: %w", "chain.id", chainId, "err", err)
}
opSim.log.Debug("submitted deposit tx to chain", "chain.id", chainId, "hash", depTx.Hash().String())

opSim.log.Info("OptimismPortal#depositTransaction", "l2TxHash", depTx.Hash().String())

case <-opSim.bgTasksCtx.Done():
sub.Unsubscribe()
Expand Down Expand Up @@ -269,6 +270,7 @@ func (opSim *OpSimulator) startBackgroundTasks() {

func (opSim *OpSimulator) handler(ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

// setup an intermediate buffer so the request body is inspectable
var buf bytes.Buffer
body := io.TeeReader(r.Body, &buf)
Expand Down Expand Up @@ -303,10 +305,19 @@ func (opSim *OpSimulator) handler(ctx context.Context) http.HandlerFunc {
batchRes[i] = msg.errorResponse(err)
continue
}
if err := opSim.checkInteropInvariants(ctx, tx); err != nil {
opSim.log.Error("interop invariants not met", "err", err)
batchRes[i] = msg.errorResponse(&jsonError{Code: InvalidParams, Message: err.Error()})
txHash := tx.Hash()

// Simulate the tx. If this fails, we let it pass through with a warning
logs, err := opSim.SimulatedLogs(ctx, tx)
if err != nil {
opSim.log.Warn("failed to simulate transaction!!! filtering tx...", "err", err, "hash", txHash)
continue
} else {
if err := opSim.checkInteropInvariants(ctx, logs); err != nil {
opSim.log.Error("unable to statisfy interop invariants within transaction", "err", err, "hash", txHash)
batchRes[i] = msg.errorResponse(&jsonError{Code: InvalidParams, Message: err.Error()})
continue
}
}
}

Expand Down Expand Up @@ -363,12 +374,7 @@ func forwardRPCRequest(ctx context.Context, rpcClient *rpc.Client, req *jsonRpcM
return &jsonRpcMessage{Version: vsn, Result: result, ID: req.ID}, nil
}

func (opSim *OpSimulator) checkInteropInvariants(ctx context.Context, tx *types.Transaction) error {
logs, err := opSim.SimulatedLogs(ctx, tx)
if err != nil {
return fmt.Errorf("failed to simulate transaction: %w", err)
}

func (opSim *OpSimulator) checkInteropInvariants(ctx context.Context, logs []types.Log) error {
var executingMessages []*bindings.CrossL2InboxExecutingMessage
for _, log := range logs {
if !interop.IsExecutingMessageLog(&log) {
Expand Down

0 comments on commit e67759e

Please sign in to comment.