diff --git a/custom/conf/app.example.ini b/custom/conf/app.example.ini index 69b57a8c01257..e080b0be72733 100644 --- a/custom/conf/app.example.ini +++ b/custom/conf/app.example.ini @@ -2642,9 +2642,15 @@ LEVEL = Info ;; override the azure blob base path if storage type is azureblob ;AZURE_BLOB_BASE_PATH = lfs/ +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; settings for Gitea's LFS client (eg: mirroring an upstream lfs endpoint) +;; ;[lfs_client] -;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number +;; Limit the number of pointers in each batch request to this number ;BATCH_SIZE = 20 +;; Limit the number of concurrent upload/download operations within a batch +;BATCH_OPERATION_CONCURRENCY = 3 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/go.mod b/go.mod index c98ef9a61bf4a..ff0d612133e36 100644 --- a/go.mod +++ b/go.mod @@ -124,6 +124,7 @@ require ( golang.org/x/image v0.21.0 golang.org/x/net v0.30.0 golang.org/x/oauth2 v0.23.0 + golang.org/x/sync v0.8.0 golang.org/x/sys v0.26.0 golang.org/x/text v0.19.0 golang.org/x/tools v0.26.0 @@ -316,7 +317,6 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/sync v0.8.0 // indirect golang.org/x/time v0.7.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect diff --git a/modules/lfs/http_client.go b/modules/lfs/http_client.go index aa9e744d72b8f..411c4248c4aa9 100644 --- a/modules/lfs/http_client.go +++ b/modules/lfs/http_client.go @@ -17,6 +17,8 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/setting" + + "golang.org/x/sync/errgroup" ) // HTTPClient is used to communicate with the LFS server @@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl return c.performOperation(ctx, objects, nil, callback) } +// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error { if len(objects) == 0 { return nil @@ -133,71 +136,87 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc return fmt.Errorf("TransferAdapter not found: %s", result.Transfer) } + errGroup, groupCtx := errgroup.WithContext(ctx) + errGroup.SetLimit(setting.LFSClient.BatchOperationConcurrency) for _, object := range result.Objects { - if object.Error != nil { - log.Trace("Error on object %v: %v", object.Pointer, object.Error) - if uc != nil { - if _, err := uc(object.Pointer, object.Error); err != nil { - return err - } - } else { - if err := dc(object.Pointer, nil, object.Error); err != nil { - return err - } - } - continue - } + errGroup.Go(func() error { + return performSingleOperation(groupCtx, object, dc, uc, transferAdapter) + }) + } - if uc != nil { - if len(object.Actions) == 0 { - log.Trace("%v already present on server", object.Pointer) - continue - } + // only the first error is returned, preserving legacy behavior before concurrency + return errGroup.Wait() +} - link, ok := object.Actions["upload"] - if !ok { - log.Debug("%+v", object) - return errors.New("missing action 'upload'") - } +// performSingleOperation performs an LFS upload or download operation on a single object +func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error { + // the response from a lfs batch api request for this specific object id contained an error + if object.Error != nil { + log.Trace("Error on object %v: %v", object.Pointer, object.Error) - content, err := uc(object.Pointer, nil) - if err != nil { + // this was an 'upload' request inside the batch request + if uc != nil { + if _, err := uc(object.Pointer, object.Error); err != nil { return err } - - err = transferAdapter.Upload(ctx, link, object.Pointer, content) - if err != nil { + } else { + // this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request + if err := dc(object.Pointer, nil, object.Error); err != nil { return err } + } + // if the callback returns no err, then the error could be ignored, and the operations should continue + return nil + } - link, ok = object.Actions["verify"] - if ok { - if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil { - return err - } - } - } else { - link, ok := object.Actions["download"] - if !ok { - // no actions block in response, try legacy response schema - link, ok = object.Links["download"] - } - if !ok { - log.Debug("%+v", object) - return errors.New("missing action 'download'") - } + // the response from an lfs batch api request contained necessary upload/download fields to act upon + if uc != nil { + if len(object.Actions) == 0 { + log.Trace("%v already present on server", object.Pointer) + return nil + } - content, err := transferAdapter.Download(ctx, link) - if err != nil { - return err - } + link, ok := object.Actions["upload"] + if !ok { + return errors.New("missing action 'upload'") + } + + content, err := uc(object.Pointer, nil) + if err != nil { + return err + } - if err := dc(object.Pointer, content, nil); err != nil { + err = transferAdapter.Upload(ctx, link, object.Pointer, content) + if err != nil { + return err + } + + link, ok = object.Actions["verify"] + if ok { + if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil { return err } } - } + } else { + link, ok := object.Actions["download"] + if !ok { + // no actions block in response, try legacy response schema + link, ok = object.Links["download"] + } + if !ok { + log.Debug("%+v", object) + return errors.New("missing action 'download'") + } + content, err := transferAdapter.Download(ctx, link) + if err != nil { + return err + } + + if err := dc(object.Pointer, content, nil); err != nil { + return err + } + } return nil } diff --git a/modules/lfs/http_client_test.go b/modules/lfs/http_client_test.go index ec90f5375d1b9..d22735147a556 100644 --- a/modules/lfs/http_client_test.go +++ b/modules/lfs/http_client_test.go @@ -12,6 +12,8 @@ import ( "testing" "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/test" "github.com/stretchr/testify/assert" ) @@ -183,93 +185,84 @@ func TestHTTPClientDownload(t *testing.T) { cases := []struct { endpoint string - expectederror string + expectedError string }{ - // case 0 { endpoint: "https://status-not-ok.io", - expectederror: io.ErrUnexpectedEOF.Error(), + expectedError: io.ErrUnexpectedEOF.Error(), }, - // case 1 { endpoint: "https://invalid-json-response.io", - expectederror: "invalid json", + expectedError: "invalid json", }, - // case 2 { endpoint: "https://valid-batch-request-download.io", - expectederror: "", + expectedError: "", }, - // case 3 { endpoint: "https://response-no-objects.io", - expectederror: "", + expectedError: "", }, - // case 4 { endpoint: "https://unknown-transfer-adapter.io", - expectederror: "TransferAdapter not found: ", + expectedError: "TransferAdapter not found: ", }, - // case 5 { endpoint: "https://error-in-response-objects.io", - expectederror: "Object not found", + expectedError: "Object not found", }, - // case 6 { endpoint: "https://empty-actions-map.io", - expectederror: "missing action 'download'", + expectedError: "missing action 'download'", }, - // case 7 { endpoint: "https://download-actions-map.io", - expectederror: "", + expectedError: "", }, - // case 8 { endpoint: "https://upload-actions-map.io", - expectederror: "missing action 'download'", + expectedError: "missing action 'download'", }, - // case 9 { endpoint: "https://verify-actions-map.io", - expectederror: "missing action 'download'", + expectedError: "missing action 'download'", }, - // case 10 { endpoint: "https://unknown-actions-map.io", - expectederror: "missing action 'download'", + expectedError: "missing action 'download'", }, - // case 11 { endpoint: "https://legacy-batch-request-download.io", - expectederror: "", + expectedError: "", }, } - for n, c := range cases { - client := &HTTPClient{ - client: hc, - endpoint: c.endpoint, - transfers: map[string]TransferAdapter{ - "dummy": dummy, - }, - } + defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)() + for _, c := range cases { + t.Run(c.endpoint, func(t *testing.T) { + client := &HTTPClient{ + client: hc, + endpoint: c.endpoint, + transfers: map[string]TransferAdapter{ + "dummy": dummy, + }, + } - err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error { - if objectError != nil { - return objectError + err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error { + if objectError != nil { + return objectError + } + b, err := io.ReadAll(content) + assert.NoError(t, err) + assert.Equal(t, []byte("dummy"), b) + return nil + }) + if c.expectedError != "" { + assert.ErrorContains(t, err, c.expectedError) + } else { + assert.NoError(t, err) } - b, err := io.ReadAll(content) - assert.NoError(t, err) - assert.Equal(t, []byte("dummy"), b) - return nil }) - if len(c.expectederror) > 0 { - assert.True(t, strings.Contains(err.Error(), c.expectederror), "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror) - } else { - assert.NoError(t, err, "case %d", n) - } } } @@ -296,81 +289,73 @@ func TestHTTPClientUpload(t *testing.T) { cases := []struct { endpoint string - expectederror string + expectedError string }{ - // case 0 { endpoint: "https://status-not-ok.io", - expectederror: io.ErrUnexpectedEOF.Error(), + expectedError: io.ErrUnexpectedEOF.Error(), }, - // case 1 { endpoint: "https://invalid-json-response.io", - expectederror: "invalid json", + expectedError: "invalid json", }, - // case 2 { endpoint: "https://valid-batch-request-upload.io", - expectederror: "", + expectedError: "", }, - // case 3 { endpoint: "https://response-no-objects.io", - expectederror: "", + expectedError: "", }, - // case 4 { endpoint: "https://unknown-transfer-adapter.io", - expectederror: "TransferAdapter not found: ", + expectedError: "TransferAdapter not found: ", }, - // case 5 { endpoint: "https://error-in-response-objects.io", - expectederror: "Object not found", + expectedError: "Object not found", }, - // case 6 { endpoint: "https://empty-actions-map.io", - expectederror: "", + expectedError: "", }, - // case 7 { endpoint: "https://download-actions-map.io", - expectederror: "missing action 'upload'", + expectedError: "missing action 'upload'", }, - // case 8 { endpoint: "https://upload-actions-map.io", - expectederror: "", + expectedError: "", }, - // case 9 { endpoint: "https://verify-actions-map.io", - expectederror: "missing action 'upload'", + expectedError: "missing action 'upload'", }, - // case 10 { endpoint: "https://unknown-actions-map.io", - expectederror: "missing action 'upload'", + expectedError: "missing action 'upload'", }, } - for n, c := range cases { - client := &HTTPClient{ - client: hc, - endpoint: c.endpoint, - transfers: map[string]TransferAdapter{ - "dummy": dummy, - }, - } + defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 3)() + for _, c := range cases { + t.Run(c.endpoint, func(t *testing.T) { + client := &HTTPClient{ + client: hc, + endpoint: c.endpoint, + transfers: map[string]TransferAdapter{ + "dummy": dummy, + }, + } - err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) { - return io.NopCloser(new(bytes.Buffer)), objectError + err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) { + return io.NopCloser(new(bytes.Buffer)), objectError + }) + if c.expectedError != "" { + assert.ErrorContains(t, err, c.expectedError) + } else { + assert.NoError(t, err) + } }) - if len(c.expectederror) > 0 { - assert.True(t, strings.Contains(err.Error(), c.expectederror), "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror) - } else { - assert.NoError(t, err, "case %d", n) - } } } diff --git a/modules/repository/repo.go b/modules/repository/repo.go index def2220b17d12..97b0343381327 100644 --- a/modules/repository/repo.go +++ b/modules/repository/repo.go @@ -181,11 +181,12 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re downloadObjects := func(pointers []lfs.Pointer) error { err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error { + if errors.Is(objectError, lfs.ErrObjectNotExist) { + log.Warn("Ignoring missing upstream LFS object %-v: %v", p, objectError) + return nil + } + if objectError != nil { - if errors.Is(objectError, lfs.ErrObjectNotExist) { - log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError) - return nil - } return objectError } diff --git a/modules/setting/lfs.go b/modules/setting/lfs.go index 24c49cabee93a..6b54ac0a60d86 100644 --- a/modules/setting/lfs.go +++ b/modules/setting/lfs.go @@ -28,7 +28,8 @@ var LFS = struct { // LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS var LFSClient = struct { - BatchSize int `ini:"BATCH_SIZE"` + BatchSize int `ini:"BATCH_SIZE"` + BatchOperationConcurrency int `ini:"BATCH_OPERATION_CONCURRENCY"` }{} func loadLFSFrom(rootCfg ConfigProvider) error { @@ -66,6 +67,11 @@ func loadLFSFrom(rootCfg ConfigProvider) error { LFSClient.BatchSize = 20 } + if LFSClient.BatchOperationConcurrency < 1 { + // match the default git-lfs's `lfs.concurrenttransfers` + LFSClient.BatchOperationConcurrency = 3 + } + LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour) if !LFS.StartServer || !InstallLock { diff --git a/modules/setting/lfs_test.go b/modules/setting/lfs_test.go index f7beaaa9c797e..471fa8bff3c68 100644 --- a/modules/setting/lfs_test.go +++ b/modules/setting/lfs_test.go @@ -114,4 +114,17 @@ BATCH_SIZE = 0 assert.NoError(t, loadLFSFrom(cfg)) assert.EqualValues(t, 100, LFS.MaxBatchSize) assert.EqualValues(t, 20, LFSClient.BatchSize) + assert.EqualValues(t, 3, LFSClient.BatchOperationConcurrency) + + iniStr = ` +[lfs_client] +BATCH_SIZE = 50 +BATCH_OPERATION_CONCURRENCY = 10 +` + cfg, err = NewConfigProviderFromData(iniStr) + assert.NoError(t, err) + + assert.NoError(t, loadLFSFrom(cfg)) + assert.EqualValues(t, 50, LFSClient.BatchSize) + assert.EqualValues(t, 10, LFSClient.BatchOperationConcurrency) }