Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync2: ATX integration #6448

Open
wants to merge 7 commits into
base: sync2/fix-multipeer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -77,6 +78,14 @@ func MainnetConfig() Config {

hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = time.Hour
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute
Comment on lines +82 to +87
Copy link
Member

@fasmat fasmat Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like how this config is structured, what is the meaning of OldAtxSyncCfg and NewAtxSyncCfg?
Is there maybe a better name for them?

Why are they part of "main" -> "Syncer" -> "V2"? Shouldn't they be part of the "main" -> "Syncer" config object?

Also if these are the defaults anyway, no need to overwrite them with the same values here again 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: fixed spelling

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might cause confusion as there's currently V1 sync for ATXs, too, which is also configured in the syncer config.
When V2 is used in server-only mode, both V1 and V2 syncs are used at the same time.

Copy link
Contributor Author

@ivan4th ivan4th Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to use reasonable defaults so that most users will not have to adjust sync settings.
Then at some time in the future we might want to restructure the config to remove all the v1 stuff and keep v2 sync only, moving it out of this v2 field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old vs new in this context means old epochs vs the current epoch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so it is actually PrevEpochSyncConfig and CurrentEpochSyncConfig? 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to use reasonable defaults so that most users will not have to adjust sync settings. Then at some time in the future we might want to restructure the config to remove all the v1 stuff and keep v2 sync only, moving it out of this v2 field.

The reasonable defaults are already specified as part of the syncer.DefaultConfig, my point was that we don't need to overwrite this values again with the same values in config.MainnetConfig 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might cause confusion as there's currently V1 sync for ATXs, too, which is also configured in the syncer config. When V2 is used in server-only mode, both V1 and V2 syncs are used at the same time.

But this is a technical detail that I think should not be exposed via the config, or at least in a different way. Maybe instead of V2 we can call it something like "reconciliationSync" or similar. Especially since "V2" has negative connotations: https://en.wikipedia.org/wiki/V-2_rocket

Copy link
Contributor Author

@ivan4th ivan4th Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there's a bit of confusion re current vs prev, and I was imprecise in my above comment.
When a new epoch starts, activations don't start to be published in that epoch immediately, and consequently, the more efficient (but more memory and CPU hungry per missing/extra set element) sync is to be applied to the new epoch some time later (EpochEndFraction in the existing sync config). That's why the "new" epoch is not necessarily the current one, and the "old" epoch is not necessarily the previous. We could probably come up with better naming but I'm unsure current / prev is the right choice here.

Otherwise agree, will fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to v2 part to reconc-sync (ReconcSync).


return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
Expand Down Expand Up @@ -212,6 +221,11 @@ func MainnetConfig() Config {
DisableMeshAgreement: true,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
ReconcSync: syncer.ReconcSyncConfig{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
13 changes: 13 additions & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -65,6 +66,13 @@ func testnet() config.Config {
hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false
defaultdir := filepath.Join(home, "spacemesh-testnet", "/")

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute

return config.Config{
Preset: "testnet",
BaseConfig: config.BaseConfig{
Expand Down Expand Up @@ -163,6 +171,11 @@ func testnet() config.Config {
OutOfSyncThresholdLayers: 10,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
ReconcSync: syncer.ReconcSyncConfig{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
13 changes: 11 additions & 2 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"sync"
"time"

corehost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -115,7 +116,7 @@
Interval time.Duration `mapstructure:"interval"`
}

func (s ServerConfig) toOpts() []server.Opt {
func (s ServerConfig) ToOpts() []server.Opt {
opts := []server.Opt{}
if s.Queue != 0 {
opts = append(opts, server.WithQueueSize(s.Queue))
Expand Down Expand Up @@ -365,7 +366,7 @@
if f.cfg.EnableServerMetrics {
opts = append(opts, server.WithMetrics())
}
opts = append(opts, f.cfg.getServerConfig(protocol).toOpts()...)
opts = append(opts, f.cfg.getServerConfig(protocol).ToOpts()...)
f.servers[protocol] = server.New(host, protocol, handler, opts...)
}

Expand Down Expand Up @@ -1013,3 +1014,11 @@
})
return peers
}

func (f *Fetch) Host() corehost.Host {
return f.host.(corehost.Host)

Check warning on line 1019 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L1018-L1019

Added lines #L1018 - L1019 were not covered by tests
}

func (f *Fetch) Peers() *peers.Peers {
return f.peers

Check warning on line 1023 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L1022-L1023

Added lines #L1022 - L1023 were not covered by tests
}
5 changes: 4 additions & 1 deletion fetch/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fetch

import (
"context"

"github.com/spacemeshos/go-spacemesh/common/types"
)

type limiter interface {
Expand All @@ -10,7 +12,8 @@ type limiter interface {
}

type getHashesOpts struct {
limiter limiter
limiter limiter
callback func(types.Hash32, error)
}

type noLimit struct{}
Expand Down
36 changes: 30 additions & 6 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
return nil
}

options := system.GetAtxOpts{}
var options system.GetAtxOpts
for _, opt := range opts {
opt(&options)
}
Expand All @@ -41,10 +41,17 @@
zap.Bool("limiting", !options.LimitingOff),
)
hashes := types.ATXIDsToHashes(ids)
if options.LimitingOff {
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage)
handler := f.validators.atx.HandleMessage
var ghOpts []getHashesOpt
if !options.LimitingOff {
ghOpts = append(ghOpts, withLimiter(f.getAtxsLimiter))
}
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage, withLimiter(f.getAtxsLimiter))
if options.Callback != nil {
ghOpts = append(ghOpts, withHashCallback(func(hash types.Hash32, err error) {
options.Callback(types.ATXID(hash), err)
}))
}
return f.getHashes(ctx, hashes, datastore.ATXDB, handler, ghOpts...)
}

type dataReceiver func(context.Context, types.Hash32, p2p.Peer, []byte) error
Expand All @@ -57,6 +64,12 @@
}
}

func withHashCallback(callback func(types.Hash32, error)) getHashesOpt {
return func(o *getHashesOpts) {
o.callback = callback
}
}

func (f *Fetch) getHashes(
ctx context.Context,
hashes []types.Hash32,
Expand All @@ -65,7 +78,8 @@
opts ...getHashesOpt,
) error {
options := getHashesOpts{
limiter: noLimit{},
limiter: noLimit{},
callback: func(types.Hash32, error) {},
}
for _, opt := range opts {
opt(&options)
Expand All @@ -82,18 +96,26 @@
for i, hash := range hashes {
if err := options.limiter.Acquire(ctx, 1); err != nil {
pendingMetric.Add(float64(i - len(hashes)))
return fmt.Errorf("acquiring slot to get hash: %w", err)
err = fmt.Errorf("acquiring slot to get hash: %w", err)
for _, h := range hashes[i:] {
options.callback(h, err)
}
return err

Check warning on line 103 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L99-L103

Added lines #L99 - L103 were not covered by tests
}
p, err := f.getHash(ctx, hash, hint, receiver)
if err != nil {
options.limiter.Release(1)
pendingMetric.Add(float64(i - len(hashes)))
for _, h := range hashes[i:] {
options.callback(h, err)
}

Check warning on line 111 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L109-L111

Added lines #L109 - L111 were not covered by tests
return err
}
if p == nil {
// data is available locally
options.limiter.Release(1)
pendingMetric.Add(-1)
options.callback(hash, nil)

Check warning on line 118 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L118

Added line #L118 was not covered by tests
continue
}

Expand All @@ -102,6 +124,7 @@
case <-ctx.Done():
options.limiter.Release(1)
pendingMetric.Add(-1)
options.callback(hash, ctx.Err())

Check warning on line 127 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L127

Added line #L127 was not covered by tests
return ctx.Err()
case <-p.completed:
options.limiter.Release(1)
Expand All @@ -117,6 +140,7 @@
bfailure.Add(hash, p.err)
mu.Unlock()
}
options.callback(hash, p.err)
return nil
}
})
Expand Down
21 changes: 17 additions & 4 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"

p2phost "github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -86,7 +87,7 @@ func startTestLoop(tb testing.TB, f *Fetch, eg *errgroup.Group, stop chan struct
default:
f.mu.Lock()
for h, req := range f.unprocessed {
require.NoError(tb, req.validator(req.ctx, types.Hash32{}, p2p.NoPeer, []byte{}))
require.NoError(tb, req.validator(req.ctx, h, p2p.NoPeer, []byte{}))
close(req.promise.completed)
delete(f.unprocessed, h)
}
Expand Down Expand Up @@ -591,7 +592,7 @@ func genATXs(tb testing.TB, num uint32) []*types.ActivationTx {
}

func TestGetATXs(t *testing.T) {
atxs := genATXs(t, 2)
atxs := genATXs(t, 4)
f := createFetch(t)
f.mAtxH.EXPECT().
HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Expand All @@ -602,10 +603,22 @@ func TestGetATXs(t *testing.T) {
var eg errgroup.Group
startTestLoop(t, f.Fetch, &eg, stop)

atxIDs := types.ToATXIDs(atxs)
require.NoError(t, f.GetAtxs(context.Background(), atxIDs))
atxIDs1 := types.ToATXIDs(atxs[:2])
require.NoError(t, f.GetAtxs(context.Background(), atxIDs1))

atxIDs2 := types.ToATXIDs(atxs[2:])
var recvIDs []types.ATXID
var mtx sync.Mutex
require.NoError(t, f.GetAtxs(context.Background(), atxIDs2,
system.WithATXCallback(func(id types.ATXID, err error) {
mtx.Lock()
defer mtx.Unlock()
require.NoError(t, err)
recvIDs = append(recvIDs, id)
})))
close(stop)
require.NoError(t, eg.Wait())
require.ElementsMatch(t, atxIDs2, recvIDs)
}

func TestGetActiveSet(t *testing.T) {
Expand Down
Loading
Loading