Skip to content

Commit

Permalink
Merge pull request #13 from Lyndon-Li/v0.16.0-velero-patch
Browse files Browse the repository at this point in the history
refactor(general): perform index compaction during repo maintenance
  • Loading branch information
Lyndon-Li authored Mar 29, 2024
2 parents b901e3b + ddd3150 commit f17bf70
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 15 deletions.
4 changes: 3 additions & 1 deletion internal/epoch/epoch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func TestMaybeCompactSingleEpoch_CompactionError(t *testing.T) {

idxCount := p.GetEpochAdvanceOnCountThreshold()
// Create sufficient indexes blobs and move clock forward to advance epoch.
for j := 0; j < 3; j++ {
for j := 0; j < 4; j++ {
for i := 0; i < idxCount; i++ {
if i == idxCount-1 {
// Advance the time so that the difference in times for writes will force
Expand All @@ -848,6 +848,8 @@ func TestMaybeCompactSingleEpoch_CompactionError(t *testing.T) {

te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(i))
}

require.NoError(t, te.mgr.MaybeAdvanceWriteEpoch(ctx))
}

compactionError := errors.New("test compaction error")
Expand Down
96 changes: 82 additions & 14 deletions repo/maintenance/maintenance_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"

"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/index"
Expand All @@ -36,16 +37,20 @@ type TaskType string

// Task IDs.
const (
TaskSnapshotGarbageCollection = "snapshot-gc"
TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs"
TaskDeleteOrphanedBlobsFull = "full-delete-blobs"
TaskRewriteContentsQuick = "quick-rewrite-contents"
TaskRewriteContentsFull = "full-rewrite-contents"
TaskDropDeletedContentsFull = "full-drop-deleted-content"
TaskIndexCompaction = "index-compaction"
TaskExtendBlobRetentionTimeFull = "extend-blob-retention-time"
TaskCleanupLogs = "cleanup-logs"
TaskCleanupEpochManager = "cleanup-epoch-manager"
TaskSnapshotGarbageCollection = "snapshot-gc"
TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs"
TaskDeleteOrphanedBlobsFull = "full-delete-blobs"
TaskRewriteContentsQuick = "quick-rewrite-contents"
TaskRewriteContentsFull = "full-rewrite-contents"
TaskDropDeletedContentsFull = "full-drop-deleted-content"
TaskIndexCompaction = "index-compaction"
TaskExtendBlobRetentionTimeFull = "extend-blob-retention-time"
TaskCleanupLogs = "cleanup-logs"
TaskEpochAdvance = "advance-epoch"
TaskEpochDeleteSupersededIndexes = "delete-superseded-epoch-indexes"
TaskEpochCleanupMarkers = "cleanup-epoch-markers"
TaskEpochGenerateRange = "generate-epoch-range-index"
TaskEpochCompactSingle = "compact-single-epoch"
)

// shouldRun returns Mode if repository is due for periodic maintenance.
Expand Down Expand Up @@ -294,6 +299,10 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa
notDeletingOrphanedBlobs(ctx, s, safety)
}

if err := runTaskEpochMaintenanceQuick(ctx, runParams, s); err != nil {
return errors.Wrap(err, "error running quick epoch maintenance tasks")
}

// consolidate many smaller indexes into fewer larger ones.
if err := runTaskIndexCompactionQuick(ctx, runParams, s, safety); err != nil {
return errors.Wrap(err, "error performing index compaction")
Expand Down Expand Up @@ -327,7 +336,14 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul
})
}

func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s *Schedule) error {
func runTaskEpochAdvance(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error {
return ReportRun(ctx, runParams.rep, TaskEpochAdvance, s, func() error {
log(ctx).Infof("Cleaning up no-longer-needed epoch markers...")
return errors.Wrap(em.MaybeAdvanceWriteEpoch(ctx), "error advancing epoch marker")
})
}

func runTaskEpochMaintenanceQuick(ctx context.Context, runParams RunParameters, s *Schedule) error {
em, hasEpochManager, emerr := runParams.rep.ContentManager().EpochManager(ctx)
if emerr != nil {
return errors.Wrap(emerr, "epoch manager")
Expand All @@ -337,9 +353,61 @@ func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s
return nil
}

return ReportRun(ctx, runParams.rep, TaskCleanupEpochManager, s, func() error {
err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error {
log(ctx).Infof("Compacting an eligible uncompacted epoch...")
return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch")
})
if err != nil {
return err
}

return runTaskEpochAdvance(ctx, em, runParams, s)
}

func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s *Schedule) error {
em, hasEpochManager, emerr := runParams.rep.ContentManager().EpochManager(ctx)
if emerr != nil {
return errors.Wrap(emerr, "epoch manager")
}

if !hasEpochManager {
return nil
}

// compact a single epoch
if err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error {
log(ctx).Infof("Compacting an eligible uncompacted epoch...")
return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch")
}); err != nil {
return err
}

if err := runTaskEpochAdvance(ctx, em, runParams, s); err != nil {
return err
}

// compact range
if err := ReportRun(ctx, runParams.rep, TaskEpochGenerateRange, s, func() error {
log(ctx).Infof("Attempting to compact a range of epoch indexes ...")

return errors.Wrap(em.MaybeGenerateRangeCheckpoint(ctx), "error creating epoch range indexes")
}); err != nil {
return err
}

// clean up epoch markers
err := ReportRun(ctx, runParams.rep, TaskEpochCleanupMarkers, s, func() error {
log(ctx).Infof("Cleaning up unneeded epoch markers...")

return errors.Wrap(em.CleanupMarkers(ctx), "error removing epoch markers")
})
if err != nil {
return err
}

return ReportRun(ctx, runParams.rep, TaskEpochDeleteSupersededIndexes, s, func() error {
log(ctx).Infof("Cleaning up old index blobs which have already been compacted...")
return errors.Wrap(em.CleanupSupersededIndexes(ctx), "error cleaning up superseded index blobs")
return errors.Wrap(em.CleanupSupersededIndexes(ctx), "error removing superseded epoch index blobs")
})
}

Expand Down Expand Up @@ -451,7 +519,7 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf
log(ctx).Debug("Extending object lock retention-period is disabled.")
}

if err := runTaskCleanupEpochManager(ctx, runParams, s); err != nil {
if err := runTaskEpochMaintenanceFull(ctx, runParams, s); err != nil {
return errors.Wrap(err, "error cleaning up epoch manager")
}

Expand Down

0 comments on commit f17bf70

Please sign in to comment.