diff --git a/cmd/alice/flags.go b/cmd/alice/flags.go index ac4c190..b4dab5e 100644 --- a/cmd/alice/flags.go +++ b/cmd/alice/flags.go @@ -319,6 +319,11 @@ func flagsInitialization(expertMode bool) []cli.Flag { Category: "Release randomizer", Value: 0, }, + &cli.BoolFlag{ + Name: "randomizer-redis-zstd-enable", + Category: "Release randomizer", + Usage: "enable redis payload decompression with zstd algo", + }, &cli.StringFlag{ Name: "randomizer-releaseskey", Category: "Release randomizer", diff --git a/internal/anilibria/randomizer.go b/internal/anilibria/randomizer.go index 331e74a..fc52759 100644 --- a/internal/anilibria/randomizer.go +++ b/internal/anilibria/randomizer.go @@ -12,6 +12,7 @@ import ( "github.com/anilibria/alice/internal/utils" futils "github.com/gofiber/fiber/v2/utils" + "github.com/klauspost/compress/zstd" "github.com/redis/go-redis/v9" "github.com/rs/zerolog" "github.com/urfave/cli/v2" @@ -30,6 +31,8 @@ type Randomizer struct { relUpdFreqErr time.Duration relUpdFreqBoot time.Duration + decoder *zstd.Decoder + mu sync.RWMutex releases []string } @@ -37,6 +40,11 @@ type Randomizer struct { func New(c context.Context) *Randomizer { cli := c.Value(utils.CKCliCtx).(*cli.Context) + var dec *zstd.Decoder + if cli.Bool("randomizer-redis-zstd-enable") { + dec, _ = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) + } + r := &Randomizer{ done: c.Done, log: c.Value(utils.CKLogger).(*zerolog.Logger), @@ -60,6 +68,8 @@ func New(c context.Context) *Randomizer { relUpdFreqErr: cli.Duration("randomizer-update-frequency-onerror"), relUpdFreqBoot: cli.Duration("randomizer-update-frequency-bootstrap"), + decoder: dec, + releases: make([]string, 0), releasesKey: cli.String("randomizer-releaseskey"), } @@ -125,10 +135,16 @@ func (m *Randomizer) peekReleaseKeyChunks() (_ int, e error) { return } - return strconv.Atoi(res) + var dres []byte + if dres, e = m.decompressPayload(futils.UnsafeBytes(res)); e != nil { + m.log.Warn().Msg("an error occurred while decompress response redis response - " + e.Error()) + return + } + + return strconv.Atoi(futils.UnsafeString(dres)) } -func (m *Randomizer) lookupReleases() (_ []string, e error) { +func (m *Randomizer) lookupReleases() (_ []string, e error) { // skipcq: GO-R1005 needed to be kept as it is var chunks int if chunks, e = m.peekReleaseKeyChunks(); e != nil { return @@ -156,6 +172,7 @@ func (m *Randomizer) lookupReleases() (_ []string, e error) { m.log.Trace().Msgf("parsing chunk %d/%d...", i, chunks) } + // get compressed chunk response from redis if res, e = m.rclient.Get(m.rctx, m.releasesKey+strconv.Itoa(i)).Result(); e == redis.Nil { e = fmt.Errorf("given chunk number %d is not exists", i) m.log.Warn().Msg(e.Error()) @@ -167,13 +184,23 @@ func (m *Randomizer) lookupReleases() (_ []string, e error) { continue } + // decompress chunk response from redis + var dres []byte + if dres, e = m.decompressPayload(futils.UnsafeBytes(res)); e != nil { + m.log.Warn().Msg("an error occurred while decompress redis response - " + e.Error()) + errs = append(errs, e.Error()) + continue + } + + // get json formated response from decompressed response var releasesChunk Releases - if e = json.Unmarshal(futils.UnsafeBytes(res), &releasesChunk); e != nil { + if e = json.Unmarshal(dres, &releasesChunk); e != nil { m.log.Warn().Msg("an error occurred while unmarshal release chunk - " + e.Error()) errs = append(errs, e.Error()) continue } + // parse json chunk response for _, release := range releasesChunk { if release.BlockedInfo != nil && release.BlockedInfo.IsBlockedByCopyrights { m.log.Debug().Msgf("release %d (%s) worldwide banned, skip it...", release.Id, release.Code) @@ -229,3 +256,11 @@ func (m *Randomizer) randomRelease() (_ string) { r := rand.Intn(len(m.releases)) // skipcq: GSC-G404 math/rand is enoght here return m.releases[r] } + +func (m *Randomizer) decompressPayload(payload []byte) ([]byte, error) { + if m.decoder == nil { + return payload, nil + } + + return m.decoder.DecodeAll(payload, nil) +}