Skip to content

Commit

Permalink
feat: piped redis broker and results
Browse files Browse the repository at this point in the history
kalbhor committed Mar 14, 2024
1 parent 5ba7a6b commit fe77e86
Showing 3 changed files with 387 additions and 9 deletions.
18 changes: 9 additions & 9 deletions brokers/redis/broker.go
Original file line number Diff line number Diff line change
@@ -29,18 +29,19 @@ type Options struct {
}

type Broker struct {
log *slog.Logger
conn redis.UniversalClient
pollPeriod time.Duration
log *slog.Logger
opts Options

conn redis.UniversalClient
}

func New(o Options, lo *slog.Logger) *Broker {
pollPeriod := o.PollPeriod
if o.PollPeriod == 0 {
pollPeriod = DefaultPollPeriod
o.PollPeriod = DefaultPollPeriod
}
return &Broker{
log: lo,
opts: o,
log: lo,
conn: redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: o.Addrs,
DB: o.DB,
@@ -51,7 +52,6 @@ func New(o Options, lo *slog.Logger) *Broker {
MinIdleConns: o.MinIdleConns,
IdleTimeout: o.IdleTimeout,
}),
pollPeriod: pollPeriod,
}
}

@@ -87,7 +87,7 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
return
default:
b.log.Debug("receiving from consumer..")
res, err := b.conn.BLPop(ctx, b.pollPeriod, queue).Result()
res, err := b.conn.BLPop(ctx, b.opts.PollPeriod, queue).Result()
if err != nil && err.Error() != "redis: nil" {
b.log.Error("error consuming from redis queue", "error", err)
} else if errors.Is(err, redis.Nil) {
@@ -105,7 +105,7 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
}

func (b *Broker) consumeScheduled(ctx context.Context, queue string) {
poll := time.NewTicker(b.pollPeriod)
poll := time.NewTicker(b.opts.PollPeriod)

for {
select {
176 changes: 176 additions & 0 deletions brokers/redis/broker_piped.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package redis

import (
"context"
"errors"
"fmt"
"log/slog"
"strconv"
"time"

"github.com/go-redis/redis/v8"
)

const (
DefaultPipePeriod = 200 * time.Millisecond
)

type PipeBroker struct {
log *slog.Logger
opts PipedOptions

conn redis.UniversalClient
pipe redis.Pipeliner
}

type PipedOptions struct {
Addrs []string
Password string
DB int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
MinIdleConns int

PollPeriod time.Duration
PipePeriod time.Duration
}

func NewPiped(o PipedOptions, lo *slog.Logger) *PipeBroker {
if o.PollPeriod == 0 {
o.PollPeriod = DefaultPollPeriod
}
if o.PipePeriod == 0 {
o.PipePeriod = DefaultPipePeriod
}

conn := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: o.Addrs,
DB: o.DB,
Password: o.Password,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
MinIdleConns: o.MinIdleConns,
IdleTimeout: o.IdleTimeout,
})

p := &PipeBroker{
log: lo,
conn: conn,
pipe: conn.Pipeline(),
opts: o,
}

go p.pushPipe(context.TODO())

return p
}

func (r *PipeBroker) pushPipe(ctx context.Context) {
tk := time.NewTicker(r.opts.PipePeriod)
for {
select {
case <-ctx.Done():
return
case <-tk.C:
r.log.Debug("submitting redis pipe")
if r.pipe.Len() == 0 {
continue
}
if _, err := r.pipe.Exec(ctx); err != nil {
r.log.Error("error executing redis pipe: %v", err)
}
}
}
}

func (r *PipeBroker) GetPending(ctx context.Context, queue string) ([]string, error) {
rs, err := r.conn.LRange(ctx, queue, 0, -1).Result()
if err == redis.Nil {
return []string{}, nil
} else if err != nil {
return []string{}, err
}

return rs, nil
}

func (b *PipeBroker) Enqueue(ctx context.Context, msg []byte, queue string) error {
return b.pipe.LPush(ctx, queue, msg).Err()
}

func (b *PipeBroker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error {
return b.pipe.ZAdd(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.Z{
Score: float64(ts.UnixNano()),
Member: msg,
}).Err()
}

func (b *PipeBroker) Consume(ctx context.Context, work chan []byte, queue string) {
go b.consumeScheduled(ctx, queue)

for {
select {
case <-ctx.Done():
b.log.Debug("shutting down consumer..")
return
default:
b.log.Debug("receiving from consumer..")
res, err := b.conn.BLPop(ctx, b.opts.PollPeriod, queue).Result()
if err != nil && err.Error() != "redis: nil" {
b.log.Error("error consuming from redis queue", "error", err)
} else if errors.Is(err, redis.Nil) {
b.log.Debug("no tasks to consume..", "queue", queue)
} else {
msg, err := blpopResult(res)
if err != nil {
b.log.Error("error parsing response from redis", "error", err)
return
}
work <- []byte(msg)
}
}
}
}

func (b *PipeBroker) consumeScheduled(ctx context.Context, queue string) {
poll := time.NewTicker(b.opts.PollPeriod)

for {
select {
case <-ctx.Done():
b.log.Debug("shutting down scheduled consumer..")
return
case <-poll.C:
b.conn.Watch(ctx, func(tx *redis.Tx) error {
// Fetch the tasks with score less than current time. These tasks have been scheduled
// to be queued.
tasks, err := tx.ZRevRangeByScore(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.ZRangeBy{
Min: "0",
Max: strconv.FormatInt(time.Now().UnixNano(), 10),
Offset: 0,
Count: 1,
}).Result()
if err != nil {
return err
}

for _, task := range tasks {
if err := b.Enqueue(ctx, []byte(task), queue); err != nil {
return err
}
}

// Remove the tasks
if err := tx.ZRem(ctx, fmt.Sprintf(sortedSetKey, queue), tasks).Err(); err != nil {
return err
}

return nil
})
}

}
}
202 changes: 202 additions & 0 deletions results/redis/results_piped.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package redis

import (
"context"
"log/slog"
"strconv"
"time"

"github.com/go-redis/redis/v8"
)

const DefaultPipePeriod = 200 * time.Millisecond

type PipedResults struct {
lo *slog.Logger
opt PipedOptions

conn redis.UniversalClient
pipe redis.Pipeliner
}

type PipedOptions struct {
Addrs []string
Password string
DB int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
Expiry time.Duration
MetaExpiry time.Duration
MinIdleConns int

PipePeriod time.Duration
}

func NewPiped(o PipedOptions, lo *slog.Logger) *PipedResults {
if o.PipePeriod == 0 {
o.PipePeriod = DefaultPipePeriod
}

conn := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: o.Addrs,
DB: o.DB,
Password: o.Password,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
MinIdleConns: o.MinIdleConns,
IdleTimeout: o.IdleTimeout,
})

p := &PipedResults{
lo: lo,
conn: conn,
pipe: conn.Pipeline(),
opt: o,
}

// TODO: pass ctx here somehow
if o.MetaExpiry != 0 {
go p.expireMeta(context.TODO(), o.MetaExpiry)
}

go p.execPipe(context.TODO())

return p
}

func (r *PipedResults) execPipe(ctx context.Context) {
tk := time.NewTicker(r.opt.PipePeriod)
for {
select {
case <-ctx.Done():
r.lo.Info("context closed, draining redis pipe", "length", r.pipe.Len())
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("error executing redis pipe: %v", err)
}
return
case <-tk.C:
plen := r.pipe.Len()
if plen == 0 {
continue
}

r.lo.Info("submitting redis pipe", "length", plen)
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("error executing redis pipe: %v", err)
}
}
}
}

func (r *PipedResults) DeleteJob(ctx context.Context, id string) error {
r.lo.Debug("deleting job")
if err := r.conn.ZRem(ctx, resultPrefix+success, 1, id).Err(); err != nil {
return err
}

if err := r.conn.ZRem(ctx, resultPrefix+failed, 1, id).Err(); err != nil {
return err
}

if err := r.conn.Del(ctx, resultPrefix+id).Err(); err != nil {
return err
}

return nil
}

func (r *PipedResults) GetSuccess(ctx context.Context) ([]string, error) {
// Fetch the failed tasks with score less than current time
r.lo.Debug("getting successful jobs")
rs, err := r.conn.ZRevRangeByScore(ctx, resultPrefix+success, &redis.ZRangeBy{
Min: "0",
Max: strconv.FormatInt(time.Now().UnixNano(), 10),
}).Result()
if err != nil {
return nil, err
}

return rs, nil
}

func (r *PipedResults) GetFailed(ctx context.Context) ([]string, error) {
// Fetch the failed tasks with score less than current time
r.lo.Debug("getting failed jobs")
rs, err := r.conn.ZRevRangeByScore(ctx, resultPrefix+failed, &redis.ZRangeBy{
Min: "0",
Max: strconv.FormatInt(time.Now().UnixNano(), 10),
}).Result()
if err != nil {
return nil, err
}

return rs, nil
}

func (r *PipedResults) SetSuccess(ctx context.Context, id string) error {
r.lo.Debug("setting job as successful", "id", id)
return r.pipe.ZAdd(ctx, resultPrefix+success, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
}).Err()
}

func (r *PipedResults) SetFailed(ctx context.Context, id string) error {
r.lo.Debug("setting job as failed", "id", id)
return r.pipe.ZAdd(ctx, resultPrefix+failed, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
}).Err()
}

func (r *PipedResults) Set(ctx context.Context, id string, b []byte) error {
r.lo.Debug("setting result for job", "id", id)
return r.pipe.Set(ctx, resultPrefix+id, b, r.opt.Expiry).Err()
}

func (r *PipedResults) Get(ctx context.Context, id string) ([]byte, error) {
r.lo.Debug("getting result for job", "id", id)
rs, err := r.conn.Get(ctx, resultPrefix+id).Bytes()
if err != nil {
return nil, err
}

return rs, nil
}

// TODO: accpet a ctx here and shutdown gracefully
func (r *PipedResults) expireMeta(ctx context.Context, ttl time.Duration) {
r.lo.Info("starting results meta purger", "ttl", ttl)

var (
tk = time.NewTicker(ttl)
)

for {
select {
case <-ctx.Done():
r.lo.Info("shutting down meta purger", "ttl", ttl)
return
case <-tk.C:
now := time.Now().UnixNano() - int64(ttl)
score := strconv.FormatInt(now, 10)

r.lo.Debug("purging failed results metadata", "score", score)
if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}

r.lo.Debug("purging success results metadata", "score", score)
if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
}
}
}

func (r *PipedResults) NilError() error {
return redis.Nil
}

0 comments on commit fe77e86

Please sign in to comment.