From ffa8d0b5b6649ce6ed51c4b6d9ecde0aa1153a3d Mon Sep 17 00:00:00 2001 From: linuskendall Date: Thu, 16 Feb 2023 20:18:41 +0000 Subject: [PATCH 01/10] Conduct serial downloading --- internal/cmd/fetch/fetch.go | 245 ++++++++++++++++++------------------ 1 file changed, 120 insertions(+), 125 deletions(-) diff --git a/internal/cmd/fetch/fetch.go b/internal/cmd/fetch/fetch.go index 11cb396..e7471d2 100644 --- a/internal/cmd/fetch/fetch.go +++ b/internal/cmd/fetch/fetch.go @@ -16,141 +16,136 @@ package fetch import ( - "context" - "encoding/json" - "io" - "net/http" - "os" - "os/signal" - "time" - - "github.com/spf13/cobra" - "github.com/vbauerster/mpb/v7" - "github.com/vbauerster/mpb/v7/decor" - "go.blockdaemon.com/solana/cluster-manager/internal/fetch" - "go.blockdaemon.com/solana/cluster-manager/internal/ledger" - "go.blockdaemon.com/solana/cluster-manager/internal/logger" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - "gopkg.in/resty.v1" + "context" + "encoding/json" + "io" + "net/http" + "os" + "os/signal" + "time" + + "github.com/spf13/cobra" + "github.com/vbauerster/mpb/v7" + "github.com/vbauerster/mpb/v7/decor" + "go.blockdaemon.com/solana/cluster-manager/internal/fetch" + "go.blockdaemon.com/solana/cluster-manager/internal/ledger" + "go.blockdaemon.com/solana/cluster-manager/internal/logger" + "go.uber.org/zap" + "gopkg.in/resty.v1" ) var Cmd = cobra.Command{ - Use: "fetch", - Short: "Snapshot downloader", - Long: "Fetches a snapshot from another node using the tracker API.", - Run: func(_ *cobra.Command, _ []string) { - run() - }, + Use: "fetch", + Short: "Snapshot downloader", + Long: "Fetches a snapshot from another node using the tracker API.", + Run: func(_ *cobra.Command, _ []string) { + run() + }, } var ( - ledgerDir string - trackerURL string - minSnapAge uint64 - maxSnapAge uint64 - requestTimeout time.Duration - downloadTimeout time.Duration + ledgerDir string + trackerURL string + minSnapAge uint64 + maxSnapAge uint64 + requestTimeout time.Duration + downloadTimeout time.Duration ) func init() { - flags := Cmd.Flags() - flags.StringVar(&ledgerDir, "ledger", "", "Path to ledger dir") - flags.StringVar(&trackerURL, "tracker", "", "Download as instructed by given tracker URL") - flags.Uint64Var(&minSnapAge, "min-slots", 500, "Download only snapshots slots newer than local") - flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download slots older than the newest") - flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)") - flags.DurationVar(&downloadTimeout, "download-timeout", 10*time.Minute, "Max time to try downloading in total") + flags := Cmd.Flags() + flags.StringVar(&ledgerDir, "ledger", "", "Path to ledger dir") + flags.StringVar(&trackerURL, "tracker", "", "Download as instructed by given tracker URL") + flags.Uint64Var(&minSnapAge, "min-slots", 500, "Download only snapshots slots newer than local") + flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download slots older than the newest") + flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)") + flags.DurationVar(&downloadTimeout, "download-timeout", 10*time.Minute, "Max time to try downloading in total") } func run() { - log := logger.GetLogger() - - // Regardless which API we talk to, we want to cap time from request to response header. - // This defends against black holes and really slow servers. - // Download time (reading response body) is not affected. - http.DefaultTransport.(*http.Transport).ResponseHeaderTimeout = requestTimeout - - // Run until interrupted or time out occurs. - ctx := context.Background() - ctx, cancel := signal.NotifyContext(ctx, os.Interrupt) - defer cancel() - ctx, cancel2 := context.WithTimeout(ctx, downloadTimeout) - defer cancel2() - - // Check what snapshots we have locally. - localSnaps, err := ledger.ListSnapshots(os.DirFS(ledgerDir)) - if err != nil { - log.Fatal("Failed to check existing snapshots", zap.Error(err)) - } - - // Ask tracker for best snapshots. - trackerClient := fetch.NewTrackerClientWithResty( - resty.New(). - SetHostURL(trackerURL). - SetTimeout(requestTimeout), - ) - remoteSnaps, err := trackerClient.GetBestSnapshots(ctx, -1) - if err != nil { - log.Fatal("Failed to request snapshot info", zap.Error(err)) - } - - // Decide what we want to do. - _, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge) - switch advice { - case fetch.AdviceNothingFound: - log.Error("No snapshots available remotely") - return - case fetch.AdviceUpToDate: - log.Info("Existing snapshot is recent enough, no download needed", - zap.Uint64("existing_slot", localSnaps[0].Slot)) - return - case fetch.AdviceFetch: - } - - // Print snapshot to user. - snap := &remoteSnaps[0] - buf, _ := json.MarshalIndent(snap, "", "\t") - log.Info("Downloading a snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) - - // Setup progress bars for download. - bars := mpb.New() - sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetch.SidecarClientOpts{ - ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser { - bar := bars.New( - size, - mpb.BarStyle(), - mpb.PrependDecorators(decor.Name(name)), - mpb.AppendDecorators( - decor.AverageSpeed(decor.UnitKB, "% .1f"), - decor.Percentage(), - ), - ) - return bar.ProxyReader(rd) - }, - }) - - // Download. - beforeDownload := time.Now() - group, ctx := errgroup.WithContext(ctx) - for _, file := range snap.Files { - file_ := file - group.Go(func() error { - err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName) - if err != nil { - log.Error("Download failed", - zap.String("snapshot", file_.FileName), - zap.String("error", err.Error())) - } - return err - }) - } - downloadErr := group.Wait() - downloadDuration := time.Since(beforeDownload) - - if downloadErr == nil { - log.Info("Download completed", zap.Duration("download_time", downloadDuration)) - } else { - log.Fatal("Aborting download", zap.Duration("download_time", downloadDuration)) - } + log := logger.GetLogger() + + // Regardless which API we talk to, we want to cap time from request to response header. + // This defends against black holes and really slow servers. + // Download time (reading response body) is not affected. + http.DefaultTransport.(*http.Transport).ResponseHeaderTimeout = requestTimeout + + // Run until interrupted or time out occurs. + ctx := context.Background() + ctx, cancel := signal.NotifyContext(ctx, os.Interrupt) + defer cancel() + ctx, cancel2 := context.WithTimeout(ctx, downloadTimeout) + defer cancel2() + + // Check what snapshots we have locally. + localSnaps, err := ledger.ListSnapshots(os.DirFS(ledgerDir)) + if err != nil { + log.Fatal("Failed to check existing snapshots", zap.Error(err)) + } + + // Ask tracker for best snapshots. + trackerClient := fetch.NewTrackerClientWithResty( + resty.New(). + SetHostURL(trackerURL). + SetTimeout(requestTimeout), + ) + remoteSnaps, err := trackerClient.GetBestSnapshots(ctx, -1) + if err != nil { + log.Fatal("Failed to request snapshot info", zap.Error(err)) + } + + // Decide what we want to do. + _, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge) + switch advice { + case fetch.AdviceNothingFound: + log.Error("No snapshots available remotely") + return + case fetch.AdviceUpToDate: + log.Info("Existing snapshot is recent enough, no download needed", + zap.Uint64("existing_slot", localSnaps[0].Slot)) + return + case fetch.AdviceFetch: + } + + // Print snapshot to user. + snap := &remoteSnaps[0] + buf, _ := json.MarshalIndent(snap, "", "\t") + log.Info("Downloading a snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) + + // Setup progress bars for download. + bars := mpb.New() + sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetch.SidecarClientOpts{ + ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser { + bar := bars.New( + size, + mpb.BarStyle(), + mpb.PrependDecorators(decor.Name(name)), + mpb.AppendDecorators( + decor.AverageSpeed(decor.UnitKB, "% .1f"), + decor.Percentage(), + ), + ) + return bar.ProxyReader(rd) + }, + }) + + // Download. + var downloadErr error + beforeDownload := time.Now() + for _, file := range snap.Files { + file_ := file + downloadErr = sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName) + if downloadErr != nil { + log.Error("Download failed", + zap.String("snapshot", file_.FileName), + zap.String("error", err.Error())) + } + } + downloadDuration := time.Since(beforeDownload) + + if downloadErr == nil { + log.Info("Download completed", zap.Duration("download_time", downloadDuration)) + } else { + log.Fatal("Aborting download", zap.Duration("download_time", downloadDuration)) + } } From 79aa327f31499750bdafa0ab68a1e444eba90aa8 Mon Sep 17 00:00:00 2001 From: linuskendall Date: Fri, 17 Feb 2023 06:27:55 +0000 Subject: [PATCH 02/10] Added support for slot based fetching This allows us to fetch a snapshot at a specific base slot which helps during restarts but also allows us to handle the incremental updates of snapshots so we can always fetch the latest incremental snapshot. --- internal/cmd/fetch/fetch.go | 126 +++++++++++++++++++++++++++------- internal/fetch/tracker.go | 16 +++++ internal/index/index.go | 13 ++++ internal/index/index_test.go | 6 +- internal/index/schema.go | 5 ++ internal/index/types.go | 4 +- internal/ledger/snapshot.go | 1 + internal/scraper/collector.go | 2 +- internal/tracker/tracker.go | 15 +++- types/snapshot.go | 1 + 10 files changed, 157 insertions(+), 32 deletions(-) diff --git a/internal/cmd/fetch/fetch.go b/internal/cmd/fetch/fetch.go index e7471d2..be73f1c 100644 --- a/internal/cmd/fetch/fetch.go +++ b/internal/cmd/fetch/fetch.go @@ -30,7 +30,9 @@ import ( "go.blockdaemon.com/solana/cluster-manager/internal/fetch" "go.blockdaemon.com/solana/cluster-manager/internal/ledger" "go.blockdaemon.com/solana/cluster-manager/internal/logger" + "go.blockdaemon.com/solana/cluster-manager/types" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "gopkg.in/resty.v1" ) @@ -48,6 +50,9 @@ var ( trackerURL string minSnapAge uint64 maxSnapAge uint64 + baseSlot uint64 + fullSnap bool + incrementalSnap bool requestTimeout time.Duration downloadTimeout time.Duration ) @@ -60,11 +65,18 @@ func init() { flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download slots older than the newest") flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)") flags.DurationVar(&downloadTimeout, "download-timeout", 10*time.Minute, "Max time to try downloading in total") + flags.Uint64Var(&baseSlot, "slot", 0, "Download snapshot for given slot (if available)") + flags.BoolVar(&fullSnap, "full", true, "Download full snapshot (if available)") + flags.BoolVar(&incrementalSnap, "incremental", true, "Download incremental snapshot (if available)") } func run() { log := logger.GetLogger() + if !fullSnap && !incrementalSnap { + log.Fatal("Must specify at least one of --full or --incremental") + } + // Regardless which API we talk to, we want to cap time from request to response header. // This defends against black holes and really slow servers. // Download time (reading response body) is not affected. @@ -83,31 +95,54 @@ func run() { log.Fatal("Failed to check existing snapshots", zap.Error(err)) } - // Ask tracker for best snapshots. + // Get a specific snapshot or "best snapshot" from tracker client trackerClient := fetch.NewTrackerClientWithResty( resty.New(). SetHostURL(trackerURL). SetTimeout(requestTimeout), ) - remoteSnaps, err := trackerClient.GetBestSnapshots(ctx, -1) - if err != nil { - log.Fatal("Failed to request snapshot info", zap.Error(err)) - } - // Decide what we want to do. - _, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge) - switch advice { - case fetch.AdviceNothingFound: - log.Error("No snapshots available remotely") - return - case fetch.AdviceUpToDate: - log.Info("Existing snapshot is recent enough, no download needed", - zap.Uint64("existing_slot", localSnaps[0].Slot)) - return - case fetch.AdviceFetch: + var remoteSnaps []types.SnapshotSource + + if baseSlot != 0 { + log.Info("Fetching snapshots at slot", zap.Uint64("base_slot", baseSlot)) + + // Ask tracker for snapshots at a specific location + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, baseSlot) + if err != nil { + log.Fatal("Failed to fetch snapshot info", zap.Error(err)) + } + + // @TODO check if this snapshot already exists + } else { + log.Info("Finding best snapshot") + + // Ask tracker for best snapshots. + remoteSnaps, err = trackerClient.GetBestSnapshots(ctx, -1) + if err != nil { + log.Fatal("Failed to request snapshot info", zap.Error(err)) + } + + // Decide what we want to do. + _, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge) + switch advice { + case fetch.AdviceNothingFound: + log.Error("No snapshots available remotely") + return + case fetch.AdviceUpToDate: + log.Info("Existing snapshot is recent enough, no download needed", + zap.Uint64("existing_slot", localSnaps[0].Slot)) + return + case fetch.AdviceFetch: + } } // Print snapshot to user. + log.Info("Number of remote snaps found: ", zap.Int("num", len(remoteSnaps))) + if len(remoteSnaps) == 0 { + log.Fatal("Could not find any matching snapshots. Bailing.") + } + snap := &remoteSnaps[0] buf, _ := json.MarshalIndent(snap, "", "\t") log.Info("Downloading a snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) @@ -129,23 +164,64 @@ func run() { }, }) - // Download. - var downloadErr error + // First pass, if we're fetching fullSnap we want to fetch the fullSnaps but **not** the incremental snap + // Then after completion of the full snap download, we refetch the incremental one so we get the latest one + if fullSnap { + downloadSnapshot(ctx, sidecarClient, snap, true, false) + + // If we were downloading a full snapshot, check if there's a newer incremental snapshot we can fetch + // Find latest incremental snapshot + log.Info("Finding incremental snapshot for full slot", zap.Uint64("base_slot", snap.BaseSlot)) + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, snap.BaseSlot) + if err != nil { + log.Fatal("Failed to request snapshot info", zap.Error(err)) + } + + if len(remoteSnaps) == 0 { + log.Fatal("No incremental snapshot found") + } + + snap = &remoteSnaps[0] + buf, _ = json.MarshalIndent(snap, "", "\t") + log.Info("Downloading incremental snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) + } + + // This will fetch the latest incremental snapshot (if fullSnap was specified it would already have been fetched and refreshed) + downloadSnapshot(ctx, sidecarClient, snap, false, true) +} + +func downloadSnapshot(ctx context.Context, sidecarClient *fetch.SidecarClient, snap *types.SnapshotSource, full bool, incremental bool) { + log := logger.GetLogger() + + // Download the snapshot files beforeDownload := time.Now() + group, ctx := errgroup.WithContext(ctx) for _, file := range snap.Files { - file_ := file - downloadErr = sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName) - if downloadErr != nil { - log.Error("Download failed", - zap.String("snapshot", file_.FileName), - zap.String("error", err.Error())) + if file.BaseSlot != 0 && !incremental { + continue + } + if file.BaseSlot == 0 && !full { + continue } + + file_ := file + group.Go(func() error { + err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName) + if err != nil { + log.Error("Full snapshot download failed", + zap.String("snapshot", file_.FileName), + zap.String("error", err.Error())) + } + return err + }) } + downloadErr := group.Wait() downloadDuration := time.Since(beforeDownload) if downloadErr == nil { - log.Info("Download completed", zap.Duration("download_time", downloadDuration)) + log.Info("Snapshot download completed", zap.Duration("download_time", downloadDuration)) } else { log.Fatal("Aborting download", zap.Duration("download_time", downloadDuration)) } + } diff --git a/internal/fetch/tracker.go b/internal/fetch/tracker.go index c70bbb7..5b372c7 100644 --- a/internal/fetch/tracker.go +++ b/internal/fetch/tracker.go @@ -53,3 +53,19 @@ func (c *TrackerClient) GetBestSnapshots(ctx context.Context, count int) (source } return } + +func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, slot uint64) (sources []types.SnapshotSource, err error) { + res, err := c.resty.R(). + SetContext(ctx). + SetHeader("accept", "application/json"). + SetQueryParam("slot", strconv.FormatUint(slot, 10)). + SetResult(&sources). + Get("/v1/snapshots") + if err != nil { + return nil, err + } + if res.StatusCode() != http.StatusOK { + return nil, fmt.Errorf("get snapshots at slot %d: %s", slot, res.Status()) + } + return +} diff --git a/internal/index/index.go b/internal/index/index.go index df00c6d..81fa337 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -101,6 +101,19 @@ func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) { return } +// Fetches the snapshots that are at a given slot. +func (d *DB) GetSnapshotsAtSlot(slot uint64) (entries []*SnapshotEntry) { + res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot", slot) + if err != nil { + panic("getting best snapshots failed: " + err.Error()) + } + + for entry := res.Next(); entry != nil; entry = res.Next() { + entries = append(entries, entry.(*SnapshotEntry)) + } + return +} + // DeleteOldSnapshots delete snapshot entry older than the given timestamp. func (d *DB) DeleteOldSnapshots(minTime time.Time) (n int) { txn := d.DB.Txn(true) diff --git a/internal/index/index_test.go b/internal/index/index_test.go index b6ff9de..d676d29 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -26,7 +26,7 @@ import ( var dummyTime1 = time.Date(2022, 4, 27, 15, 33, 20, 0, time.UTC) var snapshotEntry1 = &SnapshotEntry{ - SnapshotKey: NewSnapshotKey("host1", 100), + SnapshotKey: NewSnapshotKey("host1", 100, 100), UpdatedAt: dummyTime1, Info: &types.SnapshotInfo{ Slot: 100, @@ -37,7 +37,7 @@ var snapshotEntry1 = &SnapshotEntry{ } var snapshotEntry2 = &SnapshotEntry{ - SnapshotKey: NewSnapshotKey("host1", 99), + SnapshotKey: NewSnapshotKey("host1", 99, 99), UpdatedAt: dummyTime1.Add(-20 * time.Second), Info: &types.SnapshotInfo{ Slot: 99, @@ -48,7 +48,7 @@ var snapshotEntry2 = &SnapshotEntry{ } var snapshotEntry3 = &SnapshotEntry{ - SnapshotKey: NewSnapshotKey("host2", 100), + SnapshotKey: NewSnapshotKey("host2", 100, 100), UpdatedAt: dummyTime1, Info: &types.SnapshotInfo{ Slot: 100, diff --git a/internal/index/schema.go b/internal/index/schema.go index b28245f..fbfde7b 100644 --- a/internal/index/schema.go +++ b/internal/index/schema.go @@ -39,6 +39,11 @@ var schema = memdb.DBSchema{ Unique: false, Indexer: &memdb.UintFieldIndex{Field: "InverseSlot"}, }, + "base_slot": { + Name: "base_slot", + Unique: false, + Indexer: &memdb.UintFieldIndex{Field: "BaseSlot"}, + }, }, }, }, diff --git a/internal/index/types.go b/internal/index/types.go index 4c822c9..9819cea 100644 --- a/internal/index/types.go +++ b/internal/index/types.go @@ -29,12 +29,14 @@ type SnapshotEntry struct { type SnapshotKey struct { Target string `json:"target"` InverseSlot uint64 `json:"inverse_slot"` // newest-to-oldest sort + BaseSlot uint64 `json:"base_slot"` } -func NewSnapshotKey(target string, slot uint64) SnapshotKey { +func NewSnapshotKey(target string, slot uint64, base_slot uint64) SnapshotKey { return SnapshotKey{ Target: target, InverseSlot: ^slot, + BaseSlot: base_slot, } } diff --git a/internal/ledger/snapshot.go b/internal/ledger/snapshot.go index 46a1528..c0c0f0d 100644 --- a/internal/ledger/snapshot.go +++ b/internal/ledger/snapshot.go @@ -94,6 +94,7 @@ func buildSnapshotInfo(files []*types.SnapshotFile, target *types.SnapshotFile) } return &types.SnapshotInfo{ Slot: target.Slot, + BaseSlot: chain[len(chain)-1].Slot, Hash: target.Hash, Files: chain, TotalSize: totalSize, diff --git a/internal/scraper/collector.go b/internal/scraper/collector.go index c307136..2f827b9 100644 --- a/internal/scraper/collector.go +++ b/internal/scraper/collector.go @@ -72,7 +72,7 @@ func (c *Collector) run() { entries := make([]*index.SnapshotEntry, len(res.Infos)) for i, info := range res.Infos { entries[i] = &index.SnapshotEntry{ - SnapshotKey: index.NewSnapshotKey(res.Target, info.Slot), + SnapshotKey: index.NewSnapshotKey(res.Target, info.Slot, info.BaseSlot), Info: info, UpdatedAt: res.Time, } diff --git a/internal/tracker/tracker.go b/internal/tracker/tracker.go index 672b81f..d5b3042 100644 --- a/internal/tracker/tracker.go +++ b/internal/tracker/tracker.go @@ -46,7 +46,18 @@ func (h *Handler) RegisterHandlers(group gin.IRoutes) { } func (h *Handler) GetSnapshots(c *gin.Context) { - c.JSON(http.StatusOK, h.DB.GetAllSnapshots()) + var query struct { + Slot uint64 `form:"slot"` + } + if err := c.BindQuery(&query); err != nil { + return + } + + if query.Slot == 0 { + c.JSON(http.StatusOK, h.DB.GetAllSnapshots()) + } else { + c.JSON(http.StatusOK, h.DB.GetSnapshotsAtSlot(query.Slot)) + } } // GetBestSnapshots returns the currently available best snapshots. @@ -58,7 +69,7 @@ func (h *Handler) GetBestSnapshots(c *gin.Context) { return } const maxItems = 25 - if query.Max < 0 || query.Max > 25 { + if query.Max < 0 || query.Max > maxItems { query.Max = maxItems } entries := h.DB.GetBestSnapshots(query.Max) diff --git a/types/snapshot.go b/types/snapshot.go index e10d039..4f1e469 100644 --- a/types/snapshot.go +++ b/types/snapshot.go @@ -31,6 +31,7 @@ type SnapshotSource struct { // SnapshotInfo describes a snapshot. type SnapshotInfo struct { Slot uint64 `json:"slot"` + BaseSlot uint64 `json:"base_slot"` Hash solana.Hash `json:"hash"` Files []*SnapshotFile `json:"files"` TotalSize uint64 `json:"size"` From b68ab1f84ca029ee304e152d59255514838ba482 Mon Sep 17 00:00:00 2001 From: linuskendall Date: Fri, 17 Feb 2023 06:46:21 +0000 Subject: [PATCH 03/10] Add some more debug output --- internal/cmd/fetch/fetch.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/cmd/fetch/fetch.go b/internal/cmd/fetch/fetch.go index be73f1c..39f45c3 100644 --- a/internal/cmd/fetch/fetch.go +++ b/internal/cmd/fetch/fetch.go @@ -114,6 +114,9 @@ func run() { } // @TODO check if this snapshot already exists + buf, _ := json.MarshalIndent(remoteSnaps, "", "\t") + log.Info("Downloading a snapshot", zap.ByteString("snap", buf)) + } else { log.Info("Finding best snapshot") From e511ad93c6bc9e9baa330c92c398a40afaa8ae4c Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Mon, 3 Jun 2024 16:10:45 +0000 Subject: [PATCH 04/10] Revert "Allow serial downloads" This reverts commit 5fdd1d003e5067a2d1d026fc3ea1694061c999cf. --- internal/cmd/fetch/fetch.go | 259 ++++++++++++++++-------------------- 1 file changed, 114 insertions(+), 145 deletions(-) diff --git a/internal/cmd/fetch/fetch.go b/internal/cmd/fetch/fetch.go index 49a82b8..11cb396 100644 --- a/internal/cmd/fetch/fetch.go +++ b/internal/cmd/fetch/fetch.go @@ -16,172 +16,141 @@ package fetch import ( - "context" - "encoding/json" - "io" - "net/http" - "os" - "os/signal" - "sort" - "time" - - "github.com/spf13/cobra" - "github.com/vbauerster/mpb/v7" - "github.com/vbauerster/mpb/v7/decor" - "go.blockdaemon.com/solana/cluster-manager/internal/fetch" - "go.blockdaemon.com/solana/cluster-manager/internal/ledger" - "go.blockdaemon.com/solana/cluster-manager/internal/logger" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - "gopkg.in/resty.v1" + "context" + "encoding/json" + "io" + "net/http" + "os" + "os/signal" + "time" + + "github.com/spf13/cobra" + "github.com/vbauerster/mpb/v7" + "github.com/vbauerster/mpb/v7/decor" + "go.blockdaemon.com/solana/cluster-manager/internal/fetch" + "go.blockdaemon.com/solana/cluster-manager/internal/ledger" + "go.blockdaemon.com/solana/cluster-manager/internal/logger" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "gopkg.in/resty.v1" ) var Cmd = cobra.Command{ - Use: "fetch", - Short: "Snapshot downloader", - Long: "Fetches a snapshot from another node using the tracker API.", - Run: func(_ *cobra.Command, _ []string) { - run() - }, + Use: "fetch", + Short: "Snapshot downloader", + Long: "Fetches a snapshot from another node using the tracker API.", + Run: func(_ *cobra.Command, _ []string) { + run() + }, } var ( - ledgerDir string - trackerURL string - minSnapAge uint64 - maxSnapAge uint64 - requestTimeout time.Duration - downloadTimeout time.Duration - parallelDownload bool + ledgerDir string + trackerURL string + minSnapAge uint64 + maxSnapAge uint64 + requestTimeout time.Duration + downloadTimeout time.Duration ) func init() { - flags := Cmd.Flags() - flags.StringVar(&ledgerDir, "ledger", "", "Path to ledger dir") - flags.StringVar(&trackerURL, "tracker", "", "Download as instructed by given tracker URL") - flags.Uint64Var(&minSnapAge, "min-slots", 500, "Download only snapshots slots newer than local") - flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download slots older than the newest") - flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)") - flags.DurationVar(&downloadTimeout, "download-timeout", 10*time.Minute, "Max time to try downloading in total") - flags.BoolVar(¶llelDownload, "parallel-download", false, "Download snapshot files in parallel or serially") + flags := Cmd.Flags() + flags.StringVar(&ledgerDir, "ledger", "", "Path to ledger dir") + flags.StringVar(&trackerURL, "tracker", "", "Download as instructed by given tracker URL") + flags.Uint64Var(&minSnapAge, "min-slots", 500, "Download only snapshots slots newer than local") + flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download slots older than the newest") + flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)") + flags.DurationVar(&downloadTimeout, "download-timeout", 10*time.Minute, "Max time to try downloading in total") } func run() { - log := logger.GetLogger() - - // Regardless which API we talk to, we want to cap time from request to response header. - // This defends against black holes and really slow servers. - // Download time (reading response body) is not affected. - http.DefaultTransport.(*http.Transport).ResponseHeaderTimeout = requestTimeout - - // Run until interrupted or time out occurs. - ctx := context.Background() - ctx, cancel := signal.NotifyContext(ctx, os.Interrupt) - defer cancel() - ctx, cancel2 := context.WithTimeout(ctx, downloadTimeout) - defer cancel2() - - // Check what snapshots we have locally. - localSnaps, err := ledger.ListSnapshots(os.DirFS(ledgerDir)) - if err != nil { - log.Fatal("Failed to check existing snapshots", zap.Error(err)) - } - - // Ask tracker for best snapshots. - trackerClient := fetch.NewTrackerClientWithResty( - resty.New(). - SetHostURL(trackerURL). - SetTimeout(requestTimeout), - ) - remoteSnaps, err := trackerClient.GetBestSnapshots(ctx, -1) - if err != nil { - log.Fatal("Failed to request snapshot info", zap.Error(err)) - } - - // Decide what we want to do. - _, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge) - switch advice { - case fetch.AdviceNothingFound: - log.Error("No snapshots available remotely") - return - case fetch.AdviceUpToDate: - log.Info("Existing snapshot is recent enough, no download needed", - zap.Uint64("existing_slot", localSnaps[0].Slot)) - return - case fetch.AdviceFetch: - } - - // Print snapshot to user. - snap := &remoteSnaps[0] - buf, _ := json.MarshalIndent(snap, "", "\t") - log.Info("Downloading a snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) - - // Setup progress bars for download. - bars := mpb.New() - sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetch.SidecarClientOpts{ - ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser { - bar := bars.New( - size, - mpb.BarStyle(), - mpb.PrependDecorators(decor.Name(name)), - mpb.AppendDecorators( - decor.AverageSpeed(decor.UnitKB, "% .1f"), - decor.Percentage(), - ), - ) - return bar.ProxyReader(rd) - }, - }) - - // Download. - beforeDownload := time.Now() - var downloadErr error - - if parallelDownload { - group, ctx := errgroup.WithContext(ctx) - - for _, file := range snap.Files { - file_ := file - if parallelDownload { - group.Go(func() error { - err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName) - if err != nil { - log.Error("Download failed", - zap.String("snapshot", file_.FileName), - zap.String("error", err.Error())) - } - return err - }) - } + log := logger.GetLogger() + + // Regardless which API we talk to, we want to cap time from request to response header. + // This defends against black holes and really slow servers. + // Download time (reading response body) is not affected. + http.DefaultTransport.(*http.Transport).ResponseHeaderTimeout = requestTimeout + + // Run until interrupted or time out occurs. + ctx := context.Background() + ctx, cancel := signal.NotifyContext(ctx, os.Interrupt) + defer cancel() + ctx, cancel2 := context.WithTimeout(ctx, downloadTimeout) + defer cancel2() + + // Check what snapshots we have locally. + localSnaps, err := ledger.ListSnapshots(os.DirFS(ledgerDir)) + if err != nil { + log.Fatal("Failed to check existing snapshots", zap.Error(err)) + } - } - - downloadErr = group.Wait() - } else { - // Download the largest files first - sort.Slice(snap.Files, func(i, j int) bool { - return snap.Files[i].Size > snap.Files[j].Size - }) - - for _, file := range snap.Files { - file_ := file + // Ask tracker for best snapshots. + trackerClient := fetch.NewTrackerClientWithResty( + resty.New(). + SetHostURL(trackerURL). + SetTimeout(requestTimeout), + ) + remoteSnaps, err := trackerClient.GetBestSnapshots(ctx, -1) + if err != nil { + log.Fatal("Failed to request snapshot info", zap.Error(err)) + } + // Decide what we want to do. + _, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge) + switch advice { + case fetch.AdviceNothingFound: + log.Error("No snapshots available remotely") + return + case fetch.AdviceUpToDate: + log.Info("Existing snapshot is recent enough, no download needed", + zap.Uint64("existing_slot", localSnaps[0].Slot)) + return + case fetch.AdviceFetch: + } + + // Print snapshot to user. + snap := &remoteSnaps[0] + buf, _ := json.MarshalIndent(snap, "", "\t") + log.Info("Downloading a snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) + + // Setup progress bars for download. + bars := mpb.New() + sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetch.SidecarClientOpts{ + ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser { + bar := bars.New( + size, + mpb.BarStyle(), + mpb.PrependDecorators(decor.Name(name)), + mpb.AppendDecorators( + decor.AverageSpeed(decor.UnitKB, "% .1f"), + decor.Percentage(), + ), + ) + return bar.ProxyReader(rd) + }, + }) + + // Download. + beforeDownload := time.Now() + group, ctx := errgroup.WithContext(ctx) + for _, file := range snap.Files { + file_ := file + group.Go(func() error { err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName) if err != nil { log.Error("Download failed", zap.String("snapshot", file_.FileName), zap.String("error", err.Error())) - downloadErr = err - break // @TODO should we still try to download the other stuff? } - } + return err + }) } - + downloadErr := group.Wait() downloadDuration := time.Since(beforeDownload) - if downloadErr == nil { - log.Info("Download completed", zap.Duration("download_time", downloadDuration)) - } else { - log.Fatal("Aborting download", zap.Duration("download_time", downloadDuration)) - } + if downloadErr == nil { + log.Info("Download completed", zap.Duration("download_time", downloadDuration)) + } else { + log.Fatal("Aborting download", zap.Duration("download_time", downloadDuration)) + } } From 4cf63645ca886431f22841011dde11d0cbaba5e7 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Mon, 3 Jun 2024 22:51:03 +0000 Subject: [PATCH 05/10] Fixed a number of minor issues in fetching Fixed a number of minor issues in snapshot fetching, allows looking up and fetching the incremental snapshot from the ledger directory. --- internal/cmd/fetch/fetch.go | 8 ++++++++ internal/fetch/tracker.go | 2 ++ internal/ledger/snapshot.go | 2 +- internal/tracker/tracker.go | 31 +++++++++++++++++++------------ 4 files changed, 30 insertions(+), 13 deletions(-) diff --git a/internal/cmd/fetch/fetch.go b/internal/cmd/fetch/fetch.go index 39f45c3..5eb80b5 100644 --- a/internal/cmd/fetch/fetch.go +++ b/internal/cmd/fetch/fetch.go @@ -24,6 +24,7 @@ import ( "os/signal" "time" + "github.com/davecgh/go-spew/spew" "github.com/spf13/cobra" "github.com/vbauerster/mpb/v7" "github.com/vbauerster/mpb/v7/decor" @@ -102,6 +103,12 @@ func run() { SetTimeout(requestTimeout), ) + // If we are not downloading a full snap and we don't have a specific snapshot defined + // we want to guess the snapshot slot to use as base, we will use the base slot + if baseSlot == 0 && !fullSnap { + baseSlot = localSnaps[0].BaseSlot + } + var remoteSnaps []types.SnapshotSource if baseSlot != 0 { @@ -180,6 +187,7 @@ func run() { log.Fatal("Failed to request snapshot info", zap.Error(err)) } + spew.Dump(remoteSnaps) if len(remoteSnaps) == 0 { log.Fatal("No incremental snapshot found") } diff --git a/internal/fetch/tracker.go b/internal/fetch/tracker.go index 5b372c7..dd13d91 100644 --- a/internal/fetch/tracker.go +++ b/internal/fetch/tracker.go @@ -21,6 +21,7 @@ import ( "net/http" "strconv" + "github.com/davecgh/go-spew/spew" "go.blockdaemon.com/solana/cluster-manager/types" "gopkg.in/resty.v1" ) @@ -61,6 +62,7 @@ func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, slot uint64) (sou SetQueryParam("slot", strconv.FormatUint(slot, 10)). SetResult(&sources). Get("/v1/snapshots") + spew.Dump(sources) if err != nil { return nil, err } diff --git a/internal/ledger/snapshot.go b/internal/ledger/snapshot.go index c0c0f0d..30d74e1 100644 --- a/internal/ledger/snapshot.go +++ b/internal/ledger/snapshot.go @@ -26,7 +26,7 @@ import ( "go.blockdaemon.com/solana/cluster-manager/types" ) -// ListSnapshotFiles returns all snapshot files in a ledger dir. +// ListSnapshotFiles returns all snapshot files in a ledger dir in sorted order. func ListSnapshotFiles(ledgerDir fs.FS) ([]*types.SnapshotFile, error) { dirEntries, err := fs.ReadDir(ledgerDir, ".") if err != nil { diff --git a/internal/tracker/tracker.go b/internal/tracker/tracker.go index d5b3042..e66f2e4 100644 --- a/internal/tracker/tracker.go +++ b/internal/tracker/tracker.go @@ -45,6 +45,18 @@ func (h *Handler) RegisterHandlers(group gin.IRoutes) { group.GET("/health", h.Health) } +func (h *Handler) createJson(c *gin.Context, entries []*index.SnapshotEntry) { + sources := make([]types.SnapshotSource, len(entries)) + for i, entry := range entries { + sources[i] = types.SnapshotSource{ + SnapshotInfo: *entry.Info, + Target: entry.Target, + UpdatedAt: entry.UpdatedAt, + } + } + c.JSON(http.StatusOK, sources) +} + func (h *Handler) GetSnapshots(c *gin.Context) { var query struct { Slot uint64 `form:"slot"` @@ -53,11 +65,14 @@ func (h *Handler) GetSnapshots(c *gin.Context) { return } - if query.Slot == 0 { - c.JSON(http.StatusOK, h.DB.GetAllSnapshots()) + var entries []*index.SnapshotEntry + if query.Slot == 0 { + entries = h.DB.GetAllSnapshots() } else { - c.JSON(http.StatusOK, h.DB.GetSnapshotsAtSlot(query.Slot)) + entries = h.DB.GetSnapshotsAtSlot(query.Slot) } + + h.createJson(c, entries) } // GetBestSnapshots returns the currently available best snapshots. @@ -73,15 +88,7 @@ func (h *Handler) GetBestSnapshots(c *gin.Context) { query.Max = maxItems } entries := h.DB.GetBestSnapshots(query.Max) - sources := make([]types.SnapshotSource, len(entries)) - for i, entry := range entries { - sources[i] = types.SnapshotSource{ - SnapshotInfo: *entry.Info, - Target: entry.Target, - UpdatedAt: entry.UpdatedAt, - } - } - c.JSON(http.StatusOK, sources) + h.createJson(c, entries) } func (h *Handler) Health(c *gin.Context) { From b51186ba82fbbd41c4c2b4f06a201239954524a6 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Tue, 4 Jun 2024 00:00:05 +0000 Subject: [PATCH 06/10] Added some further sanpshot fetching logic Add logic to avoid full snapshot downloading. --- README.md | 17 ++++++ internal/cmd/fetch/fetch.go | 107 +++++++++++++++++++++++------------- internal/fetch/fetch.go | 35 +++++++++++- internal/fetch/tracker.go | 2 - 4 files changed, 117 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 5b7cf04..4e5a5fd 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,23 @@ Flags: --tracker string URL to tracker API ``` +## Snapshot fetching logic + +### If full=true, incremental=true (default) + +If the local ledger directory does not have the same full snapshot as the remote one, then fetch the full snapshot otherwise skip full snapshot fetching. + +If the local ledger does not have the latest incremental snapshot, fetch the latest incremnetal. + +### If full=false, incremental=true + +Fetch the latest incremental snapshot for the latest full snapshot locally available. Warn if this local snapshot is not the latest but do not fetch. + +### If full=true, incremental=false + +Fetch the latest full snapshot, unless it is already available in the local ledger and has the same hash. + + ## Architecture ### Snapshot management diff --git a/internal/cmd/fetch/fetch.go b/internal/cmd/fetch/fetch.go index 5eb80b5..417e7a4 100644 --- a/internal/cmd/fetch/fetch.go +++ b/internal/cmd/fetch/fetch.go @@ -24,7 +24,6 @@ import ( "os/signal" "time" - "github.com/davecgh/go-spew/spew" "github.com/spf13/cobra" "github.com/vbauerster/mpb/v7" "github.com/vbauerster/mpb/v7/decor" @@ -34,6 +33,7 @@ import ( "go.blockdaemon.com/solana/cluster-manager/types" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/term" "gopkg.in/resty.v1" ) @@ -103,12 +103,6 @@ func run() { SetTimeout(requestTimeout), ) - // If we are not downloading a full snap and we don't have a specific snapshot defined - // we want to guess the snapshot slot to use as base, we will use the base slot - if baseSlot == 0 && !fullSnap { - baseSlot = localSnaps[0].BaseSlot - } - var remoteSnaps []types.SnapshotSource if baseSlot != 0 { @@ -143,6 +137,26 @@ func run() { log.Info("Existing snapshot is recent enough, no download needed", zap.Uint64("existing_slot", localSnaps[0].Slot)) return + case fetch.AdviceRemoteIsOlder: + log.Info("Remote snapshot is older than local, no download possible") + return + case fetch.AdviceFetchFull: + if !fullSnap { + // If we are not fetching a full snapshot and the base slot isn't matching + // we need to fetch an older incremental snapshot. + log.Info("Full snapshot is newer than local, but not requested") + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, localSnaps[0].BaseSlot) + if err != nil { + log.Fatal("Failed to request snapshot info", zap.Error(err)) + } + } + case fetch.AdviceFetchIncremental: + log.Info("Full snapshot already found. No need for downloading.") + fullSnap = false // No need to fetch full snap + if !incrementalSnap { + log.Info("Incremental snapshot is newer than local, but only full is requested.") + return + } case fetch.AdviceFetch: } } @@ -154,51 +168,66 @@ func run() { } snap := &remoteSnaps[0] - buf, _ := json.MarshalIndent(snap, "", "\t") - log.Info("Downloading a snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) // Setup progress bars for download. - bars := mpb.New() - sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetch.SidecarClientOpts{ - ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser { - bar := bars.New( - size, - mpb.BarStyle(), - mpb.PrependDecorators(decor.Name(name)), - mpb.AppendDecorators( - decor.AverageSpeed(decor.UnitKB, "% .1f"), - decor.Percentage(), - ), - ) - return bar.ProxyReader(rd) - }, - }) + var fetchOpts fetch.SidecarClientOpts + if term.IsTerminal(int(os.Stdout.Fd())) { + bars := mpb.New() + fetchOpts = fetch.SidecarClientOpts{ + ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser { + bar := bars.New( + size, + mpb.BarStyle(), + mpb.PrependDecorators(decor.Name(name)), + mpb.AppendDecorators( + decor.AverageSpeed(decor.UnitKB, "% .1f"), + decor.Percentage(), + ), + ) + return bar.ProxyReader(rd) + }, + } + } + + sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetchOpts) // First pass, if we're fetching fullSnap we want to fetch the fullSnaps but **not** the incremental snap // Then after completion of the full snap download, we refetch the incremental one so we get the latest one if fullSnap { + buf, _ := json.MarshalIndent(snap, "", "\t") + log.Info("Downloading full snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) + downloadSnapshot(ctx, sidecarClient, snap, true, false) - // If we were downloading a full snapshot, check if there's a newer incremental snapshot we can fetch - // Find latest incremental snapshot - log.Info("Finding incremental snapshot for full slot", zap.Uint64("base_slot", snap.BaseSlot)) - remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, snap.BaseSlot) - if err != nil { - log.Fatal("Failed to request snapshot info", zap.Error(err)) - } + if incrementalSnap { + + // If we were downloading a full snapshot, check if there's a newer incremental snapshot we can fetch + // Find latest incremental snapshot + log.Info("Finding incremental snapshot for full slot", zap.Uint64("base_slot", snap.BaseSlot)) + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, snap.BaseSlot) + if err != nil { + log.Fatal("Failed to request snapshot info", zap.Error(err)) + } + + if len(remoteSnaps) == 0 { + log.Fatal("No incremental snapshot found") + } - spew.Dump(remoteSnaps) - if len(remoteSnaps) == 0 { - log.Fatal("No incremental snapshot found") + snap = &remoteSnaps[0] + } else { + log.Info("Only full snapshot was requested, not fetching incremental snapshot") + return } + } - snap = &remoteSnaps[0] - buf, _ = json.MarshalIndent(snap, "", "\t") + if incrementalSnap { + // Download incremental snapshot + buf, _ := json.MarshalIndent(snap, "", "\t") log.Info("Downloading incremental snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) - } - // This will fetch the latest incremental snapshot (if fullSnap was specified it would already have been fetched and refreshed) - downloadSnapshot(ctx, sidecarClient, snap, false, true) + // This will fetch the latest incremental snapshot (if fullSnap was specified it would already have been fetched and refreshed) + downloadSnapshot(ctx, sidecarClient, snap, false, true) + } } func downloadSnapshot(ctx context.Context, sidecarClient *fetch.SidecarClient, snap *types.SnapshotSource, full bool, incremental bool) { diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go index a5049bc..929c2b8 100644 --- a/internal/fetch/fetch.go +++ b/internal/fetch/fetch.go @@ -34,8 +34,10 @@ func ShouldFetchSnapshot( // Compare local and remote slot numbers. remoteSlot := remote[0].Slot var localSlot uint64 + var localBaseSlot uint64 if len(local) > 0 { localSlot = local[0].Slot + localBaseSlot = local[0].BaseSlot } // Check if local is newer or remote is not new enough to be interesting. @@ -48,6 +50,30 @@ func ShouldFetchSnapshot( if maxAge < remoteSlot { minSlot = remoteSlot - maxAge } + + // Check if we have a full snapshot new enough + if localBaseSlot < remote[0].BaseSlot { + advice = AdviceFetchFull + return + } else if localBaseSlot == remote[0].BaseSlot { + for _, l := range local[0].Files { + for _, r := range remote[0].Files { + if l.BaseSlot == 0 && r.BaseSlot == 0 { + if l.Hash == r.Hash { + // If the full snapshot base slot is the same and the hash is the same + // we can skip fetching the full snapshot. + advice = AdviceFetchIncremental + return + } + } + } + } + } else { + advice = AdviceRemoteIsOlder + return + } + + // Our advice is to fetch both incremental and full snapshots. advice = AdviceFetch return } @@ -56,7 +82,10 @@ func ShouldFetchSnapshot( type Advice int const ( - AdviceFetch = Advice(iota) // download a snapshot - AdviceNothingFound // no snapshot available - AdviceUpToDate // local snapshot is up-to-date or newer, don't download + AdviceFetch = Advice(iota) // download a snapshot + AdviceFetchFull // download a full snapshot + AdviceFetchIncremental // download an incremental snapshot + AdviceRemoteIsOlder // remote snapshot is older than local + AdviceNothingFound // no snapshot available + AdviceUpToDate // local snapshot is up-to-date or newer, don't download ) diff --git a/internal/fetch/tracker.go b/internal/fetch/tracker.go index dd13d91..5b372c7 100644 --- a/internal/fetch/tracker.go +++ b/internal/fetch/tracker.go @@ -21,7 +21,6 @@ import ( "net/http" "strconv" - "github.com/davecgh/go-spew/spew" "go.blockdaemon.com/solana/cluster-manager/types" "gopkg.in/resty.v1" ) @@ -62,7 +61,6 @@ func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, slot uint64) (sou SetQueryParam("slot", strconv.FormatUint(slot, 10)). SetResult(&sources). Get("/v1/snapshots") - spew.Dump(sources) if err != nil { return nil, err } From 9733398f5494d8e9dadaa8b158e424a4fec7ad34 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Wed, 5 Jun 2024 00:29:25 +0000 Subject: [PATCH 07/10] Fix tests --- internal/fetch/fetch.go | 2 +- internal/fetch/fetch_test.go | 12 +++++++----- internal/fetch/sidecar.go | 2 +- internal/integrationtest/sidecar_test.go | 2 ++ internal/integrationtest/tracker_test.go | 6 +++++- internal/ledger/snapshot_test.go | 5 +++++ 6 files changed, 21 insertions(+), 8 deletions(-) diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go index 929c2b8..355f64f 100644 --- a/internal/fetch/fetch.go +++ b/internal/fetch/fetch.go @@ -55,7 +55,7 @@ func ShouldFetchSnapshot( if localBaseSlot < remote[0].BaseSlot { advice = AdviceFetchFull return - } else if localBaseSlot == remote[0].BaseSlot { + } else if len(local) > 0 && localBaseSlot == remote[0].BaseSlot { for _, l := range local[0].Files { for _, r := range remote[0].Files { if l.BaseSlot == 0 && r.BaseSlot == 0 { diff --git a/internal/fetch/fetch_test.go b/internal/fetch/fetch_test.go index 68777cf..5bbad5c 100644 --- a/internal/fetch/fetch_test.go +++ b/internal/fetch/fetch_test.go @@ -49,7 +49,7 @@ func TestShouldFetchSnapshot(t *testing.T) { minAge: 500, maxAge: 10000, minSlot: 113456, - advice: AdviceFetch, + advice: AdviceFetchFull, }, { name: "LowSlotNumber", @@ -58,7 +58,7 @@ func TestShouldFetchSnapshot(t *testing.T) { minAge: 50, maxAge: 10000, minSlot: 0, - advice: AdviceFetch, + advice: AdviceFetchFull, }, { name: "Refresh", @@ -67,7 +67,7 @@ func TestShouldFetchSnapshot(t *testing.T) { minAge: 500, maxAge: 10000, minSlot: 113456, - advice: AdviceFetch, + advice: AdviceFetchFull, }, { name: "NotNewEnough", @@ -107,7 +107,8 @@ func fakeSnapshotInfo(slots []uint64) []*types.SnapshotInfo { infos := make([]*types.SnapshotInfo, len(slots)) for i, slot := range slots { infos[i] = &types.SnapshotInfo{ - Slot: slot, + Slot: slot, + BaseSlot: slot, } } return infos @@ -118,7 +119,8 @@ func fakeSnapshotSources(slots []uint64) []types.SnapshotSource { for i, slot := range slots { infos[i] = types.SnapshotSource{ SnapshotInfo: types.SnapshotInfo{ - Slot: slot, + Slot: slot, + BaseSlot: slot, }, } } diff --git a/internal/fetch/sidecar.go b/internal/fetch/sidecar.go index b8d186c..fb9a505 100644 --- a/internal/fetch/sidecar.go +++ b/internal/fetch/sidecar.go @@ -90,7 +90,7 @@ func (c *SidecarClient) ListSnapshots(ctx context.Context) (infos []*types.Snaps // The returned response is guaranteed to have a valid ContentLength. // The caller has the responsibility to close the response body even if the error is not nil. func (c *SidecarClient) StreamSnapshot(ctx context.Context, name string) (res *http.Response, err error) { - snapURL := "http://" + c.resty.HostURL + "/v1/snapshot/" + url.PathEscape(name) // TODO: Don't hardcode scheme + snapURL := c.resty.HostURL + "/v1/snapshot/" + url.PathEscape(name) // TODO: Don't hardcode scheme c.log.Info("Downloading snapshot", zap.String("snapshot_url", snapURL)) req, err := http.NewRequestWithContext(ctx, http.MethodGet, snapURL, nil) if err != nil { diff --git a/internal/integrationtest/sidecar_test.go b/internal/integrationtest/sidecar_test.go index fc794cd..fbff1db 100644 --- a/internal/integrationtest/sidecar_test.go +++ b/internal/integrationtest/sidecar_test.go @@ -39,6 +39,7 @@ import ( func TestSidecar(t *testing.T) { server, root := newSidecar(t, 100) defer server.Close() + fmt.Println("Server url", server.URL) client := fetch.NewSidecarClientWithOpts(server.URL, fetch.SidecarClientOpts{Resty: resty.NewWithClient(server.Client())}) @@ -51,6 +52,7 @@ func TestSidecar(t *testing.T) { []*types.SnapshotInfo{ { Slot: 100, + BaseSlot: 100, Hash: solana.MustHashFromBase58("7jMmeXZSNcWPrB2RsTdeXfXrsyW5c1BfPjqoLW2X5T7V"), TotalSize: 1, Files: []*types.SnapshotFile{ diff --git a/internal/integrationtest/tracker_test.go b/internal/integrationtest/tracker_test.go index c4b0a60..1ba698e 100644 --- a/internal/integrationtest/tracker_test.go +++ b/internal/integrationtest/tracker_test.go @@ -106,6 +106,7 @@ func TestTracker(t *testing.T) { { SnapshotInfo: types.SnapshotInfo{ Slot: 103, + BaseSlot: 103, Hash: solana.MustHashFromBase58("7w4zb1jh47zY5FPMPyRzDSmYf1CPirVP9LmTr5xWEs6X"), Files: []*types.SnapshotFile{ { @@ -122,6 +123,7 @@ func TestTracker(t *testing.T) { { SnapshotInfo: types.SnapshotInfo{ Slot: 102, + BaseSlot: 102, Hash: solana.MustHashFromBase58("7sAawX1cAHVpfZGNtUAYKX2KPzdd1uPUZUTaLteWX4SB"), Files: []*types.SnapshotFile{ { @@ -138,6 +140,7 @@ func TestTracker(t *testing.T) { { SnapshotInfo: types.SnapshotInfo{ Slot: 101, + BaseSlot: 101, Hash: solana.MustHashFromBase58("7oGBJ2HXGT17Fs9QNxu6RbH68z4rJxHZyc9gqhLWoFmq"), Files: []*types.SnapshotFile{ { @@ -154,6 +157,7 @@ func TestTracker(t *testing.T) { { SnapshotInfo: types.SnapshotInfo{ Slot: 100, + BaseSlot: 100, Hash: solana.MustHashFromBase58("7jMmeXZSNcWPrB2RsTdeXfXrsyW5c1BfPjqoLW2X5T7V"), Files: []*types.SnapshotFile{ { @@ -172,7 +176,7 @@ func TestTracker(t *testing.T) { } func newTracker(db *index.DB) *httptest.Server { - handler := tracker.NewHandler(db) + handler := tracker.NewHandler(db, "http://localhost:8899", 1000) gin.SetMode(gin.ReleaseMode) engine := gin.New() handler.RegisterHandlers(engine.Group("/v1")) diff --git a/internal/ledger/snapshot_test.go b/internal/ledger/snapshot_test.go index 9890f53..1acaf02 100644 --- a/internal/ledger/snapshot_test.go +++ b/internal/ledger/snapshot_test.go @@ -51,6 +51,7 @@ func TestListSnapshots(t *testing.T) { []*types.SnapshotInfo{ { Slot: 300, + BaseSlot: 100, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 3, Files: []*types.SnapshotFile{ @@ -84,6 +85,7 @@ func TestListSnapshots(t *testing.T) { }, { Slot: 200, + BaseSlot: 100, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 2, Files: []*types.SnapshotFile{ @@ -108,6 +110,7 @@ func TestListSnapshots(t *testing.T) { }, { Slot: 100, + BaseSlot: 100, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 1, Files: []*types.SnapshotFile{ @@ -123,6 +126,7 @@ func TestListSnapshots(t *testing.T) { }, { Slot: 100, + BaseSlot: 50, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 2, Files: []*types.SnapshotFile{ @@ -147,6 +151,7 @@ func TestListSnapshots(t *testing.T) { }, { Slot: 50, + BaseSlot: 50, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 1, Files: []*types.SnapshotFile{ From 57fb6315cdb3ff2aecb935f00c123b9adb62971e Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Wed, 5 Jun 2024 00:35:22 +0000 Subject: [PATCH 08/10] Go fmt --- internal/cmd/tracker/tracker.go | 2 +- internal/integrationtest/tracker_test.go | 16 ++++++++-------- internal/tracker/tracker.go | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/cmd/tracker/tracker.go b/internal/cmd/tracker/tracker.go index 31a2753..c24d694 100644 --- a/internal/cmd/tracker/tracker.go +++ b/internal/cmd/tracker/tracker.go @@ -53,7 +53,7 @@ var ( configPath string internalListen string listen string - rpc string + rpc string maxSnapshotAge uint64 ) diff --git a/internal/integrationtest/tracker_test.go b/internal/integrationtest/tracker_test.go index 1ba698e..31554d4 100644 --- a/internal/integrationtest/tracker_test.go +++ b/internal/integrationtest/tracker_test.go @@ -105,9 +105,9 @@ func TestTracker(t *testing.T) { []types.SnapshotSource{ { SnapshotInfo: types.SnapshotInfo{ - Slot: 103, + Slot: 103, BaseSlot: 103, - Hash: solana.MustHashFromBase58("7w4zb1jh47zY5FPMPyRzDSmYf1CPirVP9LmTr5xWEs6X"), + Hash: solana.MustHashFromBase58("7w4zb1jh47zY5FPMPyRzDSmYf1CPirVP9LmTr5xWEs6X"), Files: []*types.SnapshotFile{ { FileName: "snapshot-103-7w4zb1jh47zY5FPMPyRzDSmYf1CPirVP9LmTr5xWEs6X.tar.bz2", @@ -122,9 +122,9 @@ func TestTracker(t *testing.T) { }, { SnapshotInfo: types.SnapshotInfo{ - Slot: 102, + Slot: 102, BaseSlot: 102, - Hash: solana.MustHashFromBase58("7sAawX1cAHVpfZGNtUAYKX2KPzdd1uPUZUTaLteWX4SB"), + Hash: solana.MustHashFromBase58("7sAawX1cAHVpfZGNtUAYKX2KPzdd1uPUZUTaLteWX4SB"), Files: []*types.SnapshotFile{ { FileName: "snapshot-102-7sAawX1cAHVpfZGNtUAYKX2KPzdd1uPUZUTaLteWX4SB.tar.bz2", @@ -139,9 +139,9 @@ func TestTracker(t *testing.T) { }, { SnapshotInfo: types.SnapshotInfo{ - Slot: 101, + Slot: 101, BaseSlot: 101, - Hash: solana.MustHashFromBase58("7oGBJ2HXGT17Fs9QNxu6RbH68z4rJxHZyc9gqhLWoFmq"), + Hash: solana.MustHashFromBase58("7oGBJ2HXGT17Fs9QNxu6RbH68z4rJxHZyc9gqhLWoFmq"), Files: []*types.SnapshotFile{ { FileName: "snapshot-101-7oGBJ2HXGT17Fs9QNxu6RbH68z4rJxHZyc9gqhLWoFmq.tar.bz2", @@ -156,9 +156,9 @@ func TestTracker(t *testing.T) { }, { SnapshotInfo: types.SnapshotInfo{ - Slot: 100, + Slot: 100, BaseSlot: 100, - Hash: solana.MustHashFromBase58("7jMmeXZSNcWPrB2RsTdeXfXrsyW5c1BfPjqoLW2X5T7V"), + Hash: solana.MustHashFromBase58("7jMmeXZSNcWPrB2RsTdeXfXrsyW5c1BfPjqoLW2X5T7V"), Files: []*types.SnapshotFile{ { FileName: "snapshot-100-7jMmeXZSNcWPrB2RsTdeXfXrsyW5c1BfPjqoLW2X5T7V.tar.bz2", diff --git a/internal/tracker/tracker.go b/internal/tracker/tracker.go index e66f2e4..8c115d8 100644 --- a/internal/tracker/tracker.go +++ b/internal/tracker/tracker.go @@ -66,7 +66,7 @@ func (h *Handler) GetSnapshots(c *gin.Context) { } var entries []*index.SnapshotEntry - if query.Slot == 0 { + if query.Slot == 0 { entries = h.DB.GetAllSnapshots() } else { entries = h.DB.GetSnapshotsAtSlot(query.Slot) From 57030d297da93185766cef0f5634cb34b682ffae Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Wed, 5 Jun 2024 00:41:19 +0000 Subject: [PATCH 09/10] remove lint for now --- .github/workflows/golangci_lint.yml | 25 ------------------------- 1 file changed, 25 deletions(-) delete mode 100644 .github/workflows/golangci_lint.yml diff --git a/.github/workflows/golangci_lint.yml b/.github/workflows/golangci_lint.yml deleted file mode 100644 index 65ffffd..0000000 --- a/.github/workflows/golangci_lint.yml +++ /dev/null @@ -1,25 +0,0 @@ ---- -name: golangci-lint -on: - push: - tags: - - v* - branches: - - main - pull_request: -permissions: - contents: read -jobs: - golangci: - name: lint - runs-on: ubuntu-latest - steps: - - uses: actions/setup-go@v3 - - uses: actions/checkout@v3 - with: - go-version: 1.18.x - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - version: v1.45.2 - only-new-issues: true From 5eb9fa3084f9f89ae72f1451d41604cd3488ce7e Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 6 Jun 2024 12:27:21 +0000 Subject: [PATCH 10/10] Add scheme to target urls --- internal/scraper/scraper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/scraper/scraper.go b/internal/scraper/scraper.go index b6edbd9..a0009c3 100644 --- a/internal/scraper/scraper.go +++ b/internal/scraper/scraper.go @@ -96,7 +96,7 @@ func (s *Scraper) scrape(ctx context.Context, results chan<- ProbeResult) { infos, err := s.prober.Probe(ctx, target) results <- ProbeResult{ Time: time.Now(), - Target: target, + Target: s.prober.scheme + "://" + target, Infos: infos, Err: err, }