Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove context from mvcc public interface #18678

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions server/etcdserver/api/v3rpc/validationfuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package v3rpc

import (
"context"
"testing"

"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -167,15 +166,11 @@ func execTransaction(t *testing.T, req *pb.RequestOp) {
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

request := &pb.TxnRequest{
Success: []*pb.RequestOp{req},
}

_, _, err := txn.Txn(ctx, zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{})
_, _, err := txn.Txn(zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{})
if err != nil {
t.Skipf("Application erroring. %s", err.Error())
}
Expand Down
8 changes: 4 additions & 4 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,19 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc)
}

func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p)
return mvcctxn.Put(a.lg, a.lessor, a.kv, p)
}

func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return mvcctxn.DeleteRange(context.TODO(), a.lg, a.kv, dr)
return mvcctxn.DeleteRange(a.lg, a.kv, dr)
}

func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return mvcctxn.Range(context.TODO(), a.lg, a.kv, r)
return mvcctxn.Range(a.lg, a.kv, r)
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(context.TODO(), a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
return mvcctxn.Txn(a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
}

func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
Expand Down
12 changes: 8 additions & 4 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
func Put(lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
ctx := context.Background()
trace = traceutil.Get(ctx)
// create put tracing if the trace in context is empty
if trace.IsEmpty() {
Expand Down Expand Up @@ -94,7 +95,8 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p
return resp, nil
}

func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
func DeleteRange(lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
ctx := context.Background()
trace = traceutil.Get(ctx)
// create delete tracing if the trace in context is empty
if trace.IsEmpty() {
Expand Down Expand Up @@ -133,7 +135,8 @@ func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRange
return resp, nil
}

func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
func Range(lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
ctx := context.Background()
trace = traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("range", lg)
Expand Down Expand Up @@ -249,7 +252,8 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
return resp, nil
}

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
func Txn(lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
ctx := context.Background()
trace := traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("transaction", lg)
Expand Down
80 changes: 3 additions & 77 deletions server/etcdserver/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package txn

import (
"context"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -223,9 +221,7 @@ func TestCheckTxn(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s, lessor := setup(t, tc.setup)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor)
_, _, err := Txn(zaptest.NewLogger(t), tc.txn, false, s, lessor)

gotErr := ""
if err != nil {
Expand All @@ -243,9 +239,7 @@ func TestCheckPut(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s, lessor := setup(t, tc.setup)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, _, err := Put(ctx, zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut())
_, _, err := Put(zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut())

gotErr := ""
if err != nil {
Expand All @@ -263,9 +257,7 @@ func TestCheckRange(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s, _ := setup(t, tc.setup)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, _, err := Range(ctx, zaptest.NewLogger(t), s, tc.op.GetRequestRange())
_, _, err := Range(zaptest.NewLogger(t), s, tc.op.GetRequestRange())

gotErr := ""
if err != nil {
Expand Down Expand Up @@ -304,72 +296,6 @@ func setup(t *testing.T, setup testSetup) (mvcc.KV, lease.Lessor) {
return s, lessor
}

func TestReadonlyTxnError(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to double check the original motivation for the test and if we need to rewrite it. Marking the PR as draft until then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think that enough context removal. Based on #14149 this Test validates that Readonly TXN pass context and don't panic on it. This is the exact place where we want to allow canceling Read transactions as it's safe and a feature to allow clients abort requests.

At some point would be good to design how context should be handled in etcd codebase. Non-cancel-able metadata for apply loop that executes writes, that can also be cancel-able for Read request. Both paths meet in the transaction code layer. Would be nice to be able to statically validate at compilation time whether transaction is read only.

cc @ahrtr @shyamjvs

b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// put some data to prevent early termination in rangeKeys
// we are expecting failure on cancelled context check
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
t.Fatalf("Expected context canceled error, got %v", err)
}
}

func TestWriteTxnPanic(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// write txn that puts some data and then fails in range due to cancelled context
txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("foo"),
Value: []byte("bar"),
},
},
},
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

assert.Panics(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes")
}

func TestCheckTxnAuth(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}

get := func() { resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r) }
get := func() { resp, _, err = txn.Range(s.Logger(), s.KV(), r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
Expand Down Expand Up @@ -183,7 +183,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}(time.Now())

get := func() {
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
resp, _, err = txn.Txn(s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
}
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
Expand Down
Loading