Skip to content

Commit

Permalink
sourcegraph: multi-tenant Zoekt (#859)
Browse files Browse the repository at this point in the history
This updates webserver and sourcegraph-indexserver to support multi-tenancy.

The change is behind an ENV feature-flag.

Key changes:
- tenant ID is now part of the index (repo metadata)
- GRPC: IndexOption and Repository have a new field TenantId
- If multi-tenancy is enabled, webserver checks if tenant in context matches the tenant id in the shard
- zoekt-git-index has a new parameter "-shard_prefix ". If set, the value will be used instead of repository name as prefix for the name of the shard. For Sourcegraph we use "<tenant id>_<repository id>" as prefix if multi-tenancy is enabled 

Assumption:
All calls to Sourcegraph are privileged

Test plan:
- New tests
- Ran this together with Sourcegraph (with and without MT enabled)
  • Loading branch information
stefanhengl authored Nov 19, 2024
1 parent 4f038fd commit c52a9cd
Show file tree
Hide file tree
Showing 30 changed files with 1,121 additions and 373 deletions.
8 changes: 8 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ func (r RepositoryBranch) String() string {

// Repository holds repository metadata.
type Repository struct {
// Sourcegraph's tenant ID
TenantID int

// Sourcegraph's repository ID
ID uint32

Expand Down Expand Up @@ -635,6 +638,11 @@ func (r *Repository) UnmarshalJSON(data []byte) error {
r.ID = uint32(id)
}

if v, ok := repo.RawConfig["tenantID"]; ok {
id, _ := strconv.ParseInt(v, 10, 64)
r.TenantID = int(id)
}

// Sourcegraph indexserver doesn't set repo.Rank, so we set it here. Setting it
// on read instead of during indexing allows us to avoid a complete reindex.
//
Expand Down
2 changes: 2 additions & 0 deletions api_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ func RepositoryFromProto(p *proto.Repository) Repository {
}

return Repository{
TenantID: int(p.GetTenantId()),
ID: p.GetId(),
Name: p.GetName(),
URL: p.GetUrl(),
Expand Down Expand Up @@ -474,6 +475,7 @@ func (r *Repository) ToProto() *proto.Repository {
}

return &proto.Repository{
TenantId: int64(r.TenantID),
Id: r.ID,
Name: r.Name,
Url: r.URL,
Expand Down
20 changes: 15 additions & 5 deletions build/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package build

import (
"cmp"
"crypto/sha1"
"flag"
"fmt"
Expand Down Expand Up @@ -117,6 +118,9 @@ type Options struct {
//
// Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile.
HeapProfileTriggerBytes uint64

// ShardPrefix is the prefix of the shard. It defaults to the repository name.
ShardPrefix string
}

// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building.
Expand Down Expand Up @@ -181,6 +185,7 @@ func (o *Options) Flags(fs *flag.FlagSet) {
fs.StringVar(&o.IndexDir, "index", x.IndexDir, "directory for search indices")
fs.BoolVar(&o.CTagsMustSucceed, "require_ctags", x.CTagsMustSucceed, "If set, ctags calls must succeed.")
fs.Var(largeFilesFlag{o}, "large_file", "A glob pattern where matching files are to be index regardless of their size. You can add multiple patterns by setting this more than once.")
fs.StringVar(&o.ShardPrefix, "shard_prefix", x.ShardPrefix, "the prefix of the shard. Defaults to repository name")

// Sourcegraph specific
fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.")
Expand Down Expand Up @@ -228,6 +233,10 @@ func (o *Options) Args() []string {
args = append(args, "-shard_merging")
}

if o.ShardPrefix != "" {
args = append(args, "-shard_prefix", o.ShardPrefix)
}

return args
}

Expand Down Expand Up @@ -337,12 +346,13 @@ func (o *Options) shardName(n int) string {
}

func (o *Options) shardNameVersion(version, n int) string {
abs := url.QueryEscape(o.RepositoryDescription.Name)
if len(abs) > 200 {
abs = abs[:200] + hashString(abs)[:8]
prefix := url.QueryEscape(cmp.Or(o.ShardPrefix, o.RepositoryDescription.Name))
if len(prefix) > 200 {
prefix = prefix[:200] + hashString(prefix)[:8]
}
return filepath.Join(o.IndexDir,
fmt.Sprintf("%s_v%d.%05d.zoekt", abs, version, n))
shardName := filepath.Join(o.IndexDir,
fmt.Sprintf("%s_v%d.%05d.zoekt", prefix, version, n))
return shardName
}

type IndexState string
Expand Down
137 changes: 137 additions & 0 deletions build/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ import (
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/grafana/regexp"
"github.com/stretchr/testify/require"

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/internal/tenant"
"github.com/sourcegraph/zoekt/internal/tenant/tenanttest"
"github.com/sourcegraph/zoekt/query"
"github.com/sourcegraph/zoekt/shards"
)
Expand Down Expand Up @@ -153,6 +158,138 @@ func TestBasic(t *testing.T) {
})
}

func TestSearchTenant(t *testing.T) {
tenanttest.MockEnforce(t)

dir := t.TempDir()

ctx1 := tenanttest.NewTestContext()
tnt1, err := tenant.FromContext(ctx1)
require.NoError(t, err)

opts := Options{
IndexDir: dir,
ShardMax: 1024,
RepositoryDescription: zoekt.Repository{
Name: "repo",
RawConfig: map[string]string{"tenantID": strconv.Itoa(tnt1.ID())},
},
Parallelism: 2,
SizeMax: 1 << 20,
}

b, err := NewBuilder(opts)
if err != nil {
t.Fatalf("NewBuilder: %v", err)
}

for i := 0; i < 4; i++ {
s := fmt.Sprintf("%d", i)
if err := b.AddFile("F"+s, []byte(strings.Repeat(s, 1000))); err != nil {
t.Fatal(err)
}
}

if err := b.Finish(); err != nil {
t.Errorf("Finish: %v", err)
}

fs, _ := filepath.Glob(dir + "/*.zoekt")
if len(fs) <= 1 {
t.Fatalf("want multiple shards, got %v", fs)
}

_, md0, err := zoekt.ReadMetadataPath(fs[0])
if err != nil {
t.Fatal(err)
}
for _, f := range fs[1:] {
_, md, err := zoekt.ReadMetadataPath(f)
if err != nil {
t.Fatal(err)
}
if md.IndexTime != md0.IndexTime {
t.Fatalf("wanted identical time stamps but got %v!=%v", md.IndexTime, md0.IndexTime)
}
if md.ID != md0.ID {
t.Fatalf("wanted identical IDs but got %s!=%s", md.ID, md0.ID)
}
}

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
defer ss.Close()

q, err := query.Parse("111")
if err != nil {
t.Fatalf("Parse(111): %v", err)
}

var sOpts zoekt.SearchOptions

// Tenant 1 has access to the repo
result, err := ss.Search(ctx1, q, &sOpts)
require.NoError(t, err)
require.Len(t, result.Files, 1)

// Tenant 2 does not have access to the repo
ctx2 := tenanttest.NewTestContext()
result, err = ss.Search(ctx2, q, &sOpts)
require.NoError(t, err)
require.Len(t, result.Files, 0)
}

func TestListTenant(t *testing.T) {
tenanttest.MockEnforce(t)

dir := t.TempDir()

ctx1 := tenanttest.NewTestContext()
tnt1, err := tenant.FromContext(ctx1)
require.NoError(t, err)

opts := Options{
IndexDir: dir,
RepositoryDescription: zoekt.Repository{
Name: "repo",
RawConfig: map[string]string{"tenantID": strconv.Itoa(tnt1.ID())},
},
}
opts.SetDefaults()

b, err := NewBuilder(opts)
if err != nil {
t.Fatalf("NewBuilder: %v", err)
}
if err := b.Finish(); err != nil {
t.Errorf("Finish: %v", err)
}

fs, _ := filepath.Glob(dir + "/*.zoekt")
if len(fs) != 1 {
t.Fatalf("want a shard, got %v", fs)
}

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
defer ss.Close()

// Tenant 1 has access to the repo
result, err := ss.List(ctx1, &query.Const{Value: true}, nil)
require.NoError(t, err)
require.Len(t, result.Repos, 1)

// Tenant 2 does not have access to the repo
ctx2 := tenanttest.NewTestContext()
result, err = ss.List(ctx2, &query.Const{Value: true}, nil)
require.NoError(t, err)
require.Len(t, result.Repos, 0)
}

// retryTest will retry f until min(t.Deadline(), time.Minute). It returns
// once f doesn't call fatalf.
func retryTest(t *testing.T, f func(fatalf func(format string, args ...interface{}))) {
Expand Down
18 changes: 16 additions & 2 deletions cmd/zoekt-sourcegraph-indexserver/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"strings"
"time"

sglog "github.com/sourcegraph/log"

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/build"
"github.com/sourcegraph/zoekt/ctags"

sglog "github.com/sourcegraph/log"
"github.com/sourcegraph/zoekt/internal/tenant"
)

const defaultIndexingTimeout = 1*time.Hour + 30*time.Minute
Expand Down Expand Up @@ -67,6 +68,9 @@ type IndexOptions struct {
// The number of threads to use for indexing shards. Defaults to the number of available
// CPUs. If the server flag -cpu_fraction is set, then this value overrides it.
ShardConcurrency int32

// TenantID is the tenant ID for the repository.
TenantID int
}

// indexArgs represents the arguments we pass to zoekt-git-index
Expand Down Expand Up @@ -100,12 +104,18 @@ type indexArgs struct {
// BuildOptions returns a build.Options represented by indexArgs. Note: it
// doesn't set fields like repository/branch.
func (o *indexArgs) BuildOptions() *build.Options {
shardPrefix := ""
if tenant.EnforceTenant() {
shardPrefix = tenant.SrcPrefix(o.TenantID, o.RepoID)
}

return &build.Options{
// It is important that this RepositoryDescription exactly matches what
// the indexer we call will produce. This is to ensure that
// IncrementalSkipIndexing and IndexState can correctly calculate if
// nothing needs to be done.
RepositoryDescription: zoekt.Repository{
TenantID: o.TenantID,
ID: uint32(o.IndexOptions.RepoID),
Name: o.Name,
Branches: o.Branches,
Expand All @@ -117,6 +127,7 @@ func (o *indexArgs) BuildOptions() *build.Options {
"archived": marshalBool(o.Archived),
// Calculate repo rank based on the latest commit date.
"latestCommitDate": "1",
"tenantID": strconv.Itoa(o.TenantID),
},
},
IndexDir: o.IndexDir,
Expand All @@ -130,6 +141,8 @@ func (o *indexArgs) BuildOptions() *build.Options {
LanguageMap: o.LanguageMap,

ShardMerging: o.ShardMerging,

ShardPrefix: shardPrefix,
}
}

Expand Down Expand Up @@ -245,6 +258,7 @@ func fetchRepo(ctx context.Context, gitDir string, o *indexArgs, c gitIndexConfi
"-C", gitDir,
"-c", "protocol.version=2",
"-c", "http.extraHeader=X-Sourcegraph-Actor-UID: internal",
"-c", "http.extraHeader=X-Sourcegraph-Tenant-ID: " + strconv.Itoa(o.TenantID),
"fetch", "--depth=1", "--no-tags",
}

Expand Down
Loading

0 comments on commit c52a9cd

Please sign in to comment.