diff --git a/pkg/varlog/log.go b/pkg/varlog/log.go index d80f56977..2fc096d5d 100644 --- a/pkg/varlog/log.go +++ b/pkg/varlog/log.go @@ -44,13 +44,22 @@ type Log interface { // It returns an error if the log stream does not exist or fails to // fetch metadata from all replicas. // - // Deprecated: Use LogStreamReplicaMetdata. + // Deprecated: Use PeekLogStream LogStreamMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (varlogpb.LogStreamDescriptor, error) // LogStreamReplicaMetadata returns metadata of log stream replica // specified by the arguments tpid and lsid. It returns the first // successful result among all replicas. + // + // Deprecated: Use PeekLogStream LogStreamReplicaMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error) + + // PeekLogStream returns the log sequence numbers at the first and the + // last. It fetches the metadata for each replica of a log stream lsid + // concurrently and takes a result from either appendable or sealed + // replica. If none of the replicas' statuses is either appendable or + // sealed, it returns an error. + PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) } type AppendResult struct { @@ -190,6 +199,10 @@ func (v *logImpl) LogStreamReplicaMetadata(ctx context.Context, tpid types.Topic return v.logStreamReplicaMetadata(ctx, tpid, lsid) } +func (v *logImpl) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) { + return v.peekLogStream(ctx, tpid, lsid) +} + func (v *logImpl) Close() (err error) { if v.closed.Load() { return diff --git a/pkg/varlog/operations.go b/pkg/varlog/operations.go index 9b8164b51..55eb9bc54 100644 --- a/pkg/varlog/operations.go +++ b/pkg/varlog/operations.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "github.com/pkg/errors" "go.uber.org/multierr" @@ -147,3 +148,57 @@ func (v *logImpl) logStreamReplicaMetadata(ctx context.Context, tpID types.Topic } return snpb.LogStreamReplicaMetadataDescriptor{}, err } + +func (v *logImpl) peekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) { + replicas, ok := v.replicasRetriever.Retrieve(tpid, lsid) + if !ok { + err = errNoLogStream + return + } + + var ( + errs = make([]error, len(replicas)) + wg sync.WaitGroup + mu sync.Mutex + found bool + ) + for idx := range replicas { + wg.Add(1) + go func(idx int) { + defer wg.Done() + client, erri := v.logCLManager.GetOrConnect(ctx, replicas[idx].StorageNodeID, replicas[idx].Address) + if erri != nil { + errs[idx] = erri + return + } + lsrmd, erri := client.LogStreamReplicaMetadata(ctx, tpid, lsid) + if erri != nil { + errs[idx] = erri + return + } + switch lsrmd.Status { + case varlogpb.LogStreamStatusRunning, varlogpb.LogStreamStatusSealed: + mu.Lock() + defer mu.Unlock() + if first.LLSN < lsrmd.LocalLowWatermark.LLSN { + first = lsrmd.LocalLowWatermark + } + if last.LLSN < lsrmd.LocalHighWatermark.LLSN { + last = lsrmd.LocalHighWatermark + } + found = true + default: + errs[idx] = fmt.Errorf("logstream replica snid=%v: invalid status: %s", + replicas[idx].StorageNodeID, lsrmd.Status, + ) + } + }(idx) + } + wg.Wait() + + if found { + return first, last, nil + } + err = multierr.Combine(errs...) + return first, last, err +} diff --git a/pkg/varlogtest/log.go b/pkg/varlogtest/log.go index 1b57e9cea..70b4de447 100644 --- a/pkg/varlogtest/log.go +++ b/pkg/varlogtest/log.go @@ -357,6 +357,34 @@ func (c *testLog) LogStreamReplicaMetadata(_ context.Context, tpid types.TopicID }, nil } +func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) { + if err = c.lock(); err != nil { + return first, last, err + } + defer c.unlock() + + topicDesc, ok := c.vt.topics[tpid] + if !ok { + return first, last, errors.New("no such topic") + } + + if !topicDesc.HasLogStream(lsid) { + return first, last, errors.New("no such log stream") + } + + head, tail := c.vt.peek(tpid, lsid) + first = varlogpb.LogSequenceNumber{ + LLSN: head.LLSN, + GLSN: head.GLSN, + } + last = varlogpb.LogSequenceNumber{ + LLSN: tail.LLSN, + GLSN: tail.GLSN, + } + return first, last, nil + +} + type errSubscriber struct { err error } diff --git a/pkg/varlogtest/varlogtest_test.go b/pkg/varlogtest/varlogtest_test.go index 488e76d91..f3d15ed8b 100644 --- a/pkg/varlogtest/varlogtest_test.go +++ b/pkg/varlogtest/varlogtest_test.go @@ -152,16 +152,10 @@ func TestVarlogTest(t *testing.T) { LogStreamID: logStreamID, }, logStreamDesc.Tail) - lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), topicID, logStreamID) + first, last, err := vlg.PeekLogStream(context.Background(), topicID, logStreamID) require.NoError(t, err) - require.Equal(t, varlogpb.LogEntryMeta{ - TopicID: topicID, - LogStreamID: logStreamID, - }, lsrmd.Head()) - require.Equal(t, varlogpb.LogEntryMeta{ - TopicID: topicID, - LogStreamID: logStreamID, - }, lsrmd.Tail()) + require.True(t, first.Invalid()) + require.True(t, last.Invalid()) } for i := 0; i < numLogStreams-numTopics; i++ { tpID := topicIDs[rng.Intn(numTopics)] @@ -336,15 +330,10 @@ func TestVarlogTest(t *testing.T) { } for _, lsID := range topicLogStreamsMap[tpID] { - lsDesc, err := vlg.LogStreamMetadata(context.Background(), tpID, lsID) //nolint:staticcheck - require.NoError(t, err) - require.GreaterOrEqual(t, lsDesc.Tail.LLSN, lsDesc.Head.LLSN) //nolint:staticcheck - subscribeTo(tpID, lsID, lsDesc.Head.LLSN, lsDesc.Tail.LLSN+1) //nolint:staticcheck - - lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), tpID, lsID) + first, last, err := vlg.PeekLogStream(context.Background(), tpID, lsID) require.NoError(t, err) - require.GreaterOrEqual(t, lsrmd.Tail().LLSN, lsrmd.Head().LLSN) - subscribeTo(tpID, lsID, lsrmd.Head().LLSN, lsrmd.Tail().LLSN+1) + require.GreaterOrEqual(t, last.LLSN, first.LLSN) + subscribeTo(tpID, lsID, first.LLSN, last.LLSN+1) lsd, err := adm.Unseal(context.Background(), tpID, lsID) require.NoError(t, err) @@ -360,15 +349,10 @@ func TestVarlogTest(t *testing.T) { // Metadata for tpID, lsIDs := range topicLogStreamsMap { for _, lsID := range lsIDs { - lsDesc, err := vlg.LogStreamMetadata(context.Background(), tpID, lsID) //nolint:staticcheck + first, last, err := vlg.PeekLogStream(context.Background(), tpID, lsID) require.NoError(t, err) - require.GreaterOrEqual(t, lsDesc.Tail.LLSN, lsDesc.Head.LLSN) //nolint:staticcheck - subscribeTo(tpID, lsID, lsDesc.Head.LLSN, lsDesc.Tail.LLSN+1) //nolint:staticcheck - - lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), tpID, lsID) - require.NoError(t, err) - require.GreaterOrEqual(t, lsrmd.Tail().LLSN, lsrmd.Head().LLSN) - subscribeTo(tpID, lsID, lsrmd.Head().LLSN, lsrmd.Tail().LLSN+1) + require.GreaterOrEqual(t, last.LLSN, first.LLSN) + subscribeTo(tpID, lsID, first.LLSN, last.LLSN+1) } } @@ -376,29 +360,24 @@ func TestVarlogTest(t *testing.T) { tpID := topicIDs[0] lsID := topicLogStreamsMap[tpID][0] - lsDesc, err := vlg.LogStreamMetadata(context.Background(), tpID, lsID) //nolint:staticcheck - require.NoError(t, err) - require.Equal(t, types.MinLLSN, lsDesc.Head.LLSN) //nolint:staticcheck - require.GreaterOrEqual(t, lsDesc.Tail.LLSN, types.LLSN(minLogsPerTopic)) - - lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), tpID, lsID) + first, last, err := vlg.PeekLogStream(context.Background(), tpID, lsID) require.NoError(t, err) - require.Equal(t, types.MinLLSN, lsrmd.Head().LLSN) - require.GreaterOrEqual(t, lsrmd.Tail().LLSN, types.LLSN(minLogsPerTopic)) + require.Equal(t, types.MinLLSN, first.LLSN) + require.GreaterOrEqual(t, last.LLSN, types.LLSN(minLogsPerTopic)) ctx1, cancel1 := context.WithCancel(context.Background()) defer cancel1() - sub1 := vlg.SubscribeTo(ctx1, tpID, lsID, types.MinLLSN, lsDesc.Tail.LLSN+1) + sub1 := vlg.SubscribeTo(ctx1, tpID, lsID, types.MinLLSN, last.LLSN+1) ctx2, cancel2 := context.WithCancel(context.Background()) defer cancel2() - sub2 := vlg.SubscribeTo(ctx2, tpID, lsID, types.MinLLSN, lsDesc.Tail.LLSN+4) + sub2 := vlg.SubscribeTo(ctx2, tpID, lsID, types.MinLLSN, last.LLSN+4) ctx3, cancel3 := context.WithCancel(context.Background()) defer cancel3() - sub3 := vlg.SubscribeTo(ctx3, tpID, lsID, types.MinLLSN, lsDesc.Tail.LLSN+4) + sub3 := vlg.SubscribeTo(ctx3, tpID, lsID, types.MinLLSN, last.LLSN+4) - for llsn := types.MinLLSN; llsn <= lsDesc.Tail.LLSN; llsn++ { + for llsn := types.MinLLSN; llsn <= last.LLSN; llsn++ { le, err := sub1.Next() require.NoError(t, err) require.Equal(t, llsn, le.LLSN) @@ -418,7 +397,7 @@ func TestVarlogTest(t *testing.T) { // Already closed require.Error(t, sub1.Close()) - for llsn := types.MinLLSN; llsn <= lsDesc.Tail.LLSN; llsn++ { + for llsn := types.MinLLSN; llsn <= last.LLSN; llsn++ { le, err := sub2.Next() require.NoError(t, err) require.Equal(t, llsn, le.LLSN) @@ -428,7 +407,7 @@ func TestVarlogTest(t *testing.T) { appendToLog(tpID, lsID, 1) le, err := sub2.Next() require.NoError(t, err) - require.Equal(t, lsDesc.Tail.LLSN+1, le.LLSN) + require.Equal(t, last.LLSN+1, le.LLSN) var wg sync.WaitGroup @@ -438,7 +417,7 @@ func TestVarlogTest(t *testing.T) { defer wg.Done() le, err := sub2.Next() require.NoError(t, err) - require.Equal(t, lsDesc.Tail.LLSN+2, le.LLSN) + require.Equal(t, last.LLSN+2, le.LLSN) }() time.Sleep(5 * time.Millisecond) appendToLog(tpID, lsID, 1) @@ -460,7 +439,7 @@ func TestVarlogTest(t *testing.T) { require.ErrorIs(t, err, context.Canceled) require.ErrorIs(t, sub2.Close(), context.Canceled) - for llsn := types.MinLLSN; llsn <= lsDesc.Tail.LLSN+2; llsn++ { + for llsn := types.MinLLSN; llsn <= last.LLSN+2; llsn++ { le, err := sub3.Next() require.NoError(t, err) require.Equal(t, llsn, le.LLSN) @@ -548,33 +527,15 @@ func TestVarlogTest_Trim(t *testing.T) { assert.Error(t, subscriber.Close()) } - lsd, err := vlg.LogStreamMetadata(context.Background(), td.TopicID, lsds[0].LogStreamID) //nolint:staticcheck - assert.NoError(t, err) - assert.Equal(t, types.LLSN(3), lsd.Head.LLSN) //nolint:staticcheck - assert.Equal(t, types.GLSN(5), lsd.Head.GLSN) //nolint:staticcheck - assert.Equal(t, types.LLSN(5), lsd.Tail.LLSN) - assert.Equal(t, types.GLSN(9), lsd.Tail.GLSN) - - lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), td.TopicID, lsds[0].LogStreamID) - assert.NoError(t, err) - assert.Equal(t, types.LLSN(3), lsrmd.Head().LLSN) - assert.Equal(t, types.GLSN(5), lsrmd.Head().GLSN) - assert.Equal(t, types.LLSN(5), lsrmd.Tail().LLSN) - assert.Equal(t, types.GLSN(9), lsrmd.Tail().GLSN) - - lsd, err = vlg.LogStreamMetadata(context.Background(), td.TopicID, lsds[1].LogStreamID) //nolint:staticcheck + first, last, err := vlg.PeekLogStream(context.Background(), td.TopicID, lsds[0].LogStreamID) assert.NoError(t, err) - assert.Equal(t, types.LLSN(2), lsd.Head.LLSN) //nolint:staticcheck - assert.Equal(t, types.GLSN(4), lsd.Head.GLSN) //nolint:staticcheck - assert.Equal(t, types.LLSN(5), lsd.Tail.LLSN) - assert.Equal(t, types.GLSN(10), lsd.Tail.GLSN) + assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 3, GLSN: 5}, first) + assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 5, GLSN: 9}, last) - lsrmd, err = vlg.LogStreamReplicaMetadata(context.Background(), td.TopicID, lsds[1].LogStreamID) + first, last, err = vlg.PeekLogStream(context.Background(), td.TopicID, lsds[1].LogStreamID) assert.NoError(t, err) - assert.Equal(t, types.LLSN(2), lsrmd.Head().LLSN) - assert.Equal(t, types.GLSN(4), lsrmd.Head().GLSN) - assert.Equal(t, types.LLSN(5), lsrmd.Tail().LLSN) - assert.Equal(t, types.GLSN(10), lsrmd.Tail().GLSN) + assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 2, GLSN: 4}, first) + assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 5, GLSN: 10}, last) subscriber := vlg.SubscribeTo(context.Background(), td.TopicID, lsds[0].LogStreamID, types.LLSN(3), types.LLSN(6)) expectedLLSN := types.LLSN(3) diff --git a/tests/it/cluster/client_test.go b/tests/it/cluster/client_test.go index c0df797dc..229e2c814 100644 --- a/tests/it/cluster/client_test.go +++ b/tests/it/cluster/client_test.go @@ -217,20 +217,12 @@ func TestClientAppend(t *testing.T) { require.Equal(t, topicID, lsd.Tail.TopicID) require.Equal(t, logStreamID, lsd.Tail.LogStreamID) - lsrmd, err := client.LogStreamReplicaMetadata(context.Background(), topicID, logStreamID) + first, last, err := client.PeekLogStream(context.Background(), topicID, logStreamID) require.NoError(t, err) - require.Equal(t, topicID, lsrmd.TopicID) - require.Equal(t, logStreamID, lsrmd.LogStreamID) - - require.Equal(t, types.MinLLSN, lsrmd.Head().LLSN) - require.GreaterOrEqual(t, lsrmd.Head().GLSN, types.MinGLSN) - require.Equal(t, topicID, lsrmd.Head().TopicID) - require.Equal(t, logStreamID, lsrmd.Head().LogStreamID) - - require.GreaterOrEqual(t, lsrmd.Tail().LLSN, types.MinLLSN) - require.GreaterOrEqual(t, lsrmd.Tail().GLSN, types.MinGLSN) - require.Equal(t, topicID, lsrmd.Tail().TopicID) - require.Equal(t, logStreamID, lsrmd.Tail().LogStreamID) + require.Equal(t, types.MinLLSN, first.LLSN) + require.GreaterOrEqual(t, first.GLSN, types.MinGLSN) + require.GreaterOrEqual(t, last.LLSN, types.MinLLSN) + require.GreaterOrEqual(t, last.GLSN, types.MinGLSN) } } @@ -614,3 +606,54 @@ func TestVarlogSubscribeWithUpdateLS(t *testing.T) { }) })) } + +func TestClientPeekLogStream(t *testing.T) { + clus := it.NewVarlogCluster(t, + it.WithNumberOfStorageNodes(2), + it.WithReplicationFactor(2), + it.WithNumberOfTopics(1), + it.WithNumberOfLogStreams(1), + it.WithNumberOfClients(1), + it.WithVMSOptions(it.NewTestVMSOptions()...), + ) + defer clus.Close(t) + + tpid := clus.TopicIDs()[0] + lsid := clus.LogStreamIDs(tpid)[0] + client := clus.ClientAtIndex(t, 0) + + first, last, err := client.PeekLogStream(context.Background(), tpid, lsid) + require.NoError(t, err) + require.True(t, first.Invalid()) + require.True(t, last.Invalid()) + + res := client.Append(context.Background(), tpid, [][]byte{nil}) + require.NoError(t, res.Err) + + first, last, err = client.PeekLogStream(context.Background(), tpid, lsid) + require.NoError(t, err) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: 1, GLSN: 1, + }, first) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: 1, GLSN: 1, + }, last) + + idx := int(time.Now().UnixNano() % 2) + clus.CloseSN(t, clus.StorageNodeIDAtIndex(t, idx)) + + first, last, err = client.PeekLogStream(context.Background(), tpid, lsid) + require.NoError(t, err) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: 1, GLSN: 1, + }, first) + require.Equal(t, varlogpb.LogSequenceNumber{ + LLSN: 1, GLSN: 1, + }, last) + + idx = (idx + 1) % 2 + clus.CloseSN(t, clus.StorageNodeIDAtIndex(t, idx)) + + _, _, err = client.PeekLogStream(context.Background(), tpid, lsid) + require.Error(t, err) +}