-
-
Notifications
You must be signed in to change notification settings - Fork 363
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
commitlog: add compaction cleaner (#120)
- Loading branch information
1 parent
6031a75
commit e2a8d10
Showing
10 changed files
with
308 additions
and
68 deletions.
There are no files selected for viewing
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package commitlog | ||
|
||
import ( | ||
"github.com/cespare/xxhash" | ||
) | ||
|
||
type CompactCleaner struct { | ||
// map from key hash to offset | ||
m map[uint64]int64 | ||
} | ||
|
||
func NewCompactCleaner() *CompactCleaner { | ||
return &CompactCleaner{ | ||
m: make(map[uint64]int64), | ||
} | ||
} | ||
|
||
func (c *CompactCleaner) Clean(segments []*Segment) (cleaned []*Segment, err error) { | ||
if len(segments) == 0 { | ||
return segments, nil | ||
} | ||
|
||
var ss *SegmentScanner | ||
var ms MessageSet | ||
var offset int64 | ||
|
||
// build the map of keys to their latest offsets | ||
for _, segment := range segments { | ||
ss = NewSegmentScanner(segment) | ||
|
||
for ms, err = ss.Scan(); err == nil; ms, err = ss.Scan() { | ||
offset = ms.Offset() | ||
for _, msg := range ms.Messages() { | ||
c.m[Hash(msg.Key())] = offset | ||
} | ||
} | ||
} | ||
|
||
// TODO: handle joining segments when they're smaller than max segment size | ||
for _, ds := range segments { | ||
ss = NewSegmentScanner(ds) | ||
|
||
cs, err := NewSegment(ds.path, ds.BaseOffset, ds.maxBytes, cleanedSuffix) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
for ms, err = ss.Scan(); err == nil; ms, err = ss.Scan() { | ||
var retain bool | ||
offset = ms.Offset() | ||
for _, msg := range ms.Messages() { | ||
if c.m[Hash(msg.Key())] <= offset { | ||
retain = true | ||
} | ||
} | ||
|
||
if retain { | ||
if _, err = cs.Write(ms); err != nil { | ||
return nil, err | ||
} | ||
} | ||
} | ||
|
||
if err = cs.Replace(ds); err != nil { | ||
return nil, err | ||
} | ||
|
||
cleaned = append(cleaned, cs) | ||
} | ||
|
||
return cleaned, nil | ||
} | ||
|
||
func Hash(b []byte) uint64 { | ||
h := xxhash.New() | ||
if _, err := h.Write(b); err != nil { | ||
panic(err) | ||
} | ||
return h.Sum64() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package commitlog_test | ||
|
||
import ( | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
"github.com/travisjeffery/jocko/commitlog" | ||
"github.com/travisjeffery/jocko/protocol" | ||
) | ||
|
||
func TestCompactCleaner(t *testing.T) { | ||
req := require.New(t) | ||
var err error | ||
|
||
var msgSets []commitlog.MessageSet | ||
msgSets = append(msgSets, newMessageSet(0, &protocol.Message{ | ||
Key: []byte("travisjeffery"), | ||
Value: []byte("one tj"), | ||
MagicByte: 2, | ||
Timestamp: time.Now(), | ||
})) | ||
|
||
msgSets = append(msgSets, newMessageSet(1, &protocol.Message{ | ||
Key: []byte("another"), | ||
Value: []byte("one another"), | ||
MagicByte: 2, | ||
Timestamp: time.Now(), | ||
})) | ||
|
||
msgSets = append(msgSets, newMessageSet(2, &protocol.Message{ | ||
Key: []byte("travisjeffery"), | ||
Value: []byte("two tj"), | ||
MagicByte: 2, | ||
Timestamp: time.Now(), | ||
})) | ||
|
||
msgSets = append(msgSets, newMessageSet(3, &protocol.Message{ | ||
Key: []byte("again another"), | ||
Value: []byte("again another"), | ||
MagicByte: 2, | ||
Timestamp: time.Now(), | ||
})) | ||
|
||
path := os.TempDir() | ||
defer os.RemoveAll(path) | ||
|
||
opts := commitlog.Options{ | ||
Path: path, | ||
MaxSegmentBytes: int64(len(msgSets[0]) + len(msgSets[1])), | ||
MaxLogBytes: 1000, | ||
} | ||
l, err := commitlog.New(opts) | ||
require.NoError(t, err) | ||
|
||
for _, msgSet := range msgSets { | ||
_, err = l.Append(msgSet) | ||
require.NoError(t, err) | ||
} | ||
|
||
segments := l.Segments() | ||
req.Equal(2, len(l.Segments())) | ||
segment := segments[0] | ||
|
||
scanner := commitlog.NewSegmentScanner(segment) | ||
ms, err := scanner.Scan() | ||
require.NoError(t, err) | ||
require.Equal(t, msgSets[0], ms) | ||
|
||
cc := commitlog.NewCompactCleaner() | ||
cleaned, err := cc.Clean(segments) | ||
req.NoError(err) | ||
req.Equal(2, len(cleaned)) | ||
|
||
scanner = commitlog.NewSegmentScanner(cleaned[0]) | ||
|
||
var count int | ||
for { | ||
ms, err = scanner.Scan() | ||
if err != nil { | ||
break | ||
} | ||
req.Equal(1, len(ms.Messages())) | ||
req.Equal([]byte("another"), ms.Messages()[0].Key()) | ||
req.Equal([]byte("one another"), ms.Messages()[0].Value()) | ||
count++ | ||
} | ||
req.Equal(1, count) | ||
|
||
scanner = commitlog.NewSegmentScanner(cleaned[1]) | ||
count = 0 | ||
for { | ||
ms, err = scanner.Scan() | ||
if err != nil { | ||
break | ||
} | ||
req.Equal(1, len(ms.Messages())) | ||
req.Equal([]byte("travisjeffery"), ms.Messages()[0].Key()) | ||
req.Equal([]byte("two tj"), ms.Messages()[0].Value()) | ||
count++ | ||
} | ||
req.Equal(1, count) | ||
|
||
} | ||
|
||
func newMessageSet(offset uint64, pmsgs ...*protocol.Message) commitlog.MessageSet { | ||
var cmsgs []commitlog.Message | ||
for _, msg := range pmsgs { | ||
b, err := protocol.Encode(msg) | ||
if err != nil { | ||
panic(err) | ||
} | ||
cmsgs = append(cmsgs, b) | ||
} | ||
return commitlog.NewMessageSet(offset, cmsgs...) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.