Skip to content

Commit

Permalink
feat(client): add PeekLogStream to the client
Browse files Browse the repository at this point in the history
This patch adds a new API called PeekLogStream to the client. The PeekLogStream returns sequence
numbers at the first and last log entries. Users using LogStreamMetadata or LogStreamReplicaMetadata
to fetch the sequence numbers should use PeekLogStream.

PeekLogStream has the benefits over the ogStreamMetadata or LogStreamReplicaMetadata.

- It does not send users useless and complex information about log stream replicas.
- It prevents users from getting too detailed information about log stream replicas.
- It takes accounts for the status of log stream replicas; thus, it returns more accurate first and
  last log sequence numbers of the log stream.

Resolves kakao#239
  • Loading branch information
ijsong committed Nov 24, 2022
1 parent 613aa57 commit ab1bb44
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 79 deletions.
15 changes: 14 additions & 1 deletion pkg/varlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,22 @@ type Log interface {
// It returns an error if the log stream does not exist or fails to
// fetch metadata from all replicas.
//
// Deprecated: Use LogStreamReplicaMetdata.
// Deprecated: Use PeekLogStream
LogStreamMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (varlogpb.LogStreamDescriptor, error)

// LogStreamReplicaMetadata returns metadata of log stream replica
// specified by the arguments tpid and lsid. It returns the first
// successful result among all replicas.
//
// Deprecated: Use PeekLogStream
LogStreamReplicaMetadata(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (snpb.LogStreamReplicaMetadataDescriptor, error)

// PeekLogStream returns the log sequence numbers at the first and the
// last. It fetches the metadata for each replica of a log stream lsid
// concurrently and takes a result from either appendable or sealed
// replica. If none of the replicas' statuses is either appendable or
// sealed, it returns an error.
PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error)
}

type AppendResult struct {
Expand Down Expand Up @@ -190,6 +199,10 @@ func (v *logImpl) LogStreamReplicaMetadata(ctx context.Context, tpid types.Topic
return v.logStreamReplicaMetadata(ctx, tpid, lsid)
}

func (v *logImpl) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) {
return v.peekLogStream(ctx, tpid, lsid)
}

func (v *logImpl) Close() (err error) {
if v.closed.Load() {
return
Expand Down
55 changes: 55 additions & 0 deletions pkg/varlog/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"

"github.com/pkg/errors"
"go.uber.org/multierr"
Expand Down Expand Up @@ -147,3 +148,57 @@ func (v *logImpl) logStreamReplicaMetadata(ctx context.Context, tpID types.Topic
}
return snpb.LogStreamReplicaMetadataDescriptor{}, err
}

func (v *logImpl) peekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) {
replicas, ok := v.replicasRetriever.Retrieve(tpid, lsid)
if !ok {
err = errNoLogStream
return
}

var (
errs = make([]error, len(replicas))
wg sync.WaitGroup
mu sync.Mutex
found bool
)
for idx := range replicas {
wg.Add(1)
go func(idx int) {
defer wg.Done()
client, erri := v.logCLManager.GetOrConnect(ctx, replicas[idx].StorageNodeID, replicas[idx].Address)
if erri != nil {
errs[idx] = erri
return
}
lsrmd, erri := client.LogStreamReplicaMetadata(ctx, tpid, lsid)
if erri != nil {
errs[idx] = erri
return
}
switch lsrmd.Status {
case varlogpb.LogStreamStatusRunning, varlogpb.LogStreamStatusSealed:
mu.Lock()
defer mu.Unlock()
if first.LLSN < lsrmd.LocalLowWatermark.LLSN {
first = lsrmd.LocalLowWatermark
}
if last.LLSN < lsrmd.LocalHighWatermark.LLSN {
last = lsrmd.LocalHighWatermark
}
found = true
default:
errs[idx] = fmt.Errorf("logstream replica snid=%v: invalid status: %s",
replicas[idx].StorageNodeID, lsrmd.Status,
)
}
}(idx)
}
wg.Wait()

if found {
return first, last, nil
}
err = multierr.Combine(errs...)
return first, last, err
}
28 changes: 28 additions & 0 deletions pkg/varlogtest/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,34 @@ func (c *testLog) LogStreamReplicaMetadata(_ context.Context, tpid types.TopicID
}, nil
}

func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (first varlogpb.LogSequenceNumber, last varlogpb.LogSequenceNumber, err error) {
if err = c.lock(); err != nil {
return first, last, err
}
defer c.unlock()

topicDesc, ok := c.vt.topics[tpid]
if !ok {
return first, last, errors.New("no such topic")
}

if !topicDesc.HasLogStream(lsid) {
return first, last, errors.New("no such log stream")
}

head, tail := c.vt.peek(tpid, lsid)
first = varlogpb.LogSequenceNumber{
LLSN: head.LLSN,
GLSN: head.GLSN,
}
last = varlogpb.LogSequenceNumber{
LLSN: tail.LLSN,
GLSN: tail.GLSN,
}
return first, last, nil

}

type errSubscriber struct {
err error
}
Expand Down
91 changes: 26 additions & 65 deletions pkg/varlogtest/varlogtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,10 @@ func TestVarlogTest(t *testing.T) {
LogStreamID: logStreamID,
}, logStreamDesc.Tail)

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), topicID, logStreamID)
first, last, err := vlg.PeekLogStream(context.Background(), topicID, logStreamID)
require.NoError(t, err)
require.Equal(t, varlogpb.LogEntryMeta{
TopicID: topicID,
LogStreamID: logStreamID,
}, lsrmd.Head())
require.Equal(t, varlogpb.LogEntryMeta{
TopicID: topicID,
LogStreamID: logStreamID,
}, lsrmd.Tail())
require.True(t, first.Invalid())
require.True(t, last.Invalid())
}
for i := 0; i < numLogStreams-numTopics; i++ {
tpID := topicIDs[rng.Intn(numTopics)]
Expand Down Expand Up @@ -336,15 +330,10 @@ func TestVarlogTest(t *testing.T) {
}

for _, lsID := range topicLogStreamsMap[tpID] {
lsDesc, err := vlg.LogStreamMetadata(context.Background(), tpID, lsID) //nolint:staticcheck
require.NoError(t, err)
require.GreaterOrEqual(t, lsDesc.Tail.LLSN, lsDesc.Head.LLSN) //nolint:staticcheck
subscribeTo(tpID, lsID, lsDesc.Head.LLSN, lsDesc.Tail.LLSN+1) //nolint:staticcheck

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), tpID, lsID)
first, last, err := vlg.PeekLogStream(context.Background(), tpID, lsID)
require.NoError(t, err)
require.GreaterOrEqual(t, lsrmd.Tail().LLSN, lsrmd.Head().LLSN)
subscribeTo(tpID, lsID, lsrmd.Head().LLSN, lsrmd.Tail().LLSN+1)
require.GreaterOrEqual(t, last.LLSN, first.LLSN)
subscribeTo(tpID, lsID, first.LLSN, last.LLSN+1)

lsd, err := adm.Unseal(context.Background(), tpID, lsID)
require.NoError(t, err)
Expand All @@ -360,45 +349,35 @@ func TestVarlogTest(t *testing.T) {
// Metadata
for tpID, lsIDs := range topicLogStreamsMap {
for _, lsID := range lsIDs {
lsDesc, err := vlg.LogStreamMetadata(context.Background(), tpID, lsID) //nolint:staticcheck
first, last, err := vlg.PeekLogStream(context.Background(), tpID, lsID)
require.NoError(t, err)
require.GreaterOrEqual(t, lsDesc.Tail.LLSN, lsDesc.Head.LLSN) //nolint:staticcheck
subscribeTo(tpID, lsID, lsDesc.Head.LLSN, lsDesc.Tail.LLSN+1) //nolint:staticcheck

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), tpID, lsID)
require.NoError(t, err)
require.GreaterOrEqual(t, lsrmd.Tail().LLSN, lsrmd.Head().LLSN)
subscribeTo(tpID, lsID, lsrmd.Head().LLSN, lsrmd.Tail().LLSN+1)
require.GreaterOrEqual(t, last.LLSN, first.LLSN)
subscribeTo(tpID, lsID, first.LLSN, last.LLSN+1)
}
}

// Stop subscriber
tpID := topicIDs[0]
lsID := topicLogStreamsMap[tpID][0]

lsDesc, err := vlg.LogStreamMetadata(context.Background(), tpID, lsID) //nolint:staticcheck
require.NoError(t, err)
require.Equal(t, types.MinLLSN, lsDesc.Head.LLSN) //nolint:staticcheck
require.GreaterOrEqual(t, lsDesc.Tail.LLSN, types.LLSN(minLogsPerTopic))

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), tpID, lsID)
first, last, err := vlg.PeekLogStream(context.Background(), tpID, lsID)
require.NoError(t, err)
require.Equal(t, types.MinLLSN, lsrmd.Head().LLSN)
require.GreaterOrEqual(t, lsrmd.Tail().LLSN, types.LLSN(minLogsPerTopic))
require.Equal(t, types.MinLLSN, first.LLSN)
require.GreaterOrEqual(t, last.LLSN, types.LLSN(minLogsPerTopic))

ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
sub1 := vlg.SubscribeTo(ctx1, tpID, lsID, types.MinLLSN, lsDesc.Tail.LLSN+1)
sub1 := vlg.SubscribeTo(ctx1, tpID, lsID, types.MinLLSN, last.LLSN+1)

ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
sub2 := vlg.SubscribeTo(ctx2, tpID, lsID, types.MinLLSN, lsDesc.Tail.LLSN+4)
sub2 := vlg.SubscribeTo(ctx2, tpID, lsID, types.MinLLSN, last.LLSN+4)

ctx3, cancel3 := context.WithCancel(context.Background())
defer cancel3()
sub3 := vlg.SubscribeTo(ctx3, tpID, lsID, types.MinLLSN, lsDesc.Tail.LLSN+4)
sub3 := vlg.SubscribeTo(ctx3, tpID, lsID, types.MinLLSN, last.LLSN+4)

for llsn := types.MinLLSN; llsn <= lsDesc.Tail.LLSN; llsn++ {
for llsn := types.MinLLSN; llsn <= last.LLSN; llsn++ {
le, err := sub1.Next()
require.NoError(t, err)
require.Equal(t, llsn, le.LLSN)
Expand All @@ -418,7 +397,7 @@ func TestVarlogTest(t *testing.T) {
// Already closed
require.Error(t, sub1.Close())

for llsn := types.MinLLSN; llsn <= lsDesc.Tail.LLSN; llsn++ {
for llsn := types.MinLLSN; llsn <= last.LLSN; llsn++ {
le, err := sub2.Next()
require.NoError(t, err)
require.Equal(t, llsn, le.LLSN)
Expand All @@ -428,7 +407,7 @@ func TestVarlogTest(t *testing.T) {
appendToLog(tpID, lsID, 1)
le, err := sub2.Next()
require.NoError(t, err)
require.Equal(t, lsDesc.Tail.LLSN+1, le.LLSN)
require.Equal(t, last.LLSN+1, le.LLSN)

var wg sync.WaitGroup

Expand All @@ -438,7 +417,7 @@ func TestVarlogTest(t *testing.T) {
defer wg.Done()
le, err := sub2.Next()
require.NoError(t, err)
require.Equal(t, lsDesc.Tail.LLSN+2, le.LLSN)
require.Equal(t, last.LLSN+2, le.LLSN)
}()
time.Sleep(5 * time.Millisecond)
appendToLog(tpID, lsID, 1)
Expand All @@ -460,7 +439,7 @@ func TestVarlogTest(t *testing.T) {
require.ErrorIs(t, err, context.Canceled)
require.ErrorIs(t, sub2.Close(), context.Canceled)

for llsn := types.MinLLSN; llsn <= lsDesc.Tail.LLSN+2; llsn++ {
for llsn := types.MinLLSN; llsn <= last.LLSN+2; llsn++ {
le, err := sub3.Next()
require.NoError(t, err)
require.Equal(t, llsn, le.LLSN)
Expand Down Expand Up @@ -548,33 +527,15 @@ func TestVarlogTest_Trim(t *testing.T) {
assert.Error(t, subscriber.Close())
}

lsd, err := vlg.LogStreamMetadata(context.Background(), td.TopicID, lsds[0].LogStreamID) //nolint:staticcheck
assert.NoError(t, err)
assert.Equal(t, types.LLSN(3), lsd.Head.LLSN) //nolint:staticcheck
assert.Equal(t, types.GLSN(5), lsd.Head.GLSN) //nolint:staticcheck
assert.Equal(t, types.LLSN(5), lsd.Tail.LLSN)
assert.Equal(t, types.GLSN(9), lsd.Tail.GLSN)

lsrmd, err := vlg.LogStreamReplicaMetadata(context.Background(), td.TopicID, lsds[0].LogStreamID)
assert.NoError(t, err)
assert.Equal(t, types.LLSN(3), lsrmd.Head().LLSN)
assert.Equal(t, types.GLSN(5), lsrmd.Head().GLSN)
assert.Equal(t, types.LLSN(5), lsrmd.Tail().LLSN)
assert.Equal(t, types.GLSN(9), lsrmd.Tail().GLSN)

lsd, err = vlg.LogStreamMetadata(context.Background(), td.TopicID, lsds[1].LogStreamID) //nolint:staticcheck
first, last, err := vlg.PeekLogStream(context.Background(), td.TopicID, lsds[0].LogStreamID)
assert.NoError(t, err)
assert.Equal(t, types.LLSN(2), lsd.Head.LLSN) //nolint:staticcheck
assert.Equal(t, types.GLSN(4), lsd.Head.GLSN) //nolint:staticcheck
assert.Equal(t, types.LLSN(5), lsd.Tail.LLSN)
assert.Equal(t, types.GLSN(10), lsd.Tail.GLSN)
assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 3, GLSN: 5}, first)
assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 5, GLSN: 9}, last)

lsrmd, err = vlg.LogStreamReplicaMetadata(context.Background(), td.TopicID, lsds[1].LogStreamID)
first, last, err = vlg.PeekLogStream(context.Background(), td.TopicID, lsds[1].LogStreamID)
assert.NoError(t, err)
assert.Equal(t, types.LLSN(2), lsrmd.Head().LLSN)
assert.Equal(t, types.GLSN(4), lsrmd.Head().GLSN)
assert.Equal(t, types.LLSN(5), lsrmd.Tail().LLSN)
assert.Equal(t, types.GLSN(10), lsrmd.Tail().GLSN)
assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 2, GLSN: 4}, first)
assert.Equal(t, varlogpb.LogSequenceNumber{LLSN: 5, GLSN: 10}, last)

subscriber := vlg.SubscribeTo(context.Background(), td.TopicID, lsds[0].LogStreamID, types.LLSN(3), types.LLSN(6))
expectedLLSN := types.LLSN(3)
Expand Down
69 changes: 56 additions & 13 deletions tests/it/cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,20 +217,12 @@ func TestClientAppend(t *testing.T) {
require.Equal(t, topicID, lsd.Tail.TopicID)
require.Equal(t, logStreamID, lsd.Tail.LogStreamID)

lsrmd, err := client.LogStreamReplicaMetadata(context.Background(), topicID, logStreamID)
first, last, err := client.PeekLogStream(context.Background(), topicID, logStreamID)
require.NoError(t, err)
require.Equal(t, topicID, lsrmd.TopicID)
require.Equal(t, logStreamID, lsrmd.LogStreamID)

require.Equal(t, types.MinLLSN, lsrmd.Head().LLSN)
require.GreaterOrEqual(t, lsrmd.Head().GLSN, types.MinGLSN)
require.Equal(t, topicID, lsrmd.Head().TopicID)
require.Equal(t, logStreamID, lsrmd.Head().LogStreamID)

require.GreaterOrEqual(t, lsrmd.Tail().LLSN, types.MinLLSN)
require.GreaterOrEqual(t, lsrmd.Tail().GLSN, types.MinGLSN)
require.Equal(t, topicID, lsrmd.Tail().TopicID)
require.Equal(t, logStreamID, lsrmd.Tail().LogStreamID)
require.Equal(t, types.MinLLSN, first.LLSN)
require.GreaterOrEqual(t, first.GLSN, types.MinGLSN)
require.GreaterOrEqual(t, last.LLSN, types.MinLLSN)
require.GreaterOrEqual(t, last.GLSN, types.MinGLSN)
}
}

Expand Down Expand Up @@ -614,3 +606,54 @@ func TestVarlogSubscribeWithUpdateLS(t *testing.T) {
})
}))
}

func TestClientPeekLogStream(t *testing.T) {
clus := it.NewVarlogCluster(t,
it.WithNumberOfStorageNodes(2),
it.WithReplicationFactor(2),
it.WithNumberOfTopics(1),
it.WithNumberOfLogStreams(1),
it.WithNumberOfClients(1),
it.WithVMSOptions(it.NewTestVMSOptions()...),
)
defer clus.Close(t)

tpid := clus.TopicIDs()[0]
lsid := clus.LogStreamIDs(tpid)[0]
client := clus.ClientAtIndex(t, 0)

first, last, err := client.PeekLogStream(context.Background(), tpid, lsid)
require.NoError(t, err)
require.True(t, first.Invalid())
require.True(t, last.Invalid())

res := client.Append(context.Background(), tpid, [][]byte{nil})
require.NoError(t, res.Err)

first, last, err = client.PeekLogStream(context.Background(), tpid, lsid)
require.NoError(t, err)
require.Equal(t, varlogpb.LogSequenceNumber{
LLSN: 1, GLSN: 1,
}, first)
require.Equal(t, varlogpb.LogSequenceNumber{
LLSN: 1, GLSN: 1,
}, last)

idx := int(time.Now().UnixNano() % 2)
clus.CloseSN(t, clus.StorageNodeIDAtIndex(t, idx))

first, last, err = client.PeekLogStream(context.Background(), tpid, lsid)
require.NoError(t, err)
require.Equal(t, varlogpb.LogSequenceNumber{
LLSN: 1, GLSN: 1,
}, first)
require.Equal(t, varlogpb.LogSequenceNumber{
LLSN: 1, GLSN: 1,
}, last)

idx = (idx + 1) % 2
clus.CloseSN(t, clus.StorageNodeIDAtIndex(t, idx))

_, _, err = client.PeekLogStream(context.Background(), tpid, lsid)
require.Error(t, err)
}

0 comments on commit ab1bb44

Please sign in to comment.