Skip to content

Commit

Permalink
Refactor push mirror find and add check for updating push mirror (#32539
Browse files Browse the repository at this point in the history
)

Co-authored-by: wxiaoguang <[email protected]>
  • Loading branch information
lunny and wxiaoguang authored Nov 18, 2024
1 parent 8a20fba commit 696fbe6
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 108 deletions.
3 changes: 2 additions & 1 deletion models/db/collation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func CheckCollations(x *xorm.Engine) (*CheckCollationsResult, error) {

var candidateCollations []string
if x.Dialect().URI().DBType == schemas.MYSQL {
if _, err = x.SQL("SELECT @@collation_database").Get(&res.DatabaseCollation); err != nil {
_, err = x.SQL("SELECT DEFAULT_COLLATION_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?", setting.Database.Name).Get(&res.DatabaseCollation)
if err != nil {
return nil, err
}
res.IsCollationCaseSensitive = func(s string) bool {
Expand Down
50 changes: 34 additions & 16 deletions models/repo/pushmirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ import (

"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/optional"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"

"xorm.io/builder"
)

// ErrPushMirrorNotExist mirror does not exist error
var ErrPushMirrorNotExist = util.NewNotExistErrorf("PushMirror does not exist")

// PushMirror represents mirror information of a repository.
type PushMirror struct {
ID int64 `xorm:"pk autoincr"`
Expand Down Expand Up @@ -96,26 +94,46 @@ func DeletePushMirrors(ctx context.Context, opts PushMirrorOptions) error {
return util.NewInvalidArgumentErrorf("repoID required and must be set")
}

type findPushMirrorOptions struct {
db.ListOptions
RepoID int64
SyncOnCommit optional.Option[bool]
}

func (opts findPushMirrorOptions) ToConds() builder.Cond {
cond := builder.NewCond()
if opts.RepoID > 0 {
cond = cond.And(builder.Eq{"repo_id": opts.RepoID})
}
if opts.SyncOnCommit.Has() {
cond = cond.And(builder.Eq{"sync_on_commit": opts.SyncOnCommit.Value()})
}
return cond
}

// GetPushMirrorsByRepoID returns push-mirror information of a repository.
func GetPushMirrorsByRepoID(ctx context.Context, repoID int64, listOptions db.ListOptions) ([]*PushMirror, int64, error) {
sess := db.GetEngine(ctx).Where("repo_id = ?", repoID)
if listOptions.Page != 0 {
sess = db.SetSessionPagination(sess, &listOptions)
mirrors := make([]*PushMirror, 0, listOptions.PageSize)
count, err := sess.FindAndCount(&mirrors)
return mirrors, count, err
return db.FindAndCount[PushMirror](ctx, findPushMirrorOptions{
ListOptions: listOptions,
RepoID: repoID,
})
}

func GetPushMirrorByIDAndRepoID(ctx context.Context, id, repoID int64) (*PushMirror, bool, error) {
var pushMirror PushMirror
has, err := db.GetEngine(ctx).Where("id = ?", id).And("repo_id = ?", repoID).Get(&pushMirror)
if !has || err != nil {
return nil, has, err
}
mirrors := make([]*PushMirror, 0, 10)
count, err := sess.FindAndCount(&mirrors)
return mirrors, count, err
return &pushMirror, true, nil
}

// GetPushMirrorsSyncedOnCommit returns push-mirrors for this repo that should be updated by new commits
func GetPushMirrorsSyncedOnCommit(ctx context.Context, repoID int64) ([]*PushMirror, error) {
mirrors := make([]*PushMirror, 0, 10)
return mirrors, db.GetEngine(ctx).
Where("repo_id = ? AND sync_on_commit = ?", repoID, true).
Find(&mirrors)
return db.Find[PushMirror](ctx, findPushMirrorOptions{
RepoID: repoID,
SyncOnCommit: optional.Some(true),
})
}

// PushMirrorsIterate iterates all push-mirror repositories.
Expand Down
51 changes: 15 additions & 36 deletions routers/web/repo/setting/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -290,8 +289,8 @@ func SettingsPost(ctx *context.Context) {
return
}

m, err := selectPushMirrorByForm(ctx, form, repo)
if err != nil {
m, _, _ := repo_model.GetPushMirrorByIDAndRepoID(ctx, form.PushMirrorID, repo.ID)
if m == nil {
ctx.NotFound("", nil)
return
}
Expand All @@ -317,15 +316,13 @@ func SettingsPost(ctx *context.Context) {
return
}

id, err := strconv.ParseInt(form.PushMirrorID, 10, 64)
if err != nil {
ctx.ServerError("UpdatePushMirrorIntervalPushMirrorID", err)
m, _, _ := repo_model.GetPushMirrorByIDAndRepoID(ctx, form.PushMirrorID, repo.ID)
if m == nil {
ctx.NotFound("", nil)
return
}
m := &repo_model.PushMirror{
ID: id,
Interval: interval,
}

m.Interval = interval
if err := repo_model.UpdatePushMirrorInterval(ctx, m); err != nil {
ctx.ServerError("UpdatePushMirrorInterval", err)
return
Expand All @@ -334,7 +331,10 @@ func SettingsPost(ctx *context.Context) {
// If we observed its implementation in the context of `push-mirror-sync` where it
// is evident that pushing to the queue is necessary for updates.
// So, there are updates within the given interval, it is necessary to update the queue accordingly.
mirror_service.AddPushMirrorToQueue(m.ID)
if !ctx.FormBool("push_mirror_defer_sync") {
// push_mirror_defer_sync is mainly for testing purpose, we do not really want to sync the push mirror immediately
mirror_service.AddPushMirrorToQueue(m.ID)
}
ctx.Flash.Success(ctx.Tr("repo.settings.update_settings_success"))
ctx.Redirect(repo.Link() + "/settings")

Expand All @@ -348,18 +348,18 @@ func SettingsPost(ctx *context.Context) {
// as an error on the UI for this action
ctx.Data["Err_RepoName"] = nil

m, err := selectPushMirrorByForm(ctx, form, repo)
if err != nil {
m, _, _ := repo_model.GetPushMirrorByIDAndRepoID(ctx, form.PushMirrorID, repo.ID)
if m == nil {
ctx.NotFound("", nil)
return
}

if err = mirror_service.RemovePushMirrorRemote(ctx, m); err != nil {
if err := mirror_service.RemovePushMirrorRemote(ctx, m); err != nil {
ctx.ServerError("RemovePushMirrorRemote", err)
return
}

if err = repo_model.DeletePushMirrors(ctx, repo_model.PushMirrorOptions{ID: m.ID, RepoID: m.RepoID}); err != nil {
if err := repo_model.DeletePushMirrors(ctx, repo_model.PushMirrorOptions{ID: m.ID, RepoID: m.RepoID}); err != nil {
ctx.ServerError("DeletePushMirrorByID", err)
return
}
Expand Down Expand Up @@ -995,24 +995,3 @@ func handleSettingRemoteAddrError(ctx *context.Context, err error, form *forms.R
}
ctx.RenderWithErr(ctx.Tr("repo.mirror_address_url_invalid"), tplSettingsOptions, form)
}

func selectPushMirrorByForm(ctx *context.Context, form *forms.RepoSettingForm, repo *repo_model.Repository) (*repo_model.PushMirror, error) {
id, err := strconv.ParseInt(form.PushMirrorID, 10, 64)
if err != nil {
return nil, err
}

pushMirrors, _, err := repo_model.GetPushMirrorsByRepoID(ctx, repo.ID, db.ListOptions{})
if err != nil {
return nil, err
}

for _, m := range pushMirrors {
if m.ID == id {
m.Repo = repo
return m, nil
}
}

return nil, fmt.Errorf("PushMirror[%v] not associated to repository %v", id, repo)
}
2 changes: 1 addition & 1 deletion services/forms/repo_form.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type RepoSettingForm struct {
MirrorPassword string
LFS bool `form:"mirror_lfs"`
LFSEndpoint string `form:"mirror_lfs_endpoint"`
PushMirrorID string
PushMirrorID int64
PushMirrorAddress string
PushMirrorUsername string
PushMirrorPassword string
Expand Down
10 changes: 1 addition & 9 deletions services/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"

repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
Expand Down Expand Up @@ -119,14 +118,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
return nil
}

func queueHandler(items ...*SyncRequest) []*SyncRequest {
for _, req := range items {
doMirrorSync(graceful.GetManager().ShutdownContext(), req)
}
return nil
}

// InitSyncMirrors initializes a go routine to sync the mirrors
func InitSyncMirrors() {
StartSyncMirrors(queueHandler)
StartSyncMirrors()
}
11 changes: 9 additions & 2 deletions services/mirror/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ type SyncRequest struct {
ReferenceID int64 // RepoID for pull mirror, MirrorID for push mirror
}

func queueHandler(items ...*SyncRequest) []*SyncRequest {
for _, req := range items {
doMirrorSync(graceful.GetManager().ShutdownContext(), req)
}
return nil
}

// StartSyncMirrors starts a go routine to sync the mirrors
func StartSyncMirrors(queueHandle func(data ...*SyncRequest) []*SyncRequest) {
func StartSyncMirrors() {
if !setting.Mirror.Enabled {
return
}
mirrorQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "mirror", queueHandle)
mirrorQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "mirror", queueHandler)
if mirrorQueue == nil {
log.Fatal("Unable to create mirror queue")
}
Expand Down
5 changes: 4 additions & 1 deletion tests/integration/db_collation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ func TestDatabaseCollation(t *testing.T) {

t.Run("Convert tables to utf8mb4_bin", func(t *testing.T) {
defer test.MockVariableValue(&setting.Database.CharsetCollation, "utf8mb4_bin")()
assert.NoError(t, db.ConvertDatabaseTable())
r, err := db.CheckCollations(x)
assert.NoError(t, err)
assert.EqualValues(t, "utf8mb4_bin", r.ExpectedCollation)
assert.NoError(t, db.ConvertDatabaseTable())
r, err = db.CheckCollations(x)
assert.NoError(t, err)
assert.Equal(t, "utf8mb4_bin", r.DatabaseCollation)
assert.True(t, r.CollationEquals(r.ExpectedCollation, r.DatabaseCollation))
assert.Empty(t, r.InconsistentCollationColumns)
Expand Down
Loading

0 comments on commit 696fbe6

Please sign in to comment.