From c14f0e1ae5c0c486b8413de78d525f22ea85b14c Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Mon, 29 Jul 2024 23:40:17 -0400 Subject: [PATCH 1/2] kvserver/rangefeed: change p.Register to use registration interface Previously, we introduced a registration interface to abstract the implementation details of buffered and unbuffered registration. This patch updates the p.Register function to use this interface as well. Part of: #126560 Release note: none --- pkg/kv/kvserver/rangefeed/scheduled_processor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go index 9f484c6f30a2..b6c59b2be7fd 100644 --- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go +++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go @@ -325,7 +325,7 @@ func (p *ScheduledProcessor) Register( if p.stopping { return nil } - if !p.Span.AsRawSpanWithNoLocals().Contains(r.span) { + if !p.Span.AsRawSpanWithNoLocals().Contains(r.getSpan()) { log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) } @@ -348,8 +348,8 @@ func (p *ScheduledProcessor) Register( if p.unregisterClient(r) { // unreg callback is set by replica to tear down processors that have // zero registrations left and to update event filters. - if r.unreg != nil { - r.unreg() + if f := r.getUnreg(); f != nil { + f() } } } From 2bb3625512cd57eb23d1b1ffce4b988dc5b91c6e Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Fri, 16 Aug 2024 20:52:58 -0400 Subject: [PATCH 2/2] kvserver/rangefeed: move perRangeEventSink to rangefeed pacakge Previously, perRangeEventSink was defined in pkg/server/node.go. This patch relocates it to the rangefeed package to facilitate future commits testing within the rangefeed package itself. Part of: #126560 Release note: none --- pkg/kv/kvserver/rangefeed/BUILD.bazel | 1 + pkg/kv/kvserver/rangefeed/registry.go | 10 ---- pkg/kv/kvserver/rangefeed/stream.go | 78 +++++++++++++++++++++++++++ pkg/server/node.go | 45 +--------------- 4 files changed, 80 insertions(+), 54 deletions(-) create mode 100644 pkg/kv/kvserver/rangefeed/stream.go diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 3ebb4b0538cc..52909da7fdd8 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "resolved_timestamp.go", "scheduled_processor.go", "scheduler.go", + "stream.go", "stream_muxer.go", "stream_muxer_test_helper.go", "task.go", diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 1337b2acf268..1529657b0a12 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -22,16 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) -// Stream is a object capable of transmitting RangeFeedEvents. -type Stream interface { - kvpb.RangeFeedEventSink - // Disconnect disconnects the stream with the provided error. Note that this - // function can be called by the processor worker while holding raftMu, so it - // is important that this function doesn't block IO or try acquiring locks - // that could lead to deadlocks. - Disconnect(err *kvpb.Error) -} - // registration defines an interface for registration that can be added to a // processor registry. Implemented by bufferedRegistration. type registration interface { diff --git a/pkg/kv/kvserver/rangefeed/stream.go b/pkg/kv/kvserver/rangefeed/stream.go new file mode 100644 index 000000000000..0ed86a344cab --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/stream.go @@ -0,0 +1,78 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// Stream is an object capable of transmitting RangeFeedEvents from a server +// rangefeed to a client. +type Stream interface { + kvpb.RangeFeedEventSink + // Disconnect disconnects the stream with the provided error. Note that this + // function can be called by the processor worker while holding raftMu, so it + // is important that this function doesn't block IO or try acquiring locks + // that could lead to deadlocks. + Disconnect(err *kvpb.Error) +} + +// PerRangeEventSink is an implementation of Stream which annotates each +// response with rangeID and streamID. It is used by MuxRangeFeed. +type PerRangeEventSink struct { + ctx context.Context + rangeID roachpb.RangeID + streamID int64 + wrapped *StreamMuxer +} + +func NewPerRangeEventSink( + ctx context.Context, rangeID roachpb.RangeID, streamID int64, wrapped *StreamMuxer, +) *PerRangeEventSink { + return &PerRangeEventSink{ + ctx: ctx, + rangeID: rangeID, + streamID: streamID, + wrapped: wrapped, + } +} + +var _ kvpb.RangeFeedEventSink = (*PerRangeEventSink)(nil) +var _ Stream = (*PerRangeEventSink)(nil) + +func (s *PerRangeEventSink) Context() context.Context { + return s.ctx +} + +// SendIsThreadSafe is a no-op declaration method. It is a contract that the +// Send method is thread-safe. Note that Send wraps StreamMuxer which declares +// its Send method to be thread-safe. +func (s *PerRangeEventSink) SendIsThreadSafe() {} + +func (s *PerRangeEventSink) Send(event *kvpb.RangeFeedEvent) error { + response := &kvpb.MuxRangeFeedEvent{ + RangeFeedEvent: *event, + RangeID: s.rangeID, + StreamID: s.streamID, + } + return s.wrapped.Send(response) +} + +// Disconnect implements the Stream interface. It requests the StreamMuxer to +// detach the stream. The StreamMuxer is then responsible for handling the +// actual disconnection and additional cleanup. Note that Caller should not rely +// on immediate disconnection as cleanup takes place async. +func (s *PerRangeEventSink) Disconnect(err *kvpb.Error) { + s.wrapped.DisconnectStreamWithError(s.streamID, s.rangeID, err) +} diff --git a/pkg/server/node.go b/pkg/server/node.go index 9e0f8b53791e..c9c2ef8bcd85 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1925,44 +1925,6 @@ func (n *Node) RangeLookup( return resp, nil } -// perRangeEventSink is an implementation of rangefeed.Stream which annotates -// each response with rangeID and streamID. It is used by MuxRangeFeed. -type perRangeEventSink struct { - ctx context.Context - rangeID roachpb.RangeID - streamID int64 - wrapped *rangefeed.StreamMuxer -} - -var _ kvpb.RangeFeedEventSink = (*perRangeEventSink)(nil) -var _ rangefeed.Stream = (*perRangeEventSink)(nil) - -func (s *perRangeEventSink) Context() context.Context { - return s.ctx -} - -// SendIsThreadSafe is a no-op declaration method. It is a contract that the -// Send method is thread-safe. Note that Send wraps rangefeed.StreamMuxer which -// declares its Send method to be thread-safe. -func (s *perRangeEventSink) SendIsThreadSafe() {} - -func (s *perRangeEventSink) Send(event *kvpb.RangeFeedEvent) error { - response := &kvpb.MuxRangeFeedEvent{ - RangeFeedEvent: *event, - RangeID: s.rangeID, - StreamID: s.streamID, - } - return s.wrapped.Send(response) -} - -// Disconnect implements the rangefeed.Stream interface. It requests the -// StreamMuxer to detach the stream. The StreamMuxer is then responsible for -// handling the actual disconnection and additional cleanup. Note that Caller -// should not rely on immediate disconnection as cleanup takes place async. -func (s *perRangeEventSink) Disconnect(err *kvpb.Error) { - s.wrapped.DisconnectStreamWithError(s.streamID, s.rangeID, err) -} - // lockedMuxStream provides support for concurrent calls to Send. The underlying // MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to // Send. @@ -2021,12 +1983,7 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { streamCtx = logtags.AddTag(streamCtx, "s", req.Replica.StoreID) streamCtx = logtags.AddTag(streamCtx, "sid", req.StreamID) - streamSink := &perRangeEventSink{ - ctx: streamCtx, - rangeID: req.RangeID, - streamID: req.StreamID, - wrapped: streamMuxer, - } + streamSink := rangefeed.NewPerRangeEventSink(streamCtx, req.RangeID, req.StreamID, streamMuxer) streamMuxer.AddStream(req.StreamID, req.RangeID, cancel) // Rangefeed attempts to register rangefeed a request over the specified