Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #510 from BuxOrg/refactor-411-taskmanager-simplifi…
Browse files Browse the repository at this point in the history
…cation

refactor(BUX-411): taskmanager simplification & tasq with redis fixes
  • Loading branch information
mergify[bot] authored Dec 20, 2023
2 parents 75ba0e0 + 12a41f8 commit 61b5f56
Show file tree
Hide file tree
Showing 52 changed files with 674 additions and 987 deletions.
2 changes: 1 addition & 1 deletion action_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func BenchmarkAction_Transaction_newTransaction(b *testing.B) {

func initBenchmarkData(b *testing.B) (context.Context, ClientInterface, *Xpub, *TransactionConfig, error) {
ctx, client, _ := CreateBenchmarkSQLiteClient(b, false, true,
WithCustomTaskManager(&taskManagerMockBase{}),
withTaskManagerMockup(),
WithFreeCache(),
WithIUCDisabled(),
)
Expand Down
15 changes: 9 additions & 6 deletions bux_suite_mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ type taskManagerMockBase struct{}

func (tm *taskManagerMockBase) Info(context.Context, string, ...interface{}) {}

func (tm *taskManagerMockBase) RegisterTask(*taskmanager.Task) error {
func (tm *taskManagerMockBase) RegisterTask(string, interface{}) error {
return nil
}

func (tm *taskManagerMockBase) ResetCron() {}

func (tm *taskManagerMockBase) RunTask(context.Context, *taskmanager.TaskOptions) error {
func (tm *taskManagerMockBase) RunTask(context.Context, *taskmanager.TaskRunOptions) error {
return nil
}

Expand All @@ -32,10 +32,6 @@ func (tm *taskManagerMockBase) Close(context.Context) error {

func (tm *taskManagerMockBase) Debug(bool) {}

func (tm *taskManagerMockBase) Engine() taskmanager.Engine {
return taskmanager.Empty
}

func (tm *taskManagerMockBase) Factory() taskmanager.Factory {
return taskmanager.FactoryEmpty
}
Expand All @@ -55,3 +51,10 @@ func (tm *taskManagerMockBase) IsNewRelicEnabled() bool {
func (tm *taskManagerMockBase) CronJobsInit(cronJobsMap taskmanager.CronJobs) error {
return nil
}

// Sets custom task manager only for testing
func withTaskManagerMockup() ClientOps {
return func(c *clientOptions) {
c.taskManager.TaskEngine = &taskManagerMockBase{}
}
}
13 changes: 5 additions & 8 deletions bux_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func (ts *EmbeddedDBTestSuite) serveMySQL() {

// SetupSuite runs at the start of the suite
func (ts *EmbeddedDBTestSuite) SetupSuite() {

var err error

// Create the MySQL server
Expand Down Expand Up @@ -120,7 +119,6 @@ func (ts *EmbeddedDBTestSuite) SetupSuite() {

// TearDownSuite runs after the suite finishes
func (ts *EmbeddedDBTestSuite) TearDownSuite() {

// Stop the Mongo server
if ts.MongoServer != nil {
ts.MongoServer.Stop()
Expand Down Expand Up @@ -157,8 +155,8 @@ func (ts *EmbeddedDBTestSuite) TearDownTest() {
//
// NOTE: you need to close the client: ts.Close()
func (ts *EmbeddedDBTestSuite) createTestClient(ctx context.Context, database datastore.Engine,
tablePrefix string, mockDB, mockRedis bool, opts ...ClientOps) (*TestingClient, error) {

tablePrefix string, mockDB, mockRedis bool, opts ...ClientOps,
) (*TestingClient, error) {
var err error

// Start the suite
Expand Down Expand Up @@ -201,7 +199,6 @@ func (ts *EmbeddedDBTestSuite) createTestClient(ctx context.Context, database da
}

} else {

// Load the in-memory version of the database
if database == datastore.SQLite {
opts = append(opts, WithSQLite(&datastore.SQLiteConfig{
Expand Down Expand Up @@ -329,9 +326,9 @@ func (ts *EmbeddedDBTestSuite) genericDBClient(t *testing.T, database datastore.
WithAutoMigrate(&PaymailAddress{}),
)
if taskManagerEnabled {
opts = append(opts, WithTaskQ(taskmanager.DefaultTaskQConfig(prefix+"_queue"), taskmanager.FactoryMemory))
opts = append(opts, WithTaskqConfig(taskmanager.DefaultTaskQConfig(prefix+"_queue")))
} else {
opts = append(opts, WithCustomTaskManager(&taskManagerMockBase{}))
opts = append(opts, withTaskManagerMockup())
}

tc, err := ts.createTestClient(
Expand All @@ -358,7 +355,7 @@ func (ts *EmbeddedDBTestSuite) genericMockedDBClient(t *testing.T, database data
),
database, prefix,
true, true, WithDebugging(),
WithCustomTaskManager(&taskManagerMockBase{}),
withTaskManagerMockup(),
)
require.NoError(t, err)
require.NotNil(t, tc)
Expand Down
10 changes: 5 additions & 5 deletions bux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func DefaultClientOpts(debug, shared bool) []ClientOps {
opts := make([]ClientOps, 0)
opts = append(
opts,
WithTaskQ(tqc, taskmanager.FactoryMemory),
WithTaskqConfig(tqc),
WithSQLite(tester.SQLiteTestConfig(debug, shared)),
WithChainstateOptions(false, false, false, false),
WithMinercraft(&chainstate.MinerCraftBase{}),
Expand Down Expand Up @@ -150,8 +150,8 @@ func (a *account) Unlocker(context.Context, *bscript.Script) (bt.Unlocker, error

// CreateFakeFundingTransaction will create a valid (fake) transaction for funding
func CreateFakeFundingTransaction(t *testing.T, masterKey *bip32.ExtendedKey,
destinations []*Destination, satoshis uint64) string {

destinations []*Destination, satoshis uint64,
) string {
// Create new tx
rawTx := bt.NewTx()
txErr := rawTx.From(testTxScriptSigID, 0, testTxScriptSigOut, satoshis+354)
Expand Down Expand Up @@ -185,8 +185,8 @@ func CreateFakeFundingTransaction(t *testing.T, masterKey *bip32.ExtendedKey,

// CreateNewXPub will create a new xPub and return all the information to use the xPub
func CreateNewXPub(ctx context.Context, t *testing.T, buxClient ClientInterface,
opts ...ModelOps) (*bip32.ExtendedKey, *Xpub, string) {

opts ...ModelOps,
) (*bip32.ExtendedKey, *Xpub, string) {
// Generate a key pair
masterKey, err := bitcoin.GenerateHDKey(bitcoin.SecureSeedLength)
require.NoError(t, err)
Expand Down
21 changes: 8 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ type (

// taskManagerOptions holds the configuration for taskmanager
taskManagerOptions struct {
taskmanager.ClientInterface // Client for TaskManager
cronJobs taskmanager.CronJobs // List of cron jobs
options []taskmanager.ClientOps // List of options
cronCustomPeriods map[string]time.Duration // will override the default period of cronJob
taskmanager.TaskEngine // Client for TaskManager
cronJobs taskmanager.CronJobs // List of cron jobs
options []taskmanager.TaskManagerOptions // List of options
cronCustomPeriods map[string]time.Duration // will override the default period of cronJob
}
)

Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *Client) Close(ctx context.Context) error {
if err := tm.Close(ctx); err != nil {
return err
}
c.options.taskManager.ClientInterface = nil
c.options.taskManager.TaskEngine = nil
}
return nil
}
Expand Down Expand Up @@ -340,11 +340,6 @@ func (c *Client) Debug(on bool) {
if n := c.Notifications(); n != nil {
n.Debug(on)
}

// Set debugging on the Taskmanager
if tm := c.Taskmanager(); tm != nil {
tm.Debug(on)
}
}

// DefaultSyncConfig will return the default sync config from the client defaults (for chainstate)
Expand Down Expand Up @@ -445,9 +440,9 @@ func (c *Client) SetNotificationsClient(client notifications.ClientInterface) {
}

// Taskmanager will return the Taskmanager if it exists
func (c *Client) Taskmanager() taskmanager.ClientInterface {
if c.options.taskManager != nil && c.options.taskManager.ClientInterface != nil {
return c.options.taskManager.ClientInterface
func (c *Client) Taskmanager() taskmanager.TaskEngine {
if c.options.taskManager != nil && c.options.taskManager.TaskEngine != nil {
return c.options.taskManager.TaskEngine
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions client_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func (c *Client) loadPaymailClient() (err error) {
// loadTaskmanager will load the TaskManager and start the TaskManager client
func (c *Client) loadTaskmanager(ctx context.Context) (err error) {
// Load if a custom interface was NOT provided
if c.options.taskManager.ClientInterface == nil {
c.options.taskManager.ClientInterface, err = taskmanager.NewClient(
if c.options.taskManager.TaskEngine == nil {
c.options.taskManager.TaskEngine, err = taskmanager.NewTaskManager(
ctx, c.options.taskManager.options...,
)
}
Expand Down
48 changes: 4 additions & 44 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func defaultClientOptions() *clientOptions {

// Blank TaskManager config
taskManager: &taskManagerOptions{
ClientInterface: nil,
TaskEngine: nil,
cronCustomPeriods: map[string]time.Duration{},
},

Expand Down Expand Up @@ -215,7 +215,6 @@ func WithDebugging() ClientOps {
c.chainstate.options = append(c.chainstate.options, chainstate.WithDebugging())
c.dataStore.options = append(c.dataStore.options, datastore.WithDebugging())
c.notifications.options = append(c.notifications.options, notifications.WithDebugging())
c.taskManager.options = append(c.taskManager.options, taskmanager.WithDebugging())
}
}

Expand Down Expand Up @@ -532,57 +531,18 @@ func WithPaymailServerConfig(config *server.Configuration, defaultFromPaymail, d
// TASK MANAGER
// -----------------------------------------------------------------

// WithCustomTaskManager will set the taskmanager
func WithCustomTaskManager(taskManager taskmanager.ClientInterface) ClientOps {
return func(c *clientOptions) {
if taskManager != nil {
c.taskManager.ClientInterface = taskManager
}
}
}

// WithTaskQ will set the task manager to use TaskQ & in-memory
func WithTaskQ(config *taskq.QueueOptions, factory taskmanager.Factory) ClientOps {
return func(c *clientOptions) {
if config != nil {
c.taskManager.options = append(
c.taskManager.options,
taskmanager.WithTaskQ(config, factory),
)
}
}
}

// WithTaskQUsingRedis will set the task manager to use TaskQ & Redis
func WithTaskQUsingRedis(config *taskq.QueueOptions, redisOptions *redis.Options) ClientOps {
// WithTaskqConfig will set the task manager to use TaskQ & in-memory
func WithTaskqConfig(config *taskq.QueueOptions) ClientOps {
return func(c *clientOptions) {
if config != nil {

// Create a new redis client
if config.Redis == nil {

// Remove prefix if found
redisOptions.Addr = strings.Replace(redisOptions.Addr, cachestore.RedisPrefix, "", -1)
config.Redis = redis.NewClient(redisOptions)
}

c.taskManager.options = append(
c.taskManager.options,
taskmanager.WithTaskQ(config, taskmanager.FactoryRedis),
taskmanager.WithTaskqConfig(config),
)
}
}
}

// WithCronService will set the custom cron service provider
func WithCronService(cronService taskmanager.CronService) ClientOps {
return func(c *clientOptions) {
if cronService != nil && c.taskManager != nil {
c.taskManager.options = append(c.taskManager.options, taskmanager.WithCronService(cronService))
}
}
}

// WithCronCustmPeriod will set the custom cron jobs period which will override the default
func WithCronCustmPeriod(cronJobName string, period time.Duration) ClientOps {
return func(c *clientOptions) {
Expand Down
26 changes: 9 additions & 17 deletions client_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/BuxOrg/bux/utils"
"github.com/bitcoin-sv/go-paymail"
"github.com/coocood/freecache"
"github.com/go-redis/redis/v8"
"github.com/mrz1836/go-cachestore"
"github.com/mrz1836/go-datastore"
"github.com/newrelic/go-agent/v3/newrelic"
Expand Down Expand Up @@ -212,7 +211,6 @@ func TestWithDebugging(t *testing.T) {
assert.Equal(t, true, tc.IsDebug())
assert.Equal(t, true, tc.Cachestore().IsDebug())
assert.Equal(t, true, tc.Datastore().IsDebug())
assert.Equal(t, true, tc.Taskmanager().IsDebug())
})
}

Expand Down Expand Up @@ -265,7 +263,7 @@ func TestWithRedis(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedis(&cachestore.RedisConfig{
URL: cachestore.RedisPrefix + "localhost:6379",
}),
Expand All @@ -288,7 +286,7 @@ func TestWithRedis(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedis(&cachestore.RedisConfig{
URL: "localhost:6379",
}),
Expand All @@ -315,7 +313,7 @@ func TestWithRedisConnection(t *testing.T) {
t.Run("using a nil connection", func(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedisConnection(nil),
WithSQLite(tester.SQLiteTestConfig(false, true)),
WithMinercraft(&chainstate.MinerCraftBase{}),
Expand All @@ -336,7 +334,7 @@ func TestWithRedisConnection(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQ(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix())),
WithRedisConnection(client),
WithSQLite(tester.SQLiteTestConfig(false, true)),
WithMinercraft(&chainstate.MinerCraftBase{}),
Expand Down Expand Up @@ -364,7 +362,7 @@ func TestWithFreeCache(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithFreeCache(),
WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithMinercraft(&chainstate.MinerCraftBase{}))
require.NoError(t, err)
Expand Down Expand Up @@ -392,7 +390,7 @@ func TestWithFreeCacheConnection(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithFreeCacheConnection(nil),
WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithMinercraft(&chainstate.MinerCraftBase{}),
WithLogger(&logger),
Expand All @@ -413,7 +411,7 @@ func TestWithFreeCacheConnection(t *testing.T) {
tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithFreeCacheConnection(fc),
WithTaskQ(taskmanager.DefaultTaskQConfig(testQueueName), taskmanager.FactoryMemory),
WithTaskqConfig(taskmanager.DefaultTaskQConfig(testQueueName)),
WithSQLite(&datastore.SQLiteConfig{Shared: true}),
WithMinercraft(&chainstate.MinerCraftBase{}),
WithLogger(&logger),
Expand Down Expand Up @@ -470,7 +468,6 @@ func TestWithTaskQ(t *testing.T) {
// todo: test cases where config is nil, or cannot load TaskQ

t.Run("using taskq using memory", func(t *testing.T) {

logger := zerolog.Nop()
tcOpts := DefaultClientOpts(true, true)
tcOpts = append(tcOpts, WithLogger(&logger))
Expand All @@ -485,7 +482,6 @@ func TestWithTaskQ(t *testing.T) {

tm := tc.Taskmanager()
require.NotNil(t, tm)
assert.Equal(t, taskmanager.TaskQ, tm.Engine())
assert.Equal(t, taskmanager.FactoryMemory, tm.Factory())
})

Expand All @@ -498,11 +494,8 @@ func TestWithTaskQ(t *testing.T) {

tc, err := NewClient(
tester.GetNewRelicCtx(t, defaultNewRelicApp, defaultNewRelicTx),
WithTaskQUsingRedis(
taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix()),
&redis.Options{
Addr: "localhost:6379",
},
WithTaskqConfig(
taskmanager.DefaultTaskQConfig(tester.RandomTablePrefix(), taskmanager.WithRedis("localhost:6379")),
),
WithRedis(&cachestore.RedisConfig{
URL: cachestore.RedisPrefix + "localhost:6379",
Expand All @@ -517,7 +510,6 @@ func TestWithTaskQ(t *testing.T) {

tm := tc.Taskmanager()
require.NotNil(t, tm)
assert.Equal(t, taskmanager.TaskQ, tm.Engine())
assert.Equal(t, taskmanager.FactoryRedis, tm.Factory())
})
}
Expand Down
1 change: 0 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestClient_Debug(t *testing.T) {
assert.Equal(t, true, tc.Cachestore().IsDebug())
assert.Equal(t, true, tc.Datastore().IsDebug())
assert.Equal(t, true, tc.Notifications().IsDebug())
assert.Equal(t, true, tc.Taskmanager().IsDebug())
})
}

Expand Down
Loading

0 comments on commit 61b5f56

Please sign in to comment.