Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare judge stream business #32

Merged
merged 2 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Server",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "application/server/main.go"
}
]
}
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ run-rpc-server: build check

.PHONY: run-background
run-background: build check
make -j run-task-worker run-schedule
make -j run-schedule

.PHONY: run-all
run-all: build check
make -j run-task-worker run-server run-schedule
make -j run-server run-schedule

.PHONY: help
help:
Expand Down
12 changes: 1 addition & 11 deletions application/schedule/main.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package main

import (
asynqAgent "github.com/OJ-lab/oj-lab-services/core/agent/asynq"
"github.com/OJ-lab/oj-lab-services/service/business"
)

func main() {
asynqAgent.RunSecheduler(
asynqAgent.ScheduleTask{
Cronspec: "@every 1s",
Task: business.NewTaskJudgerTrackAllState(),
},
)

}
47 changes: 47 additions & 0 deletions application/server/handler/judge.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ func SetupJudgeRoute(baseRoute *gin.RouterGroup) {
g := baseRoute.Group("/judge")
{
g.POST("/add-judger", postJudger)
g.POST("/task/pick", postPickJudgeTask)
g.POST("/task/report", postReportJudgeTaskResult)
}
}

Expand All @@ -29,3 +31,48 @@ func postJudger(ginCtx *gin.Context) {
"message": "success",
})
}

type PickJudgeTaskBody struct {
Consumer string `json:"consumer"`
}

func postPickJudgeTask(ginCtx *gin.Context) {
body := PickJudgeTaskBody{}
if err := ginCtx.ShouldBindJSON(&body); err != nil {
ginCtx.Error(err)
return
}

task, err := service.PickJudgeTask(ginCtx, body.Consumer)
if err != nil {
ginCtx.Error(err)
return
}

ginCtx.JSON(200, gin.H{
"task": task,
})
}

type ReportJudgeTaskResultBody struct {
Consumer string `json:"consumer"`
StreamID string `json:"stream_id"`
VerdictJson string `json:"verdict_json"`
}

func postReportJudgeTaskResult(ginCtx *gin.Context) {
body := ReportJudgeTaskResultBody{}
if err := ginCtx.ShouldBindJSON(&body); err != nil {
ginCtx.Error(err)
return
}

if err := service.ReportJudgeTaskResult(ginCtx, body.Consumer, body.StreamID, body.VerdictJson); err != nil {
ginCtx.Error(err)
return
}

ginCtx.JSON(200, gin.H{
"message": "success",
})
}
9 changes: 5 additions & 4 deletions application/server/handler/problem.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ func postSubmission(ginCtx *gin.Context) {
return
}

submission, err := service.PostSubmission(ginCtx, slug, body.Code, body.Language)
if err != nil {
ginCtx.Error(err)
submission := model.NewSubmission("", slug, body.Code, body.Language)
result, svcErr := service.CreateJudgeTaskSubmission(ginCtx, submission)
if svcErr != nil {
svcErr.AppendToGin(ginCtx)
return
}

ginCtx.JSON(200, submission)
ginCtx.JSON(200, result)
}
1 change: 1 addition & 0 deletions application/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func main() {
handler.SetupProblemRoute(apiRouter)
handler.SetupEventRouter(apiRouter)
handler.SetupSubmissionRouter(apiRouter)
handler.SetupJudgeRoute(apiRouter)

err := r.Run(servicePort)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
github.com/google/uuid v1.3.1
github.com/redis/go-redis/v9 v9.1.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/viper v1.16.0
github.com/swaggo/swag v1.16.2
Expand Down Expand Up @@ -78,7 +79,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.9 // indirect
github.com/redis/go-redis/v9 v9.1.0
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/ugorji/go/codec v1.2.11 // indirect
Expand Down
78 changes: 78 additions & 0 deletions service/business/judge_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package business

import (
"context"

redisAgent "github.com/OJ-lab/oj-lab-services/core/agent/redis"
"github.com/OJ-lab/oj-lab-services/service/model"
"github.com/redis/go-redis/v9"
)

const (
streamName = "oj_lab_judge_stream"
consumerGroupName = "oj_lab_judge_stream_consumer_group"
defaultConsumerName = "oj_lab_judge_stream_consumer_default"
)

func init() {
redisAgent := redisAgent.GetDefaultRedisClient()
_, err := redisAgent.XGroupCreateMkStream(context.Background(), streamName, consumerGroupName, "0").Result()
if err != nil && err != redis.Nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
panic(err)
}
}

func AddTaskToStream(ctx context.Context, task *model.JudgeTask) (*string, error) {
redisAgent := redisAgent.GetDefaultRedisClient()
id, err := redisAgent.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: task.ToStringMap(),
}).Result()
if err != nil {
return nil, err
}

return &id, err
}

func GetTaskFromStream(ctx context.Context, consumer string) (*model.JudgeTask, error) {
redisAgent := redisAgent.GetDefaultRedisClient()
if consumer == "" {
consumer = defaultConsumerName
}
result, err := redisAgent.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: consumerGroupName,
Consumer: consumer,
Streams: []string{streamName, ">"},
Count: 1,
Block: -1,
}).Result()

if err != nil {
return nil, err
}
if len(result) == 0 {
return nil, nil
}

task := model.JudgeTask{}
for _, message := range result[0].Messages {
task = *model.JudgeTaskFromMap(message.Values)
task.RedisStreamID = &message.ID
}

return &task, nil
}

func AckTaskFromStream(ctx context.Context, consumer string, streamID string) error {
redisAgent := redisAgent.GetDefaultRedisClient()
if consumer == "" {
consumer = defaultConsumerName
}
_, err := redisAgent.XAck(ctx, streamName, consumerGroupName, streamID).Result()
if err != nil {
return err
}

return nil
}
2 changes: 2 additions & 0 deletions service/business/judger_task.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// ! Deprected

package business

import (
Expand Down
31 changes: 31 additions & 0 deletions service/mapper/submission.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mapper

import (
"fmt"

"github.com/OJ-lab/oj-lab-services/service/model"
"github.com/google/uuid"
"gorm.io/gorm"
Expand Down Expand Up @@ -77,3 +79,32 @@ func GetSubmissionListByOptions(tx *gorm.DB, options GetSubmissionOptions) ([]*m

return submissions, count, nil
}

func UpdateSubmission(tx *gorm.DB, submission model.JudgeTaskSubmission) error {
updatingSubmission := model.JudgeTaskSubmission{}
if submission.UID != uuid.Nil {
err := tx.Where("uid = ?", submission.UID).First(&updatingSubmission).Error
if err != nil {
return err
}
} else if submission.RedisStreamID != "" {
err := tx.Where("redis_stream_id = ?", submission.RedisStreamID).First(&updatingSubmission).Error
if err != nil {
return err
}
} else {
return fmt.Errorf("submission uid and redis stream id are both empty")
}

if submission.Status != "" {
updatingSubmission.Status = submission.Status
}
if submission.VerdictJson != "" {
updatingSubmission.VerdictJson = submission.VerdictJson
}
if submission.RedisStreamID != "" {
updatingSubmission.RedisStreamID = submission.RedisStreamID
}

return tx.Model(&updatingSubmission).Updates(updatingSubmission).Error
}
31 changes: 19 additions & 12 deletions service/model/judge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package model

import (
"strings"

"github.com/google/uuid"
)

type JudgerState string
Expand All @@ -21,19 +19,28 @@ type Judger struct {
}

type JudgeTask struct {
UID uuid.UUID `json:"uid"`
ProblemSlug string `json:"problemSlug"`
Code string `json:"code"`
Language string `json:"language"`
Judger Judger `json:"judger"`
SubmissionUID string `json:"submissionUID"`
ProblemSlug string `json:"problemSlug"`
Code string `json:"code"`
Language string `json:"language"`
RedisStreamID *string `json:"redisStreamID"`
}

func (jt *JudgeTask) ToStringMap() map[string]interface{} {
return map[string]interface{}{
"submission_uid": jt.SubmissionUID,
"problem_slug": jt.ProblemSlug,
"code": jt.Code,
"language": jt.Language,
}
}

func NewJudgeTask(problemSlug, code, language string) *JudgeTask {
func JudgeTaskFromMap(m map[string]interface{}) *JudgeTask {
return &JudgeTask{
UID: uuid.New(),
ProblemSlug: problemSlug,
Code: code,
Language: language,
SubmissionUID: m["submission_uid"].(string),
ProblemSlug: m["problem_slug"].(string),
Code: m["code"].(string),
Language: m["language"].(string),
}
}

Expand Down
36 changes: 26 additions & 10 deletions service/model/submission.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package model

import "github.com/google/uuid"
import (
"github.com/google/uuid"
)

type SubmissionStatus string

Expand All @@ -13,6 +15,10 @@ const (

type SubmissionLanguage string

func (sl SubmissionLanguage) String() string {
return string(sl)
}

const (
SubmissionLanguageCpp SubmissionLanguage = "Cpp"
SubmissionLanguageRust SubmissionLanguage = "Rust"
Expand All @@ -22,15 +28,16 @@ const (
// Using relationship according to https://gorm.io/docs/belongs_to.html
type JudgeTaskSubmission struct {
MetaFields
UID uuid.UUID `gorm:"primaryKey" json:"uid"`
UserAccount string `gorm:"not null" json:"userAccount"`
User User `json:"user"`
ProblemSlug string `gorm:"not null" json:"problemSlug"`
Problem Problem `json:"problem"`
Code string `gorm:"not null" json:"code"`
Language SubmissionLanguage `gorm:"not null" json:"language"`
Status SubmissionStatus `gorm:"default:pending" json:"status"`
VerdictJson string `json:"verdictJson"`
UID uuid.UUID `gorm:"primaryKey" json:"UID"`
RedisStreamID string `json:"redisStreamID"`
UserAccount string `gorm:"not null" json:"userAccount"`
User User `json:"user"`
ProblemSlug string `gorm:"not null" json:"problemSlug"`
Problem Problem `json:"problem"`
Code string `gorm:"not null" json:"code"`
Language SubmissionLanguage `gorm:"not null" json:"language"`
Status SubmissionStatus `gorm:"default:pending" json:"status"`
VerdictJson string `json:"verdictJson"`
}

type JudgeTaskSubmissionSortByColumn string
Expand All @@ -54,3 +61,12 @@ func NewSubmission(
Status: SubmissionStatusPending,
}
}

func (s *JudgeTaskSubmission) ToJudgeTask() JudgeTask {
return JudgeTask{
SubmissionUID: s.UID.String(),
ProblemSlug: s.ProblemSlug,
Code: s.Code,
Language: s.Language.String(),
}
}
Loading
Loading