Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
justlorain committed Mar 28, 2024
1 parent d3c3ba3 commit fc62623
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 16 deletions.
61 changes: 47 additions & 14 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,36 @@ import (

// TODO: record execute time of each task
// TODO: add progress bar
// TODO: optimize error handling
// TODO: optimize error handling add retry strategy
// TODO: data cleaning e.g. ByteDance, bytedance, Bytedance => bytedance
// TODO: use transaction

// TODO: 定时任务如果失败(有一个 error 就判定为失败),回退事务,整体重试,通过 chan 来传递和监听是否有 err

func Start(ctx context.Context) {
errC := make(chan error)

InitTask(ctx)
c := cron.New()
if _, err := c.AddFunc(config.GlobalConfig.Backend.Cron, func() {
UpdateTask(ctx)
}); err != nil {
slog.Error("error doing cron", "err", err)
errC <- err
}
c.Start()
defer c.Stop()

if err := util.WaitSignal(errC); err != nil {
slog.Error("receive close signal error", "signal", err.Error())
return
}
slog.Info("openalysis service stopped")
}

func Restart(ctx context.Context) {
errC := make(chan error)

c := cron.New()
if _, err := c.AddFunc(config.GlobalConfig.Backend.Cron, func() {
UpdateTask(ctx)
Expand All @@ -48,7 +72,6 @@ type Count struct {
PullRequestCount int
StarCount int
ForkCount int
ContributorCount int
}

func InitTask(ctx context.Context) {
Expand All @@ -75,7 +98,6 @@ func InitTask(ctx context.Context) {
cache[login] = repos

// handle repos in org
// TODO: use errgroup to optimize performance
for _, nameWithOwner := range repos {
owner, name := util.SplitNameWithOwner(nameWithOwner)
rd := &RepoData{
Expand All @@ -96,18 +118,22 @@ func InitTask(ctx context.Context) {
orgCount.PullRequestCount += rd.Repo.PullRequests.TotalCount
orgCount.StarCount += rd.Repo.Stargazers.TotalCount
orgCount.ForkCount += rd.Repo.Forks.TotalCount
orgCount.ContributorCount += rd.ContributorCount
}
}
// TODO: both success or failed
contributorCount, err := storage.QueryContributorCountByOrg(ctx, org.ID)
if err != nil {
slog.Error("error query contributor count by org", "err", err.Error())
continue
}
if err := storage.CreateOrganization(ctx, &model.Organization{
Login: org.Login,
NodeID: org.ID,
IssueCount: orgCount.IssueCount,
PullRequestCount: orgCount.PullRequestCount,
StarCount: orgCount.StarCount,
ForkCount: orgCount.ForkCount,
ContributorCount: orgCount.ContributorCount,
ContributorCount: contributorCount,
}); err != nil {
slog.Error("error create org", "err", err.Error())
continue
Expand All @@ -124,7 +150,6 @@ func InitTask(ctx context.Context) {
groupCount.PullRequestCount += orgCount.PullRequestCount
groupCount.StarCount += orgCount.StarCount
groupCount.ForkCount += orgCount.ForkCount
groupCount.ContributorCount += orgCount.ContributorCount
}
}
// handle repos in group
Expand Down Expand Up @@ -156,17 +181,20 @@ func InitTask(ctx context.Context) {
groupCount.PullRequestCount += rd.Repo.PullRequests.TotalCount
groupCount.StarCount += rd.Repo.Stargazers.TotalCount
groupCount.ForkCount += rd.Repo.Forks.TotalCount
groupCount.ContributorCount += rd.ContributorCount
}
}
// TODO: insert groups first, then update counts
contributorCount, err := storage.QueryContributorCountByGroup(ctx, group.Name)
if err != nil {
slog.Error("error query contributor count by group", "err", err.Error())
}
if err := storage.CreateGroup(ctx, &model.Group{
Name: group.Name,
IssueCount: groupCount.IssueCount,
PullRequestCount: groupCount.PullRequestCount,
StarCount: groupCount.StarCount,
ForkCount: groupCount.ForkCount,
ContributorCount: groupCount.ContributorCount,
ContributorCount: contributorCount,
}); err != nil {
slog.Error("error create group", "err", err.Error())
continue
Expand Down Expand Up @@ -225,16 +253,20 @@ func UpdateTask(ctx context.Context) {
orgCount.PullRequestCount += rd.Repo.PullRequests.TotalCount
orgCount.StarCount += rd.Repo.Stargazers.TotalCount
orgCount.ForkCount += rd.Repo.Forks.TotalCount
orgCount.ContributorCount += rd.ContributorCount
}
}
contributorCount, err := storage.QueryContributorCountByOrg(ctx, org.ID)
if err != nil {
slog.Error("error query contributor count by org", "err", err.Error())
continue
}
if err := storage.UpdateOrganization(ctx, &model.Organization{
NodeID: org.ID,
IssueCount: orgCount.IssueCount,
PullRequestCount: orgCount.PullRequestCount,
StarCount: orgCount.StarCount,
ForkCount: orgCount.ForkCount,
ContributorCount: orgCount.ContributorCount,
ContributorCount: contributorCount,
}); err != nil {
slog.Error("error update org", "err", err.Error())
continue
Expand All @@ -244,7 +276,6 @@ func UpdateTask(ctx context.Context) {
groupCount.PullRequestCount += orgCount.PullRequestCount
groupCount.StarCount += orgCount.StarCount
groupCount.ForkCount += orgCount.ForkCount
groupCount.ContributorCount += orgCount.ContributorCount
}
}
for _, nameWithOwner := range group.Repos {
Expand Down Expand Up @@ -272,16 +303,19 @@ func UpdateTask(ctx context.Context) {
groupCount.PullRequestCount += rd.Repo.PullRequests.TotalCount
groupCount.StarCount += rd.Repo.Stargazers.TotalCount
groupCount.ForkCount += rd.Repo.Forks.TotalCount
groupCount.ContributorCount += rd.ContributorCount
}
}
contributorCount, err := storage.QueryContributorCountByGroup(ctx, group.Name)
if err != nil {
slog.Error("error query contributor count by group", "err", err.Error())
}
if err := storage.UpdateGroup(ctx, &model.Group{
Name: group.Name,
IssueCount: groupCount.IssueCount,
PullRequestCount: groupCount.PullRequestCount,
StarCount: groupCount.StarCount,
ForkCount: groupCount.ForkCount,
ContributorCount: groupCount.ContributorCount,
ContributorCount: contributorCount,
}); err != nil {
slog.Error("error update group", "err", err.Error())
continue
Expand Down Expand Up @@ -658,7 +692,6 @@ func UpdateRepoData(ctx context.Context, rd *RepoData) error {
return err
}
}
// TODO: 这里更新时应该根据 repo_node_id 和 login 来查找不然可能会更新到其他 repo 的同一个 contributor
if err := storage.UpdateOrCreateContributors(ctx, rd.Contributors); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestInitTask(t *testing.T) {
storage.Init()
graphql.Init()
rest.Init()
InitTask(context.Background()) // 7min 78s (467.58s)
InitTask(context.Background()) // over 8 min
}

func TestUpdateTask(t *testing.T) {
Expand Down
53 changes: 52 additions & 1 deletion storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,61 @@ func CreateContributors(ctx context.Context, cs []*model.Contributor) error {
return DB.WithContext(ctx).Create(cs).Error
}

func QueryContributorCountByOrg(ctx context.Context, orgNodeID string) (int, error) {
var contributorCount int
if err := DB.WithContext(ctx).
Table("contributors").
Select("COUNT(DISTINCT contributors.node_id) AS contributor_count").
Joins("JOIN repositories ON contributors.repo_node_id = repositories.node_id").
Joins("JOIN organizations ON repositories.owner_node_id = organizations.node_id").
Where("organizations.node_id = ?", orgNodeID).
Scan(&contributorCount).Error; err != nil {
return 0, err
}
return contributorCount, nil
}

func QueryContributorCountByGroup(ctx context.Context, groupName string) (int, error) {
var count int64

var repos1 []string
sq1 := DB.WithContext(ctx).
Table("groups_repositories").
Select("groups_repositories.repo_node_id").
Joins("INNER JOIN repositories ON groups_repositories.repo_node_id = repositories.node_id").
Where("groups_repositories.group_name = ?", groupName)
if err := sq1.Find(&repos1).Error; err != nil {
return 0, err
}

var repos2 []string
sq2 := DB.WithContext(ctx).
Table("repositories").
Select("repositories.node_id").
Joins("INNER JOIN groups_organizations ON repositories.owner_node_id = groups_organizations.org_node_id").
Where("groups_organizations.group_name = ?", groupName)
if err := sq2.Find(&repos2).Error; err != nil {
return 0, err
}

repoNodeIDs := append(repos1, repos2...)

if err := DB.WithContext(ctx).
Table("contributors").
Select("contributors.node_id").
Where("contributors.repo_node_id IN ?", repoNodeIDs).
Distinct().
Count(&count).Error; err != nil {
return 0, err
}
return int(count), nil
}

func UpdateOrCreateContributors(ctx context.Context, cs []*model.Contributor) error {
for _, contributor := range cs {
if err := DB.WithContext(ctx).Where(model.Contributor{
NodeID: contributor.NodeID,
NodeID: contributor.NodeID,
RepoNodeID: contributor.RepoNodeID,
}).Assign(contributor).FirstOrCreate(contributor).Error; err != nil {
return err
}
Expand Down
22 changes: 22 additions & 0 deletions storage/storage_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package storage

import (
"context"
"fmt"
"github.com/B1NARY-GR0UP/openalysis/config"
"github.com/B1NARY-GR0UP/openalysis/model"
"gorm.io/driver/mysql"
"gorm.io/gorm"
Expand Down Expand Up @@ -77,3 +79,23 @@ func TestCreateRepo(t *testing.T) {
//}
//fmt.Println(rows)
}

func TestQueryContributorCountByOrg(t *testing.T) {
config.Init("../default.yaml")
Init()
count, err := QueryContributorCountByOrg(context.Background(), "MDEyOk9yZ2FuaXphdGlvbjc5MjM2NDUz")
if err != nil {
panic(err.Error())
}
fmt.Println(count)
}

func TestQueryContributorCountByGroup(t *testing.T) {
config.Init("../default.yaml")
Init()
count, err := QueryContributorCountByGroup(context.Background(), "cloudwego")
if err != nil {
panic(err.Error())
}
fmt.Println(count)
}

0 comments on commit fc62623

Please sign in to comment.