Skip to content

Commit

Permalink
Add In-Memory Queue Support (#342)
Browse files Browse the repository at this point in the history
* add support for in-memory queue initialization

* register workers using cfg and queue

* add in-memory queue provider

* add storage client

* add storage client feature

* use cfg to create worker

* add in-memory queue

* add local storage

* update requirements

* memqueue-check stop call

* change eventdelivery declaration

* Fix: merge conflicts

* fix: merge conflicts

* use QueueOptions to define structure

* wrap queue.Type check in Queuer interface

* fix pointer to redisclient

* restructure toggle between memqueue and redisqueue

* fix CI issues

* generate queue mocks

* check to start consumer

* add test scripts

* fix CI lint

* fix: remove test consumer starts

* verbose error on newclient function

* add redis client step

* fix typo and remove queue type check

* set redis dsn on env, change port in testdata conovy.json

* print verbose error messages

* fix redis version

* test: add dsn for mongo

* single step for integration tests

* use test table

* pull from upstream

* restructure test table

* build tags

* add build tags
  • Loading branch information
ogbanugot authored Jan 22, 2022
1 parent 9fa4c94 commit 186a2b7
Show file tree
Hide file tree
Showing 17 changed files with 577 additions and 31 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
go-version: [1.16.x, 1.17.x]
os: [ubuntu-latest, macos-latest]
mongodb-version: ["4.0", "4.2", "4.4"]
redis-version: ["6.2.6"]

runs-on: ubuntu-latest
steps:
Expand All @@ -21,6 +22,12 @@ jobs:
with:
mongodb-version: ${{ matrix.mongodb-version }}

- name: Start Redis v${{ matrix.redis-version }}
uses: supercharge/[email protected]
with:
redis-version: ${{ matrix.redis-version }}
redis-port: 6379

- name: Get the version
id: get_version
run: echo ::set-output name=tag::$(echo ${GITHUB_SHA:8})
Expand Down Expand Up @@ -49,8 +56,9 @@ jobs:
- name: Go vet
run: go vet ./...

- name: Run Mongo integration tests
- name: Run integration tests
run: go test -tags integration -v ./...
env:
TEST_BOLT_DSN: "../../test.db"
TEST_MONGO_DSN: "mongodb://localhost:27017/testdb"
TEST_REDIS_DSN: "redis://localhost:6379"
47 changes: 42 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"time"
_ "time/tzdata"

convoyRedis "github.com/frain-dev/convoy/queue/redis"
memqueue "github.com/frain-dev/convoy/queue/memqueue"
redisqueue "github.com/frain-dev/convoy/queue/redis"
"github.com/frain-dev/convoy/worker/task"
"github.com/getsentry/sentry-go"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
prefixed "github.com/x-cray/logrus-prefixed-formatter"
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/frain-dev/convoy/util"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"github.com/vmihailenco/taskq/v3"

Expand Down Expand Up @@ -106,12 +107,31 @@ func main() {

var qFn taskq.Factory
var rC *redis.Client
var lS queue.Storage
var opts queue.QueueOptions

if cfg.Queue.Type == config.RedisQueueProvider {
rC, qFn, err = convoyRedis.NewClient(cfg)
rC, qFn, err = redisqueue.NewClient(cfg)
if err != nil {
return err
}
opts = queue.QueueOptions{
Type: "redis",
Redis: rC,
Factory: qFn,
}
}

if cfg.Queue.Type == config.InMemoryQueueProvider {
lS, qFn, err = memqueue.NewClient(cfg)
if err != nil {
return err
}
opts = queue.QueueOptions{
Type: "in-memory",
Storage: lS,
Factory: qFn,
}
}

if util.IsStringEmpty(string(cfg.GroupConfig.Signature.Header)) {
Expand All @@ -125,8 +145,8 @@ func main() {
app.applicationRepo = db.AppRepo()
app.eventDeliveryRepo = db.EventDeliveryRepo()

app.eventQueue = convoyRedis.NewQueue(rC, qFn, "EventQueue")
app.deadLetterQueue = convoyRedis.NewQueue(rC, qFn, "DeadLetterQueue")
app.eventQueue = NewQueue(opts, "EventQueue")
app.deadLetterQueue = NewQueue(opts, "DeadLetterQueue")

err = ensureDefaultGroup(context.Background(), cfg, app)
if err != nil {
Expand Down Expand Up @@ -173,6 +193,23 @@ func main() {
}
}

func NewQueue(opts queue.QueueOptions, name string) queue.Queuer {
optsType := opts.Type
var convoyQueue queue.Queuer
switch optsType {
case "in-memory":
opts.Name = name
convoyQueue = memqueue.NewQueue(opts)

case "redis":
opts.Name = name
convoyQueue = redisqueue.NewQueue(opts)
default:
log.Errorf("Invalid queue type: %v", optsType)
}
return convoyQueue
}

func ensureDefaultGroup(ctx context.Context, cfg config.Configuration, a *app) error {
var filter *datastore.GroupFilter
var groups []*datastore.Group
Expand Down
12 changes: 6 additions & 6 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/frain-dev/convoy/worker/task"

"github.com/frain-dev/convoy/config"
convoyQueue "github.com/frain-dev/convoy/queue/redis"
"github.com/frain-dev/convoy/server"
"github.com/frain-dev/convoy/util"
"github.com/frain-dev/convoy/worker"
Expand Down Expand Up @@ -55,12 +54,13 @@ func addServerCommand(a *app) *cobra.Command {
}

// register workers.
if queue, ok := a.eventQueue.(*convoyQueue.RedisQueue); ok {
worker.NewProducer(queue).Start()
}
producer := worker.NewProducer(a.eventQueue)

cleaner := worker.NewCleaner(a.deadLetterQueue)

if queue, ok := a.deadLetterQueue.(*convoyQueue.RedisQueue); ok {
worker.NewCleaner(queue).Start()
if cfg.Queue.Type != config.InMemoryQueueProvider {
producer.Start()
cleaner.Start()
}

log.Infof("Started convoy server in %s", time.Since(start))
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ const (

const (
RedisQueueProvider QueueProvider = "redis"
InMemoryQueueProvider QueueProvider = "in-memory"
DefaultStrategyProvider StrategyProvider = "default"
DefaultSignatureHeader SignatureHeaderProvider = "X-Convoy-Signature"
)
Expand Down Expand Up @@ -270,6 +271,10 @@ func ensureQueueConfig(queueCfg QueueConfiguration) error {
if queueCfg.Redis.DSN == "" {
return errors.New("redis queue dsn is empty")
}

case InMemoryQueueProvider:
return nil

default:
return fmt.Errorf("unsupported queue type: %s", queueCfg.Type)
}
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/frain-dev/convoy
go 1.16

require (
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5
github.com/felixge/httpsnoop v1.0.2
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/getkin/kin-openapi v0.78.0
Expand All @@ -18,6 +18,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.0
github.com/hashicorp/golang-lru v0.5.4
github.com/jarcoal/httpmock v1.0.8
github.com/kelseyhightower/envconfig v1.4.0
github.com/mattn/go-colorable v0.1.11 // indirect
Expand All @@ -35,7 +36,7 @@ require (
github.com/vmihailenco/taskq/v3 v3.2.7
github.com/x-cray/logrus-prefixed-formatter v0.5.2
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/bbolt v1.3.6
go.mongodb.org/mongo-driver v1.7.1
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3
golang.org/x/mod v0.5.0 // indirect
Expand Down
5 changes: 1 addition & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
Expand Down Expand Up @@ -595,8 +596,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -684,8 +683,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e h1:+b/22bPvDYt4NPDcy4xAGCmON713ONAWFeY3Z7I3tR8=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down
19 changes: 17 additions & 2 deletions mocks/queue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions queue/memqueue/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package memqueue

import (
"context"
"errors"
"time"

"github.com/frain-dev/convoy"
"github.com/frain-dev/convoy/config"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/queue"
"github.com/vmihailenco/taskq/v3"
"github.com/vmihailenco/taskq/v3/memqueue"
)

type MemQueue struct {
Name string
queue *memqueue.Queue
inner taskq.Factory
closeChan chan struct{}
}

func NewClient(cfg config.Configuration) (queue.Storage, taskq.Factory, error) {
if cfg.Queue.Type != config.InMemoryQueueProvider {
return nil, nil, errors.New("please select the in-memory queue in your config")
}

qFn := memqueue.NewFactory()

storage := queue.NewLocalStorage()

return storage, qFn, nil
}

func NewQueue(opts queue.QueueOptions) queue.Queuer {
q := opts.Factory.RegisterQueue(&taskq.QueueOptions{
Name: opts.Name,
Storage: opts.Storage,
})

return &MemQueue{
Name: opts.Name,
inner: opts.Factory,
queue: q.(*memqueue.Queue),
}
}

func (q *MemQueue) Close() error {
q.closeChan <- struct{}{}
return q.inner.Close()
}

func (q *MemQueue) Write(ctx context.Context, name convoy.TaskName, e *datastore.EventDelivery, delay time.Duration) error {
job := &queue.Job{
ID: e.UID,
}

m := &taskq.Message{
Ctx: ctx,
TaskName: string(name),
Args: []interface{}{job},
Delay: delay,
}

err := q.queue.Add(m)
if err != nil {
return err
}

return nil
}

func (q *MemQueue) Consumer() taskq.QueueConsumer {
return q.queue.Consumer()
}
Loading

0 comments on commit 186a2b7

Please sign in to comment.