Skip to content

Commit

Permalink
add allow-index-write-on-load to open options
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Mar 25, 2024
1 parent 4889017 commit 71a2ec3
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 13 deletions.
16 changes: 9 additions & 7 deletions internal/epoch/epoch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,26 +1130,28 @@ func rangeCheckpointBlobPrefix(epoch1, epoch2 int) blob.ID {
return blob.ID(fmt.Sprintf("%v%v_%v_", RangeCheckpointIndexBlobPrefix, epoch1, epoch2))
}

func allowWritesOnIndexLoad() bool {
v := strings.ToLower(os.Getenv("KOPIA_ALLOW_WRITE_ON_INDEX_LOAD"))
func allowWritesOnIndexLoad(fromParam bool) bool {
if fromParam {
return true
}

if v == "" {
// temporary default to be changed once index cleanup is performed on maintenance
v := strings.ToLower(os.Getenv("KOPIA_ALLOW_WRITE_ON_INDEX_LOAD"))
if v == "true" || v == "1" {
return true
}

return v == "true" || v == "1"
return false
}

// NewManager creates new epoch manager.
func NewManager(st blob.Storage, paramProvider ParametersProvider, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time) *Manager {
func NewManager(st blob.Storage, paramProvider ParametersProvider, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time, optAllowWriteOnIndexLoad bool) *Manager {
return &Manager{
st: st,
log: log,
compact: compactor,
timeFunc: timeNow,
paramProvider: paramProvider,
allowCleanupWritesOnIndexLoad: allowWritesOnIndexLoad(),
allowCleanupWritesOnIndexLoad: allowWritesOnIndexLoad(optAllowWriteOnIndexLoad),
getCompleteIndexSetTooSlow: new(int32),
committedStateRefreshTooSlow: new(int32),
writeIndexTooSlow: new(int32),
Expand Down
6 changes: 3 additions & 3 deletions internal/epoch/epoch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv {
EpochAdvanceOnCountThreshold: 15,
EpochAdvanceOnTotalSizeBytesThreshold: 20 << 20,
DeleteParallelism: 1,
}}, te.compact, testlogging.NewTestLogger(t), te.ft.NowFunc())
}}, te.compact, testlogging.NewTestLogger(t), te.ft.NowFunc(), true)
te.mgr = m
te.faultyStorage = fs
te.data = data
Expand All @@ -121,7 +121,7 @@ func (te *epochManagerTestEnv) another() *epochManagerTestEnv {
faultyStorage: te.faultyStorage,
}

te2.mgr = NewManager(te2.st, te.mgr.paramProvider, te2.compact, te.mgr.log, te.mgr.timeFunc)
te2.mgr = NewManager(te2.st, te.mgr.paramProvider, te2.compact, te.mgr.log, te.mgr.timeFunc, true)

return te2
}
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestIndexEpochManager_NoCompactionInReadOnly(t *testing.T) {
}

// Set new epoch manager to read-only to ensure we don't get stuck.
te2.mgr = NewManager(te2.st, te.mgr.paramProvider, te2.compact, te.mgr.log, te.mgr.timeFunc)
te2.mgr = NewManager(te2.st, te.mgr.paramProvider, te2.compact, te.mgr.log, te.mgr.timeFunc, true)

// Use assert.Eventually here so we'll exit the test early instead of getting
// stuck until the timeout.
Expand Down
7 changes: 4 additions & 3 deletions repo/content/committed_read_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func indexBlobCacheSweepSettings(caching *CachingOptions) cache.SweepSettings {
}
}

func (sm *SharedManager) setupCachesAndIndexManagers(ctx context.Context, caching *CachingOptions, mr *metrics.Registry) error {
func (sm *SharedManager) setupCachesAndIndexManagers(ctx context.Context, caching *CachingOptions, mr *metrics.Registry, allowWriteOnIndexLoad bool) error {
dataCache, err := cache.NewContentCache(ctx, sm.st, cache.Options{
BaseCacheDirectory: caching.CacheDirectory,
CacheSubDir: "contents",
Expand Down Expand Up @@ -514,7 +514,8 @@ func (sm *SharedManager) setupCachesAndIndexManagers(ctx context.Context, cachin
return errors.Wrap(sm.indexBlobManagerV1.CompactEpoch(ctx, blobIDs, outputPrefix), "CompactEpoch")
},
sm.namedLogger("epoch-manager"),
sm.timeNow),
sm.timeNow,
allowWriteOnIndexLoad),
sm.timeNow,
sm.format,
sm.namedLogger("index-blob-manager"),
Expand Down Expand Up @@ -636,7 +637,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider

caching = caching.CloneOrDefault()

if err := sm.setupCachesAndIndexManagers(ctx, caching, mr); err != nil {
if err := sm.setupCachesAndIndexManagers(ctx, caching, mr, opts.AllowWriteOnIndexLoad); err != nil {
return nil, errors.Wrap(err, "error setting up read manager caches")
}

Expand Down
1 change: 1 addition & 0 deletions repo/content/content_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,7 @@ type ManagerOptions struct {
TimeNow func() time.Time // Time provider
DisableInternalLog bool
PermissiveCacheLoading bool
AllowWriteOnIndexLoad bool
}

// CloneOrDefault returns a clone of provided ManagerOptions or default empty struct if nil.
Expand Down
3 changes: 3 additions & 0 deletions repo/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type Options struct {

// test-only flags
TestOnlyIgnoreMissingRequiredFeatures bool // ignore missing features

AllowWriteOnIndexLoad bool
}

// ErrInvalidPassword is returned when repository password is invalid.
Expand Down Expand Up @@ -241,6 +243,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
TimeNow: defaultTime(options.TimeNowFunc),
DisableInternalLog: options.DisableInternalLog,
PermissiveCacheLoading: cliOpts.PermissiveCacheLoading,
AllowWriteOnIndexLoad: options.AllowWriteOnIndexLoad,
}

mr := metrics.NewRegistry()
Expand Down

0 comments on commit 71a2ec3

Please sign in to comment.