Skip to content

Commit

Permalink
Cherrypicking Marc-Antoine's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ola-rozenfeld authored and engflow-github-automation committed May 19, 2024
1 parent debf142 commit d50ee21
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 66 deletions.
6 changes: 5 additions & 1 deletion go/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,11 @@ type DiskCacheOpts struct {
func (o *DiskCacheOpts) Apply(c *Client) {
if o.Path != "" {
capBytes := uint64(o.MaxCapacityGb * 1024 * 1024 * 1024)
c.diskCache = diskcache.New(o.Context, o.Path, capBytes)
var err error
// TODO(ola): propagate errors from Apply.
if c.diskCache, err = diskcache.New(o.Context, o.Path, capBytes); err != nil {
log.Errorf("Error initializing disk cache on %s: %v", o.Path, err)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions go/pkg/diskcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_golang_glog//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

Expand Down
74 changes: 41 additions & 33 deletions go/pkg/diskcache/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
"time"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
log "github.com/golang/glog"
"google.golang.org/protobuf/proto"
)

type key struct {
Expand Down Expand Up @@ -103,7 +105,7 @@ type DiskCache struct {
testGcTicks chan uint64
}

func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache {
func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache, error) {
res := &DiskCache{
root: root,
maxCapacityBytes: maxCapacityBytes,
Expand All @@ -115,44 +117,45 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache {
shutdown: make(chan bool),
}
heap.Init(res.queue)
_ = os.MkdirAll(root, os.ModePerm)
if err := os.MkdirAll(root, os.ModePerm); err != nil {
return nil, err
}
// We use Git's directory/file naming structure as inspiration:
// https://git-scm.com/book/en/v2/Git-Internals-Git-Objects#:~:text=The%20subdirectory%20is%20named%20with%20the%20first%202%20characters%20of%20the%20SHA%2D1%2C%20and%20the%20filename%20is%20the%20remaining%2038%20characters.
var wg sync.WaitGroup
wg.Add(256)
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 256; i++ {
prefixDir := filepath.Join(root, fmt.Sprintf("%02x", i))
go func() {
defer wg.Done()
_ = os.MkdirAll(prefixDir, os.ModePerm)
_ = filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error {
eg.Go(func() error {
if eCtx.Err() != nil {
return eCtx.Err()
}
if err := os.MkdirAll(prefixDir, os.ModePerm); err != nil {
return err
}
return filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error {
// We log and continue on all errors, because cache read errors are not critical.
if err != nil {
log.Errorf("Error reading cache directory: %v", err)
return nil
return fmt.Errorf("error reading cache directory: %v", err)
}
if d.IsDir() {
return nil
}
subdir := filepath.Base(filepath.Dir(path))
k, err := res.getKeyFromFileName(subdir + d.Name())
if err != nil {
log.Errorf("Error parsing cached file name %s: %v", path, err)
return nil
return fmt.Errorf("error parsing cached file name %s: %v", path, err)
}
atime, err := GetLastAccessTime(path)
atime, err := getLastAccessTime(path)
if err != nil {
log.Errorf("Error getting last accessed time of %s: %v", path, err)
return nil
return fmt.Errorf("error getting last accessed time of %s: %v", path, err)
}
it := &qitem{
key: k,
lat: atime,
}
size, err := res.getItemSize(k)
if err != nil {
log.Errorf("Error getting file size of %s: %v", path, err)
return nil
return fmt.Errorf("error getting file size of %s: %v", path, err)
}
res.store.Store(k, it)
atomic.AddInt64(&res.sizeBytes, size)
Expand All @@ -161,11 +164,13 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache {
res.mu.Unlock()
return nil
})
}()
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
wg.Wait()
go res.gc()
return res
return res, nil
}

func (d *DiskCache) getItemSize(k key) (int64, error) {
Expand Down Expand Up @@ -342,18 +347,13 @@ func copyFile(src, dst string, size int64) error {
return err
}
defer out.Close()
_, err = io.Copy(out, in)
n, err := io.Copy(out, in)
if err != nil {
return err
}
// Required sanity check: sometimes the copy pretends to succeed, but doesn't, if
// the file is being concurrently deleted.
dstInfo, err := os.Stat(dst)
if err != nil {
return err
}
if dstInfo.Size() != size {
return fmt.Errorf("copy of %s to %s failed: src/dst size mismatch: wanted %d, got %d", src, dst, size, dstInfo.Size())
// Required sanity check: if the file is being concurrently deleted, we may not always copy everything.
if n != size {
return fmt.Errorf("copy of %s to %s failed: src/dst size mismatch: wanted %d, got %d", src, dst, size, n)
}
return nil
}
Expand All @@ -367,12 +367,12 @@ func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool {
}
it := iUntyped.(*qitem)
it.mu.RLock()
if err := copyFile(d.getPath(k), path, dg.Size); err != nil {
err := copyFile(d.getPath(k), path, dg.Size)
it.mu.RUnlock()
if err != nil {
// It is not possible to prevent a race with GC; hence, we return false on copy errors.
it.mu.RUnlock()
return false
}
it.mu.RUnlock()

d.mu.Lock()
d.queue.Bump(it)
Expand Down Expand Up @@ -418,3 +418,11 @@ func (d *DiskCache) loadActionResult(k key, ar *repb.ActionResult) error {
}
return nil
}

func getLastAccessTime(path string) (time.Time, error) {
info, err := os.Stat(path)
if err != nil {
return time.Time{}, err
}
return FileInfoToAccessTime(info), nil
}
46 changes: 35 additions & 11 deletions go/pkg/diskcache/diskcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ func TestStoreLoadCasPerm(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
root := t.TempDir()
d := New(context.Background(), filepath.Join(root, "cache"), 20)
d, err := New(context.Background(), filepath.Join(root, "cache"), 20)
if err != nil {
t.Errorf("New: %v", err)
}
defer d.Shutdown()
fname, _ := testutil.CreateFile(t, tc.executable, "12345")
srcInfo, err := os.Stat(fname)
Expand Down Expand Up @@ -83,7 +86,10 @@ func TestStoreLoadCasPerm(t *testing.T) {

func TestLoadCasNotFound(t *testing.T) {
root := t.TempDir()
d := New(context.Background(), filepath.Join(root, "cache"), 20)
d, err := New(context.Background(), filepath.Join(root, "cache"), 20)
if err != nil {
t.Errorf("New: %v", err)
}
defer d.Shutdown()
newName := filepath.Join(root, "new")
dg := digest.NewFromBlob([]byte("bla"))
Expand All @@ -94,7 +100,10 @@ func TestLoadCasNotFound(t *testing.T) {

func TestStoreLoadActionCache(t *testing.T) {
root := t.TempDir()
d := New(context.Background(), filepath.Join(root, "cache"), 100)
d, err := New(context.Background(), filepath.Join(root, "cache"), 100)
if err != nil {
t.Errorf("New: %v", err)
}
defer d.Shutdown()
ar := &repb.ActionResult{
OutputFiles: []*repb.OutputFile{
Expand All @@ -116,7 +125,10 @@ func TestStoreLoadActionCache(t *testing.T) {

func TestGcOldestCas(t *testing.T) {
root := t.TempDir()
d := New(context.Background(), filepath.Join(root, "cache"), 20)
d, err := New(context.Background(), filepath.Join(root, "cache"), 20)
if err != nil {
t.Errorf("New: %v", err)
}
defer d.Shutdown()
d.testGcTicks = make(chan uint64, 1)
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -154,7 +166,10 @@ func TestGcOldestActionCache(t *testing.T) {
}
size := len(bytes)
root := t.TempDir()
d := New(context.Background(), filepath.Join(root, "cache"), uint64(size)*4)
d, err := New(context.Background(), filepath.Join(root, "cache"), uint64(size)*4)
if err != nil {
t.Errorf("New: %v", err)
}
defer d.Shutdown()
d.testGcTicks = make(chan uint64, 1)
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -192,11 +207,11 @@ func TestGcOldestActionCache(t *testing.T) {
func isSystemLastAccessTimeAccurate(t *testing.T) bool {
t.Helper()
fname, _ := testutil.CreateFile(t, false, "foo")
lat, _ := GetLastAccessTime(fname)
lat, _ := getLastAccessTime(fname)
if _, err := os.ReadFile(fname); err != nil {
t.Fatalf("%v", err)
}
newLat, _ := GetLastAccessTime(fname)
newLat, _ := getLastAccessTime(fname)
return lat.Before(newLat)
}

Expand All @@ -209,7 +224,10 @@ func TestInitFromExistingCas(t *testing.T) {
return
}
root := t.TempDir()
d := New(context.Background(), filepath.Join(root, "cache"), 20)
d, err := New(context.Background(), filepath.Join(root, "cache"), 20)
if err != nil {
t.Errorf("New: %v", err)
}
for i := 0; i < 4; i++ {
fname, _ := testutil.CreateFile(t, false, fmt.Sprintf("aaa %d", i))
dg, err := digest.NewFromFile(fname)
Expand All @@ -228,7 +246,10 @@ func TestInitFromExistingCas(t *testing.T) {
d.Shutdown()

// Re-initialize from existing files.
d = New(context.Background(), filepath.Join(root, "cache"), 20)
d, err = New(context.Background(), filepath.Join(root, "cache"), 20)
if err != nil {
t.Errorf("New: %v", err)
}
defer d.Shutdown()
d.testGcTicks = make(chan uint64, 1)

Expand All @@ -238,7 +259,7 @@ func TestInitFromExistingCas(t *testing.T) {
t.Errorf("expected %s to be cached", dg)
}
fname, _ := testutil.CreateFile(t, false, "aaa 4")
dg, err := digest.NewFromFile(fname)
dg, err = digest.NewFromFile(fname)
if err != nil {
t.Fatalf("digest.NewFromFile failed: %v", err)
}
Expand Down Expand Up @@ -267,7 +288,10 @@ func TestThreadSafetyCas(t *testing.T) {
nFiles := 10
attempts := 5000
// All blobs are size 5 exactly. We will have half the byte capacity we need.
d := New(context.Background(), filepath.Join(root, "cache"), uint64(nFiles*5)/2)
d, err := New(context.Background(), filepath.Join(root, "cache"), uint64(nFiles*5)/2)
if err != nil {
t.Errorf("New: %v", err)
}
d.testGcTicks = make(chan uint64, attempts)
defer d.Shutdown()
var files []string
Expand Down
10 changes: 3 additions & 7 deletions go/pkg/diskcache/sys_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@
package diskcache

import (
"os"
"io/fs"
"syscall"
"time"
)

func GetLastAccessTime(path string) (time.Time, error) {
info, err := os.Stat(path)
if err != nil {
return time.Time{}, err
}
return time.Unix(info.Sys().(*syscall.Stat_t).Atimespec.Unix()), nil
func FileInfoToAccessTime(info fs.FileInfo) time.Time {
return time.Unix(info.Sys().(*syscall.Stat_t).Atimespec.Unix())
}
10 changes: 3 additions & 7 deletions go/pkg/diskcache/sys_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@
package diskcache

import (
"os"
"io/fs"
"syscall"
"time"
)

func GetLastAccessTime(path string) (time.Time, error) {
info, err := os.Stat(path)
if err != nil {
return time.Time{}, err
}
return time.Unix(info.Sys().(*syscall.Stat_t).Atim.Unix()), nil
func FileInfoToAccessTime(info fs.FileInfo) time.Time {
return time.Unix(info.Sys().(*syscall.Stat_t).Atim.Unix())
}
10 changes: 3 additions & 7 deletions go/pkg/diskcache/sys_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@
package diskcache

import (
"os"
"io/fs"
"syscall"
"time"
)

// This will return correct values only if `fsutil behavior set disablelastaccess 0` is set.
// Tracking of last access time is disabled by default on Windows.
func GetLastAccessTime(path string) (time.Time, error) {
info, err := os.Stat(path)
if err != nil {
return time.Time{}, err
}
return time.Unix(0, info.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds()), nil
func FileInfoToAccessTime(info fs.FileInfo) time.Time {
return time.Unix(0, info.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds())
}

0 comments on commit d50ee21

Please sign in to comment.