Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add In-Memory Queue Support #342

Merged
merged 44 commits into from
Jan 22, 2022
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
9489a09
add support for in-memory queue initialization
ogbanugot Jan 12, 2022
ecd0500
register workers using cfg and queue
ogbanugot Jan 12, 2022
f605fd2
add in-memory queue provider
ogbanugot Jan 12, 2022
9971ab8
add storage client
ogbanugot Jan 12, 2022
8bf3284
add storage client feature
ogbanugot Jan 12, 2022
56db8b9
use cfg to create worker
ogbanugot Jan 12, 2022
0986fde
add in-memory queue
ogbanugot Jan 12, 2022
c9769a7
add local storage
ogbanugot Jan 12, 2022
2ccb79d
update requirements
ogbanugot Jan 12, 2022
3298498
memqueue-check stop call
ogbanugot Jan 12, 2022
604ac4e
Merge branch 'main' into ogban/feature/in-memory-queue
ogbanugot Jan 13, 2022
07e1679
change eventdelivery declaration
ogbanugot Jan 13, 2022
8fa1a0a
Fix: merge conflicts
ogbanugot Jan 13, 2022
1485935
fix: merge conflicts
ogbanugot Jan 13, 2022
3234b9c
use QueueOptions to define structure
ogbanugot Jan 16, 2022
bd0f4d2
wrap queue.Type check in Queuer interface
ogbanugot Jan 16, 2022
c355a46
fix pointer to redisclient
ogbanugot Jan 16, 2022
50f73ba
restructure toggle between memqueue and redisqueue
ogbanugot Jan 16, 2022
dc5bfd0
fix CI issues
ogbanugot Jan 16, 2022
4b27cbe
generate queue mocks
ogbanugot Jan 16, 2022
f17ead0
Merge branch 'main' into ogban/feature/in-memory-queue
ogbanugot Jan 16, 2022
c69031f
Merge branch 'main' into ogban/feature/in-memory-queue
ogbanugot Jan 17, 2022
e57f7a4
check to start consumer
ogbanugot Jan 17, 2022
6047d60
add test scripts
ogbanugot Jan 17, 2022
329345d
fix CI lint
ogbanugot Jan 17, 2022
7a8cba7
fix: remove test consumer starts
ogbanugot Jan 17, 2022
1ea48d0
verbose error on newclient function
ogbanugot Jan 18, 2022
26c1c0e
add redis client step
ogbanugot Jan 18, 2022
898896a
fix typo and remove queue type check
ogbanugot Jan 18, 2022
1b5905c
set redis dsn on env, change port in testdata conovy.json
ogbanugot Jan 18, 2022
b9be791
print verbose error messages
ogbanugot Jan 18, 2022
6a33df0
fix redis version
ogbanugot Jan 18, 2022
e81f866
test: add dsn for mongo
ogbanugot Jan 18, 2022
8664589
single step for integration tests
ogbanugot Jan 18, 2022
1e6638f
use test table
ogbanugot Jan 18, 2022
109ec4f
pull from upstream
ogbanugot Jan 18, 2022
9ffd8be
Merge branch 'main' of https://github.com/frain-dev/convoy
ogbanugot Jan 19, 2022
8c79fc2
Merge branch 'main' into ogban/feature/in-memory-queue
ogbanugot Jan 21, 2022
26bea1e
Merge branch 'main' of https://github.com/frain-dev/convoy
ogbanugot Jan 21, 2022
2412337
merge main
ogbanugot Jan 21, 2022
4f1aa3c
restructure test table
ogbanugot Jan 22, 2022
e575e7a
build tags
ogbanugot Jan 22, 2022
b0ae390
add test redis dsn
ogbanugot Jan 22, 2022
2e251f4
add build tags
ogbanugot Jan 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
go-version: [1.16.x, 1.17.x]
os: [ubuntu-latest, macos-latest]
mongodb-version: ["4.0", "4.2", "4.4"]
redis-version: ["6.2.6"]

runs-on: ubuntu-latest
steps:
Expand All @@ -21,6 +22,12 @@ jobs:
with:
mongodb-version: ${{ matrix.mongodb-version }}

- name: Start Redis v${{ matrix.redis-version }}
uses: supercharge/[email protected]
with:
redis-version: ${{ matrix.redis-version }}
redis-port: 6379

- name: Get the version
id: get_version
run: echo ::set-output name=tag::$(echo ${GITHUB_SHA:8})
Expand Down Expand Up @@ -49,8 +56,9 @@ jobs:
- name: Go vet
run: go vet ./...

- name: Run Mongo integration tests
- name: Run integration tests
run: go test -tags integration -v ./...
env:
TEST_BOLT_DSN: "../../test.db"
TEST_MONGO_DSN: "mongodb://localhost:27017/testdb"
TEST_REDIS_DSN: "redis://localhost:6379"
47 changes: 42 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"time"
_ "time/tzdata"

convoyRedis "github.com/frain-dev/convoy/queue/redis"
memqueue "github.com/frain-dev/convoy/queue/memqueue"
redisqueue "github.com/frain-dev/convoy/queue/redis"
"github.com/frain-dev/convoy/worker/task"
"github.com/getsentry/sentry-go"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
prefixed "github.com/x-cray/logrus-prefixed-formatter"
"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/frain-dev/convoy/util"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"github.com/vmihailenco/taskq/v3"

Expand Down Expand Up @@ -106,12 +107,31 @@ func main() {

var qFn taskq.Factory
var rC *redis.Client
var lS queue.Storage
var opts queue.QueueOptions

if cfg.Queue.Type == config.RedisQueueProvider {
rC, qFn, err = convoyRedis.NewClient(cfg)
rC, qFn, err = redisqueue.NewClient(cfg)
if err != nil {
return err
}
opts = queue.QueueOptions{
Type: "redis",
Redis: rC,
Factory: qFn,
}
}

if cfg.Queue.Type == config.InMemoryQueueProvider {
lS, qFn, err = memqueue.NewClient(cfg)
if err != nil {
return err
}
opts = queue.QueueOptions{
Type: "in-memory",
Storage: lS,
Factory: qFn,
}
}

if util.IsStringEmpty(string(cfg.GroupConfig.Signature.Header)) {
Expand All @@ -125,8 +145,8 @@ func main() {
app.applicationRepo = db.AppRepo()
app.eventDeliveryRepo = db.EventDeliveryRepo()

app.eventQueue = convoyRedis.NewQueue(rC, qFn, "EventQueue")
app.deadLetterQueue = convoyRedis.NewQueue(rC, qFn, "DeadLetterQueue")
app.eventQueue = NewQueue(opts, "EventQueue")
app.deadLetterQueue = NewQueue(opts, "DeadLetterQueue")

err = ensureDefaultGroup(context.Background(), cfg, app)
if err != nil {
Expand Down Expand Up @@ -173,6 +193,23 @@ func main() {
}
}

func NewQueue(opts queue.QueueOptions, name string) queue.Queuer {
optsType := opts.Type
var convoyQueue queue.Queuer
switch optsType {
case "in-memory":
opts.Name = name
convoyQueue = memqueue.NewQueue(opts)

case "redis":
opts.Name = name
convoyQueue = redisqueue.NewQueue(opts)
default:
log.Errorf("Invalid queue type: %v", optsType)
}
return convoyQueue
}

func ensureDefaultGroup(ctx context.Context, cfg config.Configuration, a *app) error {
var filter *datastore.GroupFilter
var groups []*datastore.Group
Expand Down
12 changes: 6 additions & 6 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/frain-dev/convoy/worker/task"

"github.com/frain-dev/convoy/config"
convoyQueue "github.com/frain-dev/convoy/queue/redis"
"github.com/frain-dev/convoy/server"
"github.com/frain-dev/convoy/util"
"github.com/frain-dev/convoy/worker"
Expand Down Expand Up @@ -55,12 +54,13 @@ func addServerCommand(a *app) *cobra.Command {
}

// register workers.
if queue, ok := a.eventQueue.(*convoyQueue.RedisQueue); ok {
worker.NewProducer(queue).Start()
}
producer := worker.NewProducer(a.eventQueue)

cleaner := worker.NewCleaner(a.deadLetterQueue)

if queue, ok := a.deadLetterQueue.(*convoyQueue.RedisQueue); ok {
worker.NewCleaner(queue).Start()
if cfg.Queue.Type != config.InMemoryQueueProvider {
producer.Start()
cleaner.Start()
}

log.Infof("Started convoy server in %s", time.Since(start))
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ const (

const (
RedisQueueProvider QueueProvider = "redis"
InMemoryQueueProvider QueueProvider = "in-memory"
DefaultStrategyProvider StrategyProvider = "default"
DefaultSignatureHeader SignatureHeaderProvider = "X-Convoy-Signature"
)
Expand Down Expand Up @@ -270,6 +271,10 @@ func ensureQueueConfig(queueCfg QueueConfiguration) error {
if queueCfg.Redis.DSN == "" {
return errors.New("redis queue dsn is empty")
}

case InMemoryQueueProvider:
return nil

default:
return fmt.Errorf("unsupported queue type: %s", queueCfg.Type)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.0
github.com/hashicorp/golang-lru v0.5.4
github.com/jarcoal/httpmock v1.0.8
github.com/kelseyhightower/envconfig v1.4.0
github.com/mattn/go-colorable v0.1.11 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
Expand Down
19 changes: 17 additions & 2 deletions mocks/queue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions queue/memqueue/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package memqueue

import (
"context"
"errors"
"time"

"github.com/frain-dev/convoy"
"github.com/frain-dev/convoy/config"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/queue"
"github.com/vmihailenco/taskq/v3"
"github.com/vmihailenco/taskq/v3/memqueue"
)

type MemQueue struct {
Name string
queue *memqueue.Queue
inner taskq.Factory
closeChan chan struct{}
}

func NewClient(cfg config.Configuration) (queue.Storage, taskq.Factory, error) {
if cfg.Queue.Type != config.InMemoryQueueProvider {
return nil, nil, errors.New("please select the in-memory queue in your config")
}

qFn := memqueue.NewFactory()

storage := queue.NewLocalStorage()

return storage, qFn, nil
}

func NewQueue(opts queue.QueueOptions) queue.Queuer {
q := opts.Factory.RegisterQueue(&taskq.QueueOptions{
Name: opts.Name,
Storage: opts.Storage,
})

return &MemQueue{
Name: opts.Name,
inner: opts.Factory,
queue: q.(*memqueue.Queue),
}
}

func (q *MemQueue) Close() error {
q.closeChan <- struct{}{}
return q.inner.Close()
}

func (q *MemQueue) Write(ctx context.Context, name convoy.TaskName, e *datastore.EventDelivery, delay time.Duration) error {
job := &queue.Job{
ID: e.UID,
}

m := &taskq.Message{
Ctx: ctx,
TaskName: string(name),
Args: []interface{}{job},
Delay: delay,
}

err := q.queue.Add(m)
if err != nil {
return err
}

return nil
}

func (q *MemQueue) Consumer() taskq.QueueConsumer {
return q.queue.Consumer()
}
88 changes: 88 additions & 0 deletions queue/memqueue/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//go:build integration
// +build integration

package memqueue

import (
"context"
"fmt"
"testing"

"github.com/frain-dev/convoy"
"github.com/frain-dev/convoy/config"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/queue"
"github.com/google/uuid"
"github.com/vmihailenco/taskq/v3"
)

func TestMemqueueQueue(t *testing.T) {
for scenario, fn := range map[string]func(t *testing.T){
"memqueue queue write": testWritetoQueue,
} {
t.Run(scenario, func(t *testing.T) {
fn(t)
})
}
}

func testWritetoQueue(t *testing.T) {
configfile := "../testdata/convoy_memqueue.json"

appID := uuid.NewString()
eventID := uuid.NewString()
eventDeliveryID := uuid.NewString()

eventDelivery := &datastore.EventDelivery{
UID: eventDeliveryID,
EventMetadata: &datastore.EventMetadata{
UID: eventID,
},
Status: datastore.SuccessEventStatus,
AppMetadata: &datastore.AppMetadata{
UID: appID,
},
}
taskName := convoy.TaskName(uuid.NewString())
configFile := configfile
err := config.LoadConfig(configFile, new(config.Configuration))
if err != nil {
t.Fatalf("Failed to load config file: %v", err)
}
cfg, err := config.Get()
if err != nil {
t.Fatalf("Failed to get config")

}

var qFn taskq.Factory
var lS queue.Storage
var opts queue.QueueOptions

lS, qFn, err = NewClient(cfg)
if err != nil {
t.Fatalf("Failed to load new client")
}
opts = queue.QueueOptions{
Name: uuid.NewString(),
Type: "in-memory",
Storage: lS,
Factory: qFn,
}

eventQueue := NewQueue(opts)

err = eventQueue.Write(context.TODO(), taskName, eventDelivery, 0)
if err != nil {
t.Fatalf("Failed to get queue length")
}
queueLength, err := eventQueue.Consumer().Queue().Len()

if err != nil {
t.Fatalf("Failed to get queue length")
}
if fmt.Sprint(queueLength) != "1" {
t.Fatalf("Length = %q, Want: %v", queueLength, 1)

}
}
Loading