diff --git a/.github/workflows/golangci_lint.yml b/.github/workflows/golangci_lint.yml deleted file mode 100644 index 65ffffd..0000000 --- a/.github/workflows/golangci_lint.yml +++ /dev/null @@ -1,25 +0,0 @@ ---- -name: golangci-lint -on: - push: - tags: - - v* - branches: - - main - pull_request: -permissions: - contents: read -jobs: - golangci: - name: lint - runs-on: ubuntu-latest - steps: - - uses: actions/setup-go@v3 - - uses: actions/checkout@v3 - with: - go-version: 1.18.x - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - version: v1.45.2 - only-new-issues: true diff --git a/README.md b/README.md index 5b7cf04..4e5a5fd 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,23 @@ Flags: --tracker string URL to tracker API ``` +## Snapshot fetching logic + +### If full=true, incremental=true (default) + +If the local ledger directory does not have the same full snapshot as the remote one, then fetch the full snapshot otherwise skip full snapshot fetching. + +If the local ledger does not have the latest incremental snapshot, fetch the latest incremnetal. + +### If full=false, incremental=true + +Fetch the latest incremental snapshot for the latest full snapshot locally available. Warn if this local snapshot is not the latest but do not fetch. + +### If full=true, incremental=false + +Fetch the latest full snapshot, unless it is already available in the local ledger and has the same hash. + + ## Architecture ### Snapshot management diff --git a/internal/cmd/fetch/fetch.go b/internal/cmd/fetch/fetch.go index 49a82b8..417e7a4 100644 --- a/internal/cmd/fetch/fetch.go +++ b/internal/cmd/fetch/fetch.go @@ -22,7 +22,6 @@ import ( "net/http" "os" "os/signal" - "sort" "time" "github.com/spf13/cobra" @@ -31,8 +30,10 @@ import ( "go.blockdaemon.com/solana/cluster-manager/internal/fetch" "go.blockdaemon.com/solana/cluster-manager/internal/ledger" "go.blockdaemon.com/solana/cluster-manager/internal/logger" + "go.blockdaemon.com/solana/cluster-manager/types" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/term" "gopkg.in/resty.v1" ) @@ -50,9 +51,11 @@ var ( trackerURL string minSnapAge uint64 maxSnapAge uint64 + baseSlot uint64 + fullSnap bool + incrementalSnap bool requestTimeout time.Duration downloadTimeout time.Duration - parallelDownload bool ) func init() { @@ -63,12 +66,18 @@ func init() { flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download slots older than the newest") flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)") flags.DurationVar(&downloadTimeout, "download-timeout", 10*time.Minute, "Max time to try downloading in total") - flags.BoolVar(¶llelDownload, "parallel-download", false, "Download snapshot files in parallel or serially") + flags.Uint64Var(&baseSlot, "slot", 0, "Download snapshot for given slot (if available)") + flags.BoolVar(&fullSnap, "full", true, "Download full snapshot (if available)") + flags.BoolVar(&incrementalSnap, "incremental", true, "Download incremental snapshot (if available)") } func run() { log := logger.GetLogger() + if !fullSnap && !incrementalSnap { + log.Fatal("Must specify at least one of --full or --incremental") + } + // Regardless which API we talk to, we want to cap time from request to response header. // This defends against black holes and really slow servers. // Download time (reading response body) is not affected. @@ -87,101 +96,172 @@ func run() { log.Fatal("Failed to check existing snapshots", zap.Error(err)) } - // Ask tracker for best snapshots. + // Get a specific snapshot or "best snapshot" from tracker client trackerClient := fetch.NewTrackerClientWithResty( resty.New(). SetHostURL(trackerURL). SetTimeout(requestTimeout), ) - remoteSnaps, err := trackerClient.GetBestSnapshots(ctx, -1) - if err != nil { - log.Fatal("Failed to request snapshot info", zap.Error(err)) - } - // Decide what we want to do. - _, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge) - switch advice { - case fetch.AdviceNothingFound: - log.Error("No snapshots available remotely") - return - case fetch.AdviceUpToDate: - log.Info("Existing snapshot is recent enough, no download needed", - zap.Uint64("existing_slot", localSnaps[0].Slot)) - return - case fetch.AdviceFetch: + var remoteSnaps []types.SnapshotSource + + if baseSlot != 0 { + log.Info("Fetching snapshots at slot", zap.Uint64("base_slot", baseSlot)) + + // Ask tracker for snapshots at a specific location + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, baseSlot) + if err != nil { + log.Fatal("Failed to fetch snapshot info", zap.Error(err)) + } + + // @TODO check if this snapshot already exists + buf, _ := json.MarshalIndent(remoteSnaps, "", "\t") + log.Info("Downloading a snapshot", zap.ByteString("snap", buf)) + + } else { + log.Info("Finding best snapshot") + + // Ask tracker for best snapshots. + remoteSnaps, err = trackerClient.GetBestSnapshots(ctx, -1) + if err != nil { + log.Fatal("Failed to request snapshot info", zap.Error(err)) + } + + // Decide what we want to do. + _, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge) + switch advice { + case fetch.AdviceNothingFound: + log.Error("No snapshots available remotely") + return + case fetch.AdviceUpToDate: + log.Info("Existing snapshot is recent enough, no download needed", + zap.Uint64("existing_slot", localSnaps[0].Slot)) + return + case fetch.AdviceRemoteIsOlder: + log.Info("Remote snapshot is older than local, no download possible") + return + case fetch.AdviceFetchFull: + if !fullSnap { + // If we are not fetching a full snapshot and the base slot isn't matching + // we need to fetch an older incremental snapshot. + log.Info("Full snapshot is newer than local, but not requested") + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, localSnaps[0].BaseSlot) + if err != nil { + log.Fatal("Failed to request snapshot info", zap.Error(err)) + } + } + case fetch.AdviceFetchIncremental: + log.Info("Full snapshot already found. No need for downloading.") + fullSnap = false // No need to fetch full snap + if !incrementalSnap { + log.Info("Incremental snapshot is newer than local, but only full is requested.") + return + } + case fetch.AdviceFetch: + } } // Print snapshot to user. + log.Info("Number of remote snaps found: ", zap.Int("num", len(remoteSnaps))) + if len(remoteSnaps) == 0 { + log.Fatal("Could not find any matching snapshots. Bailing.") + } + snap := &remoteSnaps[0] - buf, _ := json.MarshalIndent(snap, "", "\t") - log.Info("Downloading a snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) // Setup progress bars for download. - bars := mpb.New() - sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetch.SidecarClientOpts{ - ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser { - bar := bars.New( - size, - mpb.BarStyle(), - mpb.PrependDecorators(decor.Name(name)), - mpb.AppendDecorators( - decor.AverageSpeed(decor.UnitKB, "% .1f"), - decor.Percentage(), - ), - ) - return bar.ProxyReader(rd) - }, - }) - - // Download. + var fetchOpts fetch.SidecarClientOpts + if term.IsTerminal(int(os.Stdout.Fd())) { + bars := mpb.New() + fetchOpts = fetch.SidecarClientOpts{ + ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser { + bar := bars.New( + size, + mpb.BarStyle(), + mpb.PrependDecorators(decor.Name(name)), + mpb.AppendDecorators( + decor.AverageSpeed(decor.UnitKB, "% .1f"), + decor.Percentage(), + ), + ) + return bar.ProxyReader(rd) + }, + } + } + + sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetchOpts) + + // First pass, if we're fetching fullSnap we want to fetch the fullSnaps but **not** the incremental snap + // Then after completion of the full snap download, we refetch the incremental one so we get the latest one + if fullSnap { + buf, _ := json.MarshalIndent(snap, "", "\t") + log.Info("Downloading full snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) + + downloadSnapshot(ctx, sidecarClient, snap, true, false) + + if incrementalSnap { + + // If we were downloading a full snapshot, check if there's a newer incremental snapshot we can fetch + // Find latest incremental snapshot + log.Info("Finding incremental snapshot for full slot", zap.Uint64("base_slot", snap.BaseSlot)) + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, snap.BaseSlot) + if err != nil { + log.Fatal("Failed to request snapshot info", zap.Error(err)) + } + + if len(remoteSnaps) == 0 { + log.Fatal("No incremental snapshot found") + } + + snap = &remoteSnaps[0] + } else { + log.Info("Only full snapshot was requested, not fetching incremental snapshot") + return + } + } + + if incrementalSnap { + // Download incremental snapshot + buf, _ := json.MarshalIndent(snap, "", "\t") + log.Info("Downloading incremental snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target)) + + // This will fetch the latest incremental snapshot (if fullSnap was specified it would already have been fetched and refreshed) + downloadSnapshot(ctx, sidecarClient, snap, false, true) + } +} + +func downloadSnapshot(ctx context.Context, sidecarClient *fetch.SidecarClient, snap *types.SnapshotSource, full bool, incremental bool) { + log := logger.GetLogger() + + // Download the snapshot files beforeDownload := time.Now() - var downloadErr error - - if parallelDownload { - group, ctx := errgroup.WithContext(ctx) - - for _, file := range snap.Files { - file_ := file - if parallelDownload { - group.Go(func() error { - err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName) - if err != nil { - log.Error("Download failed", - zap.String("snapshot", file_.FileName), - zap.String("error", err.Error())) - } - return err - }) - } - - } - - downloadErr = group.Wait() - } else { - // Download the largest files first - sort.Slice(snap.Files, func(i, j int) bool { - return snap.Files[i].Size > snap.Files[j].Size - }) - - for _, file := range snap.Files { - file_ := file - - err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName) - if err != nil { - log.Error("Download failed", - zap.String("snapshot", file_.FileName), - zap.String("error", err.Error())) - downloadErr = err - break // @TODO should we still try to download the other stuff? - } - } - } - - downloadDuration := time.Since(beforeDownload) + group, ctx := errgroup.WithContext(ctx) + for _, file := range snap.Files { + if file.BaseSlot != 0 && !incremental { + continue + } + if file.BaseSlot == 0 && !full { + continue + } + + file_ := file + group.Go(func() error { + err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName) + if err != nil { + log.Error("Full snapshot download failed", + zap.String("snapshot", file_.FileName), + zap.String("error", err.Error())) + } + return err + }) + } + downloadErr := group.Wait() + downloadDuration := time.Since(beforeDownload) if downloadErr == nil { - log.Info("Download completed", zap.Duration("download_time", downloadDuration)) + log.Info("Snapshot download completed", zap.Duration("download_time", downloadDuration)) } else { log.Fatal("Aborting download", zap.Duration("download_time", downloadDuration)) } + } diff --git a/internal/cmd/tracker/tracker.go b/internal/cmd/tracker/tracker.go index 31a2753..c24d694 100644 --- a/internal/cmd/tracker/tracker.go +++ b/internal/cmd/tracker/tracker.go @@ -53,7 +53,7 @@ var ( configPath string internalListen string listen string - rpc string + rpc string maxSnapshotAge uint64 ) diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go index a5049bc..355f64f 100644 --- a/internal/fetch/fetch.go +++ b/internal/fetch/fetch.go @@ -34,8 +34,10 @@ func ShouldFetchSnapshot( // Compare local and remote slot numbers. remoteSlot := remote[0].Slot var localSlot uint64 + var localBaseSlot uint64 if len(local) > 0 { localSlot = local[0].Slot + localBaseSlot = local[0].BaseSlot } // Check if local is newer or remote is not new enough to be interesting. @@ -48,6 +50,30 @@ func ShouldFetchSnapshot( if maxAge < remoteSlot { minSlot = remoteSlot - maxAge } + + // Check if we have a full snapshot new enough + if localBaseSlot < remote[0].BaseSlot { + advice = AdviceFetchFull + return + } else if len(local) > 0 && localBaseSlot == remote[0].BaseSlot { + for _, l := range local[0].Files { + for _, r := range remote[0].Files { + if l.BaseSlot == 0 && r.BaseSlot == 0 { + if l.Hash == r.Hash { + // If the full snapshot base slot is the same and the hash is the same + // we can skip fetching the full snapshot. + advice = AdviceFetchIncremental + return + } + } + } + } + } else { + advice = AdviceRemoteIsOlder + return + } + + // Our advice is to fetch both incremental and full snapshots. advice = AdviceFetch return } @@ -56,7 +82,10 @@ func ShouldFetchSnapshot( type Advice int const ( - AdviceFetch = Advice(iota) // download a snapshot - AdviceNothingFound // no snapshot available - AdviceUpToDate // local snapshot is up-to-date or newer, don't download + AdviceFetch = Advice(iota) // download a snapshot + AdviceFetchFull // download a full snapshot + AdviceFetchIncremental // download an incremental snapshot + AdviceRemoteIsOlder // remote snapshot is older than local + AdviceNothingFound // no snapshot available + AdviceUpToDate // local snapshot is up-to-date or newer, don't download ) diff --git a/internal/fetch/fetch_test.go b/internal/fetch/fetch_test.go index 68777cf..5bbad5c 100644 --- a/internal/fetch/fetch_test.go +++ b/internal/fetch/fetch_test.go @@ -49,7 +49,7 @@ func TestShouldFetchSnapshot(t *testing.T) { minAge: 500, maxAge: 10000, minSlot: 113456, - advice: AdviceFetch, + advice: AdviceFetchFull, }, { name: "LowSlotNumber", @@ -58,7 +58,7 @@ func TestShouldFetchSnapshot(t *testing.T) { minAge: 50, maxAge: 10000, minSlot: 0, - advice: AdviceFetch, + advice: AdviceFetchFull, }, { name: "Refresh", @@ -67,7 +67,7 @@ func TestShouldFetchSnapshot(t *testing.T) { minAge: 500, maxAge: 10000, minSlot: 113456, - advice: AdviceFetch, + advice: AdviceFetchFull, }, { name: "NotNewEnough", @@ -107,7 +107,8 @@ func fakeSnapshotInfo(slots []uint64) []*types.SnapshotInfo { infos := make([]*types.SnapshotInfo, len(slots)) for i, slot := range slots { infos[i] = &types.SnapshotInfo{ - Slot: slot, + Slot: slot, + BaseSlot: slot, } } return infos @@ -118,7 +119,8 @@ func fakeSnapshotSources(slots []uint64) []types.SnapshotSource { for i, slot := range slots { infos[i] = types.SnapshotSource{ SnapshotInfo: types.SnapshotInfo{ - Slot: slot, + Slot: slot, + BaseSlot: slot, }, } } diff --git a/internal/fetch/sidecar.go b/internal/fetch/sidecar.go index b8d186c..fb9a505 100644 --- a/internal/fetch/sidecar.go +++ b/internal/fetch/sidecar.go @@ -90,7 +90,7 @@ func (c *SidecarClient) ListSnapshots(ctx context.Context) (infos []*types.Snaps // The returned response is guaranteed to have a valid ContentLength. // The caller has the responsibility to close the response body even if the error is not nil. func (c *SidecarClient) StreamSnapshot(ctx context.Context, name string) (res *http.Response, err error) { - snapURL := "http://" + c.resty.HostURL + "/v1/snapshot/" + url.PathEscape(name) // TODO: Don't hardcode scheme + snapURL := c.resty.HostURL + "/v1/snapshot/" + url.PathEscape(name) // TODO: Don't hardcode scheme c.log.Info("Downloading snapshot", zap.String("snapshot_url", snapURL)) req, err := http.NewRequestWithContext(ctx, http.MethodGet, snapURL, nil) if err != nil { diff --git a/internal/fetch/tracker.go b/internal/fetch/tracker.go index c70bbb7..5b372c7 100644 --- a/internal/fetch/tracker.go +++ b/internal/fetch/tracker.go @@ -53,3 +53,19 @@ func (c *TrackerClient) GetBestSnapshots(ctx context.Context, count int) (source } return } + +func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, slot uint64) (sources []types.SnapshotSource, err error) { + res, err := c.resty.R(). + SetContext(ctx). + SetHeader("accept", "application/json"). + SetQueryParam("slot", strconv.FormatUint(slot, 10)). + SetResult(&sources). + Get("/v1/snapshots") + if err != nil { + return nil, err + } + if res.StatusCode() != http.StatusOK { + return nil, fmt.Errorf("get snapshots at slot %d: %s", slot, res.Status()) + } + return +} diff --git a/internal/index/index.go b/internal/index/index.go index df00c6d..81fa337 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -101,6 +101,19 @@ func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) { return } +// Fetches the snapshots that are at a given slot. +func (d *DB) GetSnapshotsAtSlot(slot uint64) (entries []*SnapshotEntry) { + res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot", slot) + if err != nil { + panic("getting best snapshots failed: " + err.Error()) + } + + for entry := res.Next(); entry != nil; entry = res.Next() { + entries = append(entries, entry.(*SnapshotEntry)) + } + return +} + // DeleteOldSnapshots delete snapshot entry older than the given timestamp. func (d *DB) DeleteOldSnapshots(minTime time.Time) (n int) { txn := d.DB.Txn(true) diff --git a/internal/index/index_test.go b/internal/index/index_test.go index b6ff9de..d676d29 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -26,7 +26,7 @@ import ( var dummyTime1 = time.Date(2022, 4, 27, 15, 33, 20, 0, time.UTC) var snapshotEntry1 = &SnapshotEntry{ - SnapshotKey: NewSnapshotKey("host1", 100), + SnapshotKey: NewSnapshotKey("host1", 100, 100), UpdatedAt: dummyTime1, Info: &types.SnapshotInfo{ Slot: 100, @@ -37,7 +37,7 @@ var snapshotEntry1 = &SnapshotEntry{ } var snapshotEntry2 = &SnapshotEntry{ - SnapshotKey: NewSnapshotKey("host1", 99), + SnapshotKey: NewSnapshotKey("host1", 99, 99), UpdatedAt: dummyTime1.Add(-20 * time.Second), Info: &types.SnapshotInfo{ Slot: 99, @@ -48,7 +48,7 @@ var snapshotEntry2 = &SnapshotEntry{ } var snapshotEntry3 = &SnapshotEntry{ - SnapshotKey: NewSnapshotKey("host2", 100), + SnapshotKey: NewSnapshotKey("host2", 100, 100), UpdatedAt: dummyTime1, Info: &types.SnapshotInfo{ Slot: 100, diff --git a/internal/index/schema.go b/internal/index/schema.go index b28245f..fbfde7b 100644 --- a/internal/index/schema.go +++ b/internal/index/schema.go @@ -39,6 +39,11 @@ var schema = memdb.DBSchema{ Unique: false, Indexer: &memdb.UintFieldIndex{Field: "InverseSlot"}, }, + "base_slot": { + Name: "base_slot", + Unique: false, + Indexer: &memdb.UintFieldIndex{Field: "BaseSlot"}, + }, }, }, }, diff --git a/internal/index/types.go b/internal/index/types.go index 4c822c9..9819cea 100644 --- a/internal/index/types.go +++ b/internal/index/types.go @@ -29,12 +29,14 @@ type SnapshotEntry struct { type SnapshotKey struct { Target string `json:"target"` InverseSlot uint64 `json:"inverse_slot"` // newest-to-oldest sort + BaseSlot uint64 `json:"base_slot"` } -func NewSnapshotKey(target string, slot uint64) SnapshotKey { +func NewSnapshotKey(target string, slot uint64, base_slot uint64) SnapshotKey { return SnapshotKey{ Target: target, InverseSlot: ^slot, + BaseSlot: base_slot, } } diff --git a/internal/integrationtest/sidecar_test.go b/internal/integrationtest/sidecar_test.go index fc794cd..fbff1db 100644 --- a/internal/integrationtest/sidecar_test.go +++ b/internal/integrationtest/sidecar_test.go @@ -39,6 +39,7 @@ import ( func TestSidecar(t *testing.T) { server, root := newSidecar(t, 100) defer server.Close() + fmt.Println("Server url", server.URL) client := fetch.NewSidecarClientWithOpts(server.URL, fetch.SidecarClientOpts{Resty: resty.NewWithClient(server.Client())}) @@ -51,6 +52,7 @@ func TestSidecar(t *testing.T) { []*types.SnapshotInfo{ { Slot: 100, + BaseSlot: 100, Hash: solana.MustHashFromBase58("7jMmeXZSNcWPrB2RsTdeXfXrsyW5c1BfPjqoLW2X5T7V"), TotalSize: 1, Files: []*types.SnapshotFile{ diff --git a/internal/integrationtest/tracker_test.go b/internal/integrationtest/tracker_test.go index c4b0a60..31554d4 100644 --- a/internal/integrationtest/tracker_test.go +++ b/internal/integrationtest/tracker_test.go @@ -105,8 +105,9 @@ func TestTracker(t *testing.T) { []types.SnapshotSource{ { SnapshotInfo: types.SnapshotInfo{ - Slot: 103, - Hash: solana.MustHashFromBase58("7w4zb1jh47zY5FPMPyRzDSmYf1CPirVP9LmTr5xWEs6X"), + Slot: 103, + BaseSlot: 103, + Hash: solana.MustHashFromBase58("7w4zb1jh47zY5FPMPyRzDSmYf1CPirVP9LmTr5xWEs6X"), Files: []*types.SnapshotFile{ { FileName: "snapshot-103-7w4zb1jh47zY5FPMPyRzDSmYf1CPirVP9LmTr5xWEs6X.tar.bz2", @@ -121,8 +122,9 @@ func TestTracker(t *testing.T) { }, { SnapshotInfo: types.SnapshotInfo{ - Slot: 102, - Hash: solana.MustHashFromBase58("7sAawX1cAHVpfZGNtUAYKX2KPzdd1uPUZUTaLteWX4SB"), + Slot: 102, + BaseSlot: 102, + Hash: solana.MustHashFromBase58("7sAawX1cAHVpfZGNtUAYKX2KPzdd1uPUZUTaLteWX4SB"), Files: []*types.SnapshotFile{ { FileName: "snapshot-102-7sAawX1cAHVpfZGNtUAYKX2KPzdd1uPUZUTaLteWX4SB.tar.bz2", @@ -137,8 +139,9 @@ func TestTracker(t *testing.T) { }, { SnapshotInfo: types.SnapshotInfo{ - Slot: 101, - Hash: solana.MustHashFromBase58("7oGBJ2HXGT17Fs9QNxu6RbH68z4rJxHZyc9gqhLWoFmq"), + Slot: 101, + BaseSlot: 101, + Hash: solana.MustHashFromBase58("7oGBJ2HXGT17Fs9QNxu6RbH68z4rJxHZyc9gqhLWoFmq"), Files: []*types.SnapshotFile{ { FileName: "snapshot-101-7oGBJ2HXGT17Fs9QNxu6RbH68z4rJxHZyc9gqhLWoFmq.tar.bz2", @@ -153,8 +156,9 @@ func TestTracker(t *testing.T) { }, { SnapshotInfo: types.SnapshotInfo{ - Slot: 100, - Hash: solana.MustHashFromBase58("7jMmeXZSNcWPrB2RsTdeXfXrsyW5c1BfPjqoLW2X5T7V"), + Slot: 100, + BaseSlot: 100, + Hash: solana.MustHashFromBase58("7jMmeXZSNcWPrB2RsTdeXfXrsyW5c1BfPjqoLW2X5T7V"), Files: []*types.SnapshotFile{ { FileName: "snapshot-100-7jMmeXZSNcWPrB2RsTdeXfXrsyW5c1BfPjqoLW2X5T7V.tar.bz2", @@ -172,7 +176,7 @@ func TestTracker(t *testing.T) { } func newTracker(db *index.DB) *httptest.Server { - handler := tracker.NewHandler(db) + handler := tracker.NewHandler(db, "http://localhost:8899", 1000) gin.SetMode(gin.ReleaseMode) engine := gin.New() handler.RegisterHandlers(engine.Group("/v1")) diff --git a/internal/ledger/snapshot.go b/internal/ledger/snapshot.go index 46a1528..30d74e1 100644 --- a/internal/ledger/snapshot.go +++ b/internal/ledger/snapshot.go @@ -26,7 +26,7 @@ import ( "go.blockdaemon.com/solana/cluster-manager/types" ) -// ListSnapshotFiles returns all snapshot files in a ledger dir. +// ListSnapshotFiles returns all snapshot files in a ledger dir in sorted order. func ListSnapshotFiles(ledgerDir fs.FS) ([]*types.SnapshotFile, error) { dirEntries, err := fs.ReadDir(ledgerDir, ".") if err != nil { @@ -94,6 +94,7 @@ func buildSnapshotInfo(files []*types.SnapshotFile, target *types.SnapshotFile) } return &types.SnapshotInfo{ Slot: target.Slot, + BaseSlot: chain[len(chain)-1].Slot, Hash: target.Hash, Files: chain, TotalSize: totalSize, diff --git a/internal/ledger/snapshot_test.go b/internal/ledger/snapshot_test.go index 9890f53..1acaf02 100644 --- a/internal/ledger/snapshot_test.go +++ b/internal/ledger/snapshot_test.go @@ -51,6 +51,7 @@ func TestListSnapshots(t *testing.T) { []*types.SnapshotInfo{ { Slot: 300, + BaseSlot: 100, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 3, Files: []*types.SnapshotFile{ @@ -84,6 +85,7 @@ func TestListSnapshots(t *testing.T) { }, { Slot: 200, + BaseSlot: 100, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 2, Files: []*types.SnapshotFile{ @@ -108,6 +110,7 @@ func TestListSnapshots(t *testing.T) { }, { Slot: 100, + BaseSlot: 100, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 1, Files: []*types.SnapshotFile{ @@ -123,6 +126,7 @@ func TestListSnapshots(t *testing.T) { }, { Slot: 100, + BaseSlot: 50, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 2, Files: []*types.SnapshotFile{ @@ -147,6 +151,7 @@ func TestListSnapshots(t *testing.T) { }, { Slot: 50, + BaseSlot: 50, Hash: solana.MustHashFromBase58("AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr"), TotalSize: 1, Files: []*types.SnapshotFile{ diff --git a/internal/scraper/collector.go b/internal/scraper/collector.go index c307136..2f827b9 100644 --- a/internal/scraper/collector.go +++ b/internal/scraper/collector.go @@ -72,7 +72,7 @@ func (c *Collector) run() { entries := make([]*index.SnapshotEntry, len(res.Infos)) for i, info := range res.Infos { entries[i] = &index.SnapshotEntry{ - SnapshotKey: index.NewSnapshotKey(res.Target, info.Slot), + SnapshotKey: index.NewSnapshotKey(res.Target, info.Slot, info.BaseSlot), Info: info, UpdatedAt: res.Time, } diff --git a/internal/scraper/scraper.go b/internal/scraper/scraper.go index b6edbd9..a0009c3 100644 --- a/internal/scraper/scraper.go +++ b/internal/scraper/scraper.go @@ -96,7 +96,7 @@ func (s *Scraper) scrape(ctx context.Context, results chan<- ProbeResult) { infos, err := s.prober.Probe(ctx, target) results <- ProbeResult{ Time: time.Now(), - Target: target, + Target: s.prober.scheme + "://" + target, Infos: infos, Err: err, } diff --git a/internal/tracker/tracker.go b/internal/tracker/tracker.go index 672b81f..8c115d8 100644 --- a/internal/tracker/tracker.go +++ b/internal/tracker/tracker.go @@ -45,8 +45,34 @@ func (h *Handler) RegisterHandlers(group gin.IRoutes) { group.GET("/health", h.Health) } +func (h *Handler) createJson(c *gin.Context, entries []*index.SnapshotEntry) { + sources := make([]types.SnapshotSource, len(entries)) + for i, entry := range entries { + sources[i] = types.SnapshotSource{ + SnapshotInfo: *entry.Info, + Target: entry.Target, + UpdatedAt: entry.UpdatedAt, + } + } + c.JSON(http.StatusOK, sources) +} + func (h *Handler) GetSnapshots(c *gin.Context) { - c.JSON(http.StatusOK, h.DB.GetAllSnapshots()) + var query struct { + Slot uint64 `form:"slot"` + } + if err := c.BindQuery(&query); err != nil { + return + } + + var entries []*index.SnapshotEntry + if query.Slot == 0 { + entries = h.DB.GetAllSnapshots() + } else { + entries = h.DB.GetSnapshotsAtSlot(query.Slot) + } + + h.createJson(c, entries) } // GetBestSnapshots returns the currently available best snapshots. @@ -58,19 +84,11 @@ func (h *Handler) GetBestSnapshots(c *gin.Context) { return } const maxItems = 25 - if query.Max < 0 || query.Max > 25 { + if query.Max < 0 || query.Max > maxItems { query.Max = maxItems } entries := h.DB.GetBestSnapshots(query.Max) - sources := make([]types.SnapshotSource, len(entries)) - for i, entry := range entries { - sources[i] = types.SnapshotSource{ - SnapshotInfo: *entry.Info, - Target: entry.Target, - UpdatedAt: entry.UpdatedAt, - } - } - c.JSON(http.StatusOK, sources) + h.createJson(c, entries) } func (h *Handler) Health(c *gin.Context) { diff --git a/types/snapshot.go b/types/snapshot.go index e10d039..4f1e469 100644 --- a/types/snapshot.go +++ b/types/snapshot.go @@ -31,6 +31,7 @@ type SnapshotSource struct { // SnapshotInfo describes a snapshot. type SnapshotInfo struct { Slot uint64 `json:"slot"` + BaseSlot uint64 `json:"base_slot"` Hash solana.Hash `json:"hash"` Files []*SnapshotFile `json:"files"` TotalSize uint64 `json:"size"`