Skip to content

Commit

Permalink
node discoverer
Browse files Browse the repository at this point in the history
  • Loading branch information
bbrodriges committed Nov 11, 2024
1 parent 0d2588f commit 983cd40
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 59 deletions.
40 changes: 28 additions & 12 deletions checked_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@ package hasql
import (
"context"
"errors"
"fmt"
"slices"

Check failure on line 23 in checked_node.go

View workflow job for this annotation

GitHub Actions / test (1.19.x, ubuntu-latest)

package slices is not in GOROOT (/opt/hostedtoolcache/go/1.19.13/x64/src/slices)

Check failure on line 23 in checked_node.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

package slices is not in GOROOT (/opt/hostedtoolcache/go/1.20.14/x64/src/slices)
"sync"
)

// CheckedNodes holds references to any cluster node which state has been checked
// CheckedNodes holds references to all available cluster nodes
type CheckedNodes[T Querier] struct {
alive []CheckedNode[T]
primaries []CheckedNode[T]
standbys []CheckedNode[T]
err error
discovered []*Node[T]
alive []CheckedNode[T]
primaries []CheckedNode[T]
standbys []CheckedNode[T]
err error
}

// Discovered returns a list of nodes discovered in cluster
func (c CheckedNodes[T]) Discovered() []*Node[T] {
return c.discovered
}

// Alive returns a list of all successfully checked nodes irregarding their cluster role
Expand Down Expand Up @@ -58,14 +65,22 @@ type CheckedNode[T Querier] struct {
}

// checkNodes takes slice of nodes, checks them in parallel and returns the alive ones
func checkNodes[T Querier](ctx context.Context, nodes []*Node[T], checkFn NodeChecker, compareFn func(a, b CheckedNode[T]) int, tracer Tracer[T]) CheckedNodes[T] {
func checkNodes[T Querier](ctx context.Context, discoverer NodeDiscoverer[T], checkFn NodeChecker, compareFn func(a, b CheckedNode[T]) int, tracer Tracer[T]) CheckedNodes[T] {
discoveredNodes, err := discoverer.DiscoverNodes(ctx)
if err != nil {
// error discovering nodes
return CheckedNodes[T]{
err: fmt.Errorf("cannot discover cluster nodes: %w", err),
}
}

var mu sync.Mutex
checked := make([]CheckedNode[T], 0, len(nodes))
checked := make([]CheckedNode[T], 0, len(discoveredNodes))
var errs NodeCheckErrors[T]

var wg sync.WaitGroup
wg.Add(len(nodes))
for _, node := range nodes {
wg.Add(len(discoveredNodes))
for _, node := range discoveredNodes {
go func(node *Node[T]) {
defer wg.Done()

Expand Down Expand Up @@ -129,9 +144,10 @@ func checkNodes[T Querier](ctx context.Context, nodes []*Node[T], checkFn NodeCh
}

res := CheckedNodes[T]{
alive: checked,
primaries: make([]CheckedNode[T], 0, 1),
standbys: make([]CheckedNode[T], 0, len(checked)),
discovered: discoveredNodes,
alive: checked,
primaries: make([]CheckedNode[T], 0, 1),
standbys: make([]CheckedNode[T], 0, len(checked)),
err: func() error {
if len(errs) != 0 {
return errs
Expand Down
37 changes: 12 additions & 25 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package hasql
import (
"context"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
Expand All @@ -31,36 +30,24 @@ type Cluster[T Querier] struct {
// configuration
updateInterval time.Duration
updateTimeout time.Duration
discoverer NodeDiscoverer[T]
checker NodeChecker
picker NodePicker[T]
tracer Tracer[T]

// status
registeredNodes []*Node[T]
checkedNodes atomic.Value
stop context.CancelFunc
checkedNodes atomic.Value
stop context.CancelFunc

// broadcast
subscribersMu sync.Mutex
subscribers []updateSubscriber[T]
}

// NewCluster returns object representing a single 'cluster' of SQL databases
func NewCluster[T Querier](nodes []*Node[T], checker NodeChecker, opts ...ClusterOpt[T]) (*Cluster[T], error) {
// validate nodes
if len(nodes) == 0 {
return nil, errors.New("no nodes provided")
}

var zero T
for i, node := range nodes {
nodeName := node.String()
if nodeName == "" {
return nil, fmt.Errorf("node %d has no name", i)
}
if any(node.DB()) == any(zero) {
return nil, fmt.Errorf("node %d (%s) has inoperable SQL client", i, nodeName)
}
func NewCluster[T Querier](discoverer NodeDiscoverer[T], checker NodeChecker, opts ...ClusterOpt[T]) (*Cluster[T], error) {
if discoverer == nil {
return nil, errors.New("node discoverer required")
}

// prepare internal 'stop' context
Expand All @@ -69,12 +56,12 @@ func NewCluster[T Querier](nodes []*Node[T], checker NodeChecker, opts ...Cluste
cl := &Cluster[T]{
updateInterval: 5 * time.Second,
updateTimeout: time.Second,
discoverer: discoverer,
checker: checker,
picker: new(RandomNodePicker[T]),
tracer: BaseTracer[T]{},

stop: stopFn,
registeredNodes: nodes,
stop: stopFn,
}

// apply options
Expand All @@ -93,12 +80,12 @@ func NewCluster[T Querier](nodes []*Node[T], checker NodeChecker, opts ...Cluste
// Close stops node updates.
// Close function must be called when cluster is not needed anymore.
// It returns combined error if multiple nodes returned errors
func (cl *Cluster[T]) Close() error {
func (cl *Cluster[T]) Close() (err error) {
cl.stop()

// close all nodes underlying connection pools
var err error
for _, node := range cl.registeredNodes {
discovered := cl.checkedNodes.Load().(CheckedNodes[T]).discovered
for _, node := range discovered {
if closer, ok := any(node.DB()).(io.Closer); ok {
err = errors.Join(err, closer.Close())
}
Expand Down Expand Up @@ -193,7 +180,7 @@ func (cl *Cluster[T]) updateNodes(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, cl.updateTimeout)
defer cancel()

checked := checkNodes(ctx, cl.registeredNodes, cl.checker, cl.picker.CompareNodes, cl.tracer)
checked := checkNodes(ctx, cl.discoverer, cl.checker, cl.picker.CompareNodes, cl.tracer)
cl.checkedNodes.Store(checked)

cl.tracer.UpdatedNodes(checked)
Expand Down
34 changes: 12 additions & 22 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,7 @@ func TestNewCluster(t *testing.T) {
t.Run("no_nodes", func(t *testing.T) {
cl, err := NewCluster[*sql.DB](nil, PostgreSQLChecker)
assert.Nil(t, cl)
assert.EqualError(t, err, "no nodes provided")
})

t.Run("unnamed_node", func(t *testing.T) {
nodes := []*Node[*sql.DB]{
NewNode("", (*sql.DB)(nil)),
}
cl, err := NewCluster(nodes, PostgreSQLChecker)
assert.Nil(t, cl)
assert.EqualError(t, err, "node 0 has no name")
})

t.Run("no_conn_node", func(t *testing.T) {
nodes := []*Node[*sql.DB]{
NewNode("shimba", (*sql.DB)(nil)),
}
cl, err := NewCluster(nodes, PostgreSQLChecker)
assert.Nil(t, cl)
assert.EqualError(t, err, "node 0 (shimba) has inoperable SQL client")
assert.EqualError(t, err, "node discoverer required")
})

t.Run("success", func(t *testing.T) {
Expand All @@ -63,7 +45,7 @@ func TestNewCluster(t *testing.T) {
NewNode("shimba", db),
}

cl, err := NewCluster(nodes, PostgreSQLChecker)
cl, err := NewCluster(NewStaticNodeDiscoverer(nodes), PostgreSQLChecker)
defer func() { require.NoError(t, cl.Close()) }()

assert.NoError(t, err)
Expand All @@ -90,9 +72,13 @@ func TestClusterClose(t *testing.T) {
NewNode("boomba", db2),
}

cl, err := NewCluster(nodes, PostgreSQLChecker)
cl, err := NewCluster(NewStaticNodeDiscoverer(nodes), PostgreSQLChecker)
require.NoError(t, err)

cl.checkedNodes.Store(CheckedNodes[*sql.DB]{
discovered: nodes,
})

assert.NoError(t, cl.Close())
})

Expand All @@ -114,9 +100,13 @@ func TestClusterClose(t *testing.T) {
NewNode("boomba", db2),
}

cl, err := NewCluster(nodes, PostgreSQLChecker)
cl, err := NewCluster(NewStaticNodeDiscoverer(nodes), PostgreSQLChecker)
require.NoError(t, err)

cl.checkedNodes.Store(CheckedNodes[*sql.DB]{
discovered: nodes,
})

err = cl.Close()
assert.ErrorIs(t, err, io.EOF)
assert.ErrorIs(t, err, sql.ErrTxDone)
Expand Down
45 changes: 45 additions & 0 deletions node_discoverer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2020 YANDEX LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package hasql

import (
"context"
"database/sql"
)

// NodeDiscoverer represents a provider of cluster nodes list.
// NodeDiscoverer must node check nodes liveness or role, just return all nodes registered in cluster
type NodeDiscoverer[T Querier] interface {
// DiscoverNodes returns list of nodes registered in cluster
DiscoverNodes(context.Context) ([]*Node[T], error)
}

var _ NodeDiscoverer[*sql.DB] = (*staticNodeDiscoverer[*sql.DB])(nil)

// staticNodeDiscoverer returns always returns list of provided nodes
type staticNodeDiscoverer[T Querier] struct {
nodes []*Node[T]
}

// NewStaticNodeDiscoverer returns new staticNodeDiscoverer instance
func NewStaticNodeDiscoverer[T Querier](nodes []*Node[T]) staticNodeDiscoverer[T] {
return staticNodeDiscoverer[T]{nodes: nodes}
}

func (s staticNodeDiscoverer[T]) DiscoverNodes(_ context.Context) ([]*Node[T], error) {
return s.nodes, nil
}

0 comments on commit 983cd40

Please sign in to comment.