Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local disk cache #529

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/pkg/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//go/pkg/command",
"//go/pkg/contextmd",
"//go/pkg/digest",
"//go/pkg/diskcache",
"//go/pkg/filemetadata",
"//go/pkg/retry",
"//go/pkg/uploadinfo",
Expand Down
16 changes: 16 additions & 0 deletions go/pkg/client/cas_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@
symlinks = append(symlinks, out)
continue
}
if c.diskCache != nil {
absPath := out.Path
if !filepath.IsAbs(absPath) {
absPath = filepath.Join(outDir, absPath)
}
if c.diskCache.LoadCas(out.Digest, absPath) {
fullStats.Requested += out.Digest.Size
fullStats.Cached += out.Digest.Size
continue
}
}
if _, ok := downloads[out.Digest]; ok {
copies = append(copies, out)
// All copies are effectivelly cached
Expand Down Expand Up @@ -129,6 +140,11 @@
if err := cache.Update(absPath, md); err != nil {
return fullStats, err
}
if c.diskCache != nil {
if err := c.diskCache.StoreCas(output.Digest, absPath); err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it could be done as part of DownloadFiles() ? Otherwise this adds a serial latency.

return fullStats, err

Check failure on line 145 in go/pkg/client/cas_download.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func (*github.com/bazelbuild/remote-apis-sdks/go/pkg/diskcache.DiskCache).StoreCas(dg github.com/bazelbuild/remote-apis-sdks/go/pkg/digest.Digest, path string) error (wrapcheck)
}
}
}
for _, out := range copies {
perm := c.RegularMode
Expand Down
16 changes: 16 additions & 0 deletions go/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"time"

"errors"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/actas"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/chunker"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/diskcache"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/retry"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
"golang.org/x/oauth2"
Expand Down Expand Up @@ -194,6 +196,7 @@ type Client struct {
uploadOnce sync.Once
downloadOnce sync.Once
useBatchCompression UseBatchCompression
diskCache *diskcache.DiskCache
}

const (
Expand Down Expand Up @@ -242,6 +245,10 @@ func (c *Client) Close() error {
if c.casConnection != c.connection {
return c.casConnection.Close()
}
if c.diskCache != nil {
// Waits for local disk GC to complete.
c.diskCache.Shutdown()
}
return nil
}

Expand Down Expand Up @@ -351,6 +358,15 @@ func (o *TreeSymlinkOpts) Apply(c *Client) {
c.TreeSymlinkOpts = o
}

type DiskCacheOpts struct {
DiskCache *diskcache.DiskCache
}

// Apply sets the client's TreeSymlinkOpts.
func (o *DiskCacheOpts) Apply(c *Client) {
c.diskCache = o.DiskCache
}

// MaxBatchDigests is maximum amount of digests to batch in upload and download operations.
type MaxBatchDigests int

Expand Down
19 changes: 15 additions & 4 deletions go/pkg/client/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,23 @@ func (c *Client) ExecuteAction(ctx context.Context, ac *Action) (*repb.ActionRes
}

// CheckActionCache queries remote action cache, returning an ActionResult or nil if it doesn't exist.
func (c *Client) CheckActionCache(ctx context.Context, acDg *repb.Digest) (*repb.ActionResult, error) {
func (c *Client) CheckActionCache(ctx context.Context, dg digest.Digest) (*repb.ActionResult, error) {
if c.diskCache != nil {
if res, loaded := c.diskCache.LoadActionCache(dg); loaded {
return res, nil
}
}
res, err := c.GetActionResult(ctx, &repb.GetActionResultRequest{
InstanceName: c.InstanceName,
ActionDigest: acDg,
ActionDigest: dg.ToProto(),
})
switch st, _ := status.FromError(err); st.Code() {
case codes.OK:
if c.diskCache != nil {
if err := c.diskCache.StoreActionCache(dg, res); err != nil {
log.Errorf("error storing ActionResult of %s to disk cache: %v", dg, err)
}
}
return res, nil
case codes.NotFound:
return nil, nil
Expand Down Expand Up @@ -166,12 +176,13 @@ func (c *Client) PrepAction(ctx context.Context, ac *Action) (*repb.Digest, *rep
if err != nil {
return nil, nil, fmt.Errorf("marshalling Action proto: %w", err)
}
acDg := digest.NewFromBlob(acBlob).ToProto()
dg := digest.NewFromBlob(acBlob)
acDg := dg.ToProto()

// If the result is cacheable, check if it's already in the cache.
if !ac.DoNotCache || !ac.SkipCache {
log.V(1).Info("Checking cache")
res, err := c.CheckActionCache(ctx, acDg)
res, err := c.CheckActionCache(ctx, dg)
if err != nil {
return nil, nil, err
}
Expand Down
49 changes: 49 additions & 0 deletions go/pkg/diskcache/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "diskcache",
srcs = [
"diskcache.go",
"sys_darwin.go",
"sys_linux.go",
"sys_windows.go",
],
importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/diskcache",
visibility = ["//visibility:public"],
deps = [
"//go/pkg/digest",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_golang_glog//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

go_test(
name = "diskcache_test",
srcs = ["diskcache_test.go"],
embed = [":diskcache"],
deps = [
"//go/pkg/digest",
"//go/pkg/testutil",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_google_uuid//:uuid",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

go_test(
name = "diskcache_benchmark_test",
srcs = ["diskcache_benchmark_test.go"],
embed = [":diskcache"],
deps = [
"//go/pkg/digest",
"//go/pkg/testutil",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_google_uuid//:uuid",
"@org_golang_x_sync//errgroup:go_default_library",
],
tags = ["manual"],
)
Loading
Loading