Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove unnecessary code from proto/varlogpb/log_entry.go #927

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/storagenode/client/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type SubscribeResult struct {
}

var InvalidSubscribeResult = SubscribeResult{
LogEntry: varlogpb.InvalidLogEntry(),
LogEntry: varlogpb.LogEntry{},
Error: errors.New("invalid subscribe result"),
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/varlog/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (p *dispatcher) dispatch(_ context.Context) {
sentErr = sentErr || res.Error != nil
}
if !sentErr {
p.onNextFunc(varlogpb.InvalidLogEntry(), io.EOF)
p.onNextFunc(varlogpb.LogEntry{}, io.EOF)
}
}

Expand All @@ -532,7 +532,7 @@ type invalidSubscriber struct {
}

func (s invalidSubscriber) Next() (varlogpb.LogEntry, error) {
return varlogpb.InvalidLogEntry(), s.err
return varlogpb.LogEntry{}, s.err
}

func (s invalidSubscriber) Close() error {
Expand Down
7 changes: 3 additions & 4 deletions pkg/varlogtest/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (c *testAdmin) ListStorageNodes(ctx context.Context, opts ...varlog.AdminCa

return ret, nil
}

func (c *testAdmin) GetStorageNodes(ctx context.Context, opts ...varlog.AdminCallOption) (map[types.StorageNodeID]admpb.StorageNodeMetadata, error) {
snms, err := c.ListStorageNodes(ctx)
if err != nil {
Expand Down Expand Up @@ -182,8 +183,7 @@ func (c *testAdmin) AddTopic(ctx context.Context, opts ...varlog.AdminCallOption
c.vt.topics[topicID] = topicDesc
c.vt.trimGLSNs[topicID] = types.InvalidGLSN

invalidLogEntry := varlogpb.InvalidLogEntry()
c.vt.globalLogEntries[topicID] = []*varlogpb.LogEntry{&invalidLogEntry}
c.vt.globalLogEntries[topicID] = []*varlogpb.LogEntry{{}}

return proto.Clone(&topicDesc).(*varlogpb.TopicDescriptor), nil
}
Expand Down Expand Up @@ -294,8 +294,7 @@ func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logSt

c.vt.logStreams[logStreamID] = lsd

invalidLogEntry := varlogpb.InvalidLogEntry()
c.vt.localLogEntries[logStreamID] = []*varlogpb.LogEntry{&invalidLogEntry}
c.vt.localLogEntries[logStreamID] = []*varlogpb.LogEntry{{}}
hungryjang marked this conversation as resolved.
Show resolved Hide resolved

topicDesc.LogStreams = append(topicDesc.LogStreams, logStreamID)
c.vt.topics[topicID] = topicDesc
Expand Down
7 changes: 3 additions & 4 deletions pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (c *testLog) Subscribe(ctx context.Context, topicID types.TopicID, begin ty
for _, logEntry := range copiedLogEntries {
onNextFunc(logEntry, nil)
}
onNextFunc(varlogpb.InvalidLogEntry(), io.EOF)
onNextFunc(varlogpb.LogEntry{}, io.EOF)
}()

return func() {
Expand Down Expand Up @@ -319,7 +319,6 @@ func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid ty
GLSN: tail.GLSN,
}
return first, last, nil

}

func (c *testLog) AppendableLogStreams(tpid types.TopicID) map[types.LogStreamID]struct{} {
Expand Down Expand Up @@ -463,7 +462,7 @@ func newErrSubscriber(err error) *errSubscriber {
}

func (s errSubscriber) Next() (varlogpb.LogEntry, error) {
return varlogpb.InvalidLogEntry(), s.err
return varlogpb.LogEntry{}, s.err
}

func (s errSubscriber) Close() error {
Expand Down Expand Up @@ -496,7 +495,7 @@ func (s *subscriberImpl) Next() (varlogpb.LogEntry, error) {
logEntry, err := s.next()
if err != nil {
s.setErr(err)
return varlogpb.InvalidLogEntry(), err
return varlogpb.LogEntry{}, err
}
if s.cursor == s.end {
s.setErr(io.EOF)
Expand Down
14 changes: 14 additions & 0 deletions proto/snpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"github.com/kakao/varlog/proto/varlogpb"
)

// ToStorageNodeDescriptor converts a StorageNodeMetadataDescriptor to a
// varlogpb.StorageNodeDescriptor. It returns nil if the
// StorageNodeMetadataDescriptor is nil.
func (snmd *StorageNodeMetadataDescriptor) ToStorageNodeDescriptor() *varlogpb.StorageNodeDescriptor {
if snmd == nil {
return nil
Expand All @@ -19,6 +22,9 @@ func (snmd *StorageNodeMetadataDescriptor) ToStorageNodeDescriptor() *varlogpb.S
return snd
}

// GetLogStream retrieves a LogStreamReplicaMetadataDescriptor by its
// LogStreamID. It returns the LogStreamReplicaMetadataDescriptor and true if
// found, otherwise an empty descriptor and false.
func (snmd *StorageNodeMetadataDescriptor) GetLogStream(logStreamID types.LogStreamID) (LogStreamReplicaMetadataDescriptor, bool) {
logStreams := snmd.GetLogStreamReplicas()
for i := range logStreams {
Expand All @@ -29,6 +35,10 @@ func (snmd *StorageNodeMetadataDescriptor) GetLogStream(logStreamID types.LogStr
return LogStreamReplicaMetadataDescriptor{}, false
}

// Head returns the varlogpb.LogEntryMeta corresponding to the local low
// watermark of the LogStreamReplicaMetadataDescriptor. The "head" represents
// the earliest log entry in the log stream replica. It returns an empty
// varlogpb.LogEntryMeta if the LogStreamReplicaMetadataDescriptor is nil.
func (lsrmd *LogStreamReplicaMetadataDescriptor) Head() varlogpb.LogEntryMeta {
if lsrmd == nil {
return varlogpb.LogEntryMeta{}
Expand All @@ -41,6 +51,10 @@ func (lsrmd *LogStreamReplicaMetadataDescriptor) Head() varlogpb.LogEntryMeta {
}
}

// Tail returns the varlogpb.LogEntryMeta corresponding to the local high
// watermark of the LogStreamReplicaMetadataDescriptor. The "tail" represents
// the latest log entry in the log stream replica. It returns an empty
// varlogpb.LogEntryMeta if the LogStreamReplicaMetadataDescriptor is nil.
func (lsrmd *LogStreamReplicaMetadataDescriptor) Tail() varlogpb.LogEntryMeta {
if lsrmd == nil {
return varlogpb.LogEntryMeta{}
Expand Down
191 changes: 191 additions & 0 deletions proto/snpb/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package snpb_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/varlogpb"
)

func TestStorageNodeMetadataDescriptor_ToStorageNodeDescriptor(t *testing.T) {
tcs := []struct {
snmd *snpb.StorageNodeMetadataDescriptor
want *varlogpb.StorageNodeDescriptor
name string
}{
{
name: "Nil",
snmd: nil,
want: nil,
},
{
name: "NonNil",
snmd: &snpb.StorageNodeMetadataDescriptor{
StorageNode: varlogpb.StorageNode{
StorageNodeID: types.MinStorageNodeID,
Address: "node1",
},
Storages: []varlogpb.StorageDescriptor{
{Path: "/path1"},
{Path: "/path2"},
},
},
want: &varlogpb.StorageNodeDescriptor{
StorageNode: varlogpb.StorageNode{
StorageNodeID: types.MinStorageNodeID,
Address: "node1",
},
Paths: []string{"/path1", "/path2"},
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
got := tc.snmd.ToStorageNodeDescriptor()
require.Equal(t, tc.want, got)
})
}
}

func TestStorageNodeMetadataDescriptor_GetLogStream(t *testing.T) {
tcs := []struct {
name string
logStreamID types.LogStreamID
want snpb.LogStreamReplicaMetadataDescriptor
wantFound bool
}{
{
name: "Found",
logStreamID: types.LogStreamID(1),
want: snpb.LogStreamReplicaMetadataDescriptor{
LogStreamReplica: varlogpb.LogStreamReplica{
TopicLogStream: varlogpb.TopicLogStream{
LogStreamID: types.LogStreamID(1),
},
},
},
wantFound: true,
},
{
name: "NotFound",
logStreamID: types.LogStreamID(3),
want: snpb.LogStreamReplicaMetadataDescriptor{},
wantFound: false,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
snmd := &snpb.StorageNodeMetadataDescriptor{
LogStreamReplicas: []snpb.LogStreamReplicaMetadataDescriptor{
{
LogStreamReplica: varlogpb.LogStreamReplica{
TopicLogStream: varlogpb.TopicLogStream{
LogStreamID: types.LogStreamID(1),
},
},
},
{
LogStreamReplica: varlogpb.LogStreamReplica{
TopicLogStream: varlogpb.TopicLogStream{
LogStreamID: types.LogStreamID(2),
},
},
},
},
}

got, found := snmd.GetLogStream(tc.logStreamID)
require.Equal(t, tc.wantFound, found)
require.Equal(t, tc.want, got)
})
}
}

func TestLogStreamReplicaMetadataDescriptor_Head(t *testing.T) {
tcs := []struct {
lsrmd *snpb.LogStreamReplicaMetadataDescriptor
name string
want varlogpb.LogEntryMeta
}{
{
name: "Nil",
lsrmd: nil,
want: varlogpb.LogEntryMeta{},
},
{
name: "NonNil",
lsrmd: &snpb.LogStreamReplicaMetadataDescriptor{
LogStreamReplica: varlogpb.LogStreamReplica{
TopicLogStream: varlogpb.TopicLogStream{
TopicID: types.TopicID(1),
LogStreamID: types.LogStreamID(2),
},
},
LocalLowWatermark: varlogpb.LogSequenceNumber{
LLSN: types.LLSN(3),
GLSN: types.GLSN(4),
},
},
want: varlogpb.LogEntryMeta{
TopicID: 1,
LogStreamID: 2,
LLSN: 3,
GLSN: 4,
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
got := tc.lsrmd.Head()
require.Equal(t, tc.want, got)
})
}
}

func TestLogStreamReplicaMetadataDescriptor_Tail(t *testing.T) {
tcs := []struct {
lsrmd *snpb.LogStreamReplicaMetadataDescriptor
name string
want varlogpb.LogEntryMeta
}{
{
name: "Nil",
lsrmd: nil,
want: varlogpb.LogEntryMeta{},
},
{
name: "NonNil",
lsrmd: &snpb.LogStreamReplicaMetadataDescriptor{
LogStreamReplica: varlogpb.LogStreamReplica{
TopicLogStream: varlogpb.TopicLogStream{
TopicID: types.TopicID(1),
LogStreamID: types.LogStreamID(2),
},
},
LocalHighWatermark: varlogpb.LogSequenceNumber{
LLSN: types.LLSN(5),
GLSN: types.GLSN(6),
},
},
want: varlogpb.LogEntryMeta{
TopicID: 1,
LogStreamID: 2,
LLSN: 5,
GLSN: 6,
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
got := tc.lsrmd.Tail()
require.Equal(t, tc.want, got)
})
}
}
16 changes: 16 additions & 0 deletions proto/snpb/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,42 @@ import (
"github.com/kakao/varlog/pkg/types"
)

// InvalidSyncPosition returns a SyncPosition with both LLSN and GLSN set to
// types.InvalidGLSN.
func InvalidSyncPosition() SyncPosition {
return SyncPosition{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN}
}

// Invalid checks if the SyncPosition is invalid. A SyncPosition is considered
// invalid if either LLSN or GLSN is invalid.
func (sp SyncPosition) Invalid() bool {
return sp.LLSN.Invalid() || sp.GLSN.Invalid()
}

// LessThan checks if the current SyncPosition "sp" is less than another
// SyncPosition "other". It returns true if both LLSN and GLSN of the current
// SyncPosition are less than those of the other SyncPosition.
func (sp SyncPosition) LessThan(other SyncPosition) bool {
return sp.LLSN < other.LLSN && sp.GLSN < other.GLSN
}

// InvalidSyncRange returns a SyncRange with both FirstLLSN and LastLLSN set to
// types.InvalidLLSN.
func InvalidSyncRange() SyncRange {
return SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.InvalidLLSN}
}

// Invalid determines if the SyncRange is invalid. A SyncRange is considered
// invalid if either FirstLLSN or LastLLSN is invalid, or if FirstLLSN is
// greater than LastLLSN.
func (sr SyncRange) Invalid() bool {
return sr.FirstLLSN.Invalid() || sr.LastLLSN.Invalid() || sr.FirstLLSN > sr.LastLLSN
}

// Validate checks the validity of the SyncRange. It returns an error if
// FirstLLSN is greater than LastLLSN, or if FirstLLSN is invalid while
// LastLLSN is valid. If both FirstLLSN and LastLLSN are invalid, it returns
// nil, indicating that the entire log range is considered trimmed.
func (sr SyncRange) Validate() error {
if sr.FirstLLSN > sr.LastLLSN {
return fmt.Errorf("invalid sync range: first %d, last %d", sr.FirstLLSN, sr.LastLLSN)
Expand Down
Loading
Loading