Skip to content

Commit

Permalink
feat: Add RedisTaskDistributor and RedisTaskProcessor for task distri…
Browse files Browse the repository at this point in the history
…bution and processing
  • Loading branch information
arya2004 committed Aug 13, 2024
1 parent 40e047e commit 177d0eb
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 9 deletions.
3 changes: 2 additions & 1 deletion app.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ HTTP_SERVER_ADDRESS=0.0.0.0:8080
GRPC_SERVER_ADDRESS=0.0.0.0:9090
TOKEN_SYMMETRIC_KEY=12345678901234567890123456789012
ACCESS_TOKEN_DURATION=15m
REFRESH_TOKEN_DURATION=36h
REFRESH_TOKEN_DURATION=36h
REDIS_ADDRESS=0.0.0.0:6379
20 changes: 20 additions & 0 deletions gapi/rpc_create_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package gapi

import (
"context"
"time"

db "github.com/arya2004/xyfin/db/sqlc"
"github.com/arya2004/xyfin/pb"
"github.com/arya2004/xyfin/utils"
"github.com/arya2004/xyfin/validators"
"github.com/arya2004/xyfin/worker"
"github.com/hibiken/asynq"
"github.com/lib/pq"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -46,6 +49,23 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest)

}

taskPayload := &worker.PayloadSendVerifyEmail{
Username: user.Username,
}

opts := []asynq.Option{
asynq.MaxRetry(10),
asynq.ProcessIn(10 * time.Second),
asynq.Queue(worker.QueueCritical),
}

err = server.taskDistributor.SendVerifyEmail(ctx, taskPayload, opts...)
if err != nil {
return nil, status.Errorf(codes.Internal, "failled to distribute task: %v", err)

}


resp := &pb.CreateUserResponse{
User: ConvertUser(user),
}
Expand Down
5 changes: 4 additions & 1 deletion gapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/arya2004/xyfin/pb"
"github.com/arya2004/xyfin/token"
"github.com/arya2004/xyfin/utils"
"github.com/arya2004/xyfin/worker"
)


Expand All @@ -15,10 +16,11 @@ type Server struct {
config utils.Configuration
store db.Store
tokenMaker token.Maker
taskDistributor worker.TaskDistributor

}

func NewServer(config utils.Configuration, store db.Store) (*Server, error) {
func NewServer(config utils.Configuration, store db.Store, taskDistributor worker.TaskDistributor) (*Server, error) {
tokenMaker, err := token.NewPasetoMaker(config.TokenSymmetricKey)
if err != nil {
return nil, fmt.Errorf("cannot create token maker: %w", err)
Expand All @@ -27,6 +29,7 @@ func NewServer(config utils.Configuration, store db.Store) (*Server, error) {
config: config,
store: store,
tokenMaker: tokenMaker,
taskDistributor: taskDistributor,
}


Expand Down
32 changes: 26 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"github.com/arya2004/xyfin/gapi"
"github.com/arya2004/xyfin/pb"
"github.com/arya2004/xyfin/utils"
"github.com/arya2004/xyfin/worker"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/hibiken/asynq"
_ "github.com/lib/pq"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -48,8 +50,16 @@ func main() {
runMigrations(config.MigrationURL,config.DbSource)

store := db.NewStore(conn)
go runGatewayServer(config, store)
runGrpcServer(config, store)

redisOpt := asynq.RedisClientOpt{
Addr: config.RedisAddress,
}

taskDistributor := worker.NewRedisTaskDistributor(redisOpt)

go runTaskProcessor(redisOpt, store)
go runGatewayServer(config, store,taskDistributor)
runGrpcServer(config, store, taskDistributor)

}

Expand All @@ -65,9 +75,19 @@ func runMigrations(migrationUrl string,dbSource string ) {
log.Println("migration completed")
}

func runGrpcServer(config utils.Configuration, store db.Store) {

func runTaskProcessor(redisopt asynq.RedisClientOpt, store db.Store) {
processor := worker.NewRedisTaskProcessor(redisopt, store)
err := processor.Start()
if err != nil {
log.Fatal("cannot run task processor", err)
}
}


func runGrpcServer(config utils.Configuration, store db.Store, taskDist worker.TaskDistributor) {

server, err := gapi.NewServer(config, store)
server, err := gapi.NewServer(config, store,taskDist)
if err != nil {
log.Fatal("cannot create server", err)
}
Expand All @@ -91,9 +111,9 @@ func runGrpcServer(config utils.Configuration, store db.Store) {
}
}

func runGatewayServer(config utils.Configuration, store db.Store) {
func runGatewayServer(config utils.Configuration, store db.Store, taskDist worker.TaskDistributor) {

server, err := gapi.NewServer(config, store)
server, err := gapi.NewServer(config, store, taskDist)
if err != nil {
log.Fatal("cannot create server", err)
}
Expand Down
1 change: 1 addition & 0 deletions utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Configuration struct {
DbDriver string `mapstructure:"DB_DRIVER"`
DbSource string `mapstructure:"DB_SOURCE"`
MigrationURL string `mapstructure:"MIGRATION_URL"`
RedisAddress string `mapstructure:"REDIS_ADDRESS"`
HTTPSServerAddress string `mapstructure:"HTTP_SERVER_ADDRESS"`
GRPCServerAddress string `mapstructure:"GRPC_SERVER_ADDRESS"`
TokenSymmetricKey string `mapstructure:"TOKEN_SYMMETRIC_KEY"`
Expand Down
12 changes: 11 additions & 1 deletion worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"github.com/hibiken/asynq"
)

const (
QueueCritical = "critical"
DefaultQueue = "default"
)

type TaskProcessor interface {
Start() error
ProcessVerifyEmail(ctx context.Context, task *asynq.Task) error
Expand All @@ -20,7 +25,12 @@ type RedisTaskProcessor struct {
func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, store db.Store) TaskProcessor{
server := asynq.NewServer(
redisOpt,
asynq.Config{},
asynq.Config{
Queues: map[string]int{
QueueCritical: 10,
DefaultQueue: 5,
},
},
)
return &RedisTaskProcessor{server: server, store: store}
}
Expand Down

0 comments on commit 177d0eb

Please sign in to comment.