diff --git a/go/pkg/client/client.go b/go/pkg/client/client.go index bdb2f2fb5..2bb9093fb 100644 --- a/go/pkg/client/client.go +++ b/go/pkg/client/client.go @@ -15,6 +15,7 @@ import ( "time" "errors" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/actas" "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer" "github.com/bazelbuild/remote-apis-sdks/go/pkg/chunker" @@ -244,6 +245,10 @@ func (c *Client) Close() error { if c.casConnection != c.connection { return c.casConnection.Close() } + if c.diskCache != nil { + // Waits for local disk GC to complete. + c.diskCache.Shutdown() + } return nil } @@ -354,21 +359,12 @@ func (o *TreeSymlinkOpts) Apply(c *Client) { } type DiskCacheOpts struct { - Context context.Context - Path string - MaxCapacityGb float64 + DiskCache *diskcache.DiskCache } // Apply sets the client's TreeSymlinkOpts. func (o *DiskCacheOpts) Apply(c *Client) { - if o.Path != "" { - capBytes := uint64(o.MaxCapacityGb * 1024 * 1024 * 1024) - var err error - // TODO(ola): propagate errors from Apply. - if c.diskCache, err = diskcache.New(o.Context, o.Path, capBytes); err != nil { - log.Errorf("Error initializing disk cache on %s: %v", o.Path, err) - } - } + c.diskCache = o.DiskCache } // MaxBatchDigests is maximum amount of digests to batch in upload and download operations. diff --git a/go/pkg/diskcache/BUILD.bazel b/go/pkg/diskcache/BUILD.bazel index dc07fc4e6..5cd18a9c2 100644 --- a/go/pkg/diskcache/BUILD.bazel +++ b/go/pkg/diskcache/BUILD.bazel @@ -28,7 +28,22 @@ go_test( "//go/pkg/testutil", "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_google_go_cmp//cmp:go_default_library", - "@com_github_pborman_uuid//:go_default_library", + "@com_github_google_uuid//:uuid", "@org_golang_x_sync//errgroup:go_default_library", ], ) + +go_test( + name = "diskcache_benchmark_test", + srcs = ["diskcache_benchmark_test.go"], + embed = [":diskcache"], + deps = [ + "//go/pkg/digest", + "//go/pkg/testutil", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", + "@com_github_google_go_cmp//cmp:go_default_library", + "@com_github_google_uuid//:uuid", + "@org_golang_x_sync//errgroup:go_default_library", + ], + tags = ["manual"], +) diff --git a/go/pkg/diskcache/diskcache.go b/go/pkg/diskcache/diskcache.go index bad4e37d0..7eb558612 100644 --- a/go/pkg/diskcache/diskcache.go +++ b/go/pkg/diskcache/diskcache.go @@ -88,8 +88,6 @@ func (q *priorityQueue) Bump(item *qitem) { heap.Fix(q, item.index) } -const maxConcurrentRequests = 1000 - // DiskCache is a local disk LRU CAS and Action Cache cache. type DiskCache struct { root string // path to the root directory of the disk cache. @@ -97,12 +95,28 @@ type DiskCache struct { mu sync.Mutex // protects the queue. store sync.Map // map of keys to qitems. queue *priorityQueue // keys by last accessed time. - sizeBytes int64 // total size. ctx context.Context shutdown chan bool - gcTick uint64 - gcReq chan uint64 - testGcTicks chan uint64 + shutdownOnce sync.Once + gcReq chan bool + gcDone chan bool + statMu sync.Mutex + stats *DiskCacheStats +} + +type DiskCacheStats struct { + TotalSizeBytes int64 + TotalNumFiles int64 + NumFilesStored int64 + TotalStoredBytes int64 + NumFilesGCed int64 + TotalGCedSizeBytes int64 + NumCacheHits int64 + NumCacheMisses int64 + TotalCacheHitSizeBytes int64 + InitTime time.Duration + TotalGCTime time.Duration + TotalGCDiskOpsTime time.Duration } func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache, error) { @@ -113,9 +127,13 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache, queue: &priorityQueue{ items: make([]*qitem, 1000), }, - gcReq: make(chan uint64, maxConcurrentRequests), + gcReq: make(chan bool), shutdown: make(chan bool), + gcDone: make(chan bool), + stats: &DiskCacheStats{}, } + start := time.Now() + defer func() { atomic.AddInt64((*int64)(&res.stats.InitTime), int64(time.Since(start))) }() heap.Init(res.queue) if err := os.MkdirAll(root, os.ModePerm); err != nil { return nil, err @@ -141,24 +159,25 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache, return nil } subdir := filepath.Base(filepath.Dir(path)) - k, err := res.getKeyFromFileName(subdir + d.Name()) + k, err := getKeyFromFileName(subdir + d.Name()) if err != nil { return fmt.Errorf("error parsing cached file name %s: %v", path, err) } - atime, err := getLastAccessTime(path) + info, err := d.Info() if err != nil { - return fmt.Errorf("error getting last accessed time of %s: %v", path, err) + return fmt.Errorf("error getting file info of %s: %v", path, err) } it := &qitem{ key: k, - lat: atime, + lat: fileInfoToAccessTime(info), } size, err := res.getItemSize(k) if err != nil { return fmt.Errorf("error getting file size of %s: %v", path, err) } res.store.Store(k, it) - atomic.AddInt64(&res.sizeBytes, size) + atomic.AddInt64(&res.stats.TotalSizeBytes, size) + atomic.AddInt64(&res.stats.TotalNumFiles, 1) res.mu.Lock() heap.Push(res.queue, it) res.mu.Unlock() @@ -180,39 +199,47 @@ func (d *DiskCache) getItemSize(k key) (int64, error) { fname := d.getPath(k) info, err := os.Stat(fname) if err != nil { - return 0, fmt.Errorf("Error getting info for %s: %v", fname, err) + return 0, fmt.Errorf("error getting info for %s: %v", fname, err) } return info.Size(), nil } -// Releases resources and terminates the GC daemon. Should be the last call to the DiskCache. +// Terminates the GC daemon, waiting for it to complete. No further Store* calls to the DiskCache should be made. func (d *DiskCache) Shutdown() { - d.shutdown <- true + d.shutdownOnce.Do(func() { + d.shutdown <- true + <-d.gcDone + log.Infof("DiskCacheStats: %+v", d.stats) + }) } -func (d *DiskCache) TotalSizeBytes() uint64 { - return uint64(atomic.LoadInt64(&d.sizeBytes)) -} - -// This function is defined in https://pkg.go.dev/strings#CutSuffix -// It is copy/pasted here as a hack, because I failed to upgrade the *Reclient* repo to the latest Go 1.20.7. -func CutSuffix(s, suffix string) (before string, found bool) { - if !strings.HasSuffix(s, suffix) { - return s, false +func (d *DiskCache) GetStats() *DiskCacheStats { + // Return a copy for safety. + return &DiskCacheStats{ + TotalSizeBytes: atomic.LoadInt64(&d.stats.TotalSizeBytes), + TotalNumFiles: atomic.LoadInt64(&d.stats.TotalNumFiles), + NumFilesStored: atomic.LoadInt64(&d.stats.NumFilesStored), + TotalStoredBytes: atomic.LoadInt64(&d.stats.TotalStoredBytes), + NumFilesGCed: atomic.LoadInt64(&d.stats.NumFilesGCed), + TotalGCedSizeBytes: atomic.LoadInt64(&d.stats.TotalGCedSizeBytes), + NumCacheHits: atomic.LoadInt64(&d.stats.NumCacheHits), + NumCacheMisses: atomic.LoadInt64(&d.stats.NumCacheMisses), + TotalCacheHitSizeBytes: atomic.LoadInt64(&d.stats.TotalCacheHitSizeBytes), + InitTime: time.Duration(atomic.LoadInt64((*int64)(&d.stats.InitTime))), + TotalGCTime: time.Duration(atomic.LoadInt64((*int64)(&d.stats.TotalGCTime))), } - return s[:len(s)-len(suffix)], true } -func (d *DiskCache) getKeyFromFileName(fname string) (key, error) { +func getKeyFromFileName(fname string) (key, error) { pair := strings.Split(fname, ".") if len(pair) != 2 { - return key{}, fmt.Errorf("expected file name in the form [ac_]hash/size, got %s", fname) + return key{}, fmt.Errorf("expected file name in the form hash[_ac].size, got %s", fname) } size, err := strconv.ParseInt(pair[1], 10, 64) if err != nil { return key{}, fmt.Errorf("invalid size in digest %s: %s", fname, err) } - hash, isAc := CutSuffix(pair[0], "ac_") + hash, isAc := strings.CutSuffix(pair[0], "_ac") dg, err := digest.New(hash, size) if err != nil { return key{}, fmt.Errorf("invalid digest from file name %s: %v", fname, err) @@ -248,10 +275,16 @@ func (d *DiskCache) StoreCas(dg digest.Digest, path string) error { if err := copyFile(path, d.getPath(it.key), dg.Size); err != nil { return err } - newSize := uint64(atomic.AddInt64(&d.sizeBytes, dg.Size)) + d.statMu.Lock() + d.stats.TotalSizeBytes += dg.Size + d.stats.TotalStoredBytes += dg.Size + newSize := uint64(d.stats.TotalSizeBytes) + d.stats.TotalNumFiles++ + d.stats.NumFilesStored++ + d.statMu.Unlock() if newSize > d.maxCapacityBytes { select { - case d.gcReq <- atomic.AddUint64(&d.gcTick, 1): + case d.gcReq <- true: default: } } @@ -281,10 +314,16 @@ func (d *DiskCache) StoreActionCache(dg digest.Digest, ar *repb.ActionResult) er if err := os.WriteFile(d.getPath(it.key), bytes, 0644); err != nil { return err } - newSize := uint64(atomic.AddInt64(&d.sizeBytes, int64(size))) + d.statMu.Lock() + d.stats.TotalSizeBytes += int64(size) + d.stats.TotalStoredBytes += int64(size) + newSize := uint64(d.stats.TotalSizeBytes) + d.stats.TotalNumFiles++ + d.stats.NumFilesStored++ + d.statMu.Unlock() if newSize > d.maxCapacityBytes { select { - case d.gcReq <- atomic.AddUint64(&d.gcTick, 1): + case d.gcReq <- true: default: } } @@ -292,15 +331,19 @@ func (d *DiskCache) StoreActionCache(dg digest.Digest, ar *repb.ActionResult) er } func (d *DiskCache) gc() { + defer func() { d.gcDone <- true }() for { select { case <-d.shutdown: return case <-d.ctx.Done(): return - case t := <-d.gcReq: + case <-d.gcReq: + start := time.Now() // Evict old entries until total size is below cap. - for uint64(atomic.LoadInt64(&d.sizeBytes)) > d.maxCapacityBytes { + var numFilesGCed, totalGCedSizeBytes int64 + var totalGCDiskOpsTime time.Duration + for uint64(atomic.LoadInt64(&d.stats.TotalSizeBytes)) > d.maxCapacityBytes { d.mu.Lock() it := heap.Pop(d.queue).(*qitem) d.mu.Unlock() @@ -309,21 +352,26 @@ func (d *DiskCache) gc() { log.Errorf("error getting item size for %v: %v", it.key, err) size = 0 } - atomic.AddInt64(&d.sizeBytes, -size) + atomic.AddInt64(&d.stats.TotalSizeBytes, -size) + numFilesGCed++ + totalGCedSizeBytes += size it.mu.Lock() + diskOpsStart := time.Now() // We only delete the files, and not the prefix directories, because the prefixes are not worth worrying about. if err := os.Remove(d.getPath(it.key)); err != nil { log.Errorf("Error removing file: %v", err) } + totalGCDiskOpsTime += time.Since(diskOpsStart) d.store.Delete(it.key) it.mu.Unlock() } - if d.testGcTicks != nil { - select { - case d.testGcTicks <- t: - default: - } - } + d.statMu.Lock() + d.stats.NumFilesGCed += numFilesGCed + d.stats.TotalNumFiles -= numFilesGCed + d.stats.TotalGCedSizeBytes += totalGCedSizeBytes + d.stats.TotalGCDiskOpsTime += time.Duration(totalGCDiskOpsTime) + d.stats.TotalGCTime += time.Since(start) + d.statMu.Unlock() } } } @@ -363,6 +411,7 @@ func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool { k := key{digest: dg, isCas: true} iUntyped, loaded := d.store.Load(k) if !loaded { + atomic.AddInt64(&d.stats.NumCacheMisses, 1) return false } it := iUntyped.(*qitem) @@ -371,12 +420,17 @@ func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool { it.mu.RUnlock() if err != nil { // It is not possible to prevent a race with GC; hence, we return false on copy errors. + atomic.AddInt64(&d.stats.NumCacheMisses, 1) return false } d.mu.Lock() d.queue.Bump(it) d.mu.Unlock() + d.statMu.Lock() + d.stats.NumCacheHits++ + d.stats.TotalCacheHitSizeBytes += dg.Size + d.statMu.Unlock() return true } @@ -384,14 +438,17 @@ func (d *DiskCache) LoadActionCache(dg digest.Digest) (ar *repb.ActionResult, lo k := key{digest: dg, isCas: false} iUntyped, loaded := d.store.Load(k) if !loaded { + atomic.AddInt64(&d.stats.NumCacheMisses, 1) return nil, false } it := iUntyped.(*qitem) it.mu.RLock() ar = &repb.ActionResult{} - if err := d.loadActionResult(k, ar); err != nil { + size, err := d.loadActionResult(k, ar) + if err != nil { // It is not possible to prevent a race with GC; hence, we return false on load errors. it.mu.RUnlock() + atomic.AddInt64(&d.stats.NumCacheMisses, 1) return nil, false } it.mu.RUnlock() @@ -399,30 +456,27 @@ func (d *DiskCache) LoadActionCache(dg digest.Digest) (ar *repb.ActionResult, lo d.mu.Lock() d.queue.Bump(it) d.mu.Unlock() + d.statMu.Lock() + d.stats.NumCacheHits++ + d.stats.TotalCacheHitSizeBytes += int64(size) + d.statMu.Unlock() return ar, true } -func (d *DiskCache) loadActionResult(k key, ar *repb.ActionResult) error { +func (d *DiskCache) loadActionResult(k key, ar *repb.ActionResult) (int, error) { bytes, err := os.ReadFile(d.getPath(k)) if err != nil { - return err + return 0, err } + n := len(bytes) // Required sanity check: sometimes the read pretends to succeed, but doesn't, if // the file is being concurrently deleted. Empty ActionResult is advised against in // the RE-API: https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L1052 - if len(bytes) == 0 { - return fmt.Errorf("read empty ActionResult for %v", k.digest) + if n == 0 { + return n, fmt.Errorf("read empty ActionResult for %v", k.digest) } if err := proto.Unmarshal(bytes, ar); err != nil { - return fmt.Errorf("error unmarshalling %v as ActionResult: %v", bytes, err) - } - return nil -} - -func getLastAccessTime(path string) (time.Time, error) { - info, err := os.Stat(path) - if err != nil { - return time.Time{}, err + return n, fmt.Errorf("error unmarshalling %v as ActionResult: %v", bytes, err) } - return FileInfoToAccessTime(info), nil + return n, nil } diff --git a/go/pkg/diskcache/diskcache_benchmark_test.go b/go/pkg/diskcache/diskcache_benchmark_test.go new file mode 100644 index 000000000..ff64f4f95 --- /dev/null +++ b/go/pkg/diskcache/diskcache_benchmark_test.go @@ -0,0 +1,174 @@ +package diskcache + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" + "golang.org/x/sync/errgroup" +) + +type BenchmarkParams struct { + Name string + MaxConcurrency int // Number of concurrent threads for requests. + CacheMissCost time.Duration // Similates a remote execution / fetch. + CacheSizeBytes uint64 + FileSizeBytes int // All files have this size. + NumRequests int // Total number of cache load/store requests. + TotalNumFiles int // NumRequests will repeat over this set. + NumExistingFiles int // Affects initialization time. +} + +var kBenchmarks = []BenchmarkParams{ + { + Name: "AllGC_Small", + MaxConcurrency: 100, + CacheMissCost: 100 * time.Millisecond, + CacheSizeBytes: 20000, + FileSizeBytes: 100, + NumRequests: 10000, + TotalNumFiles: 500, + NumExistingFiles: 0, + }, + { + Name: "AllGC_Medium", + MaxConcurrency: 100, + CacheMissCost: 100 * time.Millisecond, + CacheSizeBytes: 2000000, + FileSizeBytes: 10000, + NumRequests: 10000, + TotalNumFiles: 500, + NumExistingFiles: 0, + }, + { + Name: "AllGC_Large", + MaxConcurrency: 100, + CacheMissCost: 100 * time.Millisecond, + CacheSizeBytes: 2000000000, + FileSizeBytes: 10000000, + NumRequests: 2000, + TotalNumFiles: 500, + NumExistingFiles: 0, + }, + { + Name: "AllCacheHits_Small", + MaxConcurrency: 100, + CacheMissCost: 100 * time.Millisecond, + CacheSizeBytes: 50000, + FileSizeBytes: 100, + NumRequests: 20000, + TotalNumFiles: 500, + NumExistingFiles: 500, + }, + { + Name: "AllCacheHits_Medium", + MaxConcurrency: 100, + CacheMissCost: 100 * time.Millisecond, + CacheSizeBytes: 50000000, + FileSizeBytes: 100000, + NumRequests: 20000, + TotalNumFiles: 500, + NumExistingFiles: 500, + }, + { + Name: "AllCacheHits_Large", + MaxConcurrency: 100, + CacheMissCost: 100 * time.Millisecond, + CacheSizeBytes: 5000000000, + FileSizeBytes: 10000000, + NumRequests: 2000, + TotalNumFiles: 500, + NumExistingFiles: 500, + }, +} + +func getFilename(i int) string { + return fmt.Sprintf("f_%05d", i) +} + +func TestRunAllBenchmarks(t *testing.T) { + for _, b := range kBenchmarks { + t.Run(b.Name, func(t *testing.T) { + root := t.TempDir() + fmt.Printf("Initializing source files for benchmark %s...\n", b.Name) + start := time.Now() + source := filepath.Join(root, "source") + if err := os.MkdirAll(source, 0777); err != nil { + t.Fatalf("%v", err) + } + dgs := make([]digest.Digest, b.TotalNumFiles) + for i := 0; i < b.TotalNumFiles; i++ { + filename := filepath.Join(source, getFilename(i)) + var s strings.Builder + fmt.Fprintf(&s, "%d\n", i) + for k := s.Len(); k < b.FileSizeBytes; k++ { + s.WriteByte(0) + } + blob := []byte(s.String()) + dgs[i] = digest.NewFromBlob(blob) + if err := os.WriteFile(filename, blob, 00666); err != nil { + t.Fatalf("os.WriteFile(%s): %v", filename, err) + } + } + + cacheDir := filepath.Join(root, "cache") + d, err := New(context.Background(), cacheDir, b.CacheSizeBytes) + if err != nil { + t.Fatalf("New: %v", err) + } + // Pre-populate the cache if requested: + if b.NumExistingFiles > 0 { + fmt.Printf("Pre-warming cache for benchmark %s...\n", b.Name) + for i := 0; i < b.NumExistingFiles; i++ { + fname := filepath.Join(source, getFilename(i)) + if err := d.StoreCas(dgs[i], fname); err != nil { + t.Fatalf("StoreCas(%s, %s) failed: %v", dgs[i], fname, err) + } + } + d.Shutdown() + d, err = New(context.Background(), cacheDir, b.CacheSizeBytes) + if err != nil { + t.Fatalf("New: %v", err) + } + } + // Run the simulation: store on every cache miss. + new := filepath.Join(root, "new") + if err := os.MkdirAll(new, 0777); err != nil { + t.Fatalf("%v", err) + } + fmt.Printf("Finished initialization for benchmark %s, duration %v\n", b.Name, time.Since(start)) + eg := errgroup.Group{} + eg.SetLimit(b.MaxConcurrency) + fmt.Printf("Starting benchmark %s...\n", b.Name) + start = time.Now() + for k := 0; k < b.NumRequests; k++ { + k := k + eg.Go(func() error { + i := k % b.TotalNumFiles + newName := filepath.Join(new, getFilename(k)) + if d.LoadCas(dgs[i], newName) { + if dg, err := digest.NewFromFile(newName); dg != dgs[i] || err != nil { + return fmt.Errorf("%d: err %v or digest mismatch %v vs %v", k, err, dg, dgs[i]) + } + } else { + time.Sleep(b.CacheMissCost) + if err := d.StoreCas(dgs[i], filepath.Join(source, getFilename(i))); err != nil { + return fmt.Errorf("StoreCas: %v", err) + } + } + return nil + }) + } + if err := eg.Wait(); err != nil { + t.Fatalf("%v", err) + } + d.Shutdown() + fmt.Printf("Finished benchmark %s, total duration %v, stats:\n%+v\n", b.Name, time.Since(start), d.GetStats()) + }) + } +} diff --git a/go/pkg/diskcache/diskcache_test.go b/go/pkg/diskcache/diskcache_test.go index 4d91ab747..9c6a6b8f6 100644 --- a/go/pkg/diskcache/diskcache_test.go +++ b/go/pkg/diskcache/diskcache_test.go @@ -8,26 +8,18 @@ import ( "path/filepath" "sync/atomic" "testing" + "time" "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" "github.com/bazelbuild/remote-apis-sdks/go/pkg/testutil" "github.com/google/go-cmp/cmp" - "github.com/pborman/uuid" + "github.com/google/uuid" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" ) -// Test utility only. Assumes all modifications are done, and at least one GC is expected. -func waitForGc(d *DiskCache) { - for t := range d.testGcTicks { - if t == d.gcTick { - return - } - } -} - func TestStoreLoadCasPerm(t *testing.T) { tests := []struct { name string @@ -49,7 +41,6 @@ func TestStoreLoadCasPerm(t *testing.T) { if err != nil { t.Errorf("New: %v", err) } - defer d.Shutdown() fname, _ := testutil.CreateFile(t, tc.executable, "12345") srcInfo, err := os.Stat(fname) if err != nil { @@ -80,6 +71,41 @@ func TestStoreLoadCasPerm(t *testing.T) { if string(contents) != "12345" { t.Errorf("Cached result did not match: want %q, got %q", "12345", string(contents)) } + d.Shutdown() + stats := d.GetStats() + if stats.TotalNumFiles != 1 { + t.Errorf("expected TotalNumFiles to be 1, got %d", stats.TotalNumFiles) + } + if stats.NumFilesStored != 1 { + t.Errorf("expected NumFilesStored to be 1, got %d", stats.NumFilesStored) + } + if stats.TotalStoredBytes != 5 { + t.Errorf("expected TotalStoredBytes to be 5, got %d", stats.TotalStoredBytes) + } + if stats.NumCacheHits != 1 { + t.Errorf("expected NumCacheHits to be 1, got %d", stats.NumCacheHits) + } + if stats.TotalCacheHitSizeBytes != 5 { + t.Errorf("expected TotalCacheHitSizeBytes to be 5, got %d", stats.TotalCacheHitSizeBytes) + } + if stats.NumCacheMisses != 0 { + t.Errorf("expected NumCacheMisses to be 0, got %d", stats.NumCacheMisses) + } + if stats.TotalSizeBytes != 5 { + t.Errorf("expected TotalSizeBytes to be 5, got %d", stats.TotalSizeBytes) + } + if stats.NumFilesGCed != 0 { + t.Errorf("expected NumFilesGCed to be 0, got %d", stats.NumFilesGCed) + } + if stats.TotalGCedSizeBytes != 0 { + t.Errorf("expected TotalGCedSizeBytes to be 0, got %d", stats.TotalGCedSizeBytes) + } + if stats.InitTime == 0 { + t.Errorf("expected InitTime to be > 0") + } + if stats.TotalGCTime != 0 { + t.Errorf("expected TotalGCTime to be 0") + } }) } } @@ -90,12 +116,46 @@ func TestLoadCasNotFound(t *testing.T) { if err != nil { t.Errorf("New: %v", err) } - defer d.Shutdown() newName := filepath.Join(root, "new") dg := digest.NewFromBlob([]byte("bla")) if d.LoadCas(dg, newName) { t.Errorf("expected to not load %s from the cache to %s", dg, newName) } + d.Shutdown() + stats := d.GetStats() + if stats.TotalNumFiles != 0 { + t.Errorf("expected TotalNumFiles to be 0, got %d", stats.TotalNumFiles) + } + if stats.TotalSizeBytes != 0 { + t.Errorf("expected TotalSizeBytes to be 0, got %d", stats.TotalSizeBytes) + } + if stats.NumFilesStored != 0 { + t.Errorf("expected NumFilesStored to be 0, got %d", stats.NumFilesStored) + } + if stats.TotalStoredBytes != 0 { + t.Errorf("expected TotalStoredBytes to be 0, got %d", stats.TotalStoredBytes) + } + if stats.NumCacheHits != 0 { + t.Errorf("expected NumCacheHits to be 0, got %d", stats.NumCacheHits) + } + if stats.TotalCacheHitSizeBytes != 0 { + t.Errorf("expected TotalCacheHitSizeBytes to be 0, got %d", stats.TotalCacheHitSizeBytes) + } + if stats.NumCacheMisses != 1 { + t.Errorf("expected NumCacheMisses to be 1, got %d", stats.NumCacheMisses) + } + if stats.NumFilesGCed != 0 { + t.Errorf("expected NumFilesGCed to be 0, got %d", stats.NumFilesGCed) + } + if stats.TotalGCedSizeBytes != 0 { + t.Errorf("expected TotalGCedSizeBytes to be 0, got %d", stats.TotalGCedSizeBytes) + } + if stats.InitTime == 0 { + t.Errorf("expected InitTime to be > 0") + } + if stats.TotalGCTime != 0 { + t.Errorf("expected TotalGCTime to be 0") + } } func TestStoreLoadActionCache(t *testing.T) { @@ -104,10 +164,9 @@ func TestStoreLoadActionCache(t *testing.T) { if err != nil { t.Errorf("New: %v", err) } - defer d.Shutdown() ar := &repb.ActionResult{ OutputFiles: []*repb.OutputFile{ - &repb.OutputFile{Path: "bla", Digest: digest.Empty.ToProto()}, + {Path: "bla", Digest: digest.Empty.ToProto()}, }, } dg := digest.NewFromBlob([]byte("foo")) @@ -121,6 +180,46 @@ func TestStoreLoadActionCache(t *testing.T) { if diff := cmp.Diff(ar, got, cmp.Comparer(proto.Equal)); diff != "" { t.Errorf("LoadActionCache(...) gave diff on action result (-want +got):\n%s", diff) } + d.Shutdown() + stats := d.GetStats() + if stats.TotalNumFiles != 1 { + t.Errorf("expected TotalNumFiles to be 1, got %d", stats.TotalNumFiles) + } + bytes, err := proto.Marshal(ar) + if err != nil { + t.Fatalf("error marshalling proto: %v", err) + } + size := int64(len(bytes)) + if stats.TotalSizeBytes != size { + t.Errorf("expected TotalSizeBytes to be %d, got %d", size, stats.TotalSizeBytes) + } + if stats.NumFilesStored != 1 { + t.Errorf("expected NumFilesStored to be 1, got %d", stats.NumFilesStored) + } + if stats.TotalStoredBytes != size { + t.Errorf("expected TotalStoredBytes to be %d, got %d", size, stats.TotalStoredBytes) + } + if stats.NumCacheHits != 1 { + t.Errorf("expected NumCacheHits to be 1, got %d", stats.NumCacheHits) + } + if stats.TotalCacheHitSizeBytes != size { + t.Errorf("expected TotalCacheHitSizeBytes to be %d, got %d", size, stats.TotalCacheHitSizeBytes) + } + if stats.NumCacheMisses != 0 { + t.Errorf("expected NumCacheMisses to be 0, got %d", stats.NumCacheMisses) + } + if stats.NumFilesGCed != 0 { + t.Errorf("expected NumFilesGCed to be 0, got %d", stats.NumFilesGCed) + } + if stats.TotalGCedSizeBytes != 0 { + t.Errorf("expected TotalGCedSizeBytes to be 0, got %d", stats.TotalGCedSizeBytes) + } + if stats.InitTime == 0 { + t.Errorf("expected InitTime to be > 0") + } + if stats.TotalGCTime != 0 { + t.Errorf("expected TotalGCTime to be 0") + } } func TestGcOldestCas(t *testing.T) { @@ -129,8 +228,6 @@ func TestGcOldestCas(t *testing.T) { if err != nil { t.Errorf("New: %v", err) } - defer d.Shutdown() - d.testGcTicks = make(chan uint64, 1) for i := 0; i < 5; i++ { fname, _ := testutil.CreateFile(t, false, fmt.Sprintf("aaa %d", i)) dg, err := digest.NewFromFile(fname) @@ -141,10 +238,7 @@ func TestGcOldestCas(t *testing.T) { t.Errorf("StoreCas(%s, %s) failed: %v", dg, fname, err) } } - waitForGc(d) - if d.TotalSizeBytes() != d.maxCapacityBytes { - t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, d.TotalSizeBytes()) - } + d.Shutdown() newName := filepath.Join(root, "new") for i := 0; i < 5; i++ { dg := digest.NewFromBlob([]byte(fmt.Sprintf("aaa %d", i))) @@ -152,12 +246,46 @@ func TestGcOldestCas(t *testing.T) { t.Errorf("expected loaded to be %v for %s from the cache to %s", i > 0, dg, newName) } } + stats := d.GetStats() + if stats.TotalNumFiles != 4 { + t.Errorf("expected TotalNumFiles to be 4, got %d", stats.TotalNumFiles) + } + if stats.NumFilesStored != 5 { + t.Errorf("expected NumFilesStored to be 5, got %d", stats.NumFilesStored) + } + if stats.TotalStoredBytes != 25 { + t.Errorf("expected TotalStoredBytes to be 25, got %d", stats.TotalStoredBytes) + } + if stats.NumCacheHits != 4 { + t.Errorf("expected NumCacheHits to be 4, got %d", stats.NumCacheHits) + } + if stats.TotalCacheHitSizeBytes != 20 { + t.Errorf("expected TotalCacheHitSizeBytes to be 20, got %d", stats.TotalCacheHitSizeBytes) + } + if stats.NumCacheMisses != 1 { + t.Errorf("expected NumCacheMisses to be 1, got %d", stats.NumCacheMisses) + } + if stats.NumFilesGCed != 1 { + t.Errorf("expected NumFilesGCed to be 1, got %d", stats.NumFilesGCed) + } + if stats.TotalGCedSizeBytes != 5 { + t.Errorf("expected TotalGCedSizeBytes to be 5, got %d", stats.TotalGCedSizeBytes) + } + if uint64(stats.TotalSizeBytes) != d.maxCapacityBytes { + t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, stats.TotalSizeBytes) + } + if stats.InitTime <= 0 { + t.Errorf("expected InitTime to be > 0") + } + if stats.TotalGCTime <= 0 { + t.Errorf("expected TotalGCTime to be > 0") + } } func TestGcOldestActionCache(t *testing.T) { ar := &repb.ActionResult{ OutputFiles: []*repb.OutputFile{ - &repb.OutputFile{Path: "12345", Digest: digest.Empty.ToProto()}, + {Path: "12345", Digest: digest.Empty.ToProto()}, }, } bytes, err := proto.Marshal(ar) @@ -170,8 +298,6 @@ func TestGcOldestActionCache(t *testing.T) { if err != nil { t.Errorf("New: %v", err) } - defer d.Shutdown() - d.testGcTicks = make(chan uint64, 1) for i := 0; i < 5; i++ { si := fmt.Sprintf("aaa %d", i) dg := digest.NewFromBlob([]byte(si)) @@ -180,10 +306,7 @@ func TestGcOldestActionCache(t *testing.T) { t.Errorf("StoreActionCache(%s) failed: %v", dg, err) } } - waitForGc(d) - if d.TotalSizeBytes() != d.maxCapacityBytes { - t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, d.TotalSizeBytes()) - } + d.Shutdown() for i := 0; i < 5; i++ { si := fmt.Sprintf("aaa %d", i) dg := digest.NewFromBlob([]byte(si)) @@ -198,6 +321,48 @@ func TestGcOldestActionCache(t *testing.T) { t.Errorf("expected loaded to be %v for %s from the cache", i > 0, dg) } } + stats := d.GetStats() + if stats.TotalNumFiles != 4 { + t.Errorf("expected TotalNumFiles to be 4, got %d", stats.TotalNumFiles) + } + if stats.NumFilesStored != 5 { + t.Errorf("expected NumFilesStored to be 5, got %d", stats.NumFilesStored) + } + if stats.TotalStoredBytes != int64(size)*5 { + t.Errorf("expected TotalStoredBytes to be %d, got %d", size*5, stats.TotalStoredBytes) + } + if stats.NumCacheHits != 4 { + t.Errorf("expected NumCacheHits to be 4, got %d", stats.NumCacheHits) + } + if stats.TotalCacheHitSizeBytes != int64(size)*4 { + t.Errorf("expected TotalCacheHitSizeBytes to be %d, got %d", size*4, stats.TotalCacheHitSizeBytes) + } + if stats.NumCacheMisses != 1 { + t.Errorf("expected NumCacheMisses to be 1, got %d", stats.NumCacheMisses) + } + if stats.NumFilesGCed != 1 { + t.Errorf("expected NumFilesGCed to be 1, got %d", stats.NumFilesGCed) + } + if stats.TotalGCedSizeBytes != int64(size) { + t.Errorf("expected TotalGCedSizeBytes to be %d, got %d", size, stats.TotalGCedSizeBytes) + } + if uint64(stats.TotalSizeBytes) != d.maxCapacityBytes { + t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, stats.TotalSizeBytes) + } + if stats.InitTime <= 0 { + t.Errorf("expected InitTime to be > 0") + } + if stats.TotalGCTime <= 0 { + t.Errorf("expected TotalGCTime to be > 0") + } +} + +func getLastAccessTime(path string) (time.Time, error) { + info, err := os.Stat(path) + if err != nil { + return time.Time{}, err + } + return fileInfoToAccessTime(info), nil } // We say that Last Access Time is behaving accurately on a system if reading from the file @@ -250,8 +415,6 @@ func TestInitFromExistingCas(t *testing.T) { if err != nil { t.Errorf("New: %v", err) } - defer d.Shutdown() - d.testGcTicks = make(chan uint64, 1) // Check old files are cached: dg = digest.NewFromBlob([]byte("aaa 1")) @@ -263,18 +426,49 @@ func TestInitFromExistingCas(t *testing.T) { if err != nil { t.Fatalf("digest.NewFromFile failed: %v", err) } - if d.TotalSizeBytes() != d.maxCapacityBytes { - t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, d.TotalSizeBytes()) - } // Trigger a GC by adding a new file. if err := d.StoreCas(dg, fname); err != nil { t.Errorf("StoreCas(%s, %s) failed: %v", dg, fname, err) } - waitForGc(d) + d.Shutdown() dg = digest.NewFromBlob([]byte("aaa 2")) if d.LoadCas(dg, newName) { t.Errorf("expected to not load %s from the cache to %s", dg, newName) } + stats := d.GetStats() + if stats.TotalNumFiles != 4 { + t.Errorf("expected TotalNumFiles to be 4, got %d", stats.TotalNumFiles) + } + if stats.NumFilesStored != 1 { + t.Errorf("expected NumFilesStored to be 1, got %d", stats.NumFilesStored) + } + if stats.TotalStoredBytes != 5 { + t.Errorf("expected TotalStoredBytes to be 5, got %d", stats.TotalStoredBytes) + } + if stats.NumCacheHits != 1 { + t.Errorf("expected NumCacheHits to be 1, got %d", stats.NumCacheHits) + } + if stats.TotalCacheHitSizeBytes != 5 { + t.Errorf("expected TotalCacheHitSizeBytes to be 5, got %d", stats.TotalCacheHitSizeBytes) + } + if stats.NumCacheMisses != 1 { + t.Errorf("expected NumCacheMisses to be 1, got %d", stats.NumCacheMisses) + } + if stats.NumFilesGCed != 1 { + t.Errorf("expected NumFilesGCed to be 1, got %d", stats.NumFilesGCed) + } + if stats.TotalGCedSizeBytes != 5 { + t.Errorf("expected TotalGCedSizeBytes to be 5, got %d", stats.TotalGCedSizeBytes) + } + if uint64(stats.TotalSizeBytes) != d.maxCapacityBytes { + t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, stats.TotalSizeBytes) + } + if stats.InitTime <= 0 { + t.Errorf("expected InitTime to be > 0") + } + if stats.TotalGCTime <= 0 { + t.Errorf("expected TotalGCTime to be > 0") + } } func TestThreadSafetyCas(t *testing.T) { @@ -292,8 +486,6 @@ func TestThreadSafetyCas(t *testing.T) { if err != nil { t.Errorf("New: %v", err) } - d.testGcTicks = make(chan uint64, attempts) - defer d.Shutdown() var files []string var dgs []digest.Digest for i := 0; i < nFiles; i++ { @@ -312,14 +504,14 @@ func TestThreadSafetyCas(t *testing.T) { } } // Randomly access and store files from different threads. - eg, _ := errgroup.WithContext(context.Background()) + eg := errgroup.Group{} var hits uint64 var runs []int for k := 0; k < attempts; k++ { eg.Go(func() error { i := rand.Intn(nFiles) runs = append(runs, i) - newName := filepath.Join(root, "new", uuid.New()) + newName := filepath.Join(root, "new", uuid.New().String()) if d.LoadCas(dgs[i], newName) { atomic.AddUint64(&hits, 1) contents, err := os.ReadFile(newName) @@ -339,11 +531,43 @@ func TestThreadSafetyCas(t *testing.T) { if err := eg.Wait(); err != nil { t.Error(err) } - waitForGc(d) - if d.TotalSizeBytes() != d.maxCapacityBytes { - t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, d.TotalSizeBytes()) - } + d.Shutdown() if int(hits) < attempts/2 { t.Errorf("Unexpectedly low cache hits %d out of %d attempts", hits, attempts) } + stats := d.GetStats() + if stats.TotalNumFiles != 5 { + t.Errorf("expected TotalNumFiles to be 5, got %d", stats.TotalNumFiles) + } + if uint64(stats.NumCacheHits) != hits { + t.Errorf("expected NumCacheHits to be %d, got %d", hits, stats.NumCacheHits) + } + if uint64(stats.TotalCacheHitSizeBytes) != hits*5 { + t.Errorf("expected TotalCacheHitSizeBytes to be %d, got %d", hits*5, stats.TotalCacheHitSizeBytes) + } + if stats.NumCacheMisses+stats.NumCacheHits != int64(attempts) { + t.Errorf("expected NumCacheHits+NumCacheMisses to be %d, got %d", attempts, stats.NumCacheMisses+stats.NumCacheHits) + } + // This is less or equal because of multiple concurrent Stores. + if stats.NumFilesStored > int64(nFiles)+stats.NumCacheMisses { + t.Errorf("expected NumFilesStored to be <= %d, got %d", int64(nFiles)+stats.NumCacheMisses, stats.NumFilesStored) + } + if stats.TotalStoredBytes != 5*stats.NumFilesStored { + t.Errorf("expected TotalStoredBytes to be %d, got %d", 5*stats.NumFilesStored, stats.TotalStoredBytes) + } + if stats.NumFilesGCed <= 0 { + t.Errorf("expected NumFilesGCed to be > 0") + } + if stats.TotalGCedSizeBytes <= 0 { + t.Errorf("expected TotalGCedSizeBytes to be > 0") + } + if uint64(stats.TotalSizeBytes) != d.maxCapacityBytes { + t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, stats.TotalSizeBytes) + } + if stats.InitTime <= 0 { + t.Errorf("expected InitTime to be > 0") + } + if stats.TotalGCTime <= 0 { + t.Errorf("expected TotalGCTime to be > 0") + } } diff --git a/go/pkg/diskcache/sys_darwin.go b/go/pkg/diskcache/sys_darwin.go index ff836daa3..0c98dceb5 100644 --- a/go/pkg/diskcache/sys_darwin.go +++ b/go/pkg/diskcache/sys_darwin.go @@ -8,6 +8,6 @@ import ( "time" ) -func FileInfoToAccessTime(info fs.FileInfo) time.Time { +func fileInfoToAccessTime(info fs.FileInfo) time.Time { return time.Unix(info.Sys().(*syscall.Stat_t).Atimespec.Unix()) } diff --git a/go/pkg/diskcache/sys_linux.go b/go/pkg/diskcache/sys_linux.go index 7414d4b51..27f33d588 100644 --- a/go/pkg/diskcache/sys_linux.go +++ b/go/pkg/diskcache/sys_linux.go @@ -7,6 +7,6 @@ import ( "time" ) -func FileInfoToAccessTime(info fs.FileInfo) time.Time { +func fileInfoToAccessTime(info fs.FileInfo) time.Time { return time.Unix(info.Sys().(*syscall.Stat_t).Atim.Unix()) } diff --git a/go/pkg/diskcache/sys_windows.go b/go/pkg/diskcache/sys_windows.go index 319a7988e..b0621f64a 100644 --- a/go/pkg/diskcache/sys_windows.go +++ b/go/pkg/diskcache/sys_windows.go @@ -9,6 +9,6 @@ import ( // This will return correct values only if `fsutil behavior set disablelastaccess 0` is set. // Tracking of last access time is disabled by default on Windows. -func FileInfoToAccessTime(info fs.FileInfo) time.Time { +func fileInfoToAccessTime(info fs.FileInfo) time.Time { return time.Unix(0, info.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds()) } diff --git a/go/pkg/flags/BUILD.bazel b/go/pkg/flags/BUILD.bazel index 0572a941a..92bb117b4 100644 --- a/go/pkg/flags/BUILD.bazel +++ b/go/pkg/flags/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//go/pkg/balancer", "//go/pkg/client", + "//go/pkg/diskcache", "//go/pkg/moreflag", "@com_github_golang_glog//:go_default_library", "@org_golang_google_grpc//:go_default_library", diff --git a/go/pkg/flags/flags.go b/go/pkg/flags/flags.go index 4a54debe7..a6b2377b9 100644 --- a/go/pkg/flags/flags.go +++ b/go/pkg/flags/flags.go @@ -8,6 +8,7 @@ import ( "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer" "github.com/bazelbuild/remote-apis-sdks/go/pkg/client" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/diskcache" "github.com/bazelbuild/remote-apis-sdks/go/pkg/moreflag" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -98,15 +99,7 @@ func init() { // NewClientFromFlags connects to a remote execution service and returns a client suitable for higher-level // functionality. It uses the flags from above to configure the connection to remote execution. func NewClientFromFlags(ctx context.Context, opts ...client.Opt) (*client.Client, error) { - opts = append(opts, []client.Opt{ - client.CASConcurrency(*CASConcurrency), - client.StartupCapabilities(*StartupCapabilities), - &client.DiskCacheOpts{ - Context: ctx, - Path: *DiskCachePath, - MaxCapacityGb: *DiskCacheCapacityGb, - }, - }...) + opts = append(opts, []client.Opt{client.CASConcurrency(*CASConcurrency), client.StartupCapabilities(*StartupCapabilities)}...) if len(RPCTimeouts) > 0 { timeouts := make(map[string]time.Duration) for rpc, d := range client.DefaultRPCTimeouts { @@ -144,6 +137,14 @@ func NewClientFromFlags(ctx context.Context, opts ...client.Opt) (*client.Client log.V(1).Infof("KeepAlive params = %v", params) dialOpts = append(dialOpts, grpc.WithKeepaliveParams(params)) } + if *DiskCachePath != "" { + capBytes := uint64(*DiskCacheCapacityGb * 1024 * 1024 * 1024) + diskCache, err := diskcache.New(ctx, *DiskCachePath, capBytes) + if err != nil { + return nil, err + } + opts = append(opts, &client.DiskCacheOpts{DiskCache: diskCache}) + } return client.NewClient(ctx, *Instance, client.DialParams{ Service: *Service, NoSecurity: *ServiceNoSecurity,