Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sourcegraph: multi-tenant Zoekt #859

Merged
merged 13 commits into from
Nov 19, 2024
8 changes: 8 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ func (r RepositoryBranch) String() string {

// Repository holds repository metadata.
type Repository struct {
// Sourcegraph's tenant ID
TenantID int

// Sourcegraph's repository ID
ID uint32

Expand Down Expand Up @@ -635,6 +638,11 @@ func (r *Repository) UnmarshalJSON(data []byte) error {
r.ID = uint32(id)
}

if v, ok := repo.RawConfig["tenantID"]; ok {
id, _ := strconv.ParseInt(v, 10, 64)
r.TenantID = int(id)
}

// Sourcegraph indexserver doesn't set repo.Rank, so we set it here. Setting it
// on read instead of during indexing allows us to avoid a complete reindex.
//
Expand Down
2 changes: 2 additions & 0 deletions api_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ func RepositoryFromProto(p *proto.Repository) Repository {
}

return Repository{
TenantID: int(p.GetTenantId()),
ID: p.GetId(),
Name: p.GetName(),
URL: p.GetUrl(),
Expand Down Expand Up @@ -474,6 +475,7 @@ func (r *Repository) ToProto() *proto.Repository {
}

return &proto.Repository{
TenantId: int64(r.TenantID),
Id: r.ID,
Name: r.Name,
Url: r.URL,
Expand Down
20 changes: 15 additions & 5 deletions build/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package build

import (
"cmp"
"crypto/sha1"
"flag"
"fmt"
Expand Down Expand Up @@ -117,6 +118,9 @@ type Options struct {
//
// Note: heap checking is "best effort", and it's possible for the process to OOM without triggering the heap profile.
HeapProfileTriggerBytes uint64

// ShardPrefix is the prefix of the shard. It defaults to the repository name.
ShardPrefix string
}

// HashOptions contains only the options in Options that upon modification leads to IndexState of IndexStateMismatch during the next index building.
Expand Down Expand Up @@ -181,6 +185,7 @@ func (o *Options) Flags(fs *flag.FlagSet) {
fs.StringVar(&o.IndexDir, "index", x.IndexDir, "directory for search indices")
fs.BoolVar(&o.CTagsMustSucceed, "require_ctags", x.CTagsMustSucceed, "If set, ctags calls must succeed.")
fs.Var(largeFilesFlag{o}, "large_file", "A glob pattern where matching files are to be index regardless of their size. You can add multiple patterns by setting this more than once.")
fs.StringVar(&o.ShardPrefix, "shard_prefix", x.ShardPrefix, "the prefix of the shard. Defaults to repository name")

// Sourcegraph specific
fs.BoolVar(&o.DisableCTags, "disable_ctags", x.DisableCTags, "If set, ctags will not be called.")
Expand Down Expand Up @@ -228,6 +233,10 @@ func (o *Options) Args() []string {
args = append(args, "-shard_merging")
}

if o.ShardPrefix != "" {
args = append(args, "-shard_prefix", o.ShardPrefix)
}

return args
}

Expand Down Expand Up @@ -337,12 +346,13 @@ func (o *Options) shardName(n int) string {
}

func (o *Options) shardNameVersion(version, n int) string {
abs := url.QueryEscape(o.RepositoryDescription.Name)
if len(abs) > 200 {
abs = abs[:200] + hashString(abs)[:8]
prefix := url.QueryEscape(cmp.Or(o.ShardPrefix, o.RepositoryDescription.Name))
if len(prefix) > 200 {
prefix = prefix[:200] + hashString(prefix)[:8]
}
return filepath.Join(o.IndexDir,
fmt.Sprintf("%s_v%d.%05d.zoekt", abs, version, n))
shardName := filepath.Join(o.IndexDir,
fmt.Sprintf("%s_v%d.%05d.zoekt", prefix, version, n))
return shardName
}

type IndexState string
Expand Down
137 changes: 137 additions & 0 deletions build/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ import (
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/grafana/regexp"
"github.com/stretchr/testify/require"

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/internal/tenant"
"github.com/sourcegraph/zoekt/internal/tenant/tenanttest"
"github.com/sourcegraph/zoekt/query"
"github.com/sourcegraph/zoekt/shards"
)
Expand Down Expand Up @@ -153,6 +158,138 @@ func TestBasic(t *testing.T) {
})
}

func TestSearchTenant(t *testing.T) {
tenanttest.MockEnforce(t)

dir := t.TempDir()

ctx1 := tenanttest.NewTestContext()
tnt1, err := tenant.FromContext(ctx1)
require.NoError(t, err)

opts := Options{
IndexDir: dir,
ShardMax: 1024,
RepositoryDescription: zoekt.Repository{
Name: "repo",
RawConfig: map[string]string{"tenantID": strconv.Itoa(tnt1.ID())},
},
Parallelism: 2,
SizeMax: 1 << 20,
}

b, err := NewBuilder(opts)
if err != nil {
t.Fatalf("NewBuilder: %v", err)
}

for i := 0; i < 4; i++ {
s := fmt.Sprintf("%d", i)
if err := b.AddFile("F"+s, []byte(strings.Repeat(s, 1000))); err != nil {
t.Fatal(err)
}
}

if err := b.Finish(); err != nil {
t.Errorf("Finish: %v", err)
}

fs, _ := filepath.Glob(dir + "/*.zoekt")
if len(fs) <= 1 {
t.Fatalf("want multiple shards, got %v", fs)
}

_, md0, err := zoekt.ReadMetadataPath(fs[0])
if err != nil {
t.Fatal(err)
}
for _, f := range fs[1:] {
_, md, err := zoekt.ReadMetadataPath(f)
if err != nil {
t.Fatal(err)
}
if md.IndexTime != md0.IndexTime {
t.Fatalf("wanted identical time stamps but got %v!=%v", md.IndexTime, md0.IndexTime)
}
if md.ID != md0.ID {
t.Fatalf("wanted identical IDs but got %s!=%s", md.ID, md0.ID)
}
}

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
defer ss.Close()

q, err := query.Parse("111")
if err != nil {
t.Fatalf("Parse(111): %v", err)
}

var sOpts zoekt.SearchOptions
stefanhengl marked this conversation as resolved.
Show resolved Hide resolved

// Tenant 1 has access to the repo
result, err := ss.Search(ctx1, q, &sOpts)
require.NoError(t, err)
require.Len(t, result.Files, 1)

// Tenant 2 does not have access to the repo
ctx2 := tenanttest.NewTestContext()
result, err = ss.Search(ctx2, q, &sOpts)
require.NoError(t, err)
require.Len(t, result.Files, 0)
}

func TestListTenant(t *testing.T) {
tenanttest.MockEnforce(t)

dir := t.TempDir()

ctx1 := tenanttest.NewTestContext()
tnt1, err := tenant.FromContext(ctx1)
require.NoError(t, err)

opts := Options{
IndexDir: dir,
RepositoryDescription: zoekt.Repository{
Name: "repo",
RawConfig: map[string]string{"tenantID": strconv.Itoa(tnt1.ID())},
},
}
opts.SetDefaults()

b, err := NewBuilder(opts)
if err != nil {
t.Fatalf("NewBuilder: %v", err)
}
if err := b.Finish(); err != nil {
t.Errorf("Finish: %v", err)
}

fs, _ := filepath.Glob(dir + "/*.zoekt")
if len(fs) != 1 {
t.Fatalf("want a shard, got %v", fs)
}

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
defer ss.Close()

// Tenant 1 has access to the repo
result, err := ss.List(ctx1, &query.Const{Value: true}, nil)
require.NoError(t, err)
require.Len(t, result.Repos, 1)

// Tenant 2 does not have access to the repo
ctx2 := tenanttest.NewTestContext()
result, err = ss.List(ctx2, &query.Const{Value: true}, nil)
require.NoError(t, err)
require.Len(t, result.Repos, 0)
}

// retryTest will retry f until min(t.Deadline(), time.Minute). It returns
// once f doesn't call fatalf.
func retryTest(t *testing.T, f func(fatalf func(format string, args ...interface{}))) {
Expand Down
18 changes: 16 additions & 2 deletions cmd/zoekt-sourcegraph-indexserver/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"strings"
"time"

sglog "github.com/sourcegraph/log"

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/build"
"github.com/sourcegraph/zoekt/ctags"

sglog "github.com/sourcegraph/log"
"github.com/sourcegraph/zoekt/internal/tenant"
)

const defaultIndexingTimeout = 1*time.Hour + 30*time.Minute
Expand Down Expand Up @@ -67,6 +68,9 @@ type IndexOptions struct {
// The number of threads to use for indexing shards. Defaults to the number of available
// CPUs. If the server flag -cpu_fraction is set, then this value overrides it.
ShardConcurrency int32

// TenantID is the tenant ID for the repository.
TenantID int
}

// indexArgs represents the arguments we pass to zoekt-git-index
Expand Down Expand Up @@ -100,12 +104,18 @@ type indexArgs struct {
// BuildOptions returns a build.Options represented by indexArgs. Note: it
// doesn't set fields like repository/branch.
func (o *indexArgs) BuildOptions() *build.Options {
shardPrefix := ""
if tenant.EnforceTenant() {
shardPrefix = tenant.SrcPrefix(o.TenantID, o.RepoID)
}

return &build.Options{
// It is important that this RepositoryDescription exactly matches what
// the indexer we call will produce. This is to ensure that
// IncrementalSkipIndexing and IndexState can correctly calculate if
// nothing needs to be done.
RepositoryDescription: zoekt.Repository{
TenantID: o.TenantID,
ID: uint32(o.IndexOptions.RepoID),
Name: o.Name,
Branches: o.Branches,
Expand All @@ -117,6 +127,7 @@ func (o *indexArgs) BuildOptions() *build.Options {
"archived": marshalBool(o.Archived),
// Calculate repo rank based on the latest commit date.
"latestCommitDate": "1",
"tenantID": strconv.Itoa(o.TenantID),
},
},
IndexDir: o.IndexDir,
Expand All @@ -130,6 +141,8 @@ func (o *indexArgs) BuildOptions() *build.Options {
LanguageMap: o.LanguageMap,

ShardMerging: o.ShardMerging,

ShardPrefix: shardPrefix,
}
}

Expand Down Expand Up @@ -245,6 +258,7 @@ func fetchRepo(ctx context.Context, gitDir string, o *indexArgs, c gitIndexConfi
"-C", gitDir,
"-c", "protocol.version=2",
"-c", "http.extraHeader=X-Sourcegraph-Actor-UID: internal",
"-c", "http.extraHeader=X-Sourcegraph-Tenant-ID: " + strconv.Itoa(o.TenantID),
"fetch", "--depth=1", "--no-tags",
}

Expand Down
Loading
Loading