Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): add PeekLogStream to the client #240

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/varlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,22 @@ type Log interface {
// It returns an error if the log stream does not exist or fails to
// fetch metadata from all replicas.
//
// Deprecated: Use LogStreamReplicaMetdata.
// Deprecated: Use PeekLogStream
LogStreamMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (varlogpb.LogStreamDescriptor, error)

// LogStreamReplicaMetadata returns metadata of log stream replica
// specified by the arguments tpid and lsid. It returns the first
// successful result among all replicas.
//
// Deprecated: Use PeekLogStream
LogStreamReplicaMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error)

// PeekLogStream returns the log sequence numbers at the first and the
// last. It fetches the metadata for each replica of a log stream lsid
// concurrently and takes a result from either appendable or sealed
// replica. If none of the replicas' statuses is either appendable or
// sealed, it returns an error.
PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error)
}

type AppendResult struct {
Expand Down Expand Up @@ -190,6 +199,10 @@ func (v *logImpl) LogStreamReplicaMetadata(ctx context.Context, tpid types.Topic
return v.logStreamReplicaMetadata(ctx, tpid, lsid)
}

func (v *logImpl) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) {
return v.peekLogStream(ctx, tpid, lsid)
}

func (v *logImpl) Close() (err error) {
if v.closed.Load() {
return
Expand Down
59 changes: 59 additions & 0 deletions pkg/varlog/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"

"github.com/pkg/errors"
"go.uber.org/multierr"
Expand Down Expand Up @@ -147,3 +148,61 @@ func (v *logImpl) logStreamReplicaMetadata(ctx context.Context, tpID types.Topic
}
return snpb.LogStreamReplicaMetadataDescriptor{}, err
}

func (v *logImpl) peekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) {
replicas, ok := v.replicasRetriever.Retrieve(tpid, lsid)
if !ok {
err = errNoLogStream
return
}

var (
errs = make([]error, len(replicas))
wg sync.WaitGroup
mu sync.Mutex
found bool
)
for idx := range replicas {
wg.Add(1)
go func(idx int) {
defer wg.Done()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

client, erri := v.logCLManager.GetOrConnect(ctx, replicas[idx].StorageNodeID, replicas[idx].Address)
if erri != nil {
errs[idx] = erri
return
}
lsrmd, erri := client.LogStreamReplicaMetadata(ctx, tpid, lsid)
if erri != nil {
errs[idx] = erri
return
}
switch lsrmd.Status {
case varlogpb.LogStreamStatusRunning, varlogpb.LogStreamStatusSealed:
mu.Lock()
defer mu.Unlock()
if first.LLSN < lsrmd.LocalLowWatermark.LLSN {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it means highest LocalLowWatermark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. Since the trim operation removes from the front of log entries, PeekLogStream takes the largest one.

first = lsrmd.LocalLowWatermark
}
if last.LLSN < lsrmd.LocalHighWatermark.LLSN {
last = lsrmd.LocalHighWatermark
}
found = true
default:
errs[idx] = fmt.Errorf("logstream replica snid=%v: invalid status: %s",
replicas[idx].StorageNodeID, lsrmd.Status,
)
}
}(idx)
}
wg.Wait()

if found {
return first, last, nil
}
err = multierr.Combine(errs...)
return first, last, err
}
28 changes: 28 additions & 0 deletions pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,34 @@ func (c *testLog) LogStreamReplicaMetadata(_ context.Context, tpid types.TopicID
}, nil
}

func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) {
if err = c.lock(); err != nil {
return first, last, err
}
defer c.unlock()

topicDesc, ok := c.vt.topics[tpid]
if !ok {
return first, last, errors.New("no such topic")
}

if !topicDesc.HasLogStream(lsid) {
return first, last, errors.New("no such log stream")
}

head, tail := c.vt.peek(tpid, lsid)
first = varlogpb.LogSequenceNumber{
LLSN: head.LLSN,
GLSN: head.GLSN,
}
last = varlogpb.LogSequenceNumber{
LLSN: tail.LLSN,
GLSN: tail.GLSN,
}
return first, last, nil

}

type errSubscriber struct {
err error
}
Expand Down
91 changes: 26 additions & 65 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,10 @@ func TestVarlogTest(t *testing.T) {
LogStreamID: logStreamID,
}, logStreamDesc.Tail)

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), topicID, logStreamID)
first, last, err := vlg.PeekLogStream(context.Background(), topicID, logStreamID)
require.NoError(t, err)
require.Equal(t, varlogpb.LogEntryMeta{
TopicID: topicID,
LogStreamID: logStreamID,
}, lsrmd.Head())
require.Equal(t, varlogpb.LogEntryMeta{
TopicID: topicID,
LogStreamID: logStreamID,
}, lsrmd.Tail())
require.True(t, first.Invalid())
require.True(t, last.Invalid())
}
for i := 0; i < numLogStreams-numTopics; i++ {
tpID := topicIDs[rng.Intn(numTopics)]
Expand Down Expand Up @@ -336,15 +330,10 @@ func TestVarlogTest(t *testing.T) {
}

for _, lsID := range topicLogStreamsMap[tpID] {
lsDesc, err := vlg.LogStreamMetadata(context.Background(), tpID, lsID) //nolint:staticcheck
require.NoError(t, err)
require.GreaterOrEqual(t, lsDesc.Tail.LLSN, lsDesc.Head.LLSN) //nolint:staticcheck
subscribeTo(tpID, lsID, lsDesc.Head.LLSN, lsDesc.Tail.LLSN+1) //nolint:staticcheck

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), tpID, lsID)
first, last, err := vlg.PeekLogStream(context.Background(), tpID, lsID)
require.NoError(t, err)
require.GreaterOrEqual(t, lsrmd.Tail().LLSN, lsrmd.Head().LLSN)
subscribeTo(tpID, lsID, lsrmd.Head().LLSN, lsrmd.Tail().LLSN+1)
require.GreaterOrEqual(t, last.LLSN, first.LLSN)
subscribeTo(tpID, lsID, first.LLSN, last.LLSN+1)

lsd, err := adm.Unseal(context.Background(), tpID, lsID)
require.NoError(t, err)
Expand All @@ -360,45 +349,35 @@ func TestVarlogTest(t *testing.T) {
// Metadata
for tpID, lsIDs := range topicLogStreamsMap {
for _, lsID := range lsIDs {
lsDesc, err := vlg.LogStreamMetadata(context.Background(), tpID, lsID) //nolint:staticcheck
first, last, err := vlg.PeekLogStream(context.Background(), tpID, lsID)
require.NoError(t, err)
require.GreaterOrEqual(t, lsDesc.Tail.LLSN, lsDesc.Head.LLSN) //nolint:staticcheck
subscribeTo(tpID, lsID, lsDesc.Head.LLSN, lsDesc.Tail.LLSN+1) //nolint:staticcheck

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), tpID, lsID)
require.NoError(t, err)
require.GreaterOrEqual(t, lsrmd.Tail().LLSN, lsrmd.Head().LLSN)
subscribeTo(tpID, lsID, lsrmd.Head().LLSN, lsrmd.Tail().LLSN+1)
require.GreaterOrEqual(t, last.LLSN, first.LLSN)
subscribeTo(tpID, lsID, first.LLSN, last.LLSN+1)
}
}

// Stop subscriber
tpID := topicIDs[0]
lsID := topicLogStreamsMap[tpID][0]

lsDesc, err := vlg.LogStreamMetadata(context.Background(), tpID, lsID) //nolint:staticcheck
require.NoError(t, err)
require.Equal(t, types.MinLLSN, lsDesc.Head.LLSN) //nolint:staticcheck
require.GreaterOrEqual(t, lsDesc.Tail.LLSN, types.LLSN(minLogsPerTopic))

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), tpID, lsID)
first, last, err := vlg.PeekLogStream(context.Background(), tpID, lsID)
require.NoError(t, err)
require.Equal(t, types.MinLLSN, lsrmd.Head().LLSN)
require.GreaterOrEqual(t, lsrmd.Tail().LLSN, types.LLSN(minLogsPerTopic))
require.Equal(t, types.MinLLSN, first.LLSN)
require.GreaterOrEqual(t, last.LLSN, types.LLSN(minLogsPerTopic))

ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
sub1 := vlg.SubscribeTo(ctx1, tpID, lsID, types.MinLLSN, lsDesc.Tail.LLSN+1)
sub1 := vlg.SubscribeTo(ctx1, tpID, lsID, types.MinLLSN, last.LLSN+1)

ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
sub2 := vlg.SubscribeTo(ctx2, tpID, lsID, types.MinLLSN, lsDesc.Tail.LLSN+4)
sub2 := vlg.SubscribeTo(ctx2, tpID, lsID, types.MinLLSN, last.LLSN+4)

ctx3, cancel3 := context.WithCancel(context.Background())
defer cancel3()
sub3 := vlg.SubscribeTo(ctx3, tpID, lsID, types.MinLLSN, lsDesc.Tail.LLSN+4)
sub3 := vlg.SubscribeTo(ctx3, tpID, lsID, types.MinLLSN, last.LLSN+4)

for llsn := types.MinLLSN; llsn <= lsDesc.Tail.LLSN; llsn++ {
for llsn := types.MinLLSN; llsn <= last.LLSN; llsn++ {
le, err := sub1.Next()
require.NoError(t, err)
require.Equal(t, llsn, le.LLSN)
Expand All @@ -418,7 +397,7 @@ func TestVarlogTest(t *testing.T) {
// Already closed
require.Error(t, sub1.Close())

for llsn := types.MinLLSN; llsn <= lsDesc.Tail.LLSN; llsn++ {
for llsn := types.MinLLSN; llsn <= last.LLSN; llsn++ {
le, err := sub2.Next()
require.NoError(t, err)
require.Equal(t, llsn, le.LLSN)
Expand All @@ -428,7 +407,7 @@ func TestVarlogTest(t *testing.T) {
appendToLog(tpID, lsID, 1)
le, err := sub2.Next()
require.NoError(t, err)
require.Equal(t, lsDesc.Tail.LLSN+1, le.LLSN)
require.Equal(t, last.LLSN+1, le.LLSN)

var wg sync.WaitGroup

Expand All @@ -438,7 +417,7 @@ func TestVarlogTest(t *testing.T) {
defer wg.Done()
le, err := sub2.Next()
require.NoError(t, err)
require.Equal(t, lsDesc.Tail.LLSN+2, le.LLSN)
require.Equal(t, last.LLSN+2, le.LLSN)
}()
time.Sleep(5 * time.Millisecond)
appendToLog(tpID, lsID, 1)
Expand All @@ -460,7 +439,7 @@ func TestVarlogTest(t *testing.T) {
require.ErrorIs(t, err, context.Canceled)
require.ErrorIs(t, sub2.Close(), context.Canceled)

for llsn := types.MinLLSN; llsn <= lsDesc.Tail.LLSN+2; llsn++ {
for llsn := types.MinLLSN; llsn <= last.LLSN+2; llsn++ {
le, err := sub3.Next()
require.NoError(t, err)
require.Equal(t, llsn, le.LLSN)
Expand Down Expand Up @@ -548,33 +527,15 @@ func TestVarlogTest_Trim(t *testing.T) {
assert.Error(t, subscriber.Close())
}

lsd, err := vlg.LogStreamMetadata(context.Background(), td.TopicID, lsds[0].LogStreamID) //nolint:staticcheck
assert.NoError(t, err)
assert.Equal(t, types.LLSN(3), lsd.Head.LLSN) //nolint:staticcheck
assert.Equal(t, types.GLSN(5), lsd.Head.GLSN) //nolint:staticcheck
assert.Equal(t, types.LLSN(5), lsd.Tail.LLSN)
assert.Equal(t, types.GLSN(9), lsd.Tail.GLSN)

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), td.TopicID, lsds[0].LogStreamID)
assert.NoError(t, err)
assert.Equal(t, types.LLSN(3), lsrmd.Head().LLSN)
assert.Equal(t, types.GLSN(5), lsrmd.Head().GLSN)
assert.Equal(t, types.LLSN(5), lsrmd.Tail().LLSN)
assert.Equal(t, types.GLSN(9), lsrmd.Tail().GLSN)

lsd, err = vlg.LogStreamMetadata(context.Background(), td.TopicID, lsds[1].LogStreamID) //nolint:staticcheck
first, last, err := vlg.PeekLogStream(context.Background(), td.TopicID, lsds[0].LogStreamID)
assert.NoError(t, err)
assert.Equal(t, types.LLSN(2), lsd.Head.LLSN) //nolint:staticcheck
assert.Equal(t, types.GLSN(4), lsd.Head.GLSN) //nolint:staticcheck
assert.Equal(t, types.LLSN(5), lsd.Tail.LLSN)
assert.Equal(t, types.GLSN(10), lsd.Tail.GLSN)
assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 3, GLSN: 5}, first)
assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 5, GLSN: 9}, last)

lsrmd, err = vlg.LogStreamReplicaMetadata(context.Background(), td.TopicID, lsds[1].LogStreamID)
first, last, err = vlg.PeekLogStream(context.Background(), td.TopicID, lsds[1].LogStreamID)
assert.NoError(t, err)
assert.Equal(t, types.LLSN(2), lsrmd.Head().LLSN)
assert.Equal(t, types.GLSN(4), lsrmd.Head().GLSN)
assert.Equal(t, types.LLSN(5), lsrmd.Tail().LLSN)
assert.Equal(t, types.GLSN(10), lsrmd.Tail().GLSN)
assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 2, GLSN: 4}, first)
assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 5, GLSN: 10}, last)

subscriber := vlg.SubscribeTo(context.Background(), td.TopicID, lsds[0].LogStreamID, types.LLSN(3), types.LLSN(6))
expectedLLSN := types.LLSN(3)
Expand Down
69 changes: 56 additions & 13 deletions tests/it/cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,20 +217,12 @@ func TestClientAppend(t *testing.T) {
require.Equal(t, topicID, lsd.Tail.TopicID)
require.Equal(t, logStreamID, lsd.Tail.LogStreamID)

lsrmd, err := client.LogStreamReplicaMetadata(context.Background(), topicID, logStreamID)
first, last, err := client.PeekLogStream(context.Background(), topicID, logStreamID)
require.NoError(t, err)
require.Equal(t, topicID, lsrmd.TopicID)
require.Equal(t, logStreamID, lsrmd.LogStreamID)

require.Equal(t, types.MinLLSN, lsrmd.Head().LLSN)
require.GreaterOrEqual(t, lsrmd.Head().GLSN, types.MinGLSN)
require.Equal(t, topicID, lsrmd.Head().TopicID)
require.Equal(t, logStreamID, lsrmd.Head().LogStreamID)

require.GreaterOrEqual(t, lsrmd.Tail().LLSN, types.MinLLSN)
require.GreaterOrEqual(t, lsrmd.Tail().GLSN, types.MinGLSN)
require.Equal(t, topicID, lsrmd.Tail().TopicID)
require.Equal(t, logStreamID, lsrmd.Tail().LogStreamID)
require.Equal(t, types.MinLLSN, first.LLSN)
require.GreaterOrEqual(t, first.GLSN, types.MinGLSN)
require.GreaterOrEqual(t, last.LLSN, types.MinLLSN)
require.GreaterOrEqual(t, last.GLSN, types.MinGLSN)
}
}

Expand Down Expand Up @@ -614,3 +606,54 @@ func TestVarlogSubscribeWithUpdateLS(t *testing.T) {
})
}))
}

func TestClientPeekLogStream(t *testing.T) {
clus := it.NewVarlogCluster(t,
it.WithNumberOfStorageNodes(2),
it.WithReplicationFactor(2),
it.WithNumberOfTopics(1),
it.WithNumberOfLogStreams(1),
it.WithNumberOfClients(1),
it.WithVMSOptions(it.NewTestVMSOptions()...),
)
defer clus.Close(t)

tpid := clus.TopicIDs()[0]
lsid := clus.LogStreamIDs(tpid)[0]
client := clus.ClientAtIndex(t, 0)

first, last, err := client.PeekLogStream(context.Background(), tpid, lsid)
require.NoError(t, err)
require.True(t, first.Invalid())
require.True(t, last.Invalid())

res := client.Append(context.Background(), tpid, [][]byte{nil})
require.NoError(t, res.Err)

first, last, err = client.PeekLogStream(context.Background(), tpid, lsid)
require.NoError(t, err)
require.Equal(t, varlogpb.LogSequenceNumber{
LLSN: 1, GLSN: 1,
}, first)
require.Equal(t, varlogpb.LogSequenceNumber{
LLSN: 1, GLSN: 1,
}, last)

idx := int(time.Now().UnixNano() % 2)
clus.CloseSN(t, clus.StorageNodeIDAtIndex(t, idx))

first, last, err = client.PeekLogStream(context.Background(), tpid, lsid)
require.NoError(t, err)
require.Equal(t, varlogpb.LogSequenceNumber{
LLSN: 1, GLSN: 1,
}, first)
require.Equal(t, varlogpb.LogSequenceNumber{
LLSN: 1, GLSN: 1,
}, last)

idx = (idx + 1) % 2
clus.CloseSN(t, clus.StorageNodeIDAtIndex(t, idx))

_, _, err = client.PeekLogStream(context.Background(), tpid, lsid)
require.Error(t, err)
}