Skip to content

Commit

Permalink
randomizer bugfixes, releases refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
MindHunter86 committed Oct 19, 2024
1 parent e895c16 commit 923695f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 69 deletions.
49 changes: 15 additions & 34 deletions internal/anilibria/randomizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ type Randomizer struct {
encoder *zstd.Encoder
decoder *zstd.Decoder

mu sync.RWMutex
releases *Releases

releasesold []string
rawreleases map[string][]byte
releaseBlocks map[string]*ReleaseBlockedInfo
}

func New(c context.Context) *Randomizer {
Expand Down Expand Up @@ -78,12 +73,8 @@ func New(c context.Context) *Randomizer {
encoder: enc,
decoder: dec,

releases: NewReleases(WithFetchTries(cli.Int("randomizer-random-fetch-tries"))),

releasesold: make([]string, 0),
rawreleases: make(map[string][]byte),
releaseBlocks: make(map[string]*ReleaseBlockedInfo),
releasesKey: cli.String("randomizer-releaseskey"),
releases: NewReleases(WithFetchTries(cli.Int("randomizer-random-fetch-tries"))),
releasesKey: cli.String("randomizer-releaseskey"),
}

return r
Expand All @@ -95,29 +86,28 @@ func (m *Randomizer) Bootstrap() {
}

func (m *Randomizer) Randomize(region string) (_ string, e error) {
// return m.randomRelease()
var release *Release
if release, e = m.releases.RandomRelease(""); e != nil {
if release, e = m.releases.RandomRelease(region); e != nil {
return
}

return release.Code, e
}

func (m *Randomizer) RawRelease(ident []byte) (release []byte, ok bool, e error) {
var rawrelease []byte
if rawrelease, ok = m.rawreleases[futils.UnsafeString(ident)]; !ok {
return
}
// func (m *Randomizer) RawRelease(ident []byte) (release []byte, ok bool, e error) {
// var rawrelease []byte
// if rawrelease, ok = m.rawreleases[futils.UnsafeString(ident)]; !ok {
// return
// }

// decompress chunk response from redis
if release, e = m.decompressPayload(rawrelease); e != nil {
m.log.Warn().Msg("an error occurred while decompress redis response - " + e.Error())
return
}
// // decompress chunk response from redis
// if release, e = m.decompressPayload(rawrelease); e != nil {
// m.log.Warn().Msg("an error occurred while decompress redis response - " + e.Error())
// return
// }

return
}
// return
// }

//
//
Expand Down Expand Up @@ -277,15 +267,6 @@ func (m *Randomizer) lookupReleases(releases map[string]*Release) (chunks, faile
return chunks, chunks - len(errs), banned, nil
}

func (m *Randomizer) rotateReleases(releases []string) {
m.mu.Lock()
defer m.mu.Unlock()

m.log.Debug().Msgf("update current %d releases with slice of %d releases",
len(m.releasesold), len(releases))
m.releasesold = releases
}

func (m *Randomizer) chunkFetchFromRedis(key string) (chunk []byte, e error) {
var compressed string

Expand Down
45 changes: 32 additions & 13 deletions internal/anilibria/releases.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package anilibria

import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
Expand All @@ -26,9 +27,9 @@ func WithFetchTries(tries int) ROption {
type Releases struct {
opts *options

mu sync.RWMutex
idxid map[string]*Release
idxcode map[string]*Release
mu sync.RWMutex
releases map[string]*Release
idxcode []string

// commitedmu sync.RWMutex
// commitedid map[string]*Release
Expand All @@ -37,8 +38,7 @@ type Releases struct {

func NewReleases(opts ...ROption) *Releases {
r := &Releases{
idxid: make(map[string]*Release),
idxcode: make(map[string]*Release),
releases: make(map[string]*Release),

opts: &options{
fetchtries: 10,
Expand All @@ -53,15 +53,30 @@ func NewReleases(opts ...ROption) *Releases {
}

func (m *Releases) Commit(releases map[string]*Release) {
length, _ := actionWithRLock[int](&m.mu, func() (lenth int, _ bool) {
lenth = len(m.releases)
return lenth, lenth != 0
})

actionWithLock(&m.mu, func() {
m.idxcode = releases
m.idxid = make(map[string]*Release)
m.releases = releases
m.idxcode = make([]string, 0, len(m.releases))

for _, release := range m.idxcode {
m.idxid[strconv.Itoa(int(release.Id))] = release
for _, release := range m.releases {
if release == nil {
fmt.Println("BUG: we caught en empty release in commit stage!")
continue
}

// build index ID:RELEASE for faster searching
m.releases[strconv.Itoa(int(release.Id))] = release

// store code in slice for further Random() requests
m.idxcode = append(m.idxcode, release.Code)
}
})

fmt.Printf("COMMITING: old map %d len, new map %d len\n", length, m.Len())
}

func (m *Releases) Len() (length int) {
Expand All @@ -79,13 +94,17 @@ func (m *Releases) RandomRelease(region string) (release *Release, e error) {

for try := 1; try <= m.opts.fetchtries; try++ {
release, ok = actionWithRLock(&m.mu, func() (*Release, bool) {
max := len(m.idxid)
var max int
if max = len(m.idxcode); max == 0 {
return nil, false
}

// skipcq: GSC-G404 math/rand is enoght here
return m.idxid[strconv.Itoa(rand.Intn(max))], max != 0
id := m.idxcode[rand.Intn(max)]
return m.releases[id], true
})

if !ok {
if !ok || release == nil {
return nil, errors.New("randomizer has not ready yet or unexpected error occurred")
}

Expand All @@ -105,7 +124,7 @@ func (m *Releases) RandomRelease(region string) (release *Release, e error) {

func (m *Releases) IsExists(code string) (ok bool) {
_, ok = actionWithRLock(&m.mu, func() (_ *Release, ok bool) {
_, ok = m.idxcode[code]
_, ok = m.releases[code]
return
})

Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (m *Proxy) HandleRandomRelease(c *fiber.Ctx) (e error) {
var release string
if release, e = m.randomizer.Randomize(country); e != nil {
return fiber.NewError(fiber.StatusServiceUnavailable,
"an error occurred in randomizer, ", e.Error())
"an error occurred in randomizer, "+e.Error())
} else if release == "" {
return fiber.NewError(fiber.StatusServiceUnavailable,
"an error occurred in randomizer, maybe it's not ready yet")
Expand Down
41 changes: 20 additions & 21 deletions internal/proxy/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package proxy

import (
"bytes"
"fmt"

"github.com/anilibria/alice/internal/utils"
"github.com/gofiber/fiber/v2"
Expand Down Expand Up @@ -80,26 +79,26 @@ func (m *Proxy) middlewareReleaseRequest(c *fiber.Ctx, v *Validator) (e error) {
return c.Next() // bypass randomizer module
}

var ok bool
var ident []byte
if ident, ok = v.Arg([]byte("code")); !ok {
if ident, ok = v.Arg([]byte("id")); !ok {
return c.Next() // bypass to origin
}
}

var release []byte
if release, ok, e = m.randomizer.RawRelease(ident); e != nil {
return fiber.NewError(fiber.StatusInternalServerError, e.Error())
} else if !ok {
return c.Next() // bypass to origin
}

if e = utils.RespondWithRawJSON(release, c); e != nil {
return fiber.NewError(fiber.StatusInternalServerError, e.Error())
}

fmt.Println("returned cached value")
// var ok bool
// var ident []byte
// if ident, ok = v.Arg([]byte("code")); !ok {
// if ident, ok = v.Arg([]byte("id")); !ok {
// return c.Next() // bypass to origin
// }
// }

// var release []byte
// if release, ok, e = m.randomizer.RawRelease(ident); e != nil {
// return fiber.NewError(fiber.StatusInternalServerError, e.Error())
// } else if !ok {
// return c.Next() // bypass to origin
// }

// if e = utils.RespondWithRawJSON(release, c); e != nil {
// return fiber.NewError(fiber.StatusInternalServerError, e.Error())
// }

// fmt.Println("returned cached value")

c.Response().Header.Set("X-Alice-Cache", "HIT")
return respondPlainWithStatus(c, fiber.StatusOK)
Expand Down

0 comments on commit 923695f

Please sign in to comment.