Skip to content

Commit

Permalink
Make LFS http_client parallel within a batch. (#32369)
Browse files Browse the repository at this point in the history
Signed-off-by: Royce Remer <[email protected]>
Co-authored-by: wxiaoguang <[email protected]>
  • Loading branch information
rremer and wxiaoguang authored Nov 4, 2024
1 parent f2a6df0 commit 54146e6
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 140 deletions.
8 changes: 7 additions & 1 deletion custom/conf/app.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2642,9 +2642,15 @@ LEVEL = Info
;; override the azure blob base path if storage type is azureblob
;AZURE_BLOB_BASE_PATH = lfs/

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; settings for Gitea's LFS client (eg: mirroring an upstream lfs endpoint)
;;
;[lfs_client]
;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number
;; Limit the number of pointers in each batch request to this number
;BATCH_SIZE = 20
;; Limit the number of concurrent upload/download operations within a batch
;BATCH_OPERATION_CONCURRENCY = 3

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ require (
golang.org/x/image v0.21.0
golang.org/x/net v0.30.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
golang.org/x/text v0.19.0
golang.org/x/tools v0.26.0
Expand Down Expand Up @@ -316,7 +317,6 @@ require (
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
Expand Down
119 changes: 69 additions & 50 deletions modules/lfs/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/setting"

"golang.org/x/sync/errgroup"
)

// HTTPClient is used to communicate with the LFS server
Expand Down Expand Up @@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
return c.performOperation(ctx, objects, nil, callback)
}

// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
if len(objects) == 0 {
return nil
Expand All @@ -133,71 +136,87 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
}

errGroup, groupCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(setting.LFSClient.BatchOperationConcurrency)
for _, object := range result.Objects {
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
} else {
if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
}
continue
}
errGroup.Go(func() error {
return performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
})
}

if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
continue
}
// only the first error is returned, preserving legacy behavior before concurrency
return errGroup.Wait()
}

link, ok := object.Actions["upload"]
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'upload'")
}
// performSingleOperation performs an LFS upload or download operation on a single object
func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
// the response from a lfs batch api request for this specific object id contained an error
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)

content, err := uc(object.Pointer, nil)
if err != nil {
// this was an 'upload' request inside the batch request
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}

err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
} else {
// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
}
// if the callback returns no err, then the error could be ignored, and the operations should continue
return nil
}

link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}
// the response from an lfs batch api request contained necessary upload/download fields to act upon
if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
return nil
}

content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}
link, ok := object.Actions["upload"]
if !ok {
return errors.New("missing action 'upload'")
}

content, err := uc(object.Pointer, nil)
if err != nil {
return err
}

if err := dc(object.Pointer, content, nil); err != nil {
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
return err
}

link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}

content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}

if err := dc(object.Pointer, content, nil); err != nil {
return err
}
}
return nil
}

Expand Down
Loading

0 comments on commit 54146e6

Please sign in to comment.