From 31c22913fab27d074fe8a54abfc94554c5839011 Mon Sep 17 00:00:00 2001 From: Lorain Date: Fri, 29 Mar 2024 01:41:02 +0800 Subject: [PATCH] use transaction --- cron/cron.go | 205 +++++++++++++++++++++++++-------------------- cron/cron_test.go | 12 ++- storage/storage.go | 162 +++++++++++++++++------------------ 3 files changed, 204 insertions(+), 175 deletions(-) diff --git a/cron/cron.go b/cron/cron.go index d7e7dfc..c561b40 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -11,26 +11,35 @@ import ( "github.com/robfig/cron/v3" "github.com/shurcooL/githubv4" "golang.org/x/sync/errgroup" + "gorm.io/gorm" "log/slog" "time" ) // TODO: record execute time of each task // TODO: add progress bar -// TODO: optimize error handling add retry strategy // TODO: data cleaning e.g. ByteDance, bytedance, Bytedance => bytedance -// TODO: use transaction - -// TODO: 定时任务如果失败(有一个 error 就判定为失败),回退事务,整体重试,通过 chan 来传递和监听是否有 err -// TODO: 使用事务可能需要把 global DB 变为参数传递 func Start(ctx context.Context) { + slog.Info("openalysis service started") + errC := make(chan error) + // if init failed, stop service + errC <- InitTask(ctx, storage.DB) - InitTask(ctx) c := cron.New() if _, err := c.AddFunc(config.GlobalConfig.Backend.Cron, func() { - UpdateTask(ctx) + for { + tx := storage.DB.Begin() + err := UpdateTask(ctx, tx) + if err == nil { + tx.Commit() + break + } + slog.Error("error doing update task", "err", err.Error()) + tx.Rollback() + slog.Info("transaction rollback and retry") + } }); err != nil { slog.Error("error doing cron", "err", err) errC <- err @@ -42,15 +51,28 @@ func Start(ctx context.Context) { slog.Error("receive close signal error", "signal", err.Error()) return } + slog.Info("openalysis service stopped") } func Restart(ctx context.Context) { + slog.Info("openalysis service restarted") + errC := make(chan error) c := cron.New() if _, err := c.AddFunc(config.GlobalConfig.Backend.Cron, func() { - UpdateTask(ctx) + for { + tx := storage.DB.Begin() + err := UpdateTask(ctx, tx) + if err == nil { + tx.Commit() + break + } + slog.Error("error doing update task", "err", err.Error()) + tx.Rollback() + slog.Info("transaction rollback and retry") + } }); err != nil { slog.Error("error doing cron", "err", err) errC <- err @@ -62,6 +84,7 @@ func Restart(ctx context.Context) { slog.Error("receive close signal error", "signal", err.Error()) return } + slog.Info("openalysis service stopped") } @@ -75,7 +98,7 @@ type Count struct { ForkCount int } -func InitTask(ctx context.Context) { +func InitTask(ctx context.Context, db *gorm.DB) error { // init cache cache = make(map[string][]string) // handle groups @@ -88,12 +111,12 @@ func InitTask(ctx context.Context) { org, err := graphql.QueryOrgInfo(ctx, login) if err != nil { slog.Error("error query org info", "err", err.Error()) - continue + return err } repos, err := graphql.QueryRepoNameByOrg(ctx, login) if err != nil { slog.Error("error query repo name by org", "err", err.Error()) - continue + return err } cache[login] = repos @@ -108,11 +131,11 @@ func InitTask(ctx context.Context) { } if err := FetchRepoData(ctx, rd, time.Time{}, ""); err != nil { slog.Error("error fetch repo data", "err", err.Error()) - continue + return err } - if err := CreateRepoData(ctx, rd); err != nil { + if err := CreateRepoData(ctx, db, rd); err != nil { slog.Error("error create repo data", "err", err.Error()) - continue + return err } { orgCount.IssueCount += rd.Repo.Issues.TotalCount @@ -121,13 +144,12 @@ func InitTask(ctx context.Context) { orgCount.ForkCount += rd.Repo.Forks.TotalCount } } - // TODO: both success or failed - contributorCount, err := storage.QueryContributorCountByOrg(ctx, org.ID) + contributorCount, err := storage.QueryContributorCountByOrg(ctx, db, org.ID) if err != nil { slog.Error("error query contributor count by org", "err", err.Error()) - continue + return err } - if err := storage.CreateOrganization(ctx, &model.Organization{ + if err := storage.CreateOrganization(ctx, db, &model.Organization{ Login: org.Login, NodeID: org.ID, IssueCount: orgCount.IssueCount, @@ -137,14 +159,14 @@ func InitTask(ctx context.Context) { ContributorCount: contributorCount, }); err != nil { slog.Error("error create org", "err", err.Error()) - continue + return err } - if err := storage.CreateGroupsOrganizations(ctx, &model.GroupsOrganizations{ + if err := storage.CreateGroupsOrganizations(ctx, db, &model.GroupsOrganizations{ GroupName: group.Name, OrgNodeID: org.ID, }); err != nil { slog.Error("error create group org join", "err", err.Error()) - continue + return err } { groupCount.IssueCount += orgCount.IssueCount @@ -163,19 +185,18 @@ func InitTask(ctx context.Context) { } if err := FetchRepoData(ctx, rd, time.Time{}, ""); err != nil { slog.Error("error fetch repo data", "err", err.Error()) - continue + return err } - // TODO: both success or failed - if err := CreateRepoData(ctx, rd); err != nil { + if err := CreateRepoData(ctx, db, rd); err != nil { slog.Error("error create repo data", "err", err.Error()) - continue + return err } - if err := storage.CreateGroupsRepositories(ctx, &model.GroupsRepositories{ + if err := storage.CreateGroupsRepositories(ctx, db, &model.GroupsRepositories{ GroupName: group.Name, RepoNodeID: rd.Repo.ID, }); err != nil { slog.Error("error create group repo join", "err", err.Error()) - continue + return err } { groupCount.IssueCount += rd.Repo.Issues.TotalCount @@ -184,12 +205,12 @@ func InitTask(ctx context.Context) { groupCount.ForkCount += rd.Repo.Forks.TotalCount } } - // TODO: insert groups first, then update counts - contributorCount, err := storage.QueryContributorCountByGroup(ctx, group.Name) + contributorCount, err := storage.QueryContributorCountByGroup(ctx, db, group.Name) if err != nil { slog.Error("error query contributor count by group", "err", err.Error()) + return err } - if err := storage.CreateGroup(ctx, &model.Group{ + if err := storage.CreateGroup(ctx, db, &model.Group{ Name: group.Name, IssueCount: groupCount.IssueCount, PullRequestCount: groupCount.PullRequestCount, @@ -198,13 +219,13 @@ func InitTask(ctx context.Context) { ContributorCount: contributorCount, }); err != nil { slog.Error("error create group", "err", err.Error()) - continue + return err } } + return nil } -// UpdateTask TODO: fix bug -func UpdateTask(ctx context.Context) { +func UpdateTask(ctx context.Context, db *gorm.DB) error { for _, group := range config.GlobalConfig.Groups { var groupCount Count for _, login := range group.Orgs { @@ -212,18 +233,18 @@ func UpdateTask(ctx context.Context) { org, err := graphql.QueryOrgInfo(ctx, login) if err != nil { slog.Error("error query org info", "err", err.Error()) - continue + return err } repos, err := graphql.QueryRepoNameByOrg(ctx, login) if err != nil { slog.Error("error query repo name by org", "err", err.Error()) - continue + return err } _, deleteNeeded := util.CompareSlices(cache[login], repos) - if err := DeleteRepos(ctx, deleteNeeded); err != nil { + if err := DeleteRepos(ctx, db, deleteNeeded); err != nil { slog.Error("error delete repos", "err", err.Error()) - continue + return err } // update cache @@ -236,18 +257,18 @@ func UpdateTask(ctx context.Context) { Name: name, NameWithOwner: nameWithOwner, } - cursor, err := storage.QueryCursor(ctx, nameWithOwner) + cursor, err := storage.QueryCursor(ctx, db, nameWithOwner) if err != nil { slog.Error("error query cursor", "err", err.Error()) - continue + return err } if err := FetchRepoData(ctx, rd, cursor.LastUpdate, cursor.EndCursor); err != nil { slog.Error("error fetch repo data", "err", err.Error()) - continue + return err } - if err := UpdateRepoData(ctx, rd); err != nil { + if err := UpdateRepoData(ctx, db, rd); err != nil { slog.Error("error update repo data", "err", err.Error()) - continue + return err } { orgCount.IssueCount += rd.Repo.Issues.TotalCount @@ -256,12 +277,12 @@ func UpdateTask(ctx context.Context) { orgCount.ForkCount += rd.Repo.Forks.TotalCount } } - contributorCount, err := storage.QueryContributorCountByOrg(ctx, org.ID) + contributorCount, err := storage.QueryContributorCountByOrg(ctx, db, org.ID) if err != nil { slog.Error("error query contributor count by org", "err", err.Error()) - continue + return err } - if err := storage.UpdateOrganization(ctx, &model.Organization{ + if err := storage.UpdateOrganization(ctx, db, &model.Organization{ NodeID: org.ID, IssueCount: orgCount.IssueCount, PullRequestCount: orgCount.PullRequestCount, @@ -270,7 +291,7 @@ func UpdateTask(ctx context.Context) { ContributorCount: contributorCount, }); err != nil { slog.Error("error update org", "err", err.Error()) - continue + return err } { groupCount.IssueCount += orgCount.IssueCount @@ -286,18 +307,18 @@ func UpdateTask(ctx context.Context) { Name: name, NameWithOwner: nameWithOwner, } - cursor, err := storage.QueryCursor(ctx, nameWithOwner) + cursor, err := storage.QueryCursor(ctx, db, nameWithOwner) if err != nil { slog.Error("error query cursor", "err", err.Error()) - continue + return err } if err := FetchRepoData(ctx, rd, cursor.LastUpdate, cursor.EndCursor); err != nil { slog.Error("error fetch repo data", "err", err.Error()) - continue + return err } - if err := UpdateRepoData(ctx, rd); err != nil { + if err := UpdateRepoData(ctx, db, rd); err != nil { slog.Error("error update repo data", "err", err.Error()) - continue + return err } { groupCount.IssueCount += rd.Repo.Issues.TotalCount @@ -306,11 +327,12 @@ func UpdateTask(ctx context.Context) { groupCount.ForkCount += rd.Repo.Forks.TotalCount } } - contributorCount, err := storage.QueryContributorCountByGroup(ctx, group.Name) + contributorCount, err := storage.QueryContributorCountByGroup(ctx, db, group.Name) if err != nil { slog.Error("error query contributor count by group", "err", err.Error()) + return err } - if err := storage.UpdateGroup(ctx, &model.Group{ + if err := storage.UpdateGroup(ctx, db, &model.Group{ Name: group.Name, IssueCount: groupCount.IssueCount, PullRequestCount: groupCount.PullRequestCount, @@ -319,9 +341,10 @@ func UpdateTask(ctx context.Context) { ContributorCount: contributorCount, }); err != nil { slog.Error("error update group", "err", err.Error()) - continue + return err } } + return nil } type RepoData struct { @@ -382,8 +405,8 @@ func FetchRepoData(ctx context.Context, rd *RepoData, lu time.Time, ec string) e return nil } -func CreateRepoData(ctx context.Context, rd *RepoData) error { - if err := storage.CreateRepository(ctx, &model.Repository{ +func CreateRepoData(ctx context.Context, db *gorm.DB, rd *RepoData) error { + if err := storage.CreateRepository(ctx, db, &model.Repository{ Owner: rd.Owner, Name: rd.Name, NodeID: rd.Repo.ID, @@ -422,10 +445,10 @@ func CreateRepoData(ctx context.Context, rd *RepoData) error { } } } - if err := storage.CreateIssues(ctx, issueData); err != nil { + if err := storage.CreateIssues(ctx, db, issueData); err != nil { return err } - if err := storage.CreateIssueAssignees(ctx, issueAssignees); err != nil { + if err := storage.CreateIssueAssignees(ctx, db, issueAssignees); err != nil { return err } var prData []*model.PullRequest @@ -455,13 +478,13 @@ func CreateRepoData(ctx context.Context, rd *RepoData) error { } } } - if err := storage.CreatePullRequests(ctx, prData); err != nil { + if err := storage.CreatePullRequests(ctx, db, prData); err != nil { return err } - if err := storage.CreatePullRequestAssignees(ctx, prAssignees); err != nil { + if err := storage.CreatePullRequestAssignees(ctx, db, prAssignees); err != nil { return err } - if err := storage.CreateCursor(context.Background(), &model.Cursor{ + if err := storage.CreateCursor(ctx, db, &model.Cursor{ RepoNodeID: rd.Repo.ID, RepoNameWithOwner: rd.NameWithOwner, LastUpdate: rd.LastUpdate, @@ -469,15 +492,15 @@ func CreateRepoData(ctx context.Context, rd *RepoData) error { }); err != nil { return err } - if err := storage.CreateContributors(ctx, rd.Contributors); err != nil { + if err := storage.CreateContributors(ctx, db, rd.Contributors); err != nil { return err } return nil } -func UpdateRepoData(ctx context.Context, rd *RepoData) error { +func UpdateRepoData(ctx context.Context, db *gorm.DB, rd *RepoData) error { // create repo in each update task due to time series graph - if err := storage.CreateRepository(ctx, &model.Repository{ + if err := storage.CreateRepository(ctx, db, &model.Repository{ Owner: rd.Owner, Name: rd.Name, NodeID: rd.Repo.ID, @@ -493,14 +516,14 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { // handle issue for _, issue := range rd.Issues { // handle update in issues table - exist, err := storage.IssueExist(ctx, issue.ID) + exist, err := storage.IssueExist(ctx, db, issue.ID) if err != nil { return err } switch exist { case true: // overlay update issues in db - if err := storage.UpdateIssue(ctx, &model.Issue{ + if err := storage.UpdateIssue(ctx, db, &model.Issue{ NodeID: issue.ID, State: issue.State, IssueClosedAt: issue.ClosedAt, @@ -509,7 +532,7 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { } case false: // add new issues to db - if err := storage.CreateIssues(ctx, []*model.Issue{ + if err := storage.CreateIssues(ctx, db, []*model.Issue{ { NodeID: issue.ID, Author: issue.Author.Login, @@ -537,7 +560,7 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { }) } // handle update in issue_assignees table - exist, err = storage.IssueAssigneesExist(ctx, issue.ID) + exist, err = storage.IssueAssigneesExist(ctx, db, issue.ID) if err != nil { return err } @@ -549,19 +572,19 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { case githubv4.IssueStateOpen: if util.IsEmptySlice(assignees) { // remove from issue_assignees because no assignees - if err := storage.DeleteIssueAssigneesByIssue(ctx, issue.ID); err != nil { + if err := storage.DeleteIssueAssigneesByIssue(ctx, db, issue.ID); err != nil { return err } } else { // update db if the assignees are changed - if err := storage.UpdateIssueAssignees(ctx, issue.ID, assignees); err != nil { + if err := storage.UpdateIssueAssignees(ctx, db, issue.ID, assignees); err != nil { return err } } // after update the issue is closed case githubv4.IssueStateClosed: // remove from issue_assignees because of closed issue - if err := storage.DeleteIssueAssigneesByIssue(ctx, issue.ID); err != nil { + if err := storage.DeleteIssueAssigneesByIssue(ctx, db, issue.ID); err != nil { return err } } @@ -570,7 +593,7 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { // judge if issue has assignees if !util.IsEmptySlice(issue.Assignees.Nodes) && githubv4.IssueState(issue.State) == githubv4.IssueStateOpen { // insert into issue_assignees - if err := storage.CreateIssueAssignees(ctx, assignees); err != nil { + if err := storage.CreateIssueAssignees(ctx, db, assignees); err != nil { return err } } @@ -579,7 +602,7 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { // handle pr // update old pull requests in db // only open pr need to update - openPRs, err := storage.QueryOPENPullRequests(ctx, rd.Repo.ID) + openPRs, err := storage.QueryOPENPullRequests(ctx, db, rd.Repo.ID) if err != nil { return err } @@ -590,7 +613,7 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { return err } // overlay update old open prs - if err := storage.UpdatePullRequest(ctx, &model.PullRequest{ + if err := storage.UpdatePullRequest(ctx, db, &model.PullRequest{ NodeID: pr.ID, State: pr.State, PRMergedAt: pr.MergedAt, @@ -612,7 +635,7 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { } // judge if old pr has assignees // NOTE: openPR.NodeID == pr.ID - exist, err := storage.PullRequestAssigneesExist(ctx, pr.ID) + exist, err := storage.PullRequestAssigneesExist(ctx, db, pr.ID) if err != nil { return err } @@ -624,18 +647,18 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { case githubv4.PullRequestStateOpen: if !util.IsEmptySlice(assignees) { // if latest pr still have assignees then overlay update - if err := storage.UpdatePullRequestAssignees(ctx, pr.ID, assignees); err != nil { + if err := storage.UpdatePullRequestAssignees(ctx, db, pr.ID, assignees); err != nil { return err } } else { // if latest pr does not have any assignees then remove from pull_request_assignees - if err := storage.DeletePullRequestAssigneesByPR(ctx, pr.ID); err != nil { + if err := storage.DeletePullRequestAssigneesByPR(ctx, db, pr.ID); err != nil { return err } } // old open pr is closed or merged case githubv4.PullRequestStateMerged, githubv4.PullRequestStateClosed: - if err := storage.DeletePullRequestAssigneesByPR(ctx, pr.ID); err != nil { + if err := storage.DeletePullRequestAssigneesByPR(ctx, db, pr.ID); err != nil { return err } } @@ -643,7 +666,7 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { case false: if !util.IsEmptySlice(assignees) && githubv4.PullRequestState(pr.State) == githubv4.PullRequestStateOpen { // latest open pr has assignees then insert into db - if err := storage.CreatePullRequestAssignees(ctx, assignees); err != nil { + if err := storage.CreatePullRequestAssignees(ctx, db, assignees); err != nil { return err } } @@ -679,14 +702,14 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { } } // handle update in pull_requests table - if err := storage.CreatePullRequests(ctx, prs); err != nil { + if err := storage.CreatePullRequests(ctx, db, prs); err != nil { return err } - if err := storage.CreatePullRequestAssignees(ctx, prAssignees); err != nil { + if err := storage.CreatePullRequestAssignees(ctx, db, prAssignees); err != nil { return err } if !rd.LastUpdate.IsZero() || rd.EndCursor != "" { - if err := storage.UpdateCursor(ctx, &model.Cursor{ + if err := storage.UpdateCursor(ctx, db, &model.Cursor{ RepoNodeID: rd.Repo.ID, LastUpdate: rd.LastUpdate, EndCursor: rd.EndCursor, @@ -694,38 +717,38 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error { return err } } - if err := storage.UpdateOrCreateContributors(ctx, rd.Contributors); err != nil { + if err := storage.UpdateOrCreateContributors(ctx, db, rd.Contributors); err != nil { return err } return nil } -func DeleteRepos(ctx context.Context, repos []string) error { +func DeleteRepos(ctx context.Context, db *gorm.DB, repos []string) error { if util.IsEmptySlice(repos) { return nil } for _, repo := range repos { owner, name := util.SplitNameWithOwner(repo) - id, err := storage.QueryRepositoryNodeID(ctx, owner, name) + id, err := storage.QueryRepositoryNodeID(ctx, db, owner, name) if err != nil { return err } - if err := storage.DeleteRepository(ctx, id); err != nil { + if err := storage.DeleteRepository(ctx, db, id); err != nil { return err } - if err := storage.DeleteIssues(ctx, id); err != nil { + if err := storage.DeleteIssues(ctx, db, id); err != nil { return err } - if err := storage.DeleteIssueAssigneesByRepo(ctx, repo); err != nil { + if err := storage.DeleteIssueAssigneesByRepo(ctx, db, repo); err != nil { return err } - if err := storage.DeletePullRequests(ctx, id); err != nil { + if err := storage.DeletePullRequests(ctx, db, id); err != nil { return err } - if err := storage.DeletePullRequestAssigneesByRepo(ctx, repo); err != nil { + if err := storage.DeletePullRequestAssigneesByRepo(ctx, db, repo); err != nil { return err } - if err := storage.DeleteCursor(ctx, id); err != nil { + if err := storage.DeleteCursor(ctx, db, id); err != nil { return err } } diff --git a/cron/cron_test.go b/cron/cron_test.go index 6419f93..13d6e2a 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -41,7 +41,10 @@ func TestInitTask(t *testing.T) { storage.Init() graphql.Init() rest.Init() - InitTask(context.Background()) // around 9 min for cloudwego init + err := InitTask(context.Background(), storage.DB) // around 9 min for cloudwego init + if err != nil { + t.Fatal(err) + } } func TestUpdateTask(t *testing.T) { @@ -54,13 +57,16 @@ func TestUpdateTask(t *testing.T) { for _, login := range group.Orgs { repos, err := graphql.QueryRepoNameByOrg(context.Background(), login) if err != nil { - panic("test panic") + t.Fatal(err) } cache[login] = repos } } - UpdateTask(context.Background()) + err := UpdateTask(context.Background(), storage.DB) + if err != nil { + t.Fatal(err) + } } func TestProgressBar(t *testing.T) { diff --git a/storage/storage.go b/storage/storage.go index 9d3c4a0..8a82385 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -47,13 +47,13 @@ func Init() { } } -func CreateGroup(ctx context.Context, group *model.Group) error { - return DB.WithContext(ctx).Create(group).Error +func CreateGroup(ctx context.Context, db *gorm.DB, group *model.Group) error { + return db.WithContext(ctx).Create(group).Error } -func UpdateGroup(ctx context.Context, group *model.Group) error { +func UpdateGroup(ctx context.Context, db *gorm.DB, group *model.Group) error { var currentGroup model.Group - if err := DB.WithContext(ctx).Where("name = ?", group.Name).First(¤tGroup).Error; err != nil { + if err := db.WithContext(ctx).Where("name = ?", group.Name).First(¤tGroup).Error; err != nil { return err } currentGroup.IssueCount = group.IssueCount @@ -61,19 +61,19 @@ func UpdateGroup(ctx context.Context, group *model.Group) error { currentGroup.StarCount = group.StarCount currentGroup.ForkCount = group.ForkCount currentGroup.ContributorCount = group.ContributorCount - if err := DB.WithContext(ctx).Save(¤tGroup).Error; err != nil { + if err := db.WithContext(ctx).Save(¤tGroup).Error; err != nil { return err } return nil } -func CreateOrganization(ctx context.Context, org *model.Organization) error { - return DB.WithContext(ctx).Create(org).Error +func CreateOrganization(ctx context.Context, db *gorm.DB, org *model.Organization) error { + return db.WithContext(ctx).Create(org).Error } -func UpdateOrganization(ctx context.Context, org *model.Organization) error { +func UpdateOrganization(ctx context.Context, db *gorm.DB, org *model.Organization) error { var currentOrg model.Organization - if err := DB.WithContext(ctx).Where("node_id = ?", org.ID).First(¤tOrg).Error; err != nil { + if err := db.WithContext(ctx).Where("node_id = ?", org.ID).First(¤tOrg).Error; err != nil { return err } currentOrg.IssueCount = org.IssueCount @@ -81,47 +81,47 @@ func UpdateOrganization(ctx context.Context, org *model.Organization) error { currentOrg.StarCount = org.StarCount currentOrg.ForkCount = org.ForkCount currentOrg.ContributorCount = org.ContributorCount - if err := DB.WithContext(ctx).Save(¤tOrg).Error; err != nil { + if err := db.WithContext(ctx).Save(¤tOrg).Error; err != nil { return err } return nil } -func CreateRepository(ctx context.Context, repo *model.Repository) error { - return DB.WithContext(ctx).Create(repo).Error +func CreateRepository(ctx context.Context, db *gorm.DB, repo *model.Repository) error { + return db.WithContext(ctx).Create(repo).Error } -func QueryRepositoryNodeID(ctx context.Context, owner, name string) (string, error) { +func QueryRepositoryNodeID(ctx context.Context, db *gorm.DB, owner, name string) (string, error) { var repo model.Repository - err := DB.WithContext(ctx).Where(model.Repository{ + err := db.WithContext(ctx).Where(model.Repository{ Owner: owner, Name: name, }).First(&repo).Error return repo.NodeID, err } -func DeleteRepository(ctx context.Context, nodeID string) error { - return DB.WithContext(ctx).Where("node_id = ?", nodeID).Delete(&model.Repository{}).Error +func DeleteRepository(ctx context.Context, db *gorm.DB, nodeID string) error { + return db.WithContext(ctx).Where("node_id = ?", nodeID).Delete(&model.Repository{}).Error } -func CreateGroupsOrganizations(ctx context.Context, join *model.GroupsOrganizations) error { - return DB.WithContext(ctx).Create(join).Error +func CreateGroupsOrganizations(ctx context.Context, db *gorm.DB, join *model.GroupsOrganizations) error { + return db.WithContext(ctx).Create(join).Error } -func CreateGroupsRepositories(ctx context.Context, join *model.GroupsRepositories) error { - return DB.WithContext(ctx).Create(join).Error +func CreateGroupsRepositories(ctx context.Context, db *gorm.DB, join *model.GroupsRepositories) error { + return db.WithContext(ctx).Create(join).Error } -func CreateIssues(ctx context.Context, issues []*model.Issue) error { +func CreateIssues(ctx context.Context, db *gorm.DB, issues []*model.Issue) error { if util.IsEmptySlice(issues) { return nil } - return DB.WithContext(ctx).Create(issues).Error + return db.WithContext(ctx).Create(issues).Error } -func IssueExist(ctx context.Context, nodeID string) (bool, error) { +func IssueExist(ctx context.Context, db *gorm.DB, nodeID string) (bool, error) { var issue model.Issue - if err := DB.WithContext(ctx).Where("node_id = ?", nodeID).First(&issue).Error; err != nil { + if err := db.WithContext(ctx).Where("node_id = ?", nodeID).First(&issue).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return false, nil } @@ -130,64 +130,64 @@ func IssueExist(ctx context.Context, nodeID string) (bool, error) { return true, nil } -func UpdateIssue(ctx context.Context, issue *model.Issue) error { +func UpdateIssue(ctx context.Context, db *gorm.DB, issue *model.Issue) error { var currentIssue model.Issue - if err := DB.WithContext(ctx).Where("node_id = ?", issue.NodeID).First(¤tIssue).Error; err != nil { + if err := db.WithContext(ctx).Where("node_id = ?", issue.NodeID).First(¤tIssue).Error; err != nil { return err } currentIssue.State = issue.State currentIssue.IssueClosedAt = issue.IssueClosedAt - if err := DB.WithContext(ctx).Save(¤tIssue).Error; err != nil { + if err := db.WithContext(ctx).Save(¤tIssue).Error; err != nil { return err } return nil } -func DeleteIssues(ctx context.Context, repoNodeID string) error { - return DB.WithContext(ctx).Where("repo_node_id = ?", repoNodeID).Delete(&model.Issue{}).Error +func DeleteIssues(ctx context.Context, db *gorm.DB, repoNodeID string) error { + return db.WithContext(ctx).Where("repo_node_id = ?", repoNodeID).Delete(&model.Issue{}).Error } -func CreatePullRequests(ctx context.Context, prs []*model.PullRequest) error { +func CreatePullRequests(ctx context.Context, db *gorm.DB, prs []*model.PullRequest) error { if util.IsEmptySlice(prs) { return nil } - return DB.WithContext(ctx).Create(prs).Error + return db.WithContext(ctx).Create(prs).Error } -func UpdatePullRequest(ctx context.Context, pr *model.PullRequest) error { +func UpdatePullRequest(ctx context.Context, db *gorm.DB, pr *model.PullRequest) error { var currentPR model.PullRequest - if err := DB.WithContext(ctx).Where("node_id = ?", pr.NodeID).First(¤tPR).Error; err != nil { + if err := db.WithContext(ctx).Where("node_id = ?", pr.NodeID).First(¤tPR).Error; err != nil { return err } currentPR.State = pr.State currentPR.PRMergedAt = pr.PRMergedAt currentPR.PRClosedAt = pr.PRClosedAt - if err := DB.WithContext(ctx).Save(¤tPR).Error; err != nil { + if err := db.WithContext(ctx).Save(¤tPR).Error; err != nil { return err } return nil } -func DeletePullRequests(ctx context.Context, repoNodeID string) error { - return DB.WithContext(ctx).Where("repo_node_id = ?", repoNodeID).Delete(&model.PullRequest{}).Error +func DeletePullRequests(ctx context.Context, db *gorm.DB, repoNodeID string) error { + return db.WithContext(ctx).Where("repo_node_id = ?", repoNodeID).Delete(&model.PullRequest{}).Error } -func QueryOPENPullRequests(ctx context.Context, repoNodeID string) ([]model.PullRequest, error) { +func QueryOPENPullRequests(ctx context.Context, db *gorm.DB, repoNodeID string) ([]model.PullRequest, error) { var prs []model.PullRequest - err := DB.WithContext(ctx).Where("state = ? AND repo_node_id = ?", "OPEN", repoNodeID).Find(&prs).Error + err := db.WithContext(ctx).Where("state = ? AND repo_node_id = ?", "OPEN", repoNodeID).Find(&prs).Error return prs, err } -func CreateIssueAssignees(ctx context.Context, assignees []*model.IssueAssignees) error { +func CreateIssueAssignees(ctx context.Context, db *gorm.DB, assignees []*model.IssueAssignees) error { if util.IsEmptySlice(assignees) { return nil } - return DB.WithContext(ctx).Create(assignees).Error + return db.WithContext(ctx).Create(assignees).Error } -func IssueAssigneesExist(ctx context.Context, nodeID string) (bool, error) { +func IssueAssigneesExist(ctx context.Context, db *gorm.DB, nodeID string) (bool, error) { var assignees model.IssueAssignees - if err := DB.WithContext(ctx).Where("issue_node_id = ?", nodeID).First(&assignees).Error; err != nil { + if err := db.WithContext(ctx).Where("issue_node_id = ?", nodeID).First(&assignees).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return false, nil } @@ -196,37 +196,37 @@ func IssueAssigneesExist(ctx context.Context, nodeID string) (bool, error) { return true, nil } -func UpdateIssueAssignees(ctx context.Context, issueNodeID string, assignees []*model.IssueAssignees) error { +func UpdateIssueAssignees(ctx context.Context, db *gorm.DB, issueNodeID string, assignees []*model.IssueAssignees) error { if util.IsEmptySlice(assignees) { return nil } var currentAssignees []*model.IssueAssignees - if err := DB.WithContext(ctx).Where("issue_node_id = ?", issueNodeID).Find(¤tAssignees).Error; err != nil { + if err := db.WithContext(ctx).Where("issue_node_id = ?", issueNodeID).Find(¤tAssignees).Error; err != nil { return err } more, less := util.CompareSlices(currentAssignees, assignees) - if err := DB.WithContext(ctx).Create(more).Error; err != nil { + if err := db.WithContext(ctx).Create(more).Error; err != nil { return err } for _, e := range less { - if err := DB.WithContext(ctx).Where("id = ?", e.ID).Delete(&model.IssueAssignees{}).Error; err != nil { + if err := db.WithContext(ctx).Where("id = ?", e.ID).Delete(&model.IssueAssignees{}).Error; err != nil { return err } } return nil } -func DeleteIssueAssigneesByIssue(ctx context.Context, issueNodeID string) error { - return DB.WithContext(ctx).Where("issue_node_id = ?", issueNodeID).Delete(&model.IssueAssignees{}).Error +func DeleteIssueAssigneesByIssue(ctx context.Context, db *gorm.DB, issueNodeID string) error { + return db.WithContext(ctx).Where("issue_node_id = ?", issueNodeID).Delete(&model.IssueAssignees{}).Error } -func DeleteIssueAssigneesByRepo(ctx context.Context, nameWithOwner string) error { - return DB.WithContext(ctx).Where("issue_repo_name = ?", nameWithOwner).Delete(&model.IssueAssignees{}).Error +func DeleteIssueAssigneesByRepo(ctx context.Context, db *gorm.DB, nameWithOwner string) error { + return db.WithContext(ctx).Where("issue_repo_name = ?", nameWithOwner).Delete(&model.IssueAssignees{}).Error } -func PullRequestAssigneesExist(ctx context.Context, nodeID string) (bool, error) { +func PullRequestAssigneesExist(ctx context.Context, db *gorm.DB, nodeID string) (bool, error) { var assignees model.PullRequestAssignees - if err := DB.WithContext(ctx).Where("pull_request_node_id = ?", nodeID).First(&assignees).Error; err != nil { + if err := db.WithContext(ctx).Where("pull_request_node_id = ?", nodeID).First(&assignees).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return false, nil } @@ -235,51 +235,51 @@ func PullRequestAssigneesExist(ctx context.Context, nodeID string) (bool, error) return true, nil } -func CreatePullRequestAssignees(ctx context.Context, assignees []*model.PullRequestAssignees) error { +func CreatePullRequestAssignees(ctx context.Context, db *gorm.DB, assignees []*model.PullRequestAssignees) error { if util.IsEmptySlice(assignees) { return nil } - return DB.WithContext(ctx).Create(assignees).Error + return db.WithContext(ctx).Create(assignees).Error } -func UpdatePullRequestAssignees(ctx context.Context, prNodeID string, assignees []*model.PullRequestAssignees) error { +func UpdatePullRequestAssignees(ctx context.Context, db *gorm.DB, prNodeID string, assignees []*model.PullRequestAssignees) error { if util.IsEmptySlice(assignees) { return nil } var currentAssignees []*model.PullRequestAssignees - if err := DB.WithContext(ctx).Where("pull_request_node_id = ?", prNodeID).Find(¤tAssignees).Error; err != nil { + if err := db.WithContext(ctx).Where("pull_request_node_id = ?", prNodeID).Find(¤tAssignees).Error; err != nil { return err } more, less := util.CompareSlices(currentAssignees, assignees) - if err := DB.WithContext(ctx).Create(more).Error; err != nil { + if err := db.WithContext(ctx).Create(more).Error; err != nil { return err } for _, e := range less { - if err := DB.WithContext(ctx).Where("id = ?", e.ID).Delete(&model.PullRequestAssignees{}).Error; err != nil { + if err := db.WithContext(ctx).Where("id = ?", e.ID).Delete(&model.PullRequestAssignees{}).Error; err != nil { return err } } return nil } -func DeletePullRequestAssigneesByPR(ctx context.Context, prNodeID string) error { - return DB.WithContext(ctx).Where("pull_request_node_id = ?", prNodeID).Delete(&model.PullRequestAssignees{}).Error +func DeletePullRequestAssigneesByPR(ctx context.Context, db *gorm.DB, prNodeID string) error { + return db.WithContext(ctx).Where("pull_request_node_id = ?", prNodeID).Delete(&model.PullRequestAssignees{}).Error } -func DeletePullRequestAssigneesByRepo(ctx context.Context, nameWithOwner string) error { - return DB.WithContext(ctx).Where("pull_request_repo_name = ?", nameWithOwner).Delete(&model.PullRequestAssignees{}).Error +func DeletePullRequestAssigneesByRepo(ctx context.Context, db *gorm.DB, nameWithOwner string) error { + return db.WithContext(ctx).Where("pull_request_repo_name = ?", nameWithOwner).Delete(&model.PullRequestAssignees{}).Error } -func CreateContributors(ctx context.Context, cs []*model.Contributor) error { +func CreateContributors(ctx context.Context, db *gorm.DB, cs []*model.Contributor) error { if util.IsEmptySlice(cs) { return nil } - return DB.WithContext(ctx).Create(cs).Error + return db.WithContext(ctx).Create(cs).Error } -func QueryContributorCountByOrg(ctx context.Context, orgNodeID string) (int, error) { +func QueryContributorCountByOrg(ctx context.Context, db *gorm.DB, orgNodeID string) (int, error) { var contributorCount int - if err := DB.WithContext(ctx). + if err := db.WithContext(ctx). Table("contributors"). Select("COUNT(DISTINCT contributors.node_id) AS contributor_count"). Joins("INNER JOIN repositories ON contributors.repo_node_id = repositories.node_id"). @@ -290,11 +290,11 @@ func QueryContributorCountByOrg(ctx context.Context, orgNodeID string) (int, err return contributorCount, nil } -func QueryContributorCountByGroup(ctx context.Context, groupName string) (int, error) { +func QueryContributorCountByGroup(ctx context.Context, db *gorm.DB, groupName string) (int, error) { var count int64 var repos1 []string - sq1 := DB.WithContext(ctx). + sq1 := db.WithContext(ctx). Table("groups_repositories"). Select("groups_repositories.repo_node_id"). Joins("INNER JOIN repositories ON groups_repositories.repo_node_id = repositories.node_id"). @@ -304,7 +304,7 @@ func QueryContributorCountByGroup(ctx context.Context, groupName string) (int, e } var repos2 []string - sq2 := DB.WithContext(ctx). + sq2 := db.WithContext(ctx). Table("repositories"). Select("repositories.node_id"). Joins("INNER JOIN groups_organizations ON repositories.owner_node_id = groups_organizations.org_node_id"). @@ -315,7 +315,7 @@ func QueryContributorCountByGroup(ctx context.Context, groupName string) (int, e repoNodeIDs := append(repos1, repos2...) - if err := DB.WithContext(ctx). + if err := db.WithContext(ctx). Table("contributors"). Select("contributors.node_id"). Where("contributors.repo_node_id IN ?", repoNodeIDs). @@ -326,9 +326,9 @@ func QueryContributorCountByGroup(ctx context.Context, groupName string) (int, e return int(count), nil } -func UpdateOrCreateContributors(ctx context.Context, cs []*model.Contributor) error { +func UpdateOrCreateContributors(ctx context.Context, db *gorm.DB, cs []*model.Contributor) error { for _, contributor := range cs { - if err := DB.WithContext(ctx).Where(model.Contributor{ + if err := db.WithContext(ctx).Where(model.Contributor{ NodeID: contributor.NodeID, RepoNodeID: contributor.RepoNodeID, }).Assign(contributor).FirstOrCreate(contributor).Error; err != nil { @@ -338,13 +338,13 @@ func UpdateOrCreateContributors(ctx context.Context, cs []*model.Contributor) er return nil } -func CreateCursor(ctx context.Context, cursor *model.Cursor) error { - return DB.WithContext(ctx).Create(cursor).Error +func CreateCursor(ctx context.Context, db *gorm.DB, cursor *model.Cursor) error { + return db.WithContext(ctx).Create(cursor).Error } -func QueryCursor(ctx context.Context, repo string) (*model.Cursor, error) { +func QueryCursor(ctx context.Context, db *gorm.DB, repo string) (*model.Cursor, error) { cursor := &model.Cursor{} - err := DB.WithContext(ctx).Where("repo_name_with_owner = ?", repo).First(cursor).Error + err := db.WithContext(ctx).Where("repo_name_with_owner = ?", repo).First(cursor).Error // for organization's new repository case if errors.Is(err, gorm.ErrRecordNotFound) { return cursor, nil @@ -352,19 +352,19 @@ func QueryCursor(ctx context.Context, repo string) (*model.Cursor, error) { return cursor, err } -func UpdateCursor(ctx context.Context, cursor *model.Cursor) error { +func UpdateCursor(ctx context.Context, db *gorm.DB, cursor *model.Cursor) error { var currentCursor model.Cursor - if err := DB.WithContext(ctx).Where("repo_node_id = ?", cursor.RepoNodeID).First(¤tCursor).Error; err != nil { + if err := db.WithContext(ctx).Where("repo_node_id = ?", cursor.RepoNodeID).First(¤tCursor).Error; err != nil { return err } currentCursor.LastUpdate = cursor.LastUpdate currentCursor.EndCursor = cursor.EndCursor - if err := DB.WithContext(ctx).Save(¤tCursor).Error; err != nil { + if err := db.WithContext(ctx).Save(¤tCursor).Error; err != nil { return err } return nil } -func DeleteCursor(ctx context.Context, repoNodeID string) error { - return DB.WithContext(ctx).Where("repo_node_id = ?", repoNodeID).Delete(&model.Cursor{}).Error +func DeleteCursor(ctx context.Context, db *gorm.DB, repoNodeID string) error { + return db.WithContext(ctx).Where("repo_node_id = ?", repoNodeID).Delete(&model.Cursor{}).Error }