diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index dcac09d37e..8fbbaa9a9e 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -13,6 +13,7 @@ jobs: go-version: [1.16.x, 1.17.x] os: [ubuntu-latest, macos-latest] mongodb-version: ["4.0", "4.2", "4.4"] + redis-version: ["6.2.6"] runs-on: ubuntu-latest steps: @@ -21,6 +22,12 @@ jobs: with: mongodb-version: ${{ matrix.mongodb-version }} + - name: Start Redis v${{ matrix.redis-version }} + uses: supercharge/redis-github-action@1.4.0 + with: + redis-version: ${{ matrix.redis-version }} + redis-port: 6379 + - name: Get the version id: get_version run: echo ::set-output name=tag::$(echo ${GITHUB_SHA:8}) @@ -49,8 +56,9 @@ jobs: - name: Go vet run: go vet ./... - - name: Run Mongo integration tests + - name: Run integration tests run: go test -tags integration -v ./... env: TEST_BOLT_DSN: "../../test.db" TEST_MONGO_DSN: "mongodb://localhost:27017/testdb" + TEST_REDIS_DSN: "redis://localhost:6379" diff --git a/cmd/main.go b/cmd/main.go index 1109bc935b..f27cfe5c94 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,15 +8,16 @@ import ( "time" _ "time/tzdata" - convoyRedis "github.com/frain-dev/convoy/queue/redis" + memqueue "github.com/frain-dev/convoy/queue/memqueue" + redisqueue "github.com/frain-dev/convoy/queue/redis" "github.com/frain-dev/convoy/worker/task" "github.com/getsentry/sentry-go" + "github.com/go-redis/redis/v8" "github.com/google/uuid" prefixed "github.com/x-cray/logrus-prefixed-formatter" "go.mongodb.org/mongo-driver/bson/primitive" "github.com/frain-dev/convoy/util" - "github.com/go-redis/redis/v8" log "github.com/sirupsen/logrus" "github.com/vmihailenco/taskq/v3" @@ -106,12 +107,31 @@ func main() { var qFn taskq.Factory var rC *redis.Client + var lS queue.Storage + var opts queue.QueueOptions if cfg.Queue.Type == config.RedisQueueProvider { - rC, qFn, err = convoyRedis.NewClient(cfg) + rC, qFn, err = redisqueue.NewClient(cfg) if err != nil { return err } + opts = queue.QueueOptions{ + Type: "redis", + Redis: rC, + Factory: qFn, + } + } + + if cfg.Queue.Type == config.InMemoryQueueProvider { + lS, qFn, err = memqueue.NewClient(cfg) + if err != nil { + return err + } + opts = queue.QueueOptions{ + Type: "in-memory", + Storage: lS, + Factory: qFn, + } } if util.IsStringEmpty(string(cfg.GroupConfig.Signature.Header)) { @@ -125,8 +145,8 @@ func main() { app.applicationRepo = db.AppRepo() app.eventDeliveryRepo = db.EventDeliveryRepo() - app.eventQueue = convoyRedis.NewQueue(rC, qFn, "EventQueue") - app.deadLetterQueue = convoyRedis.NewQueue(rC, qFn, "DeadLetterQueue") + app.eventQueue = NewQueue(opts, "EventQueue") + app.deadLetterQueue = NewQueue(opts, "DeadLetterQueue") err = ensureDefaultGroup(context.Background(), cfg, app) if err != nil { @@ -173,6 +193,23 @@ func main() { } } +func NewQueue(opts queue.QueueOptions, name string) queue.Queuer { + optsType := opts.Type + var convoyQueue queue.Queuer + switch optsType { + case "in-memory": + opts.Name = name + convoyQueue = memqueue.NewQueue(opts) + + case "redis": + opts.Name = name + convoyQueue = redisqueue.NewQueue(opts) + default: + log.Errorf("Invalid queue type: %v", optsType) + } + return convoyQueue +} + func ensureDefaultGroup(ctx context.Context, cfg config.Configuration, a *app) error { var filter *datastore.GroupFilter var groups []*datastore.Group diff --git a/cmd/server.go b/cmd/server.go index 7ff85d3467..025fed8684 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -8,7 +8,6 @@ import ( "github.com/frain-dev/convoy/worker/task" "github.com/frain-dev/convoy/config" - convoyQueue "github.com/frain-dev/convoy/queue/redis" "github.com/frain-dev/convoy/server" "github.com/frain-dev/convoy/util" "github.com/frain-dev/convoy/worker" @@ -55,12 +54,13 @@ func addServerCommand(a *app) *cobra.Command { } // register workers. - if queue, ok := a.eventQueue.(*convoyQueue.RedisQueue); ok { - worker.NewProducer(queue).Start() - } + producer := worker.NewProducer(a.eventQueue) + + cleaner := worker.NewCleaner(a.deadLetterQueue) - if queue, ok := a.deadLetterQueue.(*convoyQueue.RedisQueue); ok { - worker.NewCleaner(queue).Start() + if cfg.Queue.Type != config.InMemoryQueueProvider { + producer.Start() + cleaner.Start() } log.Infof("Started convoy server in %s", time.Since(start)) diff --git a/config/config.go b/config/config.go index f411603f04..80b12021f3 100644 --- a/config/config.go +++ b/config/config.go @@ -92,6 +92,7 @@ const ( const ( RedisQueueProvider QueueProvider = "redis" + InMemoryQueueProvider QueueProvider = "in-memory" DefaultStrategyProvider StrategyProvider = "default" DefaultSignatureHeader SignatureHeaderProvider = "X-Convoy-Signature" ) @@ -270,6 +271,10 @@ func ensureQueueConfig(queueCfg QueueConfiguration) error { if queueCfg.Redis.DSN == "" { return errors.New("redis queue dsn is empty") } + + case InMemoryQueueProvider: + return nil + default: return fmt.Errorf("unsupported queue type: %s", queueCfg.Type) } diff --git a/go.mod b/go.mod index b1a139fe1e..fed657dcb7 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/frain-dev/convoy go 1.16 require ( - github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect - github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5 // indirect + github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d + github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5 github.com/felixge/httpsnoop v1.0.2 github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/getkin/kin-openapi v0.78.0 @@ -18,6 +18,7 @@ require ( github.com/golang/mock v1.6.0 github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.3.0 + github.com/hashicorp/golang-lru v0.5.4 github.com/jarcoal/httpmock v1.0.8 github.com/kelseyhightower/envconfig v1.4.0 github.com/mattn/go-colorable v0.1.11 // indirect @@ -35,7 +36,7 @@ require ( github.com/vmihailenco/taskq/v3 v3.2.7 github.com/x-cray/logrus-prefixed-formatter v0.5.2 github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect - go.etcd.io/bbolt v1.3.6 // indirect + go.etcd.io/bbolt v1.3.6 go.mongodb.org/mongo-driver v1.7.1 golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 golang.org/x/mod v0.5.0 // indirect diff --git a/go.sum b/go.sum index 9d429b8c01..b69032d6b8 100644 --- a/go.sum +++ b/go.sum @@ -474,6 +474,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= @@ -595,8 +596,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -684,8 +683,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210913180222-943fd674d43e h1:+b/22bPvDYt4NPDcy4xAGCmON713ONAWFeY3Z7I3tR8= -golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= diff --git a/mocks/queue.go b/mocks/queue.go index ba9feb9ad0..589f83a6e7 100644 --- a/mocks/queue.go +++ b/mocks/queue.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: queue/queue.go +// Source: ./queue/queue.go -// Package mocks is a generated GoMock package. +// Package mock_queue is a generated GoMock package. package mocks import ( @@ -12,6 +12,7 @@ import ( convoy "github.com/frain-dev/convoy" datastore "github.com/frain-dev/convoy/datastore" gomock "github.com/golang/mock/gomock" + taskq "github.com/vmihailenco/taskq/v3" ) // MockQueuer is a mock of Queuer interface. @@ -51,6 +52,20 @@ func (mr *MockQueuerMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockQueuer)(nil).Close)) } +// Consumer mocks base method. +func (m *MockQueuer) Consumer() taskq.QueueConsumer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Consumer") + ret0, _ := ret[0].(taskq.QueueConsumer) + return ret0 +} + +// Consumer indicates an expected call of Consumer. +func (mr *MockQueuerMockRecorder) Consumer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Consumer", reflect.TypeOf((*MockQueuer)(nil).Consumer)) +} + // Write mocks base method. func (m *MockQueuer) Write(arg0 context.Context, arg1 convoy.TaskName, arg2 *datastore.EventDelivery, arg3 time.Duration) error { m.ctrl.T.Helper() diff --git a/queue/memqueue/client.go b/queue/memqueue/client.go new file mode 100644 index 0000000000..c9a93a3de6 --- /dev/null +++ b/queue/memqueue/client.go @@ -0,0 +1,75 @@ +package memqueue + +import ( + "context" + "errors" + "time" + + "github.com/frain-dev/convoy" + "github.com/frain-dev/convoy/config" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/queue" + "github.com/vmihailenco/taskq/v3" + "github.com/vmihailenco/taskq/v3/memqueue" +) + +type MemQueue struct { + Name string + queue *memqueue.Queue + inner taskq.Factory + closeChan chan struct{} +} + +func NewClient(cfg config.Configuration) (queue.Storage, taskq.Factory, error) { + if cfg.Queue.Type != config.InMemoryQueueProvider { + return nil, nil, errors.New("please select the in-memory queue in your config") + } + + qFn := memqueue.NewFactory() + + storage := queue.NewLocalStorage() + + return storage, qFn, nil +} + +func NewQueue(opts queue.QueueOptions) queue.Queuer { + q := opts.Factory.RegisterQueue(&taskq.QueueOptions{ + Name: opts.Name, + Storage: opts.Storage, + }) + + return &MemQueue{ + Name: opts.Name, + inner: opts.Factory, + queue: q.(*memqueue.Queue), + } +} + +func (q *MemQueue) Close() error { + q.closeChan <- struct{}{} + return q.inner.Close() +} + +func (q *MemQueue) Write(ctx context.Context, name convoy.TaskName, e *datastore.EventDelivery, delay time.Duration) error { + job := &queue.Job{ + ID: e.UID, + } + + m := &taskq.Message{ + Ctx: ctx, + TaskName: string(name), + Args: []interface{}{job}, + Delay: delay, + } + + err := q.queue.Add(m) + if err != nil { + return err + } + + return nil +} + +func (q *MemQueue) Consumer() taskq.QueueConsumer { + return q.queue.Consumer() +} diff --git a/queue/memqueue/client_test.go b/queue/memqueue/client_test.go new file mode 100644 index 0000000000..6a7cf36067 --- /dev/null +++ b/queue/memqueue/client_test.go @@ -0,0 +1,149 @@ +//go:build integration +// +build integration + +package memqueue + +import ( + "context" + "testing" + + "github.com/frain-dev/convoy" + "github.com/frain-dev/convoy/config" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/queue" + "github.com/google/uuid" + "github.com/vmihailenco/taskq/v3" +) + +func TestWrite(t *testing.T) { + tests := []struct { + name string + appID string + configFile string + eventID string + eventDeliveryID string + eventDelivery *datastore.EventDelivery + queueLen int + }{ + { + name: "Write a single event to queue", + appID: uuid.NewString(), + configFile: "../testdata/convoy_memqueue.json", + eventID: uuid.NewString(), + eventDeliveryID: uuid.NewString(), + queueLen: 1, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + eventDelivery := &datastore.EventDelivery{ + UID: tc.eventDeliveryID, + EventMetadata: &datastore.EventMetadata{ + UID: tc.eventID, + }, + Status: datastore.SuccessEventStatus, + AppMetadata: &datastore.AppMetadata{ + UID: tc.appID, + }, + } + taskName := convoy.TaskName(uuid.NewString()) + configFile := tc.configFile + + err := config.LoadConfig(configFile, new(config.Configuration)) + if err != nil { + t.Fatalf("Failed to load config file: %v", err) + } + cfg, err := config.Get() + if err != nil { + t.Fatalf("Failed to get config: %v", err) + + } + + var qFn taskq.Factory + var lS queue.Storage + var opts queue.QueueOptions + + lS, qFn, err = NewClient(cfg) + if err != nil { + t.Fatalf("Failed to load new client: %v", err) + } + opts = queue.QueueOptions{ + Name: uuid.NewString(), + Type: "in-memory", + Storage: lS, + Factory: qFn, + } + + eventQueue := NewQueue(opts) + err = eventQueue.Write(context.TODO(), taskName, eventDelivery, 0) + if err != nil { + t.Fatalf("Failed to write to queue: %v", err) + } + queueLength, err := eventQueue.Consumer().Queue().Len() + + if err != nil { + t.Fatalf("Failed to get queue length: %v", err) + } + if queueLength != tc.queueLen { + t.Fatalf("Length = %q, Want: %v", queueLength, tc.queueLen) + + } + + }) + } + +} + +func TestConsumer(t *testing.T) { + tests := []struct { + name string + configFile string + err string + }{ + { + name: "Consumer already started", + configFile: "../testdata/convoy_memqueue.json", + err: "taskq: Consumer is already started", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + configFile := tc.configFile + + err := config.LoadConfig(configFile, new(config.Configuration)) + if err != nil { + t.Fatalf("Failed to load config file: %v", err) + } + cfg, err := config.Get() + if err != nil { + t.Fatalf("Failed to get config: %v", err) + + } + + var qFn taskq.Factory + var lS queue.Storage + var opts queue.QueueOptions + + lS, qFn, err = NewClient(cfg) + if err != nil { + t.Fatalf("Failed to load new client: %v", err) + } + opts = queue.QueueOptions{ + Name: uuid.NewString(), + Type: "in-memory", + Storage: lS, + Factory: qFn, + } + + eventQueue := NewQueue(opts) + err = eventQueue.Consumer().Start(context.TODO()) + if err != nil { + if err.Error() != tc.err { + t.Fatalf("Expected: %v, got: %s", tc.err, err) + } + } + }) + } +} diff --git a/queue/queue.go b/queue/queue.go index 9466af22b5..129350d9c4 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -7,14 +7,29 @@ import ( "github.com/frain-dev/convoy" "github.com/frain-dev/convoy/datastore" + "github.com/go-redis/redis/v8" + "github.com/vmihailenco/taskq/v3" ) type Queuer interface { io.Closer Write(context.Context, convoy.TaskName, *datastore.EventDelivery, time.Duration) error + Consumer() taskq.QueueConsumer } type Job struct { Err error `json:"err"` ID string `json:"id"` } + +type QueueOptions struct { + Name string + + Type string + + Redis *redis.Client + + Factory taskq.Factory + + Storage Storage +} diff --git a/queue/redis/client.go b/queue/redis/client.go index 21d34dbd56..b82cb0acc8 100644 --- a/queue/redis/client.go +++ b/queue/redis/client.go @@ -49,16 +49,16 @@ func NewClient(cfg config.Configuration) (*redis.Client, taskq.Factory, error) { return c, qFn, nil } -func NewQueue(c *redis.Client, factory taskq.Factory, name string) queue.Queuer { +func NewQueue(opts queue.QueueOptions) queue.Queuer { - q := factory.RegisterQueue(&taskq.QueueOptions{ - Name: name, - Redis: c, + q := opts.Factory.RegisterQueue(&taskq.QueueOptions{ + Name: opts.Name, + Redis: opts.Redis, }) return &RedisQueue{ - Name: name, - inner: c, + Name: opts.Name, + inner: opts.Redis, queue: q.(*redisq.Queue), } } diff --git a/queue/redis/client_test.go b/queue/redis/client_test.go new file mode 100644 index 0000000000..ccc8364730 --- /dev/null +++ b/queue/redis/client_test.go @@ -0,0 +1,146 @@ +//go:build integration +// +build integration + +package redis + +import ( + "context" + "testing" + + "github.com/frain-dev/convoy" + "github.com/frain-dev/convoy/config" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/queue" + "github.com/go-redis/redis/v8" + "github.com/google/uuid" + "github.com/vmihailenco/taskq/v3" +) + +func TestWrite(t *testing.T) { + tests := []struct { + name string + appID string + configFile string + eventID string + eventDeliveryID string + eventDelivery *datastore.EventDelivery + queueLen int + }{ + { + name: "Write a single event to queue", + appID: uuid.NewString(), + configFile: "../testdata/convoy_redis.json", + eventID: uuid.NewString(), + eventDeliveryID: uuid.NewString(), + queueLen: 1, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + eventDelivery := &datastore.EventDelivery{ + UID: tc.eventDeliveryID, + EventMetadata: &datastore.EventMetadata{ + UID: tc.eventID, + }, + Status: datastore.SuccessEventStatus, + AppMetadata: &datastore.AppMetadata{ + UID: tc.appID, + }, + } + taskName := convoy.TaskName(uuid.NewString()) + configFile := tc.configFile + + err := config.LoadConfig(configFile, new(config.Configuration)) + if err != nil { + t.Fatalf("Failed to load config file: %v", err) + } + cfg, err := config.Get() + if err != nil { + t.Fatalf("Failed to get config: %v", err) + + } + + var qFn taskq.Factory + var rC *redis.Client + var opts queue.QueueOptions + + rC, qFn, err = NewClient(cfg) + if err != nil { + t.Fatalf("Failed to load new client: %v", err) + } + opts = queue.QueueOptions{ + Name: uuid.NewString(), + Type: "redis", + Redis: rC, + Factory: qFn, + } + + eventQueue := NewQueue(opts) + err = eventQueue.Write(context.TODO(), taskName, eventDelivery, 0) + if err != nil { + t.Fatalf("Failed to write to queue: %v", err) + } + queueLength, err := eventQueue.Consumer().Queue().Len() + + if err != nil { + t.Fatalf("Failed to get queue length: %v", err) + } + if queueLength != tc.queueLen { + t.Fatalf("Length = %q, Want: %v", queueLength, tc.queueLen) + + } + + }) + } + +} + +func TestConsumer(t *testing.T) { + tests := []struct { + name string + configFile string + }{ + { + name: "Start consumer", + configFile: "../testdata/convoy_redis.json", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + configFile := tc.configFile + + err := config.LoadConfig(configFile, new(config.Configuration)) + if err != nil { + t.Fatalf("Failed to load config file: %v", err) + } + cfg, err := config.Get() + if err != nil { + t.Fatalf("Failed to get config: %v", err) + + } + + var qFn taskq.Factory + var rC *redis.Client + var opts queue.QueueOptions + + rC, qFn, err = NewClient(cfg) + if err != nil { + t.Fatalf("Failed to load new client: %v", err) + } + opts = queue.QueueOptions{ + Name: uuid.NewString(), + Type: "redis", + Redis: rC, + Factory: qFn, + } + + eventQueue := NewQueue(opts) + err = eventQueue.Consumer().Start(context.TODO()) + if err != nil { + t.Fatalf("Failed to start consumer: %v", err) + } + }) + } +} diff --git a/queue/storage.go b/queue/storage.go new file mode 100644 index 0000000000..784adf41e2 --- /dev/null +++ b/queue/storage.go @@ -0,0 +1,46 @@ +package queue + +import ( + "context" + "sync" + + "github.com/hashicorp/golang-lru/simplelru" +) + +type Storage interface { + Exists(ctx context.Context, key string) bool +} + +var _ Storage = (*localStorage)(nil) + +// LOCAL + +type localStorage struct { + mu sync.Mutex + cache *simplelru.LRU +} + +func NewLocalStorage() Storage { + return &localStorage{} +} + +func (s *localStorage) Exists(_ context.Context, key string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cache == nil { + var err error + s.cache, err = simplelru.NewLRU(128000, nil) + if err != nil { + panic(err) + } + } + + _, ok := s.cache.Get(key) + if ok { + return true + } + + s.cache.Add(key, nil) + return false +} diff --git a/queue/testdata/convoy_memqueue.json b/queue/testdata/convoy_memqueue.json new file mode 100644 index 0000000000..ea377e8870 --- /dev/null +++ b/queue/testdata/convoy_memqueue.json @@ -0,0 +1,25 @@ +{ + "database": { + "dsn": "mongodb://inside-config-file" + }, + "queue": { + "type": "in-memory" + }, + "server": { + "http": { + "port": 80 + } + }, + "group": { + "strategy": { + "type": "default", + "default": { + "intervalSeconds": 125, + "retryLimit": 15 + } + }, + "signature": { + "hash": "SHA256" + } + } +} diff --git a/queue/testdata/convoy_redis.json b/queue/testdata/convoy_redis.json new file mode 100644 index 0000000000..b6ceea1b10 --- /dev/null +++ b/queue/testdata/convoy_redis.json @@ -0,0 +1,28 @@ +{ + "database": { + "dsn": "mongodb://inside-config-file" + }, + "queue": { + "type": "redis", + "redis": { + "dsn": "redis://localhost:6379" + } + }, + "server": { + "http": { + "port": 80 + } + }, + "group": { + "strategy": { + "type": "default", + "default": { + "intervalSeconds": 125, + "retryLimit": 15 + } + }, + "signature": { + "hash": "SHA256" + } + } +} diff --git a/worker/cleaner.go b/worker/cleaner.go index 79fd9ee5aa..510ca4a925 100644 --- a/worker/cleaner.go +++ b/worker/cleaner.go @@ -2,7 +2,6 @@ package worker import ( "github.com/frain-dev/convoy/queue" - convoy_redis "github.com/frain-dev/convoy/queue/redis" log "github.com/sirupsen/logrus" "github.com/vmihailenco/taskq/v3" ) @@ -13,7 +12,8 @@ type Cleaner struct { quit chan chan error } -func NewCleaner(queue *convoy_redis.RedisQueue) *Cleaner { +func NewCleaner(queue queue.Queuer) *Cleaner { + consumer := queue.Consumer() return &Cleaner{ diff --git a/worker/producer.go b/worker/producer.go index 96eae88226..ecbf3e8d8b 100644 --- a/worker/producer.go +++ b/worker/producer.go @@ -4,7 +4,6 @@ import ( "context" "github.com/frain-dev/convoy/queue" - convoyRedis "github.com/frain-dev/convoy/queue/redis" log "github.com/sirupsen/logrus" "github.com/vmihailenco/taskq/v3" ) @@ -15,7 +14,7 @@ type Producer struct { quit chan chan error } -func NewProducer(queue *convoyRedis.RedisQueue) *Producer { +func NewProducer(queue queue.Queuer) *Producer { consumer := queue.Consumer() return &Producer{