Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sourcegraph: multi-tenant Zoekt #859

Merged
merged 13 commits into from
Nov 19, 2024
8 changes: 8 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ func (r RepositoryBranch) String() string {

// Repository holds repository metadata.
type Repository struct {
// Sourcegraph's tenant ID
TenantID int

// Sourcegraph's repository ID
ID uint32

Expand Down Expand Up @@ -635,6 +638,11 @@ func (r *Repository) UnmarshalJSON(data []byte) error {
r.ID = uint32(id)
}

if v, ok := repo.RawConfig["tenantid"]; ok {
id, _ := strconv.ParseInt(v, 10, 64)
r.TenantID = int(id)
}

// Sourcegraph indexserver doesn't set repo.Rank, so we set it here. Setting it
// on read instead of during indexing allows us to avoid a complete reindex.
//
Expand Down
19 changes: 14 additions & 5 deletions build/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ type Options struct {
//
// Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile.
HeapProfileTriggerBytes uint64

// UseSourcegraphIDForName is true if we want to use the Sourcegraph ID as prefix for the shard name.
UseSourcegraphIDForName bool
stefanhengl marked this conversation as resolved.
Show resolved Hide resolved
}

// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building.
Expand Down Expand Up @@ -337,12 +340,18 @@ func (o *Options) shardName(n int) string {
}

func (o *Options) shardNameVersion(version, n int) string {
abs := url.QueryEscape(o.RepositoryDescription.Name)
if len(abs) > 200 {
abs = abs[:200] + hashString(abs)[:8]
var prefix string
if o.UseSourcegraphIDForName {
prefix = fmt.Sprintf("%d", o.RepositoryDescription.ID)
} else {
prefix = url.QueryEscape(o.RepositoryDescription.Name)
}
if len(prefix) > 200 {
prefix = prefix[:200] + hashString(prefix)[:8]
}
return filepath.Join(o.IndexDir,
fmt.Sprintf("%s_v%d.%05d.zoekt", abs, version, n))
shardName := filepath.Join(o.IndexDir,
fmt.Sprintf("%s_v%d.%05d.zoekt", prefix, version, n))
return shardName
}

type IndexState string
Expand Down
3 changes: 3 additions & 0 deletions cmd/zoekt-git-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func run() int {

cpuProfile := flag.String("cpuprofile", "", "write cpu profile to `file`")

useSourcegraphIDForName := flag.Bool("use_sourcegraph_id_for_name", false, "use the Sourcegraph ID for the shard name")

flag.Parse()

// Tune GOMAXPROCS to match Linux container CPU quota.
Expand Down Expand Up @@ -75,6 +77,7 @@ func run() int {

opts := cmd.OptionsFromFlags()
opts.IsDelta = *isDelta
opts.UseSourcegraphIDForName = *useSourcegraphIDForName

var branches []string
if *branchesStr != "" {
Expand Down
23 changes: 23 additions & 0 deletions cmd/zoekt-sourcegraph-indexserver/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/build"
"github.com/sourcegraph/zoekt/ctags"
"github.com/sourcegraph/zoekt/internal/tenant"

sglog "github.com/sourcegraph/log"
)
Expand Down Expand Up @@ -67,6 +68,9 @@ type IndexOptions struct {
// The number of threads to use for indexing shards. Defaults to the number of available
// CPUs. If the server flag -cpu_fraction is set, then this value overrides it.
ShardConcurrency int32

// TenantID is the tenant ID for the repository.
TenantID int
}

// indexArgs represents the arguments we pass to zoekt-git-index
Expand Down Expand Up @@ -95,17 +99,28 @@ type indexArgs struct {

// ShardMerging is true if we want zoekt-git-index to respect compound shards.
ShardMerging bool

// UseSourcegraphIDForName is true if we want to use the Sourcegraph ID as prefix for the shard name.
UseSourcegraphIDForName bool
}

// BuildOptions returns a build.Options represented by indexArgs. Note: it
// doesn't set fields like repository/branch.
func (o *indexArgs) BuildOptions() *build.Options {

// Default to tenant 1 if no tenant is set.
tenantID := o.TenantID
if o.TenantID < 1 {
tenantID = 1
}
stefanhengl marked this conversation as resolved.
Show resolved Hide resolved

return &build.Options{
// It is important that this RepositoryDescription exactly matches what
// the indexer we call will produce. This is to ensure that
// IncrementalSkipIndexing and IndexState can correctly calculate if
// nothing needs to be done.
RepositoryDescription: zoekt.Repository{
TenantID: o.TenantID,
ID: uint32(o.IndexOptions.RepoID),
Name: o.Name,
Branches: o.Branches,
Expand All @@ -117,6 +132,7 @@ func (o *indexArgs) BuildOptions() *build.Options {
"archived": marshalBool(o.Archived),
// Calculate repo rank based on the latest commit date.
"latestCommitDate": "1",
"tenantid": strconv.Itoa(tenantID),
stefanhengl marked this conversation as resolved.
Show resolved Hide resolved
},
},
IndexDir: o.IndexDir,
Expand All @@ -130,6 +146,8 @@ func (o *indexArgs) BuildOptions() *build.Options {
LanguageMap: o.LanguageMap,

ShardMerging: o.ShardMerging,

UseSourcegraphIDForName: o.UseSourcegraphIDForName,
}
}

Expand Down Expand Up @@ -245,6 +263,7 @@ func fetchRepo(ctx context.Context, gitDir string, o *indexArgs, c gitIndexConfi
"-C", gitDir,
"-c", "protocol.version=2",
"-c", "http.extraHeader=X-Sourcegraph-Actor-UID: internal",
"-c", "http.extraHeader=" + tenant.HttpExtraHeader(o.TenantID),
"fetch", "--depth=1", "--no-tags",
}

Expand Down Expand Up @@ -391,6 +410,10 @@ func indexRepo(ctx context.Context, gitDir string, sourcegraph Sourcegraph, o *i
args = append(args, "-delta_threshold", strconv.FormatUint(o.DeltaShardNumberFallbackThreshold, 10))
}

if o.UseSourcegraphIDForName {
args = append(args, "-use_sourcegraph_id_for_name")
}
stefanhengl marked this conversation as resolved.
Show resolved Hide resolved

if len(o.LanguageMap) > 0 {
var languageMap []string
for language, parser := range o.LanguageMap {
Expand Down
12 changes: 8 additions & 4 deletions cmd/zoekt-sourcegraph-indexserver/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func TestIndex(t *testing.T) {
},
want: []string{
"git -c init.defaultBranch=nonExistentBranchBB0FOFCH32 init --bare $TMPDIR/test%2Frepo.git",
"git -C $TMPDIR/test%2Frepo.git -c protocol.version=2 -c http.extraHeader=X-Sourcegraph-Actor-UID: internal fetch --depth=1 --no-tags --filter=blob:limit=1m http://api.test/.internal/git/test/repo deadbeef",
"git -C $TMPDIR/test%2Frepo.git -c protocol.version=2 -c http.extraHeader=X-Sourcegraph-Actor-UID: internal -c http.extraHeader=X-Sourcegraph-Tenant-ID: 1 fetch --depth=1 --no-tags --filter=blob:limit=1m http://api.test/.internal/git/test/repo deadbeef",
"git -C $TMPDIR/test%2Frepo.git update-ref HEAD deadbeef",
"git -C $TMPDIR/test%2Frepo.git config zoekt.archived 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.fork 0",
Expand All @@ -500,6 +500,7 @@ func TestIndex(t *testing.T) {
"git -C $TMPDIR/test%2Frepo.git config zoekt.priority 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.public 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.repoid 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.tenantid 1",
"zoekt-git-index -submodules=false -branches HEAD -disable_ctags $TMPDIR/test%2Frepo.git",
},
}, {
Expand All @@ -514,7 +515,7 @@ func TestIndex(t *testing.T) {
},
want: []string{
"git -c init.defaultBranch=nonExistentBranchBB0FOFCH32 init --bare $TMPDIR/test%2Frepo.git",
"git -C $TMPDIR/test%2Frepo.git -c protocol.version=2 -c http.extraHeader=X-Sourcegraph-Actor-UID: internal fetch --depth=1 --no-tags --filter=blob:limit=1m http://api.test/.internal/git/test/repo deadbeef",
"git -C $TMPDIR/test%2Frepo.git -c protocol.version=2 -c http.extraHeader=X-Sourcegraph-Actor-UID: internal -c http.extraHeader=X-Sourcegraph-Tenant-ID: 1 fetch --depth=1 --no-tags --filter=blob:limit=1m http://api.test/.internal/git/test/repo deadbeef",
"git -C $TMPDIR/test%2Frepo.git update-ref HEAD deadbeef",
"git -C $TMPDIR/test%2Frepo.git config zoekt.archived 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.fork 0",
Expand All @@ -523,6 +524,7 @@ func TestIndex(t *testing.T) {
"git -C $TMPDIR/test%2Frepo.git config zoekt.priority 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.public 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.repoid 123",
"git -C $TMPDIR/test%2Frepo.git config zoekt.tenantid 1",
"zoekt-git-index -submodules=false -branches HEAD -disable_ctags $TMPDIR/test%2Frepo.git",
},
}, {
Expand All @@ -545,7 +547,7 @@ func TestIndex(t *testing.T) {
},
want: []string{
"git -c init.defaultBranch=nonExistentBranchBB0FOFCH32 init --bare $TMPDIR/test%2Frepo.git",
"git -C $TMPDIR/test%2Frepo.git -c protocol.version=2 -c http.extraHeader=X-Sourcegraph-Actor-UID: internal fetch --depth=1 --no-tags http://api.test/.internal/git/test/repo deadbeef feebdaed",
"git -C $TMPDIR/test%2Frepo.git -c protocol.version=2 -c http.extraHeader=X-Sourcegraph-Actor-UID: internal -c http.extraHeader=X-Sourcegraph-Tenant-ID: 1 fetch --depth=1 --no-tags http://api.test/.internal/git/test/repo deadbeef feebdaed",
"git -C $TMPDIR/test%2Frepo.git update-ref HEAD deadbeef",
"git -C $TMPDIR/test%2Frepo.git update-ref refs/heads/dev feebdaed",
"git -C $TMPDIR/test%2Frepo.git config zoekt.archived 0",
Expand All @@ -555,6 +557,7 @@ func TestIndex(t *testing.T) {
"git -C $TMPDIR/test%2Frepo.git config zoekt.priority 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.public 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.repoid 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.tenantid 1",
"zoekt-git-index -submodules=false -incremental -branches HEAD,dev " +
"-file_limit 123 -parallelism 4 -index /data/index -require_ctags -large_file foo -large_file bar " +
"$TMPDIR/test%2Frepo.git",
Expand Down Expand Up @@ -592,7 +595,7 @@ func TestIndex(t *testing.T) {
},
want: []string{
"git -c init.defaultBranch=nonExistentBranchBB0FOFCH32 init --bare $TMPDIR/test%2Frepo.git",
"git -C $TMPDIR/test%2Frepo.git -c protocol.version=2 -c http.extraHeader=X-Sourcegraph-Actor-UID: internal fetch --depth=1 --no-tags http://api.test/.internal/git/test/repo deadbeef feebdaed 12345678 oldhead olddev oldrelease",
"git -C $TMPDIR/test%2Frepo.git -c protocol.version=2 -c http.extraHeader=X-Sourcegraph-Actor-UID: internal -c http.extraHeader=X-Sourcegraph-Tenant-ID: 1 fetch --depth=1 --no-tags http://api.test/.internal/git/test/repo deadbeef feebdaed 12345678 oldhead olddev oldrelease",
"git -C $TMPDIR/test%2Frepo.git update-ref HEAD deadbeef",
"git -C $TMPDIR/test%2Frepo.git update-ref refs/heads/dev feebdaed",
"git -C $TMPDIR/test%2Frepo.git update-ref refs/heads/release 12345678",
Expand All @@ -603,6 +606,7 @@ func TestIndex(t *testing.T) {
"git -C $TMPDIR/test%2Frepo.git config zoekt.priority 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.public 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.repoid 0",
"git -C $TMPDIR/test%2Frepo.git config zoekt.tenantid 1",
"zoekt-git-index -submodules=false -incremental -branches HEAD,dev,release " +
"-delta -delta_threshold 22 -file_limit 123 -parallelism 4 -index /data/index -require_ctags -large_file foo -large_file bar " +
"$TMPDIR/test%2Frepo.git",
Expand Down
21 changes: 15 additions & 6 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ type Server struct {

// timeout defines how long the index server waits before killing an indexing job.
timeout time.Duration

useSourcegraphIDForName bool
}

var debug = log.New(io.Discard, "", log.LstdFlags)
Expand Down Expand Up @@ -459,6 +461,7 @@ func (s *Server) processQueue() {
branches = append(branches, fmt.Sprintf("%s=%s", b.Name, b.Version))
}
s.logger.Info("updated index",
sglog.Int("tenant", args.TenantID),
sglog.String("repo", args.Name),
sglog.Uint32("id", args.RepoID),
sglog.Strings("branches", branches),
Expand Down Expand Up @@ -592,6 +595,7 @@ func (s *Server) Index(args *indexArgs) (state indexState, err error) {
case build.IndexStateMeta:
log.Printf("updating index.meta %s", args.String())

// TODO(stefan) handle mergeMeta for tenant id.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:O also need exploding then to understand the new naming scheme?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here is a bit different. We should probably trigger a reindex if the tenant id changes, but I didn't want to include it now because that would trigger a reindex for everyone.

For explode, I updated the naming gated by tenant enforcement. I will file a follow-up to make that nicer. It's obviously too brittle.

if err := mergeMeta(bo); err != nil {
log.Printf("falling back to full update: failed to update index.meta %s: %s", args.String(), err)
} else {
Expand Down Expand Up @@ -669,12 +673,13 @@ func sglogBranches(key string, branches []zoekt.RepositoryBranch) sglog.Field {
func (s *Server) indexArgs(opts IndexOptions) *indexArgs {
parallelism := s.parallelism(opts, runtime.GOMAXPROCS(0))
return &indexArgs{
IndexOptions: opts,
IndexDir: s.IndexDir,
Parallelism: parallelism,
Incremental: true,
FileLimit: MaxFileSize,
ShardMerging: s.shardMerging,
IndexOptions: opts,
IndexDir: s.IndexDir,
Parallelism: parallelism,
Incremental: true,
FileLimit: MaxFileSize,
ShardMerging: s.shardMerging,
UseSourcegraphIDForName: s.useSourcegraphIDForName,
}
}

Expand Down Expand Up @@ -1247,6 +1252,8 @@ type rootConfig struct {
// config values related to backoff indexing repos with one or more consecutive failures
backoffDuration time.Duration
maxBackoffDuration time.Duration

useSourcegraphIDForName bool
}

func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
Expand All @@ -1259,6 +1266,7 @@ func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.")
fs.DurationVar(&rc.backoffDuration, "backoff_duration", getEnvWithDefaultDuration("BACKOFF_DURATION", 10*time.Minute), "for the given duration we backoff from enqueue operations for a repository that's failed its previous indexing attempt. Consecutive failures increase the duration of the delay linearly up to the maxBackoffDuration. A negative value disables indexing backoff.")
fs.DurationVar(&rc.maxBackoffDuration, "max_backoff_duration", getEnvWithDefaultDuration("MAX_BACKOFF_DURATION", 120*time.Minute), "the maximum duration to backoff from enqueueing a repo for indexing. A negative value disables indexing backoff.")
fs.BoolVar(&rc.useSourcegraphIDForName, "use_sourcegraph_id_for_name", getEnvWithDefaultBool("SRC_USE_SOURCEGRAPH_ID_FOR_NAME", false), "use the Sourcegraph ID as the name for index shards.")

// flags related to shard merging
fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
Expand Down Expand Up @@ -1465,6 +1473,7 @@ func newServer(conf rootConfig) (*Server, error) {
CPUCount: cpuCount,
queue: *q,
shardMerging: !conf.disableShardMerging,
useSourcegraphIDForName: conf.useSourcegraphIDForName,
deltaBuildRepositoriesAllowList: deltaBuildRepositoriesAllowList,
deltaShardNumberFallbackThreshold: deltaShardNumberFallbackThreshold,
repositoriesSkipSymbolsCalculationAllowList: reposShouldSkipSymbolsCalculation,
Expand Down
Loading
Loading