diff --git a/app.env b/app.env index 7c5a699..b1b93b5 100644 --- a/app.env +++ b/app.env @@ -5,4 +5,5 @@ HTTP_SERVER_ADDRESS=0.0.0.0:8080 GRPC_SERVER_ADDRESS=0.0.0.0:9090 TOKEN_SYMMETRIC_KEY=12345678901234567890123456789012 ACCESS_TOKEN_DURATION=15m -REFRESH_TOKEN_DURATION=36h \ No newline at end of file +REFRESH_TOKEN_DURATION=36h +REDIS_ADDRESS=0.0.0.0:6379 \ No newline at end of file diff --git a/gapi/rpc_create_user.go b/gapi/rpc_create_user.go index f45e711..e8ed395 100644 --- a/gapi/rpc_create_user.go +++ b/gapi/rpc_create_user.go @@ -2,11 +2,14 @@ package gapi import ( "context" + "time" db "github.com/arya2004/xyfin/db/sqlc" "github.com/arya2004/xyfin/pb" "github.com/arya2004/xyfin/utils" "github.com/arya2004/xyfin/validators" + "github.com/arya2004/xyfin/worker" + "github.com/hibiken/asynq" "github.com/lib/pq" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" @@ -46,6 +49,23 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest) } + taskPayload := &worker.PayloadSendVerifyEmail{ + Username: user.Username, + } + + opts := []asynq.Option{ + asynq.MaxRetry(10), + asynq.ProcessIn(10 * time.Second), + asynq.Queue(worker.QueueCritical), + } + + err = server.taskDistributor.SendVerifyEmail(ctx, taskPayload, opts...) + if err != nil { + return nil, status.Errorf(codes.Internal, "failled to distribute task: %v", err) + + } + + resp := &pb.CreateUserResponse{ User: ConvertUser(user), } diff --git a/gapi/server.go b/gapi/server.go index 71c93bd..6accdc0 100644 --- a/gapi/server.go +++ b/gapi/server.go @@ -7,6 +7,7 @@ import ( "github.com/arya2004/xyfin/pb" "github.com/arya2004/xyfin/token" "github.com/arya2004/xyfin/utils" + "github.com/arya2004/xyfin/worker" ) @@ -15,10 +16,11 @@ type Server struct { config utils.Configuration store db.Store tokenMaker token.Maker + taskDistributor worker.TaskDistributor } -func NewServer(config utils.Configuration, store db.Store) (*Server, error) { +func NewServer(config utils.Configuration, store db.Store, taskDistributor worker.TaskDistributor) (*Server, error) { tokenMaker, err := token.NewPasetoMaker(config.TokenSymmetricKey) if err != nil { return nil, fmt.Errorf("cannot create token maker: %w", err) @@ -27,6 +29,7 @@ func NewServer(config utils.Configuration, store db.Store) (*Server, error) { config: config, store: store, tokenMaker: tokenMaker, + taskDistributor: taskDistributor, } diff --git a/main.go b/main.go index bf81dbb..a8cd48b 100644 --- a/main.go +++ b/main.go @@ -12,10 +12,12 @@ import ( "github.com/arya2004/xyfin/gapi" "github.com/arya2004/xyfin/pb" "github.com/arya2004/xyfin/utils" + "github.com/arya2004/xyfin/worker" "github.com/golang-migrate/migrate/v4" _ "github.com/golang-migrate/migrate/v4/database/postgres" _ "github.com/golang-migrate/migrate/v4/source/file" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/hibiken/asynq" _ "github.com/lib/pq" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -48,8 +50,16 @@ func main() { runMigrations(config.MigrationURL,config.DbSource) store := db.NewStore(conn) - go runGatewayServer(config, store) - runGrpcServer(config, store) + + redisOpt := asynq.RedisClientOpt{ + Addr: config.RedisAddress, + } + + taskDistributor := worker.NewRedisTaskDistributor(redisOpt) + + go runTaskProcessor(redisOpt, store) + go runGatewayServer(config, store,taskDistributor) + runGrpcServer(config, store, taskDistributor) } @@ -65,9 +75,19 @@ func runMigrations(migrationUrl string,dbSource string ) { log.Println("migration completed") } -func runGrpcServer(config utils.Configuration, store db.Store) { + +func runTaskProcessor(redisopt asynq.RedisClientOpt, store db.Store) { + processor := worker.NewRedisTaskProcessor(redisopt, store) + err := processor.Start() + if err != nil { + log.Fatal("cannot run task processor", err) + } +} + + +func runGrpcServer(config utils.Configuration, store db.Store, taskDist worker.TaskDistributor) { - server, err := gapi.NewServer(config, store) + server, err := gapi.NewServer(config, store,taskDist) if err != nil { log.Fatal("cannot create server", err) } @@ -91,9 +111,9 @@ func runGrpcServer(config utils.Configuration, store db.Store) { } } -func runGatewayServer(config utils.Configuration, store db.Store) { +func runGatewayServer(config utils.Configuration, store db.Store, taskDist worker.TaskDistributor) { - server, err := gapi.NewServer(config, store) + server, err := gapi.NewServer(config, store, taskDist) if err != nil { log.Fatal("cannot create server", err) } diff --git a/utils/config.go b/utils/config.go index adf80d7..590d54a 100644 --- a/utils/config.go +++ b/utils/config.go @@ -11,6 +11,7 @@ type Configuration struct { DbDriver string `mapstructure:"DB_DRIVER"` DbSource string `mapstructure:"DB_SOURCE"` MigrationURL string `mapstructure:"MIGRATION_URL"` + RedisAddress string `mapstructure:"REDIS_ADDRESS"` HTTPSServerAddress string `mapstructure:"HTTP_SERVER_ADDRESS"` GRPCServerAddress string `mapstructure:"GRPC_SERVER_ADDRESS"` TokenSymmetricKey string `mapstructure:"TOKEN_SYMMETRIC_KEY"` diff --git a/worker/processor.go b/worker/processor.go index ce342c6..0d8ebf6 100644 --- a/worker/processor.go +++ b/worker/processor.go @@ -7,6 +7,11 @@ import ( "github.com/hibiken/asynq" ) +const ( + QueueCritical = "critical" + DefaultQueue = "default" +) + type TaskProcessor interface { Start() error ProcessVerifyEmail(ctx context.Context, task *asynq.Task) error @@ -20,7 +25,12 @@ type RedisTaskProcessor struct { func NewRedisTaskProcessor(redisOpt asynq.RedisClientOpt, store db.Store) TaskProcessor{ server := asynq.NewServer( redisOpt, - asynq.Config{}, + asynq.Config{ + Queues: map[string]int{ + QueueCritical: 10, + DefaultQueue: 5, + }, + }, ) return &RedisTaskProcessor{server: server, store: store} }