Skip to content

Commit

Permalink
kvserver/rangefeed: move perRangeEventSink to rangefeed pacakge
Browse files Browse the repository at this point in the history
Previously, perRangeEventSink was defined in pkg/server/node.go. This patch
relocates it to the rangefeed package to facilitate future commits testing
within the rangefeed package itself.

Part of: cockroachdb#126560
Release note: none
  • Loading branch information
wenyihu6 committed Aug 19, 2024
1 parent c14f0e1 commit 2bb3625
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 54 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"resolved_timestamp.go",
"scheduled_processor.go",
"scheduler.go",
"stream.go",
"stream_muxer.go",
"stream_muxer_test_helper.go",
"task.go",
Expand Down
10 changes: 0 additions & 10 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// Stream is a object capable of transmitting RangeFeedEvents.
type Stream interface {
kvpb.RangeFeedEventSink
// Disconnect disconnects the stream with the provided error. Note that this
// function can be called by the processor worker while holding raftMu, so it
// is important that this function doesn't block IO or try acquiring locks
// that could lead to deadlocks.
Disconnect(err *kvpb.Error)
}

// registration defines an interface for registration that can be added to a
// processor registry. Implemented by bufferedRegistration.
type registration interface {
Expand Down
78 changes: 78 additions & 0 deletions pkg/kv/kvserver/rangefeed/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// Stream is an object capable of transmitting RangeFeedEvents from a server
// rangefeed to a client.
type Stream interface {
kvpb.RangeFeedEventSink
// Disconnect disconnects the stream with the provided error. Note that this
// function can be called by the processor worker while holding raftMu, so it
// is important that this function doesn't block IO or try acquiring locks
// that could lead to deadlocks.
Disconnect(err *kvpb.Error)
}

// PerRangeEventSink is an implementation of Stream which annotates each
// response with rangeID and streamID. It is used by MuxRangeFeed.
type PerRangeEventSink struct {
ctx context.Context
rangeID roachpb.RangeID
streamID int64
wrapped *StreamMuxer
}

func NewPerRangeEventSink(
ctx context.Context, rangeID roachpb.RangeID, streamID int64, wrapped *StreamMuxer,
) *PerRangeEventSink {
return &PerRangeEventSink{
ctx: ctx,
rangeID: rangeID,
streamID: streamID,
wrapped: wrapped,
}
}

var _ kvpb.RangeFeedEventSink = (*PerRangeEventSink)(nil)
var _ Stream = (*PerRangeEventSink)(nil)

func (s *PerRangeEventSink) Context() context.Context {
return s.ctx
}

// SendIsThreadSafe is a no-op declaration method. It is a contract that the
// Send method is thread-safe. Note that Send wraps StreamMuxer which declares
// its Send method to be thread-safe.
func (s *PerRangeEventSink) SendIsThreadSafe() {}

func (s *PerRangeEventSink) Send(event *kvpb.RangeFeedEvent) error {
response := &kvpb.MuxRangeFeedEvent{
RangeFeedEvent: *event,
RangeID: s.rangeID,
StreamID: s.streamID,
}
return s.wrapped.Send(response)
}

// Disconnect implements the Stream interface. It requests the StreamMuxer to
// detach the stream. The StreamMuxer is then responsible for handling the
// actual disconnection and additional cleanup. Note that Caller should not rely
// on immediate disconnection as cleanup takes place async.
func (s *PerRangeEventSink) Disconnect(err *kvpb.Error) {
s.wrapped.DisconnectStreamWithError(s.streamID, s.rangeID, err)
}
45 changes: 1 addition & 44 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,44 +1925,6 @@ func (n *Node) RangeLookup(
return resp, nil
}

// perRangeEventSink is an implementation of rangefeed.Stream which annotates
// each response with rangeID and streamID. It is used by MuxRangeFeed.
type perRangeEventSink struct {
ctx context.Context
rangeID roachpb.RangeID
streamID int64
wrapped *rangefeed.StreamMuxer
}

var _ kvpb.RangeFeedEventSink = (*perRangeEventSink)(nil)
var _ rangefeed.Stream = (*perRangeEventSink)(nil)

func (s *perRangeEventSink) Context() context.Context {
return s.ctx
}

// SendIsThreadSafe is a no-op declaration method. It is a contract that the
// Send method is thread-safe. Note that Send wraps rangefeed.StreamMuxer which
// declares its Send method to be thread-safe.
func (s *perRangeEventSink) SendIsThreadSafe() {}

func (s *perRangeEventSink) Send(event *kvpb.RangeFeedEvent) error {
response := &kvpb.MuxRangeFeedEvent{
RangeFeedEvent: *event,
RangeID: s.rangeID,
StreamID: s.streamID,
}
return s.wrapped.Send(response)
}

// Disconnect implements the rangefeed.Stream interface. It requests the
// StreamMuxer to detach the stream. The StreamMuxer is then responsible for
// handling the actual disconnection and additional cleanup. Note that Caller
// should not rely on immediate disconnection as cleanup takes place async.
func (s *perRangeEventSink) Disconnect(err *kvpb.Error) {
s.wrapped.DisconnectStreamWithError(s.streamID, s.rangeID, err)
}

// lockedMuxStream provides support for concurrent calls to Send. The underlying
// MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to
// Send.
Expand Down Expand Up @@ -2021,12 +1983,7 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
streamCtx = logtags.AddTag(streamCtx, "s", req.Replica.StoreID)
streamCtx = logtags.AddTag(streamCtx, "sid", req.StreamID)

streamSink := &perRangeEventSink{
ctx: streamCtx,
rangeID: req.RangeID,
streamID: req.StreamID,
wrapped: streamMuxer,
}
streamSink := rangefeed.NewPerRangeEventSink(streamCtx, req.RangeID, req.StreamID, streamMuxer)
streamMuxer.AddStream(req.StreamID, req.RangeID, cancel)

// Rangefeed attempts to register rangefeed a request over the specified
Expand Down

0 comments on commit 2bb3625

Please sign in to comment.