Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of a gRPC transport #538

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,19 @@ cov:
INTEG_TESTS=yes gocov test github.com/hashicorp/raft | gocov-html > /tmp/coverage.html
open /tmp/coverage.html

.PHONY: proto
proto: proto-tools proto-no-tools

.PHONY: proto-no-tools
proto-no-tools:
buf generate
mog -source './proto/transport/v1/*.pb.go'

.PHONY: proto-tools
proto-tools:
go install github.com/bufbuild/buf/cmd/[email protected]
go install google.golang.org/protobuf/cmd/protoc-gen-go@$(shell grep google.golang.org/protobuf go.mod | awk '{print $$2}')
go install google.golang.org/grpc/cmd/[email protected]
go install github.com/hashicorp/[email protected]

.PHONY: test cov integ deps dep-linter lint
20 changes: 10 additions & 10 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func (r *Raft) ReloadableConfig() ReloadableConfig {
// servers to the cluster.
func (r *Raft) BootstrapCluster(configuration Configuration) Future {
bootstrapReq := &bootstrapFuture{}
bootstrapReq.init()
bootstrapReq.Init()
bootstrapReq.configuration = configuration
select {
case <-r.shutdownCh:
Expand Down Expand Up @@ -807,7 +807,7 @@ func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
Extensions: log.Extensions,
},
}
logFuture.init()
logFuture.Init()

select {
case <-timer:
Expand All @@ -833,7 +833,7 @@ func (r *Raft) Barrier(timeout time.Duration) Future {

// Create a log future, no index or term yet
logFuture := &logFuture{log: Log{Type: LogBarrier}}
logFuture.init()
logFuture.Init()

select {
case <-timer:
Expand All @@ -851,7 +851,7 @@ func (r *Raft) Barrier(timeout time.Duration) Future {
func (r *Raft) VerifyLeader() Future {
metrics.IncrCounter([]string{"raft", "verify_leader"}, 1)
verifyFuture := &verifyFuture{}
verifyFuture.init()
verifyFuture.Init()
select {
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
Expand All @@ -864,9 +864,9 @@ func (r *Raft) VerifyLeader() Future {
// committed. The main loop can access this directly.
func (r *Raft) GetConfiguration() ConfigurationFuture {
configReq := &configurationsFuture{}
configReq.init()
configReq.Init()
configReq.configurations = configurations{latest: r.getLatestConfiguration()}
configReq.respond(nil)
configReq.Respond(nil)
return configReq
}

Expand Down Expand Up @@ -997,12 +997,12 @@ func (r *Raft) Shutdown() Future {
// can be used to open the snapshot.
func (r *Raft) Snapshot() SnapshotFuture {
future := &userSnapshotFuture{}
future.init()
future.Init()
select {
case r.userSnapshotCh <- future:
return future
case <-r.shutdownCh:
future.respond(ErrRaftShutdown)
future.Respond(ErrRaftShutdown)
return future
}
}
Expand Down Expand Up @@ -1033,7 +1033,7 @@ func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Durati
meta: meta,
reader: reader,
}
restore.init()
restore.Init()
select {
case <-timer:
return ErrEnqueueTimeout
Expand All @@ -1055,7 +1055,7 @@ func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Durati
Type: LogNoop,
},
}
noop.init()
noop.Init()
select {
case <-timer:
return ErrEnqueueTimeout
Expand Down
14 changes: 14 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: v1
managed:
enabled: true
go_package_prefix:
default: github.com/hashicorp/raft
plugins:
- name: go
out: .
opt: paths=source_relative
- name: go-grpc
out: .
opt:
- paths=source_relative
- require_unimplemented_servers=false
17 changes: 17 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: v1
lint:
use:
- DEFAULT
except:
# we want to enable our Go packages to have a pb prefix to make goimports more
# intelligently handle fixing up imports and hopefully getting it right.
- PACKAGE_DIRECTORY_MATCH

# if we ever need a v2 we can have a second version with the .v2 version in the package name
- PACKAGE_VERSION_SUFFIX

service_suffix: Service
allow_comment_ignores: true
breaking:
use:
- FILE
14 changes: 7 additions & 7 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (r *Raft) runFSM() {
// Invoke the future if given
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
req.future.Respond(nil)
}
}()

Expand Down Expand Up @@ -171,7 +171,7 @@ func (r *Raft) runFSM() {

if req.future != nil {
req.future.response = resp
req.future.respond(nil)
req.future.Respond(nil)
}
}
}
Expand All @@ -180,7 +180,7 @@ func (r *Raft) runFSM() {
// Open the snapshot
meta, source, err := r.snapshots.Open(req.ID)
if err != nil {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
req.Respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
return
}
defer source.Close()
Expand All @@ -194,20 +194,20 @@ func (r *Raft) runFSM() {

// Attempt to restore
if err := fsmRestoreAndMeasure(snapLogger, r.fsm, source, meta.Size); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
req.Respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
return
}

// Update the last index and term
lastIndex = meta.Index
lastTerm = meta.Term
req.respond(nil)
req.Respond(nil)
}

snapshot := func(req *reqSnapshotFuture) {
// Is there something to snapshot?
if lastIndex == 0 {
req.respond(ErrNothingNewToSnapshot)
req.Respond(ErrNothingNewToSnapshot)
return
}

Expand All @@ -220,7 +220,7 @@ func (r *Raft) runFSM() {
req.index = lastIndex
req.term = lastTerm
req.snapshot = snap
req.respond(err)
req.Respond(err)
}

saturation := newSaturationMetric([]string{"raft", "thread", "fsm", "saturation"}, 1*time.Second)
Expand Down
30 changes: 15 additions & 15 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ func (e errorFuture) Index() uint64 {
return 0
}

// deferError can be embedded to allow a future
// DeferError can be embedded to allow a future
// to provide an error in the future.
type deferError struct {
type DeferError struct {
err error
errCh chan error
responded bool
ShutdownCh chan struct{}
}

func (d *deferError) init() {
func (d *DeferError) Init() {
d.errCh = make(chan error, 1)
}

func (d *deferError) Error() error {
func (d *DeferError) Error() error {
if d.err != nil {
// Note that when we've received a nil error, this
// won't trigger, but the channel is closed after
Expand All @@ -115,7 +115,7 @@ func (d *deferError) Error() error {
return d.err
}

func (d *deferError) respond(err error) {
func (d *DeferError) Respond(err error) {
if d.errCh == nil {
return
}
Expand All @@ -138,7 +138,7 @@ type configurationChangeFuture struct {
// bootstrapFuture is used to attempt a live bootstrap of the cluster. See the
// Raft object's BootstrapCluster member function for more details.
type bootstrapFuture struct {
deferError
DeferError

// configuration is the proposed bootstrap configuration to apply.
configuration Configuration
Expand All @@ -147,7 +147,7 @@ type bootstrapFuture struct {
// logFuture is used to apply a log entry and waits until
// the log is considered committed.
type logFuture struct {
deferError
DeferError
log Log
response interface{}
dispatch time.Time
Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *shutdownFuture) Error() error {
// userSnapshotFuture is used for waiting on a user-triggered snapshot to
// complete.
type userSnapshotFuture struct {
deferError
DeferError

// opener is a function used to open the snapshot. This is filled in
// once the future returns with no error.
Expand All @@ -203,7 +203,7 @@ func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
// userRestoreFuture is used for waiting on a user-triggered restore of an
// external snapshot to complete.
type userRestoreFuture struct {
deferError
DeferError

// meta is the metadata that belongs with the snapshot.
meta *SnapshotMeta
Expand All @@ -215,7 +215,7 @@ type userRestoreFuture struct {
// reqSnapshotFuture is used for requesting a snapshot start.
// It is only used internally.
type reqSnapshotFuture struct {
deferError
DeferError

// snapshot details provided by the FSM runner before responding
index uint64
Expand All @@ -226,14 +226,14 @@ type reqSnapshotFuture struct {
// restoreFuture is used for requesting an FSM to perform a
// snapshot restore. Used internally only.
type restoreFuture struct {
deferError
DeferError
ID string
}

// verifyFuture is used to verify the current node is still
// the leader. This is to prevent a stale read.
type verifyFuture struct {
deferError
DeferError
notifyCh chan *verifyFuture
quorumSize int
votes int
Expand All @@ -243,7 +243,7 @@ type verifyFuture struct {
// leadershipTransferFuture is used to track the progress of a leadership
// transfer internally.
type leadershipTransferFuture struct {
deferError
DeferError

ID *ServerID
Address *ServerAddress
Expand All @@ -252,7 +252,7 @@ type leadershipTransferFuture struct {
// configurationsFuture is used to retrieve the current configurations. This is
// used to allow safe access to this information outside of the main thread.
type configurationsFuture struct {
deferError
DeferError
configurations configurations
}

Expand Down Expand Up @@ -292,7 +292,7 @@ func (v *verifyFuture) vote(leader bool) {
// appendFuture is used for waiting on a pipelined append
// entries RPC.
type appendFuture struct {
deferError
DeferError
start time.Time
args *AppendEntriesRequest
resp *AppendEntriesResponse
Expand Down
18 changes: 9 additions & 9 deletions future_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
)

func TestDeferFutureSuccess(t *testing.T) {
var f deferError
f.init()
f.respond(nil)
var f DeferError
f.Init()
f.Respond(nil)
if err := f.Error(); err != nil {
t.Fatalf("unexpected error result; got %#v want nil", err)
}
Expand All @@ -19,9 +19,9 @@ func TestDeferFutureSuccess(t *testing.T) {

func TestDeferFutureError(t *testing.T) {
want := errors.New("x")
var f deferError
f.init()
f.respond(want)
var f DeferError
f.Init()
f.Respond(want)
if got := f.Error(); got != want {
t.Fatalf("unexpected error result; got %#v want %#v", got, want)
}
Expand All @@ -33,9 +33,9 @@ func TestDeferFutureError(t *testing.T) {
func TestDeferFutureConcurrent(t *testing.T) {
// Food for the race detector.
want := errors.New("x")
var f deferError
f.init()
go f.respond(want)
var f DeferError
f.Init()
go f.Respond(want)
if got := f.Error(); got != want {
t.Errorf("unexpected error result; got %#v want %#v", got, want)
}
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ require (
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878
github.com/hashicorp/go-hclog v0.9.1
github.com/hashicorp/go-msgpack v0.5.5
github.com/stretchr/testify v1.3.0
github.com/stretchr/testify v1.7.0
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
)
Loading