Skip to content

Commit

Permalink
Finalized HTTP API for requesting snapshots by group
Browse files Browse the repository at this point in the history
  • Loading branch information
linuskendall authored Dec 19, 2024
1 parent 68bc95f commit 6c419cb
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 19 deletions.
10 changes: 6 additions & 4 deletions internal/cmd/fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var Cmd = cobra.Command{
var (
ledgerDir string
trackerURL string
group string
minSnapAge uint64
maxSnapAge uint64
baseSlot uint64
Expand All @@ -62,6 +63,7 @@ func init() {
flags := Cmd.Flags()
flags.StringVar(&ledgerDir, "ledger", "", "Path to ledger dir")
flags.StringVar(&trackerURL, "tracker", "", "Download as instructed by given tracker URL")
flags.StringVar(&group, "group", "", "Download from specified group")
flags.Uint64Var(&minSnapAge, "min-slots", 500, "Download only snapshots <n> slots newer than local")
flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download <n> slots older than the newest")
flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)")
Expand Down Expand Up @@ -109,7 +111,7 @@ func run() {
log.Info("Fetching snapshots at slot", zap.Uint64("base_slot", baseSlot))

// Ask tracker for snapshots at a specific location
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, baseSlot)
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, group, baseSlot)
if err != nil {
log.Fatal("Failed to fetch snapshot info", zap.Error(err))
}
Expand All @@ -122,7 +124,7 @@ func run() {
log.Info("Finding best snapshot")

// Ask tracker for best snapshots.
remoteSnaps, err = trackerClient.GetBestSnapshots(ctx, -1)
remoteSnaps, err = trackerClient.GetBestSnapshots(ctx, group, -1)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}
Expand All @@ -145,7 +147,7 @@ func run() {
// If we are not fetching a full snapshot and the base slot isn't matching
// we need to fetch an older incremental snapshot.
log.Info("Full snapshot is newer than local, but not requested")
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, localSnaps[0].BaseSlot)
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, group, localSnaps[0].BaseSlot)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}
Expand Down Expand Up @@ -204,7 +206,7 @@ func run() {
// If we were downloading a full snapshot, check if there's a newer incremental snapshot we can fetch
// Find latest incremental snapshot
log.Info("Finding incremental snapshot for full slot", zap.Uint64("base_slot", snap.BaseSlot))
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, snap.BaseSlot)
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, group, snap.BaseSlot)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}
Expand Down
6 changes: 4 additions & 2 deletions internal/fetch/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ func NewTrackerClientWithResty(client *resty.Client) *TrackerClient {
return &TrackerClient{resty: client}
}

func (c *TrackerClient) GetBestSnapshots(ctx context.Context, count int) (sources []types.SnapshotSource, err error) {
func (c *TrackerClient) GetBestSnapshots(ctx context.Context, group string, count int) (sources []types.SnapshotSource, err error) {
res, err := c.resty.R().
SetContext(ctx).
SetHeader("accept", "application/json").
SetQueryParam("max", strconv.Itoa(count)).
SetQueryParam("group", group).
SetResult(&sources).
Get("/v1/best_snapshots")
if err != nil {
Expand All @@ -54,11 +55,12 @@ func (c *TrackerClient) GetBestSnapshots(ctx context.Context, count int) (source
return
}

func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, slot uint64) (sources []types.SnapshotSource, err error) {
func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, group string, slot uint64) (sources []types.SnapshotSource, err error) {
res, err := c.resty.R().
SetContext(ctx).
SetHeader("accept", "application/json").
SetQueryParam("slot", strconv.FormatUint(slot, 10)).
SetQueryParam("group", group).
SetResult(&sources).
Get("/v1/snapshots")
if err != nil {
Expand Down
19 changes: 17 additions & 2 deletions internal/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,25 @@ func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) {
return
}

func (d *DB) GetAllSnapshotsByGroup(group string) (entries []*SnapshotEntry) {
iter, err := d.DB.Txn(false).LowerBound(tableSnapshotEntry, "id_prefix", group, "", uint64(0))
if err != nil {
panic("getting best snapshots failed: " + err.Error())
}
for {
el := iter.Next()
if el == nil {
break
}
entries = append(entries, el.(*SnapshotEntry))
}
return
}

// GetBestSnapshots returns newest-to-oldest snapshots.
// The `max` argument controls the max number of snapshots to return.
// If max is negative, it returns all snapshots.
func (d *DB) GetBestSnapshotsByGroup(max int, group string) (entries []*SnapshotEntry) {
func (d *DB) GetBestSnapshotsByGroup(group string, max int) (entries []*SnapshotEntry) {
var res memdb.ResultIterator
var err error
res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "slot")
Expand Down Expand Up @@ -130,7 +145,7 @@ func (d *DB) GetSnapshotsAtSlotByGroup(group string, slot uint64) (entries []*Sn

// Fetches the best snapshots
func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) {
return d.GetBestSnapshotsByGroup(max, "")
return d.GetBestSnapshotsByGroup("", max)
}

// Fetches the snapshots that are at a given slot.
Expand Down
4 changes: 2 additions & 2 deletions internal/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ func TestDB(t *testing.T) {
[]*SnapshotEntry{
snapshotEntry4,
},
db.GetBestSnapshotsByGroup(-1, "devnet"))
db.GetBestSnapshotsByGroup("devnet", -1))

assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry1,
snapshotEntry3,
},
db.GetBestSnapshotsByGroup(-1, "mainnet"))
db.GetBestSnapshotsByGroup("mainnet", -1))
}
2 changes: 1 addition & 1 deletion internal/integrationtest/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestTracker(t *testing.T) {

// Create tracker client.
client := fetch.NewTrackerClientWithResty(resty.NewWithClient(server.Client()).SetHostURL(server.URL))
snaps, err := client.GetBestSnapshots(context.TODO(), -1)
snaps, err := client.GetBestSnapshots(context.TODO(), "", -1)
require.NoError(t, err)
// Remove timestamps and port numbers.
for i := range snaps {
Expand Down
2 changes: 1 addition & 1 deletion internal/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (w *Worker) Run(ctx context.Context) {

func (w *Worker) tick(ctx context.Context) {
w.Log.Debug("Tick")
sources, err := w.Tracker.GetBestSnapshots(ctx, w.SyncCount)
sources, err := w.Tracker.GetBestSnapshots(ctx, "", w.SyncCount)
if err != nil {
w.Log.Error("Failed to find new snapshots", zap.Error(err))
return
Expand Down
25 changes: 18 additions & 7 deletions internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,26 @@ func (h *Handler) createJson(c *gin.Context, entries []*index.SnapshotEntry) {

func (h *Handler) GetSnapshots(c *gin.Context) {
var query struct {
Slot uint64 `form:"slot"`
Slot uint64 `form:"slot"`
Group string `form:"group"`
}
if err := c.BindQuery(&query); err != nil {
return
}

var entries []*index.SnapshotEntry
if query.Slot == 0 {
entries = h.DB.GetAllSnapshots()
if query.Group == "" {
if query.Slot == 0 {
entries = h.DB.GetAllSnapshots()
} else {
entries = h.DB.GetSnapshotsAtSlot(query.Slot)
}
} else {
entries = h.DB.GetSnapshotsAtSlot(query.Slot)
if query.Slot == 0 {
entries = h.DB.GetAllSnapshotsByGroup(query.Group)
} else {
entries = h.DB.GetSnapshotsAtSlotByGroup(query.Group, query.Slot)
}
}

h.createJson(c, entries)
Expand All @@ -78,7 +87,8 @@ func (h *Handler) GetSnapshots(c *gin.Context) {
// GetBestSnapshots returns the currently available best snapshots.
func (h *Handler) GetBestSnapshots(c *gin.Context) {
var query struct {
Max int `form:"max"`
Max int `form:"max"`
Group string `form:"group"`
}
if err := c.BindQuery(&query); err != nil {
return
Expand All @@ -87,13 +97,14 @@ func (h *Handler) GetBestSnapshots(c *gin.Context) {
if query.Max < 0 || query.Max > maxItems {
query.Max = maxItems
}
entries := h.DB.GetBestSnapshots(query.Max)
entries := h.DB.GetBestSnapshotsByGroup(query.Group, query.Max)
h.createJson(c, entries)
}

func (h *Handler) Health(c *gin.Context) {
var query struct {
Max int `form:"max"`
Max int `form:"max"`
Group string `form:"group"`
}
if err := c.BindQuery(&query); err != nil {
return
Expand Down

0 comments on commit 6c419cb

Please sign in to comment.