Skip to content

Commit

Permalink
supervisor: L1 Processor (ethereum-optimism#13206)
Browse files Browse the repository at this point in the history
* supervisor: L1 Processor

* comments ; test fixes

* Make L1 source separate from RPC Addr

* fix test

* Add atomic bool for singleton processor routine
  • Loading branch information
axelKingsley authored Dec 9, 2024
1 parent 53034d2 commit b613161
Show file tree
Hide file tree
Showing 11 changed files with 373 additions and 5 deletions.
6 changes: 4 additions & 2 deletions op-e2e/interop/supersystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
EnableAdmin: true,
},
L2RPCs: []string{},
L1RPC: s.l1.UserRPC().RPC(),
Datadir: path.Join(s.t.TempDir(), "supervisor"),
}
depSet := make(map[supervisortypes.ChainID]*depset.StaticConfigDependency)
Expand Down Expand Up @@ -536,10 +537,11 @@ func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) {
s.hdWallet = s.prepareHDWallet()
s.worldDeployment, s.worldOutput = s.prepareWorld(w)

// the supervisor and client are created first so that the L2s can use the supervisor
// L1 first so that the Supervisor and L2s can connect to it
s.beacon, s.l1 = s.prepareL1()

s.supervisor = s.prepareSupervisor()

s.beacon, s.l1 = s.prepareL1()
s.l2s = s.prepareL2s()

s.prepareContracts()
Expand Down
4 changes: 3 additions & 1 deletion op-supervisor/cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

var (
ValidL1RPC = "http://localhost:8545"
ValidL2RPCs = []string{"http;//localhost:8545"}
ValidDatadir = "./supervisor_test_datadir"
)
Expand All @@ -38,7 +39,7 @@ func TestLogLevel(t *testing.T) {
func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs())
depSet := &depset.JsonDependencySetLoader{Path: "test"}
defaultCfgTempl := config.NewConfig(ValidL2RPCs, depSet, ValidDatadir)
defaultCfgTempl := config.NewConfig(ValidL1RPC, ValidL2RPCs, depSet, ValidDatadir)
defaultCfg := *defaultCfgTempl
defaultCfg.Version = Version
require.Equal(t, defaultCfg, *cfg)
Expand Down Expand Up @@ -125,6 +126,7 @@ func toArgList(req map[string]string) []string {

func requiredArgs() map[string]string {
args := map[string]string{
"--l1-rpc": ValidL1RPC,
"--l2-rpcs": ValidL2RPCs[0],
"--dependency-set": "test",
"--datadir": ValidDatadir,
Expand Down
5 changes: 4 additions & 1 deletion op-supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Config struct {
// requiring manual triggers for the backend to process anything.
SynchronousProcessors bool

L1RPC string

L2RPCs []string
Datadir string
}
Expand All @@ -56,14 +58,15 @@ func (c *Config) Check() error {

// NewConfig creates a new config using default values whenever possible.
// Required options with no suitable default are passed as parameters.
func NewConfig(l2RPCs []string, depSet depset.DependencySetSource, datadir string) *Config {
func NewConfig(l1RPC string, l2RPCs []string, depSet depset.DependencySetSource, datadir string) *Config {
return &Config{
LogConfig: oplog.DefaultCLIConfig(),
MetricsConfig: opmetrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
RPC: oprpc.DefaultCLIConfig(),
DependencySetSource: depSet,
MockRun: false,
L1RPC: l1RPC,
L2RPCs: l2RPCs,
Datadir: datadir,
}
Expand Down
2 changes: 1 addition & 1 deletion op-supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ func validConfig() *Config {
panic(err)
}
// Should be valid using only the required arguments passed in via the constructor.
return NewConfig([]string{"http://localhost:8545"}, depSet, "./supervisor_testdir")
return NewConfig("http://localhost:8545", []string{"http://localhost:8545"}, depSet, "./supervisor_testdir")
}
7 changes: 7 additions & 0 deletions op-supervisor/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func prefixEnvVars(name string) []string {
}

var (
L1RPCFlag = &cli.StringFlag{
Name: "l1-rpc",
Usage: "L1 RPC source.",
EnvVars: prefixEnvVars("L1_RPC"),
}
L2RPCsFlag = &cli.StringSliceFlag{
Name: "l2-rpcs",
Usage: "L2 RPC sources.",
Expand All @@ -46,6 +51,7 @@ var (
)

var requiredFlags = []cli.Flag{
L1RPCFlag,
L2RPCsFlag,
DataDirFlag,
DependencySetFlag,
Expand Down Expand Up @@ -86,6 +92,7 @@ func ConfigFromCLI(ctx *cli.Context, version string) *config.Config {
RPC: oprpc.ReadCLIConfig(ctx),
DependencySetSource: &depset.JsonDependencySetLoader{Path: ctx.Path(DependencySetFlag.Name)},
MockRun: ctx.Bool(MockRunFlag.Name),
L1RPC: ctx.String(L1RPCFlag.Name),
L2RPCs: ctx.StringSlice(L2RPCsFlag.Name),
Datadir: ctx.Path(DataDirFlag.Name),
}
Expand Down
54 changes: 54 additions & 0 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type SupervisorBackend struct {
// chainDBs holds on to the DB indices for each chain
chainDBs *db.ChainsDB

// l1Processor watches for new L1 blocks, updates the local-safe DB, and kicks off derivation orchestration
l1Processor *processors.L1Processor

// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]

Expand Down Expand Up @@ -125,6 +128,14 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
su.chainProcessors.Set(chainID, chainProcessor)
}

if cfg.L1RPC != "" {
if err := su.attachL1RPC(ctx, cfg.L1RPC); err != nil {
return fmt.Errorf("failed to create L1 processor: %w", err)
}
} else {
su.logger.Warn("No L1 RPC configured, L1 processor will not be started")
}

// the config has some RPC connections to attach to the chain-processors
for _, rpc := range cfg.L2RPCs {
err := su.attachRPC(ctx, rpc)
Expand Down Expand Up @@ -230,6 +241,38 @@ func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src pr
return nil
}

func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string) error {
su.logger.Info("attaching L1 RPC to L1 processor", "rpc", l1RPCAddr)

logger := su.logger.New("l1-rpc", l1RPCAddr)
l1RPC, err := client.NewRPC(ctx, logger, l1RPCAddr)
if err != nil {
return fmt.Errorf("failed to setup L1 RPC: %w", err)
}
l1Client, err := sources.NewL1Client(
l1RPC,
su.logger,
nil,
// placeholder config for the L1
sources.L1ClientSimpleConfig(true, sources.RPCKindBasic, 100))
if err != nil {
return fmt.Errorf("failed to setup L1 Client: %w", err)
}
su.AttachL1Source(l1Client)
return nil
}

// attachL1Source attaches an L1 source to the L1 processor.
// If the L1 processor does not exist, it is created and started.
func (su *SupervisorBackend) AttachL1Source(source processors.L1Source) {
if su.l1Processor == nil {
su.l1Processor = processors.NewL1Processor(su.logger, su.chainDBs, source)
su.l1Processor.Start()
} else {
su.l1Processor.AttachClient(source)
}
}

func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
Expand All @@ -254,6 +297,11 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
return fmt.Errorf("failed to resume chains db: %w", err)
}

// start the L1 processor if it exists
if su.l1Processor != nil {
su.l1Processor.Start()
}

if !su.synchronousProcessors {
// Make all the chain-processors run automatic background processing
su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
Expand All @@ -278,6 +326,12 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
return errAlreadyStopped
}
su.logger.Info("Closing supervisor backend")

// stop the L1 processor
if su.l1Processor != nil {
su.l1Processor.Stop()
}

// close all processors
su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
su.logger.Info("stopping chain processor", "chainID", id)
Expand Down
2 changes: 2 additions & 0 deletions op-supervisor/supervisor/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestBackendLifetime(t *testing.T) {
require.NoError(t, err)
t.Log("initialized!")

l1Src := &testutils.MockL1Source{}
src := &testutils.MockL1Source{}

blockX := eth.BlockRef{
Expand All @@ -77,6 +78,7 @@ func TestBackendLifetime(t *testing.T) {
Time: blockX.Time + 2,
}

b.AttachL1Source(l1Src)
require.NoError(t, b.AttachProcessorSource(chainA, src))

require.FileExists(t, filepath.Join(cfg.Datadir, "900", "log.db"), "must have logs DB 900")
Expand Down
26 changes: 26 additions & 0 deletions op-supervisor/supervisor/backend/db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ func (db *ChainsDB) LatestBlockNum(chain types.ChainID) (num uint64, ok bool) {
return logDB.LatestSealedBlockNum()
}

// LastCommonL1 returns the latest common L1 block between all chains in the database.
// it only considers block numbers, not hash. That's because the L1 source is the same for all chains
// this data can be used to determine the starting point for L1 processing
func (db *ChainsDB) LastCommonL1() (types.BlockSeal, error) {
common := types.BlockSeal{}
for _, chain := range db.depSet.Chains() {
ldb, ok := db.localDBs.Get(chain)
if !ok {
return types.BlockSeal{}, types.ErrUnknownChain
}
_, derivedFrom, err := ldb.Latest()
if err != nil {
return types.BlockSeal{}, fmt.Errorf("failed to determine Last Common L1: %w", err)
}
common = derivedFrom
// if the common block isn't yet set,
// or if the new common block is older than the current common block
// set the common block
if common == (types.BlockSeal{}) ||
derivedFrom.Number < common.Number {
common = derivedFrom
}
}
return common, nil
}

func (db *ChainsDB) IsCrossUnsafe(chainID types.ChainID, block eth.BlockID) error {
v, ok := db.crossUnsafe.Get(chainID)
if !ok {
Expand Down
38 changes: 38 additions & 0 deletions op-supervisor/supervisor/backend/db/update.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db

import (
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -86,3 +87,40 @@ func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
db.logger.Info("Updated finalized L1", "finalizedL1", finalized)
return nil
}

// RecordNewL1 records a new L1 block in the database.
// it uses the latest derived L2 block as the derived block for the new L1 block.
func (db *ChainsDB) RecordNewL1(ref eth.BlockRef) error {
for _, chain := range db.depSet.Chains() {
// get local derivation database
ldb, ok := db.localDBs.Get(chain)
if !ok {
return fmt.Errorf("cannot RecordNewL1 to chain %s: %w", chain, types.ErrUnknownChain)
}
// get the latest derived and derivedFrom blocks
derivedFrom, derived, err := ldb.Latest()
if err != nil {
return fmt.Errorf("failed to get latest derivedFrom for chain %s: %w", chain, err)
}
// make a ref from the latest derived block
derivedParent, err := ldb.PreviousDerived(derived.ID())
if errors.Is(err, types.ErrFuture) {
db.logger.Warn("Empty DB, Recording first L1 block", "chain", chain, "err", err)
} else if err != nil {
db.logger.Warn("Failed to get latest derivedfrom to insert new L1 block", "chain", chain, "err", err)
return err
}
derivedRef := derived.MustWithParent(derivedParent.ID())
// don't push the new L1 block if it's not newer than the latest derived block
if derivedFrom.Number >= ref.Number {
db.logger.Warn("L1 block has already been processed for this height", "chain", chain, "block", ref, "latest", derivedFrom)
continue
}
// the database is extended with the new L1 and the existing L2
if err = db.UpdateLocalSafe(chain, ref, derivedRef); err != nil {
db.logger.Error("Failed to update local safe", "chain", chain, "block", ref, "derived", derived, "err", err)
return err
}
}
return nil
}
Loading

0 comments on commit b613161

Please sign in to comment.