Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

👌 feat(message:deduplication) implementing the feature #33 #229

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ const (
REPLY Semantics = "reply"
)

type DeduplicationPolicy int

const (
DeduplicationPolicyNone DeduplicationPolicy = iota
DeduplicationPolicyReject
DeduplicationPolicyAck
)

//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
type Bus interface {
HandlerRegister
Expand Down Expand Up @@ -213,6 +221,8 @@ type Builder interface {

//WithLogger set custom logger instance
WithLogger(logger logrus.FieldLogger) Builder

WithDeduplicationPolicy(method DeduplicationPolicy, age time.Duration) Builder
}

//Invocation context for a specific processed message
Expand Down
61 changes: 40 additions & 21 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,35 @@ import (
"sync"
"time"

"emperror.dev/errors"
"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/deduplicator/implementation"
"github.com/wework/grabbit/gbus/saga"
"github.com/wework/grabbit/gbus/serialization"
"github.com/wework/grabbit/gbus/tx/mysql"
)

type defaultBuilder struct {
PrefetchCount uint
connStr string
purgeOnStartup bool
sagaStoreConnStr string
txnl bool
txConnStr string
txnlProvider string
workerNum uint
serializer gbus.Serializer
dlx string
defaultPolicies []gbus.MessagePolicy
confirm bool
dbPingTimeout time.Duration
usingPingTimeout bool
logger logrus.FieldLogger
busCfg gbus.BusConfiguration
PrefetchCount uint
connStr string
purgeOnStartup bool
sagaStoreConnStr string
txnl bool
txConnStr string
txnlProvider string
workerNum uint
serializer gbus.Serializer
dlx string
defaultPolicies []gbus.MessagePolicy
confirm bool
dbPingTimeout time.Duration
usingPingTimeout bool
logger logrus.FieldLogger
busCfg gbus.BusConfiguration
deduplicationPolicy gbus.DeduplicationPolicy
deduplicationRetentionAge time.Duration
}

func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
Expand All @@ -53,6 +57,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
DefaultPolicies: builder.defaultPolicies,
DbPingTimeout: 3,
Confirm: builder.confirm,
DeduplicationPolicy: builder.deduplicationPolicy,
}

var finalLogger logrus.FieldLogger
Expand Down Expand Up @@ -107,6 +112,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
if builder.usingPingTimeout {
gb.DbPingTimeout = builder.dbPingTimeout
}
gb.Deduplicator = implementation.NewDeduplicator(svcName, builder.deduplicationPolicy, gb.TxProvider, builder.deduplicationRetentionAge, gb.Log())
vladshub marked this conversation as resolved.
Show resolved Hide resolved
vladshub marked this conversation as resolved.
Show resolved Hide resolved

//TODO move this into the NewSagaStore factory methods
if builder.purgeOnStartup {
Expand All @@ -115,6 +121,11 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err)
panic(errMsg)
}
err = gb.Deduplicator.Purge()
if err != nil {
errMsg := errors.NewWithDetails("duplicator failed to purge", "component", "grabbit", "feature", "deduplicator")
panic(errMsg)
}
}
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
glue.SetLogger(gb.Log())
Expand Down Expand Up @@ -206,6 +217,12 @@ func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builde
return builder
}

func (builder *defaultBuilder) WithDeduplicationPolicy(policy gbus.DeduplicationPolicy, age time.Duration) gbus.Builder {
builder.deduplicationPolicy = policy
builder.deduplicationRetentionAge = age
return builder
}

//New :)
func New() Nu {
return Nu{}
Expand All @@ -218,9 +235,11 @@ type Nu struct {
//Bus inits a new BusBuilder
func (Nu) Bus(brokerConnStr string) gbus.Builder {
return &defaultBuilder{
busCfg: gbus.BusConfiguration{},
PrefetchCount: 1,
connStr: brokerConnStr,
serializer: serialization.NewGobSerializer(),
defaultPolicies: make([]gbus.MessagePolicy, 0)}
busCfg: gbus.BusConfiguration{},
PrefetchCount: 1,
connStr: brokerConnStr,
serializer: serialization.NewGobSerializer(),
defaultPolicies: make([]gbus.MessagePolicy, 0),
deduplicationPolicy: gbus.DeduplicationPolicyNone,
}
}
28 changes: 18 additions & 10 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/wework/grabbit/gbus/deduplicator"
"github.com/wework/grabbit/gbus/metrics"

"github.com/opentracing-contrib/go-amqp/amqptracer"
Expand Down Expand Up @@ -56,15 +57,17 @@ type DefaultBus struct {
Glue SagaGlue
TxProvider TxProvider

WorkerNum uint
Serializer Serializer
DLX string
DefaultPolicies []MessagePolicy
Confirm bool
healthChan chan error
backpressure bool
DbPingTimeout time.Duration
amqpConnected bool
WorkerNum uint
Serializer Serializer
DLX string
DeduplicationPolicy DeduplicationPolicy
Deduplicator deduplicator.Store
DefaultPolicies []MessagePolicy
Confirm bool
healthChan chan error
backpressure bool
DbPingTimeout time.Duration
amqpConnected bool
}

var (
Expand Down Expand Up @@ -222,6 +225,8 @@ func (b *DefaultBus) Start() error {
return startErr
}

b.Deduplicator.Start()

//declare queue
var q amqp.Queue
if q, e = b.createServiceQueue(); e != nil {
Expand Down Expand Up @@ -294,7 +299,10 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
registrations: b.Registrations,
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors}
amqpErrors: b.amqpErrors,
delicatePolicy: b.DeduplicationPolicy,
duplicateStore: b.Deduplicator,
}

err := w.Start()
if err != nil {
Expand Down
138 changes: 138 additions & 0 deletions gbus/deduplicator/implementation/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package implementation

import (
"database/sql"
"time"

"emperror.dev/errors"
"github.com/sirupsen/logrus"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/deduplicator"
"github.com/wework/grabbit/gbus/tx"
)

var _ deduplicator.Store = &deduper{}

type deduper struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking, the struct is named deduper which is the main struct here yet the file name is tx.go

*gbus.Glogged
svcName string
policy gbus.DeduplicationPolicy
txProvider gbus.TxProvider
age time.Duration
ticker *time.Ticker
done chan bool
tableName string
}

func (d *deduper) Purge() (err error) {
truncateSQL := "TRUNCATE TABLE " + d.tableName
rhinof marked this conversation as resolved.
Show resolved Hide resolved
txp, err := d.txProvider.New()
if err != nil {
return err
}
defer func() {
if err != nil {
serr := txp.Rollback()
err = errors.Append(err, serr)
}
err = txp.Commit()
}()
_, err = txp.Exec(truncateSQL)
if err != nil {
return err
}
return nil
}

func (d *deduper) Start() {
d.ticker = time.NewTicker(time.Minute)
d.done = make(chan bool)
deleteQuery := "DELETE FROM " + d.tableName + " WHERE `created_at` < ?"
go func() {
for {
select {
case <-d.done:
return
case <-d.ticker.C:
oldest := time.Now().Add(-1 * d.age)
tx, err := d.txProvider.New()
if err != nil {
d.Log().WithError(err).Error("failed to acquire a tx")
continue
}
result, err := tx.Exec(deleteQuery, oldest)
if err != nil && err != sql.ErrNoRows {
d.Log().WithError(err).Error("failed executing delete query")
vladshub marked this conversation as resolved.
Show resolved Hide resolved
}
n, err := result.RowsAffected()
if err != nil {
d.Log().WithError(err).Error("failed to get count of affected rows")
} else {
d.Log().WithField("table_name", d.tableName).WithField("rows_deleted", n).
Info("successfully cleanup duplicates table")
}
}
}
}()
}

func (d *deduper) Stop() {
d.Log().Info("shutting down deduplicator")
d.ticker.Stop()
close(d.done)
}

//
func (d *deduper) StoreMessageID(tx *sql.Tx, id string) error {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you consider to store the message in another storage mechanism? like redis etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to introduce another infrastructure dependency but we can have different storages that would implement the same interface.

insertSQL := "INSERT INTO " + d.tableName + " (id) values (?)"
_, err := tx.Exec(insertSQL, id)
if err != nil {
d.Log().WithError(err).Error("failed to insert the id of the message into the dedup table")
return err
}
return nil
}

// MessageExists checks if a message id is in the deduplication table and returns an error if it fails
func (d *deduper) MessageExists(id string) (bool, error) {
if d.policy == gbus.DeduplicationPolicyNone {
return false, nil
}
tx, err := d.txProvider.New()
if err != nil {
return true, err
}
defer func() {
err = tx.Rollback()
if err != nil {
d.Log().WithError(err).Error("could not commit tx for query MessageExists")
}
}()
selectSQL := "SELECT EXISTS (SELECT id FROM " + d.tableName + " WHERE id = ? limit 1)"

var exists bool
err = tx.QueryRow(selectSQL, id).Scan(&exists)
if err != nil && err == sql.ErrNoRows {
return false, nil
}

if err != nil {
return true, err
}

return exists, nil
}

func NewDeduplicator(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration, logger logrus.FieldLogger) deduplicator.Store {
d := &deduper{
svcName: svcName,
policy: policy,
txProvider: txProvider,
age: age,
tableName: tx.GrabbitTableNameTemplate(svcName, "duplicates"),
}
l := logger.WithField("grabbit", "deduplicator")
d.SetLogger(l)
return d
}
14 changes: 14 additions & 0 deletions gbus/deduplicator/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package deduplicator

import (
"database/sql"
)

// Store abstracts the way deduplicateor manages the
type Store interface {
vladshub marked this conversation as resolved.
Show resolved Hide resolved
StoreMessageID(tx *sql.Tx, id string) error
MessageExists(id string) (bool, error)
Purge() error
Start()
Stop()
}
Loading