diff --git a/internal/storagenode/client/log_client.go b/internal/storagenode/client/log_client.go index 730b7d5ec..8c2a3123a 100644 --- a/internal/storagenode/client/log_client.go +++ b/internal/storagenode/client/log_client.go @@ -18,7 +18,7 @@ type SubscribeResult struct { } var InvalidSubscribeResult = SubscribeResult{ - LogEntry: varlogpb.InvalidLogEntry(), + LogEntry: varlogpb.LogEntry{}, Error: errors.New("invalid subscribe result"), } diff --git a/pkg/varlog/subscribe.go b/pkg/varlog/subscribe.go index 533981018..481ef784e 100644 --- a/pkg/varlog/subscribe.go +++ b/pkg/varlog/subscribe.go @@ -518,7 +518,7 @@ func (p *dispatcher) dispatch(_ context.Context) { sentErr = sentErr || res.Error != nil } if !sentErr { - p.onNextFunc(varlogpb.InvalidLogEntry(), io.EOF) + p.onNextFunc(varlogpb.LogEntry{}, io.EOF) } } @@ -532,7 +532,7 @@ type invalidSubscriber struct { } func (s invalidSubscriber) Next() (varlogpb.LogEntry, error) { - return varlogpb.InvalidLogEntry(), s.err + return varlogpb.LogEntry{}, s.err } func (s invalidSubscriber) Close() error { diff --git a/pkg/varlogtest/admin.go b/pkg/varlogtest/admin.go index 04d55d5ff..33bdb7883 100644 --- a/pkg/varlogtest/admin.go +++ b/pkg/varlogtest/admin.go @@ -82,6 +82,7 @@ func (c *testAdmin) ListStorageNodes(ctx context.Context, opts ...varlog.AdminCa return ret, nil } + func (c *testAdmin) GetStorageNodes(ctx context.Context, opts ...varlog.AdminCallOption) (map[types.StorageNodeID]admpb.StorageNodeMetadata, error) { snms, err := c.ListStorageNodes(ctx) if err != nil { @@ -182,8 +183,7 @@ func (c *testAdmin) AddTopic(ctx context.Context, opts ...varlog.AdminCallOption c.vt.topics[topicID] = topicDesc c.vt.trimGLSNs[topicID] = types.InvalidGLSN - invalidLogEntry := varlogpb.InvalidLogEntry() - c.vt.globalLogEntries[topicID] = []*varlogpb.LogEntry{&invalidLogEntry} + c.vt.globalLogEntries[topicID] = []*varlogpb.LogEntry{{}} return proto.Clone(&topicDesc).(*varlogpb.TopicDescriptor), nil } @@ -294,8 +294,7 @@ func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logSt c.vt.logStreams[logStreamID] = lsd - invalidLogEntry := varlogpb.InvalidLogEntry() - c.vt.localLogEntries[logStreamID] = []*varlogpb.LogEntry{&invalidLogEntry} + c.vt.localLogEntries[logStreamID] = []*varlogpb.LogEntry{{}} topicDesc.LogStreams = append(topicDesc.LogStreams, logStreamID) c.vt.topics[topicID] = topicDesc diff --git a/pkg/varlogtest/log.go b/pkg/varlogtest/log.go index f7cc50b0f..2e6810c2f 100644 --- a/pkg/varlogtest/log.go +++ b/pkg/varlogtest/log.go @@ -215,7 +215,7 @@ func (c *testLog) Subscribe(ctx context.Context, topicID types.TopicID, begin ty for _, logEntry := range copiedLogEntries { onNextFunc(logEntry, nil) } - onNextFunc(varlogpb.InvalidLogEntry(), io.EOF) + onNextFunc(varlogpb.LogEntry{}, io.EOF) }() return func() { @@ -319,7 +319,6 @@ func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid ty GLSN: tail.GLSN, } return first, last, nil - } func (c *testLog) AppendableLogStreams(tpid types.TopicID) map[types.LogStreamID]struct{} { @@ -463,7 +462,7 @@ func newErrSubscriber(err error) *errSubscriber { } func (s errSubscriber) Next() (varlogpb.LogEntry, error) { - return varlogpb.InvalidLogEntry(), s.err + return varlogpb.LogEntry{}, s.err } func (s errSubscriber) Close() error { @@ -496,7 +495,7 @@ func (s *subscriberImpl) Next() (varlogpb.LogEntry, error) { logEntry, err := s.next() if err != nil { s.setErr(err) - return varlogpb.InvalidLogEntry(), err + return varlogpb.LogEntry{}, err } if s.cursor == s.end { s.setErr(io.EOF) diff --git a/proto/snpb/metadata.go b/proto/snpb/metadata.go index 3e89df940..cd8710708 100644 --- a/proto/snpb/metadata.go +++ b/proto/snpb/metadata.go @@ -5,6 +5,9 @@ import ( "github.com/kakao/varlog/proto/varlogpb" ) +// ToStorageNodeDescriptor converts a StorageNodeMetadataDescriptor to a +// varlogpb.StorageNodeDescriptor. It returns nil if the +// StorageNodeMetadataDescriptor is nil. func (snmd *StorageNodeMetadataDescriptor) ToStorageNodeDescriptor() *varlogpb.StorageNodeDescriptor { if snmd == nil { return nil @@ -19,6 +22,9 @@ func (snmd *StorageNodeMetadataDescriptor) ToStorageNodeDescriptor() *varlogpb.S return snd } +// GetLogStream retrieves a LogStreamReplicaMetadataDescriptor by its +// LogStreamID. It returns the LogStreamReplicaMetadataDescriptor and true if +// found, otherwise an empty descriptor and false. func (snmd *StorageNodeMetadataDescriptor) GetLogStream(logStreamID types.LogStreamID) (LogStreamReplicaMetadataDescriptor, bool) { logStreams := snmd.GetLogStreamReplicas() for i := range logStreams { @@ -29,6 +35,10 @@ func (snmd *StorageNodeMetadataDescriptor) GetLogStream(logStreamID types.LogStr return LogStreamReplicaMetadataDescriptor{}, false } +// Head returns the varlogpb.LogEntryMeta corresponding to the local low +// watermark of the LogStreamReplicaMetadataDescriptor. The "head" represents +// the earliest log entry in the log stream replica. It returns an empty +// varlogpb.LogEntryMeta if the LogStreamReplicaMetadataDescriptor is nil. func (lsrmd *LogStreamReplicaMetadataDescriptor) Head() varlogpb.LogEntryMeta { if lsrmd == nil { return varlogpb.LogEntryMeta{} @@ -41,6 +51,10 @@ func (lsrmd *LogStreamReplicaMetadataDescriptor) Head() varlogpb.LogEntryMeta { } } +// Tail returns the varlogpb.LogEntryMeta corresponding to the local high +// watermark of the LogStreamReplicaMetadataDescriptor. The "tail" represents +// the latest log entry in the log stream replica. It returns an empty +// varlogpb.LogEntryMeta if the LogStreamReplicaMetadataDescriptor is nil. func (lsrmd *LogStreamReplicaMetadataDescriptor) Tail() varlogpb.LogEntryMeta { if lsrmd == nil { return varlogpb.LogEntryMeta{} diff --git a/proto/snpb/metadata_test.go b/proto/snpb/metadata_test.go new file mode 100644 index 000000000..293af33d6 --- /dev/null +++ b/proto/snpb/metadata_test.go @@ -0,0 +1,191 @@ +package snpb_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/proto/snpb" + "github.com/kakao/varlog/proto/varlogpb" +) + +func TestStorageNodeMetadataDescriptor_ToStorageNodeDescriptor(t *testing.T) { + tcs := []struct { + snmd *snpb.StorageNodeMetadataDescriptor + want *varlogpb.StorageNodeDescriptor + name string + }{ + { + name: "Nil", + snmd: nil, + want: nil, + }, + { + name: "NonNil", + snmd: &snpb.StorageNodeMetadataDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: types.MinStorageNodeID, + Address: "node1", + }, + Storages: []varlogpb.StorageDescriptor{ + {Path: "/path1"}, + {Path: "/path2"}, + }, + }, + want: &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: types.MinStorageNodeID, + Address: "node1", + }, + Paths: []string{"/path1", "/path2"}, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.snmd.ToStorageNodeDescriptor() + require.Equal(t, tc.want, got) + }) + } +} + +func TestStorageNodeMetadataDescriptor_GetLogStream(t *testing.T) { + tcs := []struct { + name string + logStreamID types.LogStreamID + want snpb.LogStreamReplicaMetadataDescriptor + wantFound bool + }{ + { + name: "Found", + logStreamID: types.LogStreamID(1), + want: snpb.LogStreamReplicaMetadataDescriptor{ + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + LogStreamID: types.LogStreamID(1), + }, + }, + }, + wantFound: true, + }, + { + name: "NotFound", + logStreamID: types.LogStreamID(3), + want: snpb.LogStreamReplicaMetadataDescriptor{}, + wantFound: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + snmd := &snpb.StorageNodeMetadataDescriptor{ + LogStreamReplicas: []snpb.LogStreamReplicaMetadataDescriptor{ + { + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + LogStreamID: types.LogStreamID(1), + }, + }, + }, + { + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + LogStreamID: types.LogStreamID(2), + }, + }, + }, + }, + } + + got, found := snmd.GetLogStream(tc.logStreamID) + require.Equal(t, tc.wantFound, found) + require.Equal(t, tc.want, got) + }) + } +} + +func TestLogStreamReplicaMetadataDescriptor_Head(t *testing.T) { + tcs := []struct { + lsrmd *snpb.LogStreamReplicaMetadataDescriptor + name string + want varlogpb.LogEntryMeta + }{ + { + name: "Nil", + lsrmd: nil, + want: varlogpb.LogEntryMeta{}, + }, + { + name: "NonNil", + lsrmd: &snpb.LogStreamReplicaMetadataDescriptor{ + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(2), + }, + }, + LocalLowWatermark: varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(3), + GLSN: types.GLSN(4), + }, + }, + want: varlogpb.LogEntryMeta{ + TopicID: 1, + LogStreamID: 2, + LLSN: 3, + GLSN: 4, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.lsrmd.Head() + require.Equal(t, tc.want, got) + }) + } +} + +func TestLogStreamReplicaMetadataDescriptor_Tail(t *testing.T) { + tcs := []struct { + lsrmd *snpb.LogStreamReplicaMetadataDescriptor + name string + want varlogpb.LogEntryMeta + }{ + { + name: "Nil", + lsrmd: nil, + want: varlogpb.LogEntryMeta{}, + }, + { + name: "NonNil", + lsrmd: &snpb.LogStreamReplicaMetadataDescriptor{ + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(2), + }, + }, + LocalHighWatermark: varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(5), + GLSN: types.GLSN(6), + }, + }, + want: varlogpb.LogEntryMeta{ + TopicID: 1, + LogStreamID: 2, + LLSN: 5, + GLSN: 6, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.lsrmd.Tail() + require.Equal(t, tc.want, got) + }) + } +} diff --git a/proto/snpb/replicator.go b/proto/snpb/replicator.go index 6d9800cf8..f146355f0 100644 --- a/proto/snpb/replicator.go +++ b/proto/snpb/replicator.go @@ -6,26 +6,42 @@ import ( "github.com/kakao/varlog/pkg/types" ) +// InvalidSyncPosition returns a SyncPosition with both LLSN and GLSN set to +// types.InvalidGLSN. func InvalidSyncPosition() SyncPosition { return SyncPosition{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN} } +// Invalid checks if the SyncPosition is invalid. A SyncPosition is considered +// invalid if either LLSN or GLSN is invalid. func (sp SyncPosition) Invalid() bool { return sp.LLSN.Invalid() || sp.GLSN.Invalid() } +// LessThan checks if the current SyncPosition "sp" is less than another +// SyncPosition "other". It returns true if both LLSN and GLSN of the current +// SyncPosition are less than those of the other SyncPosition. func (sp SyncPosition) LessThan(other SyncPosition) bool { return sp.LLSN < other.LLSN && sp.GLSN < other.GLSN } +// InvalidSyncRange returns a SyncRange with both FirstLLSN and LastLLSN set to +// types.InvalidLLSN. func InvalidSyncRange() SyncRange { return SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.InvalidLLSN} } +// Invalid determines if the SyncRange is invalid. A SyncRange is considered +// invalid if either FirstLLSN or LastLLSN is invalid, or if FirstLLSN is +// greater than LastLLSN. func (sr SyncRange) Invalid() bool { return sr.FirstLLSN.Invalid() || sr.LastLLSN.Invalid() || sr.FirstLLSN > sr.LastLLSN } +// Validate checks the validity of the SyncRange. It returns an error if +// FirstLLSN is greater than LastLLSN, or if FirstLLSN is invalid while +// LastLLSN is valid. If both FirstLLSN and LastLLSN are invalid, it returns +// nil, indicating that the entire log range is considered trimmed. func (sr SyncRange) Validate() error { if sr.FirstLLSN > sr.LastLLSN { return fmt.Errorf("invalid sync range: first %d, last %d", sr.FirstLLSN, sr.LastLLSN) diff --git a/proto/snpb/replicator_test.go b/proto/snpb/replicator_test.go index 437c06d5d..5915e3876 100644 --- a/proto/snpb/replicator_test.go +++ b/proto/snpb/replicator_test.go @@ -4,6 +4,8 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" ) func TestSyncPositionInvalid(t *testing.T) { @@ -80,3 +82,73 @@ func TestSyncPositionLessThan(t *testing.T) { }) } } + +func TestInvalidSyncRange(t *testing.T) { + sr := InvalidSyncRange() + require.True(t, sr.Invalid()) +} + +func TestSyncRangeInvalid(t *testing.T) { + tcs := []struct { + sr SyncRange + expected bool + }{ + { + sr: SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.InvalidLLSN}, + expected: true, + }, + { + sr: SyncRange{FirstLLSN: types.LLSN(1), LastLLSN: types.InvalidLLSN}, + expected: true, + }, + { + sr: SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.LLSN(1)}, + expected: true, + }, + { + sr: SyncRange{FirstLLSN: types.LLSN(2), LastLLSN: types.LLSN(1)}, + expected: true, + }, + { + sr: SyncRange{FirstLLSN: types.LLSN(1), LastLLSN: types.LLSN(2)}, + expected: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.sr.String(), func(t *testing.T) { + require.Equal(t, tc.expected, tc.sr.Invalid()) + }) + } +} + +func TestSyncRangeValidate(t *testing.T) { + tcs := []struct { + sr SyncRange + wantErr bool + }{ + { + sr: SyncRange{FirstLLSN: types.LLSN(2), LastLLSN: types.LLSN(1)}, + wantErr: true, + }, + { + sr: SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.LLSN(1)}, + wantErr: true, + }, + { + sr: SyncRange{FirstLLSN: types.LLSN(1), LastLLSN: types.LLSN(2)}, + wantErr: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.sr.String(), func(t *testing.T) { + err := tc.sr.Validate() + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} diff --git a/proto/varlogpb/log_entry.go b/proto/varlogpb/log_entry.go deleted file mode 100644 index ee73f76ff..000000000 --- a/proto/varlogpb/log_entry.go +++ /dev/null @@ -1,13 +0,0 @@ -package varlogpb - -func InvalidLogEntryMeta() LogEntryMeta { - return LogEntryMeta{} -} - -func InvalidLogEntry() LogEntry { - return LogEntry{} -} - -func (le LogEntry) Invalid() bool { - return le.GLSN.Invalid() && le.LLSN.Invalid() && len(le.Data) == 0 -}