From b1d5b3b8e38f2a38dbec79c45b17e8d30774bb23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julio=20L=C3=B3pez?= <1953782+julio-lopez@users.noreply.github.com> Date: Wed, 27 Mar 2024 09:41:24 -0700 Subject: [PATCH 1/2] refactor(general): perform index compaction during repo maintenance (#3651) Perform index epoch compaction and cleanup during repository maintenance * refactor: rename maintenance task for deleting superseded indexes * maintenance task to cleanup epoch markers * maintenance task to advance write index epoch * maintenance task to compact epoch ranges * quick maintenance task to compact a single (index) epoch * full maintenance task to compact a single (index) epoch Ref: - #3638 - #3639 Followup to: - #3603 - #3645 Related helpers and changes: - #3721 - #3735 - #3709 - #3728 - #3727 - #3726 --- repo/maintenance/maintenance_run.go | 96 ++++++++++++++++++++++++----- 1 file changed, 82 insertions(+), 14 deletions(-) diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index c92545f0074..6a0e632ca14 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/epoch" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/content/index" @@ -36,16 +37,20 @@ type TaskType string // Task IDs. const ( - TaskSnapshotGarbageCollection = "snapshot-gc" - TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs" - TaskDeleteOrphanedBlobsFull = "full-delete-blobs" - TaskRewriteContentsQuick = "quick-rewrite-contents" - TaskRewriteContentsFull = "full-rewrite-contents" - TaskDropDeletedContentsFull = "full-drop-deleted-content" - TaskIndexCompaction = "index-compaction" - TaskExtendBlobRetentionTimeFull = "extend-blob-retention-time" - TaskCleanupLogs = "cleanup-logs" - TaskCleanupEpochManager = "cleanup-epoch-manager" + TaskSnapshotGarbageCollection = "snapshot-gc" + TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs" + TaskDeleteOrphanedBlobsFull = "full-delete-blobs" + TaskRewriteContentsQuick = "quick-rewrite-contents" + TaskRewriteContentsFull = "full-rewrite-contents" + TaskDropDeletedContentsFull = "full-drop-deleted-content" + TaskIndexCompaction = "index-compaction" + TaskExtendBlobRetentionTimeFull = "extend-blob-retention-time" + TaskCleanupLogs = "cleanup-logs" + TaskEpochAdvance = "advance-epoch" + TaskEpochDeleteSupersededIndexes = "delete-superseded-epoch-indexes" + TaskEpochCleanupMarkers = "cleanup-epoch-markers" + TaskEpochGenerateRange = "generate-epoch-range-index" + TaskEpochCompactSingle = "compact-single-epoch" ) // shouldRun returns Mode if repository is due for periodic maintenance. @@ -294,6 +299,10 @@ func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety Sa notDeletingOrphanedBlobs(ctx, s, safety) } + if err := runTaskEpochMaintenanceQuick(ctx, runParams, s); err != nil { + return errors.Wrap(err, "error running quick epoch maintenance tasks") + } + // consolidate many smaller indexes into fewer larger ones. if err := runTaskIndexCompactionQuick(ctx, runParams, s, safety); err != nil { return errors.Wrap(err, "error performing index compaction") @@ -327,7 +336,14 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul }) } -func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s *Schedule) error { +func runTaskEpochAdvance(ctx context.Context, em *epoch.Manager, runParams RunParameters, s *Schedule) error { + return ReportRun(ctx, runParams.rep, TaskEpochAdvance, s, func() error { + log(ctx).Infof("Cleaning up no-longer-needed epoch markers...") + return errors.Wrap(em.MaybeAdvanceWriteEpoch(ctx), "error advancing epoch marker") + }) +} + +func runTaskEpochMaintenanceQuick(ctx context.Context, runParams RunParameters, s *Schedule) error { em, hasEpochManager, emerr := runParams.rep.ContentManager().EpochManager(ctx) if emerr != nil { return errors.Wrap(emerr, "epoch manager") @@ -337,9 +353,61 @@ func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s return nil } - return ReportRun(ctx, runParams.rep, TaskCleanupEpochManager, s, func() error { + err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error { + log(ctx).Infof("Compacting an eligible uncompacted epoch...") + return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch") + }) + if err != nil { + return err + } + + return runTaskEpochAdvance(ctx, em, runParams, s) +} + +func runTaskEpochMaintenanceFull(ctx context.Context, runParams RunParameters, s *Schedule) error { + em, hasEpochManager, emerr := runParams.rep.ContentManager().EpochManager(ctx) + if emerr != nil { + return errors.Wrap(emerr, "epoch manager") + } + + if !hasEpochManager { + return nil + } + + // compact a single epoch + if err := ReportRun(ctx, runParams.rep, TaskEpochCompactSingle, s, func() error { + log(ctx).Infof("Compacting an eligible uncompacted epoch...") + return errors.Wrap(em.MaybeCompactSingleEpoch(ctx), "error compacting single epoch") + }); err != nil { + return err + } + + if err := runTaskEpochAdvance(ctx, em, runParams, s); err != nil { + return err + } + + // compact range + if err := ReportRun(ctx, runParams.rep, TaskEpochGenerateRange, s, func() error { + log(ctx).Infof("Attempting to compact a range of epoch indexes ...") + + return errors.Wrap(em.MaybeGenerateRangeCheckpoint(ctx), "error creating epoch range indexes") + }); err != nil { + return err + } + + // clean up epoch markers + err := ReportRun(ctx, runParams.rep, TaskEpochCleanupMarkers, s, func() error { + log(ctx).Infof("Cleaning up unneeded epoch markers...") + + return errors.Wrap(em.CleanupMarkers(ctx), "error removing epoch markers") + }) + if err != nil { + return err + } + + return ReportRun(ctx, runParams.rep, TaskEpochDeleteSupersededIndexes, s, func() error { log(ctx).Infof("Cleaning up old index blobs which have already been compacted...") - return errors.Wrap(em.CleanupSupersededIndexes(ctx), "error cleaning up superseded index blobs") + return errors.Wrap(em.CleanupSupersededIndexes(ctx), "error removing superseded epoch index blobs") }) } @@ -451,7 +519,7 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf log(ctx).Debug("Extending object lock retention-period is disabled.") } - if err := runTaskCleanupEpochManager(ctx, runParams, s); err != nil { + if err := runTaskEpochMaintenanceFull(ctx, runParams, s); err != nil { return errors.Wrap(err, "error cleaning up epoch manager") } From ddd31505bc488d9e4fce5889d947258512429f8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julio=20L=C3=B3pez?= <1953782+julio-lopez@users.noreply.github.com> Date: Wed, 27 Mar 2024 12:49:53 -0700 Subject: [PATCH 2/2] refactor(test): explicitly advance epoch in TestMaybeCompactSingleEpoch_CompactionError (#3755) Ref: - #3638 --- internal/epoch/epoch_manager_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index e101b9005a9..adafbe1b17e 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -838,7 +838,7 @@ func TestMaybeCompactSingleEpoch_CompactionError(t *testing.T) { idxCount := p.GetEpochAdvanceOnCountThreshold() // Create sufficient indexes blobs and move clock forward to advance epoch. - for j := 0; j < 3; j++ { + for j := 0; j < 4; j++ { for i := 0; i < idxCount; i++ { if i == idxCount-1 { // Advance the time so that the difference in times for writes will force @@ -848,6 +848,8 @@ func TestMaybeCompactSingleEpoch_CompactionError(t *testing.T) { te.mustWriteIndexFiles(ctx, t, newFakeIndexWithEntries(i)) } + + require.NoError(t, te.mgr.MaybeAdvanceWriteEpoch(ctx)) } compactionError := errors.New("test compaction error")