diff --git a/blizzard.go b/blizzard.go index edb2475..8a580fb 100644 --- a/blizzard.go +++ b/blizzard.go @@ -7,9 +7,9 @@ import ( ) func Init(ctx context.Context) { - judge.ResponseWorker = judge.NewWorker(ctx) + judge.Worker = judge.NewWorker(ctx) cron.Start(ctx) - go judge.ResponseWorker.Work() + go judge.Worker.Work() } func Destroy() { diff --git a/cache/store.go b/cache/store.go index 3e6a53f..58ccf05 100644 --- a/cache/store.go +++ b/cache/store.go @@ -1,13 +1,10 @@ package cache import ( - "context" "fmt" "github.com/ArcticOJ/blizzard/v0/config" - "github.com/ArcticOJ/blizzard/v0/logger" "github.com/redis/go-redis/v9" "net" - "time" ) type ( @@ -15,8 +12,7 @@ type ( ) const ( - Result DB = iota + 1 - User + User DB = iota + 1 Bucket Submission Judge @@ -24,8 +20,6 @@ const ( ) func CreateClient(db DB, name string) (c redis.UniversalClient) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() c = redis.NewClient(&redis.Options{ Addr: net.JoinHostPort(config.Config.Dragonfly.Host, fmt.Sprint(config.Config.Dragonfly.Port)), DB: int(db), @@ -33,6 +27,5 @@ func CreateClient(db DB, name string) (c redis.UniversalClient) { if config.Config.Debug { c.AddHook(DebugHook{Name: name}) } - logger.Panic(c.Ping(ctx).Err(), "failed to initialize redis client for %s cache", name) return } diff --git a/cache/stores/buckets.go b/cache/stores/buckets.go index cc0137c..7a75ba4 100644 --- a/cache/stores/buckets.go +++ b/cache/stores/buckets.go @@ -10,19 +10,19 @@ import ( "math" ) -var RateLimiter *RateLimitStore +var RateLimiter *rateLimitStore const defaultBucketKey = "blizzard::bucket[%s]" -type RateLimitStore struct { +type rateLimitStore struct { c redis.UniversalClient } func init() { - RateLimiter = &RateLimitStore{cache.CreateClient(cache.Bucket, "buckets")} + RateLimiter = &rateLimitStore{cache.CreateClient(cache.Bucket, "buckets")} } -func (s *RateLimitStore) Limit(ctx context.Context, ip string) (allowed bool, totalLimit, remaining, retryAfter, nextReset int64) { +func (s *rateLimitStore) Limit(ctx context.Context, ip string) (allowed bool, totalLimit, remaining, retryAfter, nextReset int64) { v, e := s.c.Do(ctx, "CL.THROTTLE", fmt.Sprintf(defaultBucketKey, ip), uint16(math.Max(math.Ceil(float64(config.Config.RateLimit)/2), 1)), config.Config.RateLimit, 30, 1).Int64Slice() if e != nil || len(v) != 5 { logger.Blizzard.Err(e).Msgf("failed to process rate limit for '%s'", ip) diff --git a/cache/stores/judge.go b/cache/stores/judge.go index 6f995fb..e0d9545 100644 --- a/cache/stores/judge.go +++ b/cache/stores/judge.go @@ -7,7 +7,7 @@ import ( "time" ) -var Judge *JudgeStore +var Judge *judgeStatusStore type ( JudgeStatus struct { @@ -19,9 +19,6 @@ type ( Runtimes []JudgeRuntime `json:"runtimes"` } - JudgeInfo struct { - } - JudgeRuntime struct { ID string `json:"id"` Compiler string `json:"compiler"` @@ -29,7 +26,7 @@ type ( Version string `json:"version"` } - JudgeStore struct { + judgeStatusStore struct { c redis.UniversalClient } ) @@ -41,10 +38,10 @@ const ( ) func init() { - Judge = &JudgeStore{c: cache.CreateClient(cache.Judge, "judge")} + Judge = &judgeStatusStore{c: cache.CreateClient(cache.Judge, "judge")} } -func (s *JudgeStore) UpdateJudgeStatus(ctx context.Context, judgeList []interface{}, status string, allowedRuntimes []interface{}) { +func (s *judgeStatusStore) UpdateJudgeStatus(ctx context.Context, judgeList []interface{}, status string, allowedRuntimes []interface{}) { s.c.TxPipelined(ctx, func(p redis.Pipeliner) error { s.c.Set(ctx, defaultJudgeStatusKey, status, time.Hour*24) p.Del(ctx, defaultAllowedRuntimesKey).Err() @@ -54,12 +51,12 @@ func (s *JudgeStore) UpdateJudgeStatus(ctx context.Context, judgeList []interfac }) } -func (s *JudgeStore) IsRuntimeAllowed(ctx context.Context, runtime string) bool { +func (s *judgeStatusStore) IsRuntimeAllowed(ctx context.Context, runtime string) bool { v, e := s.c.SIsMember(ctx, defaultAllowedRuntimesKey, runtime).Result() return v && e == nil } -func (s *JudgeStore) GetJudgeList(ctx context.Context) []string { +func (s *judgeStatusStore) GetJudgeList(ctx context.Context) []string { v, e := s.c.SMembers(ctx, defaultJudgeListKey).Result() if e != nil { return nil @@ -67,7 +64,7 @@ func (s *JudgeStore) GetJudgeList(ctx context.Context) []string { return v } -func (s *JudgeStore) GetJudgeStatus(ctx context.Context) []byte { +func (s *judgeStatusStore) GetJudgeStatus(ctx context.Context) []byte { status, e := s.c.Get(ctx, defaultJudgeStatusKey).Result() if e != nil { return []byte("null") diff --git a/cache/stores/pending.go b/cache/stores/pending.go deleted file mode 100644 index 3af8e65..0000000 --- a/cache/stores/pending.go +++ /dev/null @@ -1,52 +0,0 @@ -package stores - -import ( - "context" - "fmt" - "github.com/ArcticOJ/blizzard/v0/cache" - "github.com/redis/go-redis/v9" - "sync" - "time" -) - -var Pending *PendingStore - -type PendingStore struct { - c redis.UniversalClient - l sync.RWMutex - tl sync.RWMutex -} - -const ( - defaultPendingSubmissionKey = "blizzard::pending_submission[%d]" - // 30 minutes - defaultExtraTtl = 30 * 30 -) - -func init() { - Pending = &PendingStore{c: cache.CreateClient(cache.Result, "results")} -} - -func (s *PendingStore) Set(ctx context.Context, id uint32, tag uint64, count, ttl uint16) error { - s.l.Lock() - defer s.l.Unlock() - return s.c.SetEx(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id), tag, time.Duration(defaultExtraTtl+count*(ttl+2))*time.Second).Err() -} - -func (s *PendingStore) IsPending(ctx context.Context, id uint32) bool { - s.l.RLock() - defer s.l.RUnlock() - ok, e := s.c.Exists(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id)).Result() - return ok == 1 && e == nil -} - -func (s *PendingStore) Get(ctx context.Context, id uint32) (uint64, bool) { - tag, e := s.c.Get(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id)).Uint64() - return tag, e == nil -} - -func (s *PendingStore) Delete(ctx context.Context, id uint32) { - s.l.Lock() - defer s.l.Unlock() - s.c.Del(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id)) -} diff --git a/cache/stores/problems.go b/cache/stores/problems.go index 1a95684..7101a47 100644 --- a/cache/stores/problems.go +++ b/cache/stores/problems.go @@ -1,13 +1,18 @@ package stores import ( + "context" + "fmt" "github.com/ArcticOJ/blizzard/v0/cache" + "github.com/ArcticOJ/blizzard/v0/db" + "github.com/ArcticOJ/blizzard/v0/db/models/contest" "github.com/ArcticOJ/blizzard/v0/rejson" + "time" ) -var Problems *ProblemStore +var Problems *problemStore -type ProblemStore struct { +type problemStore struct { j *rejson.ReJSON } @@ -16,9 +21,29 @@ const ( ) func init() { - Problems = &ProblemStore{j: &rejson.ReJSON{Client: cache.CreateClient(cache.Problem, "problems")}} + Problems = &problemStore{j: &rejson.ReJSON{Client: cache.CreateClient(cache.Problem, "problems")}} } -func (*ProblemStore) Get() { +func (s *problemStore) fallback(ctx context.Context, id string) *contest.Problem { + prob := new(contest.Problem) + if db.Database.NewSelect().Model(prob).Where("id = ?", id).Scan(ctx) != nil { + return nil + } + // TODO: use global context to avoid interrupted operations + s.j.JTxPipelined(ctx, func(r *rejson.ReJSON) error { + k := fmt.Sprintf(defaultProblemKey, id) + if e := r.JSONSet(ctx, k, "$", prob); e != nil { + return e + } + return r.Expire(ctx, k, time.Hour*12).Err() + }) + return prob +} +func (s *problemStore) Get(ctx context.Context, id string) *contest.Problem { + p := s.j.JSONGet(ctx, fmt.Sprintf(defaultProblemKey, id)) + //if p == nil { + // return s.fallback(ctx, id) + //} + return rejson.Unmarshal[contest.Problem](p) } diff --git a/cache/stores/submissions.go b/cache/stores/submissions.go index 8209f73..64ae48d 100644 --- a/cache/stores/submissions.go +++ b/cache/stores/submissions.go @@ -1 +1,77 @@ package stores + +import ( + "context" + "fmt" + "github.com/ArcticOJ/blizzard/v0/cache" + "github.com/ArcticOJ/blizzard/v0/db/models/contest" + "github.com/ArcticOJ/blizzard/v0/rejson" + "github.com/ArcticOJ/blizzard/v0/utils" + "sync" + "time" +) + +var Submissions *submissionStore + +type submissionStore struct { + j *rejson.ReJSON + l sync.RWMutex +} + +const ( + defaultPendingSubmissionKey = "blizzard::pending_submission[%d]" + // 30 minutes + defaultExtraTtl = 30 * 60 +) + +func init() { + Submissions = &submissionStore{j: &rejson.ReJSON{Client: cache.CreateClient(cache.Submission, "submissions")}} +} + +func (s *submissionStore) SetPending(ctx context.Context, id uint32, tag uint64, count, ttl uint16) error { + s.l.Lock() + defer s.l.Unlock() + return s.j.JTxPipelined(ctx, func(r *rejson.ReJSON) error { + k := fmt.Sprintf(defaultPendingSubmissionKey, id) + if e := r.JSONSet(ctx, k, "$", map[string]interface{}{ + "tag": tag, + "cases": utils.ArrayFill[interface{}](nil, int(count)), + }); e != nil { + return e + } + return r.Expire(ctx, k, time.Duration(defaultExtraTtl+count*(ttl+2))*time.Second).Err() + }) +} + +func (s *submissionStore) UpdatePending(ctx context.Context, id uint32, result contest.CaseResult) error { + return s.j.JSONSet(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id), fmt.Sprintf("$..cases[%d]", result.ID-1), result) +} + +func (s *submissionStore) IsPending(ctx context.Context, id uint32) bool { + s.l.RLock() + defer s.l.RUnlock() + ok, e := s.j.Exists(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id)).Result() + return ok == 1 && e == nil +} + +func (s *submissionStore) GetPendingResults(ctx context.Context, id uint32) []contest.CaseResult { + r := s.j.JSONGet(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id), "$.cases") + if _r := rejson.Unmarshal[[][]contest.CaseResult](r); _r != nil && len(*_r) > 0 { + return (*_r)[0] + } + return nil +} + +func (s *submissionStore) GetPendingTag(ctx context.Context, id uint32) (uint64, bool) { + r := s.j.JSONGet(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id), "$") + if t := rejson.Unmarshal[[]uint64](r); t != nil && len(*t) > 0 { + return (*t)[0], true + } + return 0, false +} + +func (s *submissionStore) DeletePending(ctx context.Context, id uint32) { + s.l.Lock() + defer s.l.Unlock() + s.j.Del(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id)) +} diff --git a/cache/stores/users.go b/cache/stores/users.go index 932dd2a..29ae46f 100644 --- a/cache/stores/users.go +++ b/cache/stores/users.go @@ -3,54 +3,72 @@ package stores import ( "context" "crypto/md5" + "errors" "fmt" "github.com/ArcticOJ/blizzard/v0/cache" "github.com/ArcticOJ/blizzard/v0/db" "github.com/ArcticOJ/blizzard/v0/db/models/user" + "github.com/ArcticOJ/blizzard/v0/logger" "github.com/ArcticOJ/blizzard/v0/rejson" + "github.com/ArcticOJ/blizzard/v0/utils/numeric" "github.com/google/uuid" "github.com/redis/go-redis/v9" "github.com/tmthrgd/go-hex" "github.com/uptrace/bun" + "golang.org/x/sync/singleflight" "strings" "time" ) -var Users *UserStore +var Users *userStore -type UserStore struct { +type userStore struct { j *rejson.ReJSON + s singleflight.Group } const ( defaultUserKey = "blizzard::user[%s]" defaultHandleToIdResolver = "blizzard::user_id[%s]" + defaultUserListKey = "blizzard::user_list" + defaultUserTtl = time.Hour * 48 + + DefaultUserPageSize = 25 ) func init() { - //ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - //defer cancel() - Users = &UserStore{j: &rejson.ReJSON{Client: cache.CreateClient(cache.User, "users")}} - //var ids []string - //logger.Panic(db.Database.NewSelect().Model((*user.User)(nil)).Column("id").Scan(ctx, &ids), "failed to to query for users") - //var m []redis.Z - //for _, id := range ids { - // m = append(m, redis.Z{ - // Score: 0, - // Member: id, - // }) - //} - //_, e := Users.j.TxPipelined(ctx, func(p redis.Pipeliner) error { - // p.Del(ctx, defaultUserListKey) - // p.ZAdd(ctx, defaultUserListKey, m...) - // return nil - //}) - //logger.Panic(e, "failed to populate user cache") + Users = &userStore{j: &rejson.ReJSON{Client: cache.CreateClient(cache.User, "users")}} + Users.populateUserList(context.Background()) } -func (s *UserStore) load(id uuid.UUID, handle string) (u *user.User) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - defer cancel() +func (s *userStore) populateUserList(c context.Context) { + _, e, _ := s.s.Do("populate", func() (interface{}, error) { + if Users.j.Exists(c, defaultUserListKey).Val() == 0 { + var ( + ids []string + ratings []uint16 + ) + // TODO: use app's context for this + if _e := db.Database.NewSelect().Model((*user.User)(nil)).Column("id", "rating").Scan(context.Background(), &ids, &ratings); _e != nil { + return nil, _e + } + var m []redis.Z + for i := range ids { + m = append(m, redis.Z{ + Score: float64(numeric.CompressUint16(ratings[i], 0)), + Member: ids[i], + }) + } + if _e := Users.j.ZAdd(context.Background(), defaultUserListKey, m...).Err(); _e != nil { + return nil, _e + } + } + return nil, nil + }) + logger.Panic(e, "failed to populate user cache") +} + +func (s *userStore) loadOne(ctx context.Context, id uuid.UUID, handle string) (u *user.User) { u = new(user.User) q := db.Database.NewSelect().Model(u).ExcludeColumn("password", "api_key") if id == uuid.Nil { @@ -69,56 +87,106 @@ func (s *UserStore) load(id uuid.UUID, handle string) (u *user.User) { h := md5.Sum([]byte(u.Email)) u.Avatar = hex.EncodeToString(h[:]) } + if len(u.Roles) > 0 { + u.TopRole = &u.Roles[0] + } return } -//func (s *UserStore) Exists(ctx context.Context, id uuid.UUID) bool { -// return s.j.ZScore(ctx, defaultUserListKey, id.String()).Err() == nil -//} +func (s *userStore) loadMulti(ctx context.Context, users []user.User) (u []user.User) { + if db.Database.NewSelect(). + // users are not selected by order of ids but by order in the database, so I have to use this hack to ensure stable order + // might not be performance-wise but at least it does the trick lol + With("inp", db.Database.NewValues(&users).Column("id").WithOrder()). + Model(&u).ExcludeColumn("password", "api_key"). + Table("inp"). + Where("\"user\".id = inp.id"). + OrderExpr("inp._order"). + Relation("Connections", func(query *bun.SelectQuery) *bun.SelectQuery { + return query.Where("show_in_profile = true") + }). + Relation("Roles", func(query *bun.SelectQuery) *bun.SelectQuery { + return query.Order("priority ASC").Column("name", "icon", "color") + }).Scan(ctx) != nil { + return nil + } + for i := range u { + if strings.TrimSpace(u[i].Email) != "" { + h := md5.Sum([]byte(u[i].Email)) + u[i].Avatar = hex.EncodeToString(h[:]) + } + if len(u[i].Roles) > 0 { + u[i].TopRole = &u[i].Roles[0] + } + } + return +} -func (s *UserStore) fallback(ctx context.Context, id uuid.UUID, handle string) *user.User { - u := s.load(id, handle) +func (s *userStore) UserExists(ctx context.Context, id uuid.UUID) bool { + s.populateUserList(ctx) + return s.j.ZScore(ctx, defaultUserListKey, id.String()).Err() == nil +} + +func (s *userStore) fallbackOne(ctx context.Context, id uuid.UUID, handle string) *user.User { + u := s.loadOne(ctx, id, handle) if u != nil && u.ID != uuid.Nil { handle = u.Handle - s.j.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error { - p := &rejson.ReJSON{Client: pipeliner} - if e := p.Set(ctx, fmt.Sprintf(defaultHandleToIdResolver, handle), u.ID.String(), time.Hour*12).Err(); e != nil { + s.j.JTxPipelined(ctx, func(r *rejson.ReJSON) error { + if e := r.Set(ctx, fmt.Sprintf(defaultHandleToIdResolver, handle), u.ID.String(), defaultUserTtl).Err(); e != nil { return e } k := fmt.Sprintf(defaultUserKey, u.ID) - if e := p.JSONSet(ctx, k, "$", u); e != nil { + if e := r.JSONSet(ctx, k, "$", u); e != nil { return e } - return s.j.Expire(ctx, k, time.Hour*12).Err() + return r.Expire(ctx, k, defaultUserTtl).Err() }) return u } return nil } -func (s *UserStore) Get(ctx context.Context, id uuid.UUID, handle string) *user.User { +func (s *userStore) fallbackMulti(ctx context.Context, users []user.User) (u []user.User) { + u = s.loadMulti(ctx, users) + s.j.JTxPipelined(ctx, func(r *rejson.ReJSON) error { + for _, _u := range u { + if _u.ID != uuid.Nil { + k := fmt.Sprintf(defaultUserKey, _u.ID) + if r.Set(ctx, fmt.Sprintf(defaultHandleToIdResolver, _u.Handle), _u.ID.String(), defaultUserTtl).Err() == nil && + r.JSONSet(ctx, k, "$", _u) == nil { + r.Expire(ctx, k, defaultUserTtl) + } + } + } + return nil + }) + return +} + +func (s *userStore) Get(ctx context.Context, id uuid.UUID, handle string) *user.User { if handle == "" && id == uuid.Nil { return nil } if id == uuid.Nil { if _id, e := s.j.Get(ctx, fmt.Sprintf(defaultHandleToIdResolver, handle)).Result(); e == nil && _id != "" && _id != uuid.Nil.String() { id, e = uuid.Parse(_id) - if e == nil { - return s.fallback(ctx, id, "") + if e != nil { + return s.fallbackOne(ctx, id, "") } + goto getFromCache } - return s.fallback(ctx, uuid.Nil, handle) + return s.fallbackOne(ctx, uuid.Nil, handle) } - //if !s.Exists(ctx, id) { - // return nil - //} - r := s.j.JSONGet(ctx, fmt.Sprintf(defaultUserKey, id), "$") - if _u := rejson.Unmarshal[user.User](r); _u == nil { - return s.fallback(ctx, id, "") - } else if len(_u) > 0 { - return &(_u)[0] + if !s.UserExists(ctx, id) { + return nil } - return nil +getFromCache: + r := s.j.JSONGet(ctx, fmt.Sprintf(defaultUserKey, id)) + _u := rejson.Unmarshal[user.User](r) + if _u == nil { + return s.fallbackOne(ctx, id, "") + } + return _u } func userToMinimalUser(u *user.User) *user.MinimalUser { @@ -140,29 +208,97 @@ func userToMinimalUser(u *user.User) *user.MinimalUser { } } -func (s *UserStore) GetMinimal(ctx context.Context, id uuid.UUID) *user.MinimalUser { +func (s *userStore) GetMinimal(ctx context.Context, id uuid.UUID) *user.MinimalUser { if id == uuid.Nil { return nil } - //if !s.Exists(ctx, id) { - // return nil - //} - r := s.j.JSONGet(ctx, fmt.Sprintf(defaultUserKey, id), "$['id','displayName','handle','organization','avatar','roles','rating']") - res := rejson.Unmarshal[interface{}](r) - if res == nil || len(res) != 7 { - return userToMinimalUser(s.fallback(ctx, id, "")) + if !s.UserExists(ctx, id) { + return nil } - var topRole interface{} = nil - if _r, ok := res[5].([]interface{}); ok && len(_r) > 0 { - topRole = _r[0] + r := s.j.JSONGet(ctx, fmt.Sprintf(defaultUserKey, id), "") + res := rejson.Unmarshal[[]interface{}](r) + if res == nil || len(*res) != 7 { + return userToMinimalUser(s.fallbackOne(ctx, id, "")) } + _res := *res return &user.MinimalUser{ - ID: res[0].(string), - DisplayName: res[1].(string), - Handle: res[2].(string), - Organization: res[3].(string), - Avatar: res[4].(string), - TopRole: topRole, - Rating: uint16(res[6].(float64)), + ID: _res[0].(string), + DisplayName: _res[1].(string), + Handle: _res[2].(string), + Organization: _res[3].(string), + Avatar: _res[4].(string), + TopRole: _res[5], + Rating: uint16(_res[6].(float64)), + } +} + +func (s *userStore) GetPage(ctx context.Context, page uint16, rev bool) (res []user.MinimalUser) { + order := "desc" + if rev { + order = "asc" + } + r, e, _ := s.s.Do(fmt.Sprintf("page-%d-%s", page, order), func() (interface{}, error) { + s.populateUserList(ctx) + u := s.j.ZRangeArgs(context.Background(), redis.ZRangeArgs{ + Key: defaultUserListKey, + Start: "-inf", + Stop: "+inf", + ByScore: true, + // the leaderboard should be descending by default, so rev should make it ascending instead of descending + Rev: !rev, + Offset: int64(page * DefaultUserPageSize), + Count: DefaultUserPageSize, + }).Val() + if len(u) == 0 { + return nil, errors.New("no users") + } + var toGet []interface{} + for _, z := range u { + toGet = append(toGet, fmt.Sprintf(defaultUserKey, z)) + } + if r := s.j.JSONMGet(ctx, "$..['id','displayName','handle','organization','avatar','topRole','rating']", toGet...); len(r) > 0 { + res = make([]user.MinimalUser, len(r)) + var ( + toLoad []user.User + indices []int + ) + for i := range r { + _u := rejson.Unmarshal[[]interface{}](r[i]) + if _u == nil || len(*_u) != 7 { + toLoad = append(toLoad, user.User{ + ID: uuid.MustParse(u[i]), + }) + indices = append(indices, i) + continue + } + usr := *_u + res[i] = user.MinimalUser{ + ID: usr[0].(string), + DisplayName: usr[1].(string), + Handle: usr[2].(string), + Organization: usr[3].(string), + Avatar: usr[4].(string), + TopRole: usr[5], + Rating: uint16(usr[6].(float64)), + } + } + if len(toLoad) > 0 { + ul := s.fallbackMulti(ctx, toLoad) + for i, c := range ul { + if _u := userToMinimalUser(&c); _u != nil { + res[indices[i]] = *_u + } + } + } + return res, nil + } + return nil, nil + }) + if e != nil { + return } + if _r, ok := r.([]user.MinimalUser); ok { + r = _r + } + return } diff --git a/core/runtimes.go b/core/runtimes.go deleted file mode 100644 index 8f18da3..0000000 --- a/core/runtimes.go +++ /dev/null @@ -1,33 +0,0 @@ -package core - -type Runtime struct { - Name string - Extension string -} - -var LanguageMatrix = map[string]*Runtime{ - "gnuc++11": { - Name: "GNU C++ 11", - Extension: "cpp", - }, - "gnuc++14": { - Name: "GNU C++ 14", - Extension: "cpp", - }, - "gnuc++17": { - Name: "GNU C++ 17", - Extension: "cpp", - }, - "gnuc++20": { - Name: "GNU C++ 20", - Extension: "cpp", - }, - "python3": { - Name: "Python 3", - Extension: "py", - }, - "go": { - Name: "Go", - Extension: "go", - }, -} diff --git a/cron/jobs/purge_submissions.go b/cron/jobs/purge_submissions.go index b3c8183..a6bbc64 100644 --- a/cron/jobs/purge_submissions.go +++ b/cron/jobs/purge_submissions.go @@ -11,18 +11,18 @@ import ( func PurgeSubmissions(ctx context.Context) { var sub []contest.Submission if e := db.Database.NewSelect().Model(&sub).Column("id").Where("result IS ? AND submitted_at < NOW() - INTERVAL '30 MINUTE'", nil).Scan(ctx); e != nil { - logger.Blizzard.Error().Err(e).Msg("could not query for staled submissions") + logger.Blizzard.Error().Err(e).Msg("could not query for stale submissions") return } var toPurge []contest.Submission for i := range sub { - if !stores.Pending.IsPending(ctx, sub[i].ID) { + if !stores.Submissions.IsPending(ctx, sub[i].ID) { toPurge = append(toPurge, sub[i]) } } if len(toPurge) > 0 { if _, e := db.Database.NewDelete().Model(&toPurge).WherePK().Returning("NULL").Exec(ctx); e != nil { - logger.Blizzard.Error().Err(e).Msg("could not purge staled submissions") + logger.Blizzard.Error().Err(e).Msg("could not purge stale submissions") return } } diff --git a/db/models/contest/problem.go b/db/models/contest/problem.go index 440492c..0cbae5c 100644 --- a/db/models/contest/problem.go +++ b/db/models/contest/problem.go @@ -15,7 +15,8 @@ type ( *ProblemContent `bun:"embed:"` Constraints *Constraints `bun:"embed:" json:"constraints,omitempty"` TestCount uint16 `bun:",notnull" json:"testCount,omitempty"` - PointPerTest uint16 `bun:",default:1" json:"pointPerTest,omitempty"` + PointsPerTest float32 `bun:",default:1" json:"pointPerTest,omitempty"` + Submissions []Submission `bun:"rel:has-many,join:id=problem_id" json:"submissions,omitempty"` } ProblemContent struct { diff --git a/db/models/contest/submission.go b/db/models/contest/submission.go index 887967c..68e487a 100644 --- a/db/models/contest/submission.go +++ b/db/models/contest/submission.go @@ -11,16 +11,18 @@ type ( ID uint32 `bun:",pk,autoincrement" json:"id" json:"-"` AuthorID uuid.UUID `bun:",type:uuid" json:"-"` // file extension of the source code, we're using extension instead of full path because source code file name pattern is static except for the extension - Extension string `bun:",notnull"` + Extension string `bun:",notnull" json:"extension"` ProblemID string `json:"-"` + Problem *Problem `bun:"rel:belongs-to,join:problem_id=id" json:"problem,omitempty"` Language string `json:"language"` SubmittedAt *time.Time `bun:",nullzero,notnull,default:'now()'" json:"submittedAt"` Result *Result `json:"result"` - Author *user.User `json:"author,omitempty"` + Author *user.User `bun:"rel:belongs-to,join:author_id=id" json:"author,omitempty"` } Verdict string CaseResult struct { + ID uint16 `json:"id"` Message string `json:"message,omitempty"` Verdict Verdict `json:"verdict"` Memory uint32 `json:"memory"` @@ -29,7 +31,8 @@ type ( FinalResult struct { CompilerOutput string `json:"compilerOutput"` Verdict Verdict `json:"verdict"` - Point uint32 `json:"point"` + Points float32 `json:"points"` + MaxPoints float32 `json:"maxPoints"` } Result struct { Cases []CaseResult `json:"cases,omitempty"` diff --git a/db/models/user/user.go b/db/models/user/user.go index b2b2707..9d57cc6 100644 --- a/db/models/user/user.go +++ b/db/models/user/user.go @@ -16,21 +16,21 @@ type ( Rating uint16 `json:"rating"` } User struct { - ID uuid.UUID `bun:",pk,unique,type:uuid,default:gen_random_uuid()" json:"id,omitempty"` - DisplayName string `json:"displayName,omitempty"` - Handle string `bun:",notnull,unique" json:"handle,omitempty"` - Email string `bun:",notnull,unique" json:"-"` - EmailVerified bool `bun:",default:false" json:"emailVerified,omitempty"` - Avatar string `bun:"-" json:"avatar,omitempty"` - Password string `bun:",notnull" json:"-"` - Organization string `json:"organization,omitempty"` - RegisteredAt *time.Time `bun:",nullzero,notnull,default:'now()'" json:"registeredAt,omitempty"` - ApiKey string `json:"-"` - Connections []OAuthConnection `bun:"rel:has-many,join:id=user_id" json:"connections,omitempty"` - Roles []Role `bun:"m2m:user_to_roles,join:User=Role" json:"roles,omitempty"` - DeletedAt *time.Time `bun:",soft_delete,nullzero" json:"deletedAt,omitempty"` - Rating uint16 `bun:",default:0" json:"rating"` - LastUsernameChange *time.Time `bun:",nullzero" json:"lastUsernameChange,omitempty"` + ID uuid.UUID `bun:",pk,unique,type:uuid,default:gen_random_uuid()" json:"id"` + DisplayName string `json:"displayName"` + Handle string `bun:",notnull,unique" json:"handle"` + Email string `bun:",notnull,unique" json:"-"` + EmailVerified bool `bun:",default:false" json:"emailVerified,omitempty"` + Avatar string `bun:"-" json:"avatar"` + Password string `bun:",notnull" json:"-"` + Organization string `json:"organization"` + RegisteredAt *time.Time `bun:",nullzero,notnull,default:'now()'" json:"registeredAt,omitempty"` + ApiKey string `json:"-"` + Connections []OAuthConnection `bun:"rel:has-many,join:id=user_id" json:"connections"` + Roles []Role `bun:"m2m:user_to_roles,join:User=Role" json:"roles"` + TopRole *Role `bun:"-" json:"topRole"` + DeletedAt *time.Time `bun:",soft_delete,nullzero" json:"deletedAt,omitempty"` + Rating uint16 `bun:",default:0" json:"rating"` } UserToRole struct { diff --git a/go.mod b/go.mod index edd895b..17d5a95 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( aidanwoods.dev/go-paseto v1.5.0 + github.com/Jeffail/tunny v0.1.4 github.com/go-co-op/gocron v1.35.0 github.com/go-playground/validator/v10 v10.15.4 github.com/google/uuid v1.3.1 @@ -11,6 +12,7 @@ require ( github.com/labstack/echo/v4 v4.11.1 github.com/matthewhartstonge/argon2 v0.3.4 github.com/mitchellh/mapstructure v1.5.0 + github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/rabbitmq/amqp091-go v1.8.1 github.com/rabbitmq/rabbitmq-stream-go-client v1.2.1 github.com/ravener/discord-oauth2 v0.0.0-20230514095040-ae65713199b3 @@ -29,7 +31,6 @@ require ( require ( aidanwoods.dev/go-result v0.1.0 // indirect - github.com/Jeffail/tunny v0.1.4 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect diff --git a/go.sum b/go.sum index 6338fe1..4abd4ed 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,7 @@ github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/judge/judge.go b/judge/judge.go index 2e0e040..8147b1b 100644 --- a/judge/judge.go +++ b/judge/judge.go @@ -6,26 +6,34 @@ import ( type ( Submission struct { - ID uint32 - SourcePath string - Language string - ProblemID string - TestCount uint16 - Constraints *contest.Constraints + ID uint32 + SourcePath string + Language string + ProblemID string + TestCount uint16 + PointsPerTest float32 + Constraints contest.Constraints } ) func resolveFinalResult(f FinalResult) *contest.FinalResult { - fres := &contest.FinalResult{ + fr := &contest.FinalResult{ CompilerOutput: f.CompilerOutput, Verdict: contest.None, + Points: f.Points, + MaxPoints: f.MaxPoints, } if f.Verdict == ShortCircuit || f.Verdict == Normal { var v contest.Verdict = contest.Accepted if f.LastNonACVerdict != contest.None { v = f.LastNonACVerdict } - fres.Verdict = v + if f.Points > 0 && v != contest.Accepted { + v = contest.PartiallyAccepted + } else if v == contest.Accepted { + fr.Points = f.MaxPoints + } + fr.Verdict = v } else { v := contest.None switch f.Verdict { @@ -38,7 +46,7 @@ func resolveFinalResult(f FinalResult) *contest.FinalResult { case CompileError: v = contest.CompilerError } - fres.Verdict = v + fr.Verdict = v } - return fres + return fr } diff --git a/judge/result.go b/judge/result.go index ce6f1c2..ca901f4 100644 --- a/judge/result.go +++ b/judge/result.go @@ -5,7 +5,11 @@ import "github.com/ArcticOJ/blizzard/v0/db/models/contest" type ( CaseVerdict int8 FinalVerdict int8 - CaseResult struct { + Announcement struct { + Type string `json:"type"` + ID uint16 `json:"id,omitempty"` + } + CaseResult struct { Duration float32 Memory uint32 Message string @@ -14,6 +18,8 @@ type ( FinalResult struct { CompilerOutput string Verdict FinalVerdict + Points float32 + MaxPoints float32 LastNonACVerdict contest.Verdict } ) diff --git a/judge/worker.go b/judge/worker.go index dab00c7..e62a4b5 100644 --- a/judge/worker.go +++ b/judge/worker.go @@ -13,6 +13,7 @@ import ( "github.com/ArcticOJ/blizzard/v0/utils" "github.com/Jeffail/tunny" "github.com/mitchellh/mapstructure" + cmap "github.com/orcaman/concurrent-map/v2" amqp "github.com/rabbitmq/amqp091-go" amqp2 "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" @@ -28,23 +29,16 @@ import ( "time" ) -var ResponseWorker *Worker +var Worker *worker type ( - Worker struct { + worker struct { c *http.Client s *semaphore.Weighted ctx context.Context // subscribers - sm map[uint32][]chan<- interface{} - // sub channel - sc chan Subscription - // unsub channel - usc chan Subscription - // pub channel - pc chan result - // destroy channel - dc chan uint32 + // use nested hashmap for result chan for easier removal + sm cmap.ConcurrentMap[uint32, []chan interface{}] // result message queue errChan <-chan *amqp.Error returnChan <-chan amqp.Return @@ -55,17 +49,6 @@ type ( pool *tunny.Pool } - Subscription struct { - id uint32 - c chan<- interface{} - } - - result struct { - id uint32 - ttl uint16 - data interface{} - } - rmqApiResponse struct { Destination string `json:"destination"` Arguments map[string]interface{} `json:"arguments"` @@ -81,20 +64,23 @@ type ( id uint32 name string } + + result struct { + contest.FinalResult + Cases []contest.CaseResult `json:"cases"` + } ) -func NewWorker(ctx context.Context) (w *Worker) { - w = &Worker{ +func NewWorker(ctx context.Context) (w *worker) { + w = &worker{ c: &http.Client{ Timeout: time.Second, }, - s: semaphore.NewWeighted(8), - ctx: ctx, - sm: make(map[uint32][]chan<- interface{}), - sc: make(chan Subscription, 1), - pc: make(chan result, 1), - usc: make(chan Subscription, 1), - dc: make(chan uint32, 1), + s: semaphore.NewWeighted(int64(runtime.NumCPU())), + ctx: ctx, + sm: cmap.NewWithCustomShardingFunction[uint32, []chan interface{}](func(key uint32) uint32 { + return key + }), errChan: make(<-chan *amqp.Error, 1), returnChan: make(<-chan amqp.Return, 1), } @@ -107,7 +93,7 @@ func NewWorker(ctx context.Context) (w *Worker) { return } -func (w *Worker) Connect() { +func (w *worker) Connect() { var e error if w.mqConn != nil { w.mqConn.Close() @@ -126,23 +112,24 @@ func (w *Worker) Connect() { }), "failed to declare exchange for submissions") logger.Panic(w.mqChan.Qos(100, 0, false), "failed to set qos") logger.Panic(w.mqChan.ExchangeDeclare("results", "direct", true, false, false, false, nil), "could not declare exchange for results") + //logger.Panic(w.mqChan.Confirm(false), "failed to put channel to confirm mode") w.errChan = w.mqConn.NotifyClose(make(chan *amqp.Error, 1)) w.returnChan = w.mqChan.NotifyReturn(make(chan amqp.Return, 1)) w.RecoverResults() } -func (w *Worker) RecoverResults() { +func (w *worker) RecoverResults() { conf := config.Config.RabbitMQ req, _ := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/api/exchanges/%s/results/bindings/source", conf.Host, conf.ManagerPort, url.PathEscape(conf.VHost)), nil) req.SetBasicAuth(conf.Username, conf.Password) - if res, e := w.c.Do(req); e == nil { - var r []rmqApiResponse - if json.NewDecoder(res.Body).Decode(&r) != nil { + if r, e := w.c.Do(req); e == nil { + var mqr []rmqApiResponse + if json.NewDecoder(r.Body).Decode(&mqr) != nil { return } - for _, _r := range r { + for _, _r := range mqr { _id, ok := _r.Arguments["x-id"] - if !ok || !stores.Pending.IsPending(w.ctx, uint32(_id.(float64))) { + if !ok || !stores.Submissions.IsPending(w.ctx, uint32(_id.(float64))) { w.mqChan.QueueDelete(_r.Destination, true, false, true) } else { if _e := w.Consume(uint32(_id.(float64)), _r.Destination); e != nil { @@ -158,7 +145,7 @@ func (w *Worker) RecoverResults() { // TODO: implement auto reconnect -func (w *Worker) CreateStream() { +func (w *worker) CreateStream() { var e error conf := config.Config.RabbitMQ w.env, e = stream.NewEnvironment( @@ -173,16 +160,16 @@ func (w *Worker) CreateStream() { } } -func (w *Worker) commitToDb(id uint32, res *contest.FinalResult) { - db.Database.RunInTx(w.ctx, nil, func(ctx context.Context, tx bun.Tx) error { +func (w *worker) commitToDb(id uint32, res *result) { + logger.Panic(db.Database.RunInTx(w.ctx, nil, func(ctx context.Context, tx bun.Tx) error { if _, e := tx.NewUpdate().Model((*contest.Submission)(nil)).Where("id = ?", id).Set("result = ?", res).Returning("NULL").Exec(w.ctx); e != nil { return e } return nil - }) + }), "tx") } -func (w *Worker) Enqueue(sub *Submission, t time.Time) error { +func (w *worker) Enqueue(sub *Submission, t time.Time) error { var e error b, e := msgpack.Marshal(sub) if e != nil { @@ -218,23 +205,23 @@ func (w *Worker) Enqueue(sub *Submission, t time.Time) error { amqp.Publishing{ Timestamp: t, ReplyTo: name, - DeliveryMode: amqp.Persistent, + DeliveryMode: amqp.Transient, CorrelationId: strconv.FormatUint(uint64(sub.ID), 10), ContentType: "application/msgpack", Body: b, }) if e != nil { w.env.DeleteStream(name) + return e } - return stores.Pending.Set(w.ctx, sub.ID, 0, sub.TestCount, uint16(math.Ceil(float64(sub.Constraints.TimeLimit)))) - + return stores.Submissions.SetPending(w.ctx, sub.ID, 0, sub.TestCount, uint16(math.Ceil(float64(sub.Constraints.TimeLimit)))) } -func (w *Worker) Cancel(ctx context.Context, id uint32) error { - if !stores.Pending.IsPending(ctx, id) { +func (w *worker) Cancel(ctx context.Context, id uint32) error { + if !stores.Submissions.IsPending(ctx, id) { return errors.New("no submission with matching ID") } - if tag, ok := stores.Pending.Get(ctx, id); ok { + if tag, ok := stores.Submissions.GetPendingTag(ctx, id); ok { return w.mqChan.Reject(tag, false) } return errors.New("could not cancel specified submission") @@ -242,25 +229,37 @@ func (w *Worker) Cancel(ctx context.Context, id uint32) error { // TODO: figure out a way to avoid race condition as two judges may judge a submission concurrently, wasting resources. -func (w *Worker) consume(id uint32, name string) bool { +func (w *worker) consume(id uint32, name string) bool { lastNonAcVerdict := contest.None _, e := w.env.NewConsumer(name, func(ctx stream.ConsumerContext, msg *amqp2.Message) { var r res if msgpack.Unmarshal(msg.Data[0], &r) != nil { return } - fmt.Println(r.Headers["from"]) - if r.Headers["type"] == "final" { + switch r.Headers["type"] { + case "final": ctx.Consumer.Close() var _r FinalResult if mapstructure.Decode(r.Body, &_r) != nil { return } _r.LastNonACVerdict = lastNonAcVerdict - w.publish(id, _r, 0) + w.publish(id, math.MaxUint16, _r) w.env.DeleteStream(name) - stores.Pending.Delete(w.ctx, id) - } else { + break + case "announcement": + if cid, ok := r.Body.(uint16); ok { + a := Announcement{ + Type: "compile", + ID: cid, + } + if cid > 0 { + a.Type = "case" + } + w.publish(id, cid, a) + } + break + default: var _r CaseResult if mapstructure.Decode(r.Body, &_r) != nil { return @@ -268,17 +267,19 @@ func (w *Worker) consume(id uint32, name string) bool { if _r.Verdict != Accepted { lastNonAcVerdict = resolveVerdict(_r.Verdict) } - var ttl uint16 = 0 - if _ttl, _ok := r.Headers["ttl"].(int32); _ok && _ttl > 0 { - ttl = uint16(_ttl) + var cid uint16 = math.MaxUint16 + if _cid, _ok := r.Headers["case-id"].(int32); _ok { + cid = uint16(_cid) } - w.publish(id, _r, ttl) + w.publish(id, cid, _r) + break } }, stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()).SetCRCCheck(false)) + logger.Panic(e, "test") return e == nil } -func (w *Worker) Consume(id uint32, name string) error { +func (w *worker) Consume(id uint32, name string) error { if e, ok := w.pool.Process(consumeParams{ id: id, name: name, @@ -288,7 +289,7 @@ func (w *Worker) Consume(id uint32, name string) error { return nil } -func (w *Worker) Reconnect() { +func (w *worker) Reconnect() { for { select { case <-w.errChan: @@ -298,97 +299,72 @@ func (w *Worker) Reconnect() { } } -func (w *Worker) Work() { +func (w *worker) Work() { w.CreateStream() w.Connect() - go w.Reconnect() - for { - select { - // on destroy - case id := <-w.dc: - stores.Pending.Delete(w.ctx, id) - a, ok := w.sm[id] - if !ok { - continue - } - for i := range a { - close(a[i]) - a[i] = nil - } - w.sm[id] = nil - delete(w.sm, id) - // on sub - case s := <-w.sc: - w.sm[s.id] = append(w.sm[s.id], s.c) - // on unsub - case u := <-w.usc: - m, ok := w.sm[u.id] - if !ok { - continue - } - m = utils.ArrayRemove(m, func(r chan<- interface{}) bool { - if r == u.c { - close(u.c) - u.c = nil - return true - } - return false - }) - w.sm[u.id] = m - // on pub - case msg := <-w.pc: - a, ok := w.sm[msg.id] - isFinal := false - if !ok { - continue - } - var d interface{} = nil - switch r := msg.data.(type) { - case CaseResult: - cr := contest.CaseResult{ - Message: r.Message, - Verdict: resolveVerdict(r.Verdict), - Memory: r.Memory, - Duration: r.Duration, - } - d = cr - case FinalResult: - fr := resolveFinalResult(r) - d = fr - w.commitToDb(msg.id, fr) - isFinal = true - } - if d != nil { - for i := range a { - select { - case a[i] <- d: - } - } - } - if isFinal { - w.DestroyObserver(msg.id) + w.Reconnect() +} + +func (w *worker) publish(id uint32, cid uint16, data interface{}) { + subscribers, _ := w.sm.Get(id) + var d interface{} = nil + switch r := data.(type) { + case CaseResult: + cr := contest.CaseResult{ + ID: cid, + Message: r.Message, + Verdict: resolveVerdict(r.Verdict), + Memory: r.Memory, + Duration: r.Duration, + } + stores.Submissions.UpdatePending(w.ctx, id, cr) + d = cr + break + case FinalResult: + fr := resolveFinalResult(r) + d = fr + w.commitToDb(id, &result{FinalResult: *fr, Cases: stores.Submissions.GetPendingResults(w.ctx, id)}) + defer w.DestroySubscribers(id) + break + default: + d = data + break + } + if d != nil && len(subscribers) > 0 { + for i := range subscribers { + select { + case subscribers[i] <- d: } } } } -func (w *Worker) publish(id uint32, data interface{}, ttl uint16) { - w.pc <- result{id: id, data: data, ttl: ttl} +func (w *worker) DestroySubscribers(id uint32) { + stores.Submissions.DeletePending(w.ctx, id) + a, ok := w.sm.Pop(id) + if !ok { + return + } + for i := range a { + close(a[i]) + a[i] = nil + } } -func (w *Worker) DestroyObserver(id uint32) { - w.dc <- id +func (w *worker) Subscribe(id uint32) chan interface{} { + c := make(chan interface{}, 1) + subscribers, _ := w.sm.Get(id) + w.sm.Set(id, append(subscribers, c)) + return c } -func (w *Worker) Observe(id uint32, c chan<- interface{}) (s Subscription) { - s = Subscription{ - id: id, - c: c, +func (w *worker) Unsubscribe(id uint32, c chan interface{}) { + subscribers, ok := w.sm.Get(id) + if !ok { + return } - w.sc <- s - return -} - -func (w *Worker) StopObserving(s Subscription) { - w.usc <- s + close(c) + w.sm.Set(id, utils.ArrayRemove(subscribers, func(rc chan interface{}) bool { + return rc == c + })) } diff --git a/rejson/constants.go b/rejson/constants.go index 26d8b12..b39a6c5 100644 --- a/rejson/constants.go +++ b/rejson/constants.go @@ -1,6 +1,6 @@ package rejson -type Command string +type Command = string const ( SET Command = "JSON.SET" diff --git a/rejson/handler.go b/rejson/handler.go index 57a6cd3..21d9f03 100644 --- a/rejson/handler.go +++ b/rejson/handler.go @@ -19,35 +19,35 @@ type ( } ) -func args(cmd Command, key string, v ...interface{}) []interface{} { - return append([]interface{}{string(cmd), key}, v...) +func args(cmd Command, key string, paths []interface{}, v ...interface{}) []interface{} { + return append(append([]interface{}{cmd, key}, paths...), v...) } -func (r *JsonResult) String() string { +func (r JsonResult) String() string { return string(r.data) } -func (r *JsonResult) Raw() json.RawMessage { +func (r JsonResult) Raw() json.RawMessage { return r.data } -func Unmarshal[T any](r *JsonResult) []T { - if r == nil { +func Unmarshal[T any](r JsonResult) *T { + if len(r.data) == 0 { return nil } - var obj []T + var obj T if json.Unmarshal(r.data, &obj) != nil { return nil } - return obj + return &obj } -func (r *ReJSON) JSONGet(ctx context.Context, key string, paths ...interface{}) *JsonResult { - s, e := r.Do(ctx, args(GET, key, paths...)...).Result() +func (r *ReJSON) JSONGet(ctx context.Context, key string, paths ...interface{}) JsonResult { + s, e := r.Do(ctx, args(GET, key, paths)...).Result() if e != nil || len(s.(string)) == 0 { - return nil + return JsonResult{} } - return &JsonResult{ + return JsonResult{ data: []byte(s.(string)), } } @@ -57,5 +57,34 @@ func (r *ReJSON) JSONSet(ctx context.Context, key string, path string, data inte if e != nil { return e } - return r.Do(ctx, args(SET, key, path, string(b))...).Err() + return r.Do(ctx, args(SET, key, []interface{}{path}, string(b))...).Err() +} + +func (r *ReJSON) JSONMGet(ctx context.Context, path string, keys ...interface{}) (res []JsonResult) { + s, e := r.Do(ctx, append([]interface{}{MGET}, append(keys, path)...)...).Result() + if e != nil { + return + } + _r, ok := s.([]interface{}) + if ok && len(_r) > 0 { + for _, _s := range _r { + if _s == nil { + res = append(res, JsonResult{}) + } else { + res = append(res, JsonResult{data: []byte(_s.(string))}) + } + } + } + return +} + +func (r *ReJSON) JTxPipelined(ctx context.Context, fn func(r *ReJSON) error) error { + _, e := r.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error { + return fn(&ReJSON{Client: pipeliner}) + }) + return e +} + +func (r *ReJSON) JTxPipeline(ctx context.Context) *ReJSON { + return &ReJSON{Client: r.TxPipeline()} } diff --git a/routes/auth/logout.go b/routes/auth/logout.go index 72306a8..295c6c9 100644 --- a/routes/auth/logout.go +++ b/routes/auth/logout.go @@ -5,7 +5,7 @@ import ( ) func Logout(ctx *http.Context) http.Response { - ctx.Set("user", nil) + ctx.Set("id", nil) ctx.DeleteCookie("session") return ctx.Success() } diff --git a/routes/problems/problem.dynamic.go b/routes/problems/problem.dynamic.go index f896374..8938c75 100644 --- a/routes/problems/problem.dynamic.go +++ b/routes/problems/problem.dynamic.go @@ -1,16 +1,14 @@ package problems import ( - "github.com/ArcticOJ/blizzard/v0/db" - "github.com/ArcticOJ/blizzard/v0/db/models/contest" + "github.com/ArcticOJ/blizzard/v0/cache/stores" "github.com/ArcticOJ/blizzard/v0/server/http" ) func Problem(ctx *http.Context) http.Response { id := ctx.Param("problem") - var prob contest.Problem - if db.Database.NewSelect().Model(&prob).Where("problem.id = ?", id).Scan(ctx.Request().Context()) != nil { - return ctx.NotFound("Problem not found.") + if p := stores.Problems.Get(ctx.Request().Context(), id); p != nil { + return ctx.Respond(p) } - return ctx.Respond(prob) + return ctx.NotFound("Problem not found.") } diff --git a/routes/problems/submit.go b/routes/problems/submit.go index 79f9eaf..4925ec7 100644 --- a/routes/problems/submit.go +++ b/routes/problems/submit.go @@ -4,16 +4,13 @@ package problems import ( "context" - "errors" "github.com/ArcticOJ/blizzard/v0/cache/stores" - "github.com/ArcticOJ/blizzard/v0/core" - "github.com/ArcticOJ/blizzard/v0/core/errs" "github.com/ArcticOJ/blizzard/v0/db" "github.com/ArcticOJ/blizzard/v0/db/models/contest" "github.com/ArcticOJ/blizzard/v0/judge" - "github.com/ArcticOJ/blizzard/v0/logger" "github.com/ArcticOJ/blizzard/v0/server/http" "github.com/ArcticOJ/blizzard/v0/storage" + "github.com/ArcticOJ/blizzard/v0/utils" "github.com/google/uuid" "path" "strings" @@ -21,29 +18,18 @@ import ( func prepare(id uint32, _path, language string, problem *contest.Problem) *judge.Submission { return &judge.Submission{ - ID: id, - Language: language, - SourcePath: path.Base(_path), - ProblemID: problem.ID, - TestCount: problem.TestCount, - Constraints: problem.Constraints, + ID: id, + Language: language, + SourcePath: path.Base(_path), + ProblemID: problem.ID, + TestCount: problem.TestCount, + PointsPerTest: problem.PointsPerTest, + Constraints: *problem.Constraints, } } -func getExt(language, fileName string) string { - ext := strings.ToLower(strings.TrimLeft(path.Ext(fileName), ".")) - if ext == "" { - if l, ok := core.LanguageMatrix[language]; ok { - return l.Extension - } - return "" - } - for _, l := range core.LanguageMatrix { - if l.Extension == ext { - return ext - } - } - return "" +func getExt(fileName string) string { + return strings.ToLower(strings.TrimLeft(path.Ext(fileName), ".")) } func createSubmission(ctx context.Context, userId uuid.UUID, problem, language string, ext string) (*contest.Submission, func() error, func() error) { @@ -64,24 +50,19 @@ func createSubmission(ctx context.Context, userId uuid.UUID, problem, language s return sub, tx.Rollback, tx.Commit } -// TODO: check availability of judges before judging - func Submit(ctx *http.Context) http.Response { if ctx.RequireAuth() { return nil } code, e := ctx.FormFile("code") + shouldStream := ctx.FormValue("stream") == "true" if e != nil { return ctx.Bad("No code.") } lang := ctx.FormValue("language") - logger.Blizzard.Debug().Str("lang", lang).Send() id := ctx.Param("problem") var problem contest.Problem - if _, ok := core.LanguageMatrix[lang]; !ok { - return ctx.Bad("Unsupported language!") - } - ext := getExt(lang, code.Filename) + ext := getExt(code.Filename) f, e := code.Open() if e != nil { return ctx.Bad("Could not open uploaded code!") @@ -92,7 +73,11 @@ func Submit(ctx *http.Context) http.Response { if db.Database.NewSelect().Model(&problem).Where("id = ?", id).Scan(ctx.Request().Context()) != nil { return ctx.NotFound("Problem not found.") } + if len(problem.Constraints.AllowedLanguages) > 0 && !utils.ArrayIncludes(problem.Constraints.AllowedLanguages, lang) { + return ctx.Bad("This language is not allowed by current problem.") + } // might not be accurate + // TODO: group judges by supported runtimes and do icmp pings on demand if !stores.Judge.IsRuntimeAllowed(ctx.Request().Context(), lang) { return ctx.InternalServerError("No judge server is available to handle this submission.") } @@ -102,27 +87,29 @@ func Submit(ctx *http.Context) http.Response { } p := storage.Submission.Create(dbSub.ID, ext) sub := prepare(dbSub.ID, p, lang, &problem) - if storage.Submission.Write(p, f) != nil { - return ctx.Bad("Could not write code to file!") + var res chan interface{} + if shouldStream { + res = judge.Worker.Subscribe(sub.ID) } - res := make(chan interface{}, 1) - s := judge.ResponseWorker.Observe(sub.ID, res) - if judge.ResponseWorker.Enqueue(sub, *dbSub.SubmittedAt) != nil { - judge.ResponseWorker.DestroyObserver(sub.ID) + if judge.Worker.Enqueue(sub, *dbSub.SubmittedAt) != nil || storage.Submission.Write(p, f) != nil { + judge.Worker.DestroySubscribers(sub.ID) rollback() - if errors.Is(e, errs.JudgeNotAvailable) { - return ctx.InternalServerError("No judge is available for this language.") + return ctx.InternalServerError("Failed to process your submission.") + } + if commit() != nil { + return ctx.InternalServerError("Could not save submission to database.") + } + if shouldStream && res != nil { + stream := ctx.StreamResponse() + go func() { + <-ctx.Request().Context().Done() + judge.Worker.Unsubscribe(sub.ID, res) + }() + for r := range res { + stream.Write(r) } - return ctx.InternalServerError("Failed to enqueue submission.") - } - commit() - stream := ctx.StreamResponse() - go func() { - <-ctx.Request().Context().Done() - judge.ResponseWorker.StopObserving(s) - }() - for r := range res { - stream.Write(r) - } - return ctx.Success() + return ctx.Success() + } else { + return ctx.Respond(dbSub.ID) + } } diff --git a/routes/submissions/cancel_submission.go b/routes/submissions/cancel_submission.go index f855a92..af02df3 100644 --- a/routes/submissions/cancel_submission.go +++ b/routes/submissions/cancel_submission.go @@ -14,5 +14,5 @@ func CancelSubmission(ctx *http.Context) http.Response { if e != nil { return ctx.Bad("Invalid ID.") } - return ctx.Respond(judge.ResponseWorker.Cancel(ctx.Request().Context(), uint32(id))) + return ctx.Respond(judge.Worker.Cancel(ctx.Request().Context(), uint32(id))) } diff --git a/routes/submissions/submission.dynamic.go b/routes/submissions/submission.dynamic.go index a2c6e7a..6076340 100644 --- a/routes/submissions/submission.dynamic.go +++ b/routes/submissions/submission.dynamic.go @@ -1,19 +1,29 @@ package submissions import ( - "github.com/ArcticOJ/blizzard/v0/cache/stores" + "github.com/ArcticOJ/blizzard/v0/db" + "github.com/ArcticOJ/blizzard/v0/db/models/contest" "github.com/ArcticOJ/blizzard/v0/server/http" - "strconv" + "github.com/uptrace/bun" ) func Submission(ctx *http.Context) http.Response { id := ctx.Param("submission") - _id, e := strconv.ParseUint(id, 10, 32) - if e != nil { - return ctx.Bad("Invalid ID.") + s := new(contest.Submission) + if db.Database.NewSelect().Model(s).Where("submission.id = ?", id).Relation("Problem", func(query *bun.SelectQuery) *bun.SelectQuery { + return query.Column("id", "title") + }).Relation("Author", func(query *bun.SelectQuery) *bun.SelectQuery { + return query.Column("handle", "id") + }).Scan(ctx.Request().Context()) != nil { + return ctx.NotFound("Submission not found.") } - if stores.Pending.IsPending(ctx.Request().Context(), uint32(_id)) { - - } - return ctx.Respond(id) + return ctx.Respond(s) + //_id, e := strconv.ParseUint(id, 10, 32) + //if e != nil { + // return ctx.Bad("Invalid ID.") + //} + //if stores.Submissions.IsPending(ctx.Request().Context(), uint32(_id)) { + // + //} + //return ctx.Respond(id) } diff --git a/routes/user/index.go b/routes/user/index.go index b0024b7..c80470e 100644 --- a/routes/user/index.go +++ b/routes/user/index.go @@ -1,6 +1,7 @@ package user import ( + "github.com/ArcticOJ/blizzard/v0/cache/stores" "github.com/ArcticOJ/blizzard/v0/server/http" ) @@ -8,7 +9,7 @@ func Index(ctx *http.Context) http.Response { if ctx.RequireAuth() { return nil } - if u := ctx.GetUser(); u != nil { + if u := stores.Users.GetMinimal(ctx.Request().Context(), ctx.GetUUID()); u != nil { return ctx.Respond(u) } return ctx.NotFound("User not found.") diff --git a/routes/user/readme.dynamic.go b/routes/user/readme.dynamic.go new file mode 100644 index 0000000..a00006b --- /dev/null +++ b/routes/user/readme.dynamic.go @@ -0,0 +1 @@ +package user diff --git a/routes/users/index.go b/routes/users/index.go index 59dd07c..6e1c2dc 100644 --- a/routes/users/index.go +++ b/routes/users/index.go @@ -1,18 +1,26 @@ package users import ( - "github.com/ArcticOJ/blizzard/v0/db" + "github.com/ArcticOJ/blizzard/v0/cache/stores" "github.com/ArcticOJ/blizzard/v0/db/models/user" "github.com/ArcticOJ/blizzard/v0/server/http" - "github.com/uptrace/bun" + "github.com/ArcticOJ/blizzard/v0/types" + "strconv" ) func Index(ctx *http.Context) http.Response { - var users []user.User - if db.Database.NewSelect().Model(&users).Column("id", "handle", "display_name", "rating").Relation("Roles", func(query *bun.SelectQuery) *bun.SelectQuery { - return query.Order("priority ASC").Column("icon", "color").Limit(1) - }).Limit(50).Order("rating DESC").Scan(ctx.Request().Context()) != nil { + page := ctx.QueryParam("page") + var p uint16 = 1 + if _p, e := strconv.ParseUint(page, 10, 16); e == nil && _p > 0 { + p = uint16(_p) + } + var users []user.MinimalUser + if users = stores.Users.GetPage(ctx.Request().Context(), p-1, ctx.QueryParam("reversed") == "true"); users == nil { return ctx.InternalServerError("Could not fetch users.") } - return ctx.Respond(users) + return ctx.Respond(types.Paginateable[user.MinimalUser]{ + CurrentPage: p, + PageSize: stores.DefaultUserPageSize, + Data: users, + }) } diff --git a/server/http/context.go b/server/http/context.go index 4673c22..a597c5b 100644 --- a/server/http/context.go +++ b/server/http/context.go @@ -1,7 +1,6 @@ package http import ( - "github.com/ArcticOJ/blizzard/v0/db/models/user" "github.com/ArcticOJ/blizzard/v0/server/session" "github.com/google/uuid" "github.com/labstack/echo/v4" @@ -82,17 +81,6 @@ func (ctx Context) GetUUID() uuid.UUID { } } -func (ctx Context) GetUser() *user.MinimalUser { - _u := ctx.Get("user") - if _u == nil { - return nil - } - if u, ok := _u.(*user.MinimalUser); ok { - return u - } - return nil -} - func (ctx Context) PutCookie(name string, value string, exp time.Time, sessionOnly bool) { cookie := new(http.Cookie) cookie.Name = name @@ -103,7 +91,7 @@ func (ctx Context) PutCookie(name string, value string, exp time.Time, sessionOn cookie.SameSite = http.SameSiteLaxMode cookie.Path = "/" cookie.HttpOnly = true - cookie.Secure = true + //cookie.Secure = true ctx.SetCookie(cookie) } diff --git a/server/middlewares/auth.go b/server/middlewares/auth.go index f22f546..25d34a1 100644 --- a/server/middlewares/auth.go +++ b/server/middlewares/auth.go @@ -2,48 +2,36 @@ package middlewares import ( "github.com/ArcticOJ/blizzard/v0/cache/stores" - "github.com/ArcticOJ/blizzard/v0/db" - "github.com/ArcticOJ/blizzard/v0/db/models/user" "github.com/ArcticOJ/blizzard/v0/server/http" "github.com/ArcticOJ/blizzard/v0/server/session" "github.com/google/uuid" "github.com/labstack/echo/v4" - "strings" ) -func invalidate(ctx *http.Context, next echo.HandlerFunc) error { - ctx.Set("user", nil) - ctx.DeleteCookie("session") - return next(ctx) -} - func Authentication() echo.MiddlewareFunc { return func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { - if authHeader := c.Request().Header.Get("Authorization"); strings.HasPrefix(authHeader, "Bearer") { - authToken := strings.TrimSpace(strings.TrimPrefix(authHeader, "Bearer")) - if len(authToken) > 0 { - var usr user.User - if e := db.Database.NewSelect().Model(&usr).Where("api_key = ?", authToken).Column("id").Scan(c.Request().Context()); e == nil { - c.Set("user", usr.ID) - return next(c) - } - } - } + //if authHeader := c.Request().Header.Get("Authorization"); strings.HasPrefix(authHeader, "Bearer") { + // authToken := strings.TrimSpace(strings.TrimPrefix(authHeader, "Bearer")) + // if len(authToken) > 0 { + // var usr user.User + // if e := db.Database.NewSelect().Model(&usr).Where("api_key = ?", authToken).Column("id").Scan(c.Request().Context()); e == nil { + // c.Set("user", usr.ID) + // return next(c) + // } + // } + //} ctx := &http.Context{ Context: c, } cookie, e := ctx.Cookie("session") - if e != nil || cookie.Value == "" { - return invalidate(ctx, next) + if e != nil || cookie == nil || cookie.Value == "" { + return next(c) } - if uid := session.Decrypt(cookie.Value); uid != uuid.Nil { - u := stores.Users.GetMinimal(c.Request().Context(), uid) - if u == nil { - return invalidate(ctx, next) - } + if uid := session.Decrypt(cookie.Value); uid != uuid.Nil && stores.Users.UserExists(ctx.Request().Context(), uid) { ctx.Set("id", uid) - ctx.Set("user", u) + } else { + ctx.DeleteCookie("session") } return next(c) } diff --git a/server/server.go b/server/server.go index 7f6ba22..ea5fd63 100644 --- a/server/server.go +++ b/server/server.go @@ -34,9 +34,9 @@ func Register(e *echo.Echo) { g.Use(middlewares.RateLimit()) } if config.Config.Debug { - g.Use(middleware.BodyDump(func(c echo.Context, req, res []byte) { - logger.Blizzard.Debug().Str("url", c.Request().RequestURI).Bytes("req", req).Bytes("res", res).Msg("body") - })) + //g.Use(middleware.BodyDump(func(c echo.Context, req, res []byte) { + // logger.Blizzard.Debug().Str("url", c.Request().RequestURI).Bytes("req", req).Bytes("res", res).Msg("body") + //})) g.Use(middleware.RequestLoggerWithConfig(middleware.RequestLoggerConfig{ LogURI: true, LogStatus: true, diff --git a/types/paginatable.go b/types/paginatable.go new file mode 100644 index 0000000..913c130 --- /dev/null +++ b/types/paginatable.go @@ -0,0 +1,7 @@ +package types + +type Paginateable[T any] struct { + CurrentPage uint16 `json:"currentPage"` + PageSize uint16 `json:"pageSize"` + Data []T `json:"data"` +}