Skip to content

Commit

Permalink
✨ (caching): pretty "stable" caching mechanism for users
Browse files Browse the repository at this point in the history
  • Loading branch information
AlphaNecron committed Oct 18, 2023
1 parent 82f6521 commit 5a8d933
Show file tree
Hide file tree
Showing 32 changed files with 641 additions and 486 deletions.
4 changes: 2 additions & 2 deletions blizzard.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

func Init(ctx context.Context) {
judge.ResponseWorker = judge.NewWorker(ctx)
judge.Worker = judge.NewWorker(ctx)
cron.Start(ctx)
go judge.ResponseWorker.Work()
go judge.Worker.Work()
}

func Destroy() {
Expand Down
9 changes: 1 addition & 8 deletions cache/store.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,31 @@
package cache

import (
"context"
"fmt"
"github.com/ArcticOJ/blizzard/v0/config"
"github.com/ArcticOJ/blizzard/v0/logger"
"github.com/redis/go-redis/v9"
"net"
"time"
)

type (
DB uint8
)

const (
Result DB = iota + 1
User
User DB = iota + 1
Bucket
Submission
Judge
Problem
)

func CreateClient(db DB, name string) (c redis.UniversalClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
c = redis.NewClient(&redis.Options{
Addr: net.JoinHostPort(config.Config.Dragonfly.Host, fmt.Sprint(config.Config.Dragonfly.Port)),
DB: int(db),
})
if config.Config.Debug {
c.AddHook(DebugHook{Name: name})
}
logger.Panic(c.Ping(ctx).Err(), "failed to initialize redis client for %s cache", name)
return
}
8 changes: 4 additions & 4 deletions cache/stores/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ import (
"math"
)

var RateLimiter *RateLimitStore
var RateLimiter *rateLimitStore

const defaultBucketKey = "blizzard::bucket[%s]"

type RateLimitStore struct {
type rateLimitStore struct {
c redis.UniversalClient
}

func init() {
RateLimiter = &RateLimitStore{cache.CreateClient(cache.Bucket, "buckets")}
RateLimiter = &rateLimitStore{cache.CreateClient(cache.Bucket, "buckets")}
}

func (s *RateLimitStore) Limit(ctx context.Context, ip string) (allowed bool, totalLimit, remaining, retryAfter, nextReset int64) {
func (s *rateLimitStore) Limit(ctx context.Context, ip string) (allowed bool, totalLimit, remaining, retryAfter, nextReset int64) {
v, e := s.c.Do(ctx, "CL.THROTTLE", fmt.Sprintf(defaultBucketKey, ip), uint16(math.Max(math.Ceil(float64(config.Config.RateLimit)/2), 1)), config.Config.RateLimit, 30, 1).Int64Slice()
if e != nil || len(v) != 5 {
logger.Blizzard.Err(e).Msgf("failed to process rate limit for '%s'", ip)
Expand Down
17 changes: 7 additions & 10 deletions cache/stores/judge.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"
)

var Judge *JudgeStore
var Judge *judgeStatusStore

type (
JudgeStatus struct {
Expand All @@ -19,17 +19,14 @@ type (
Runtimes []JudgeRuntime `json:"runtimes"`
}

JudgeInfo struct {
}

JudgeRuntime struct {
ID string `json:"id"`
Compiler string `json:"compiler"`
Arguments string `json:"arguments"`
Version string `json:"version"`
}

JudgeStore struct {
judgeStatusStore struct {
c redis.UniversalClient
}
)
Expand All @@ -41,10 +38,10 @@ const (
)

func init() {
Judge = &JudgeStore{c: cache.CreateClient(cache.Judge, "judge")}
Judge = &judgeStatusStore{c: cache.CreateClient(cache.Judge, "judge")}
}

func (s *JudgeStore) UpdateJudgeStatus(ctx context.Context, judgeList []interface{}, status string, allowedRuntimes []interface{}) {
func (s *judgeStatusStore) UpdateJudgeStatus(ctx context.Context, judgeList []interface{}, status string, allowedRuntimes []interface{}) {
s.c.TxPipelined(ctx, func(p redis.Pipeliner) error {
s.c.Set(ctx, defaultJudgeStatusKey, status, time.Hour*24)
p.Del(ctx, defaultAllowedRuntimesKey).Err()
Expand All @@ -54,20 +51,20 @@ func (s *JudgeStore) UpdateJudgeStatus(ctx context.Context, judgeList []interfac
})
}

func (s *JudgeStore) IsRuntimeAllowed(ctx context.Context, runtime string) bool {
func (s *judgeStatusStore) IsRuntimeAllowed(ctx context.Context, runtime string) bool {
v, e := s.c.SIsMember(ctx, defaultAllowedRuntimesKey, runtime).Result()
return v && e == nil
}

func (s *JudgeStore) GetJudgeList(ctx context.Context) []string {
func (s *judgeStatusStore) GetJudgeList(ctx context.Context) []string {
v, e := s.c.SMembers(ctx, defaultJudgeListKey).Result()
if e != nil {
return nil
}
return v
}

func (s *JudgeStore) GetJudgeStatus(ctx context.Context) []byte {
func (s *judgeStatusStore) GetJudgeStatus(ctx context.Context) []byte {
status, e := s.c.Get(ctx, defaultJudgeStatusKey).Result()
if e != nil {
return []byte("null")
Expand Down
52 changes: 0 additions & 52 deletions cache/stores/pending.go

This file was deleted.

33 changes: 29 additions & 4 deletions cache/stores/problems.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package stores

import (
"context"
"fmt"
"github.com/ArcticOJ/blizzard/v0/cache"
"github.com/ArcticOJ/blizzard/v0/db"
"github.com/ArcticOJ/blizzard/v0/db/models/contest"
"github.com/ArcticOJ/blizzard/v0/rejson"
"time"
)

var Problems *ProblemStore
var Problems *problemStore

type ProblemStore struct {
type problemStore struct {
j *rejson.ReJSON
}

Expand All @@ -16,9 +21,29 @@ const (
)

func init() {
Problems = &ProblemStore{j: &rejson.ReJSON{Client: cache.CreateClient(cache.Problem, "problems")}}
Problems = &problemStore{j: &rejson.ReJSON{Client: cache.CreateClient(cache.Problem, "problems")}}
}

func (*ProblemStore) Get() {
func (s *problemStore) fallback(ctx context.Context, id string) *contest.Problem {
prob := new(contest.Problem)
if db.Database.NewSelect().Model(prob).Where("id = ?", id).Scan(ctx) != nil {
return nil
}
// TODO: use global context to avoid interrupted operations
s.j.JTxPipelined(ctx, func(r *rejson.ReJSON) error {
k := fmt.Sprintf(defaultProblemKey, id)
if e := r.JSONSet(ctx, k, "$", prob); e != nil {
return e
}
return r.Expire(ctx, k, time.Hour*12).Err()
})
return prob
}

func (s *problemStore) Get(ctx context.Context, id string) *contest.Problem {
p := s.j.JSONGet(ctx, fmt.Sprintf(defaultProblemKey, id))
//if p == nil {
// return s.fallback(ctx, id)
//}
return rejson.Unmarshal[contest.Problem](p)
}
76 changes: 76 additions & 0 deletions cache/stores/submissions.go
Original file line number Diff line number Diff line change
@@ -1 +1,77 @@
package stores

import (
"context"
"fmt"
"github.com/ArcticOJ/blizzard/v0/cache"
"github.com/ArcticOJ/blizzard/v0/db/models/contest"
"github.com/ArcticOJ/blizzard/v0/rejson"
"github.com/ArcticOJ/blizzard/v0/utils"
"sync"
"time"
)

var Submissions *submissionStore

type submissionStore struct {
j *rejson.ReJSON
l sync.RWMutex
}

const (
defaultPendingSubmissionKey = "blizzard::pending_submission[%d]"
// 30 minutes
defaultExtraTtl = 30 * 60
)

func init() {
Submissions = &submissionStore{j: &rejson.ReJSON{Client: cache.CreateClient(cache.Submission, "submissions")}}
}

func (s *submissionStore) SetPending(ctx context.Context, id uint32, tag uint64, count, ttl uint16) error {
s.l.Lock()
defer s.l.Unlock()
return s.j.JTxPipelined(ctx, func(r *rejson.ReJSON) error {
k := fmt.Sprintf(defaultPendingSubmissionKey, id)
if e := r.JSONSet(ctx, k, "$", map[string]interface{}{
"tag": tag,
"cases": utils.ArrayFill[interface{}](nil, int(count)),
}); e != nil {
return e
}
return r.Expire(ctx, k, time.Duration(defaultExtraTtl+count*(ttl+2))*time.Second).Err()
})
}

func (s *submissionStore) UpdatePending(ctx context.Context, id uint32, result contest.CaseResult) error {
return s.j.JSONSet(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id), fmt.Sprintf("$..cases[%d]", result.ID-1), result)
}

func (s *submissionStore) IsPending(ctx context.Context, id uint32) bool {
s.l.RLock()
defer s.l.RUnlock()
ok, e := s.j.Exists(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id)).Result()
return ok == 1 && e == nil
}

func (s *submissionStore) GetPendingResults(ctx context.Context, id uint32) []contest.CaseResult {
r := s.j.JSONGet(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id), "$.cases")
if _r := rejson.Unmarshal[[][]contest.CaseResult](r); _r != nil && len(*_r) > 0 {
return (*_r)[0]
}
return nil
}

func (s *submissionStore) GetPendingTag(ctx context.Context, id uint32) (uint64, bool) {
r := s.j.JSONGet(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id), "$")
if t := rejson.Unmarshal[[]uint64](r); t != nil && len(*t) > 0 {
return (*t)[0], true
}
return 0, false
}

func (s *submissionStore) DeletePending(ctx context.Context, id uint32) {
s.l.Lock()
defer s.l.Unlock()
s.j.Del(ctx, fmt.Sprintf(defaultPendingSubmissionKey, id))
}
Loading

0 comments on commit 5a8d933

Please sign in to comment.