diff --git a/cmd/jocko/jocko b/cmd/jocko/jocko new file mode 100755 index 00000000..c54c5d7b Binary files /dev/null and b/cmd/jocko/jocko differ diff --git a/commitlog/commitlog.go b/commitlog/commitlog.go index 23696b11..d223098e 100644 --- a/commitlog/commitlog.go +++ b/commitlog/commitlog.go @@ -33,7 +33,9 @@ type CommitLog struct { } type Options struct { - Path string + Path string + // MaxSegmentBytes is the max number of bytes a segment can contain, once the limit is hit a + // new segment will be split off. MaxSegmentBytes int64 MaxLogBytes int64 } diff --git a/commitlog/commitlog_test.go b/commitlog/commitlog_test.go index 8c3eb0ea..b27e7928 100644 --- a/commitlog/commitlog_test.go +++ b/commitlog/commitlog_test.go @@ -23,8 +23,7 @@ var ( commitlog.NewMessageSet(0, msgs...), commitlog.NewMessageSet(1, msgs...), } - maxBytes = msgSets[0].Size() - path = filepath.Join(os.TempDir(), fmt.Sprintf("commitlogtest%d", rand.Int63())) + path = filepath.Join(os.TempDir(), fmt.Sprintf("commitlogtest%d", rand.Int63())) ) func TestNewCommitLog(t *testing.T) { diff --git a/commitlog/compact_cleaner.go b/commitlog/compact_cleaner.go new file mode 100644 index 00000000..baf3a1f6 --- /dev/null +++ b/commitlog/compact_cleaner.go @@ -0,0 +1,80 @@ +package commitlog + +import ( + "github.com/cespare/xxhash" +) + +type CompactCleaner struct { + // map from key hash to offset + m map[uint64]int64 +} + +func NewCompactCleaner() *CompactCleaner { + return &CompactCleaner{ + m: make(map[uint64]int64), + } +} + +func (c *CompactCleaner) Clean(segments []*Segment) (cleaned []*Segment, err error) { + if len(segments) == 0 { + return segments, nil + } + + var ss *SegmentScanner + var ms MessageSet + var offset int64 + + // build the map of keys to their latest offsets + for _, segment := range segments { + ss = NewSegmentScanner(segment) + + for ms, err = ss.Scan(); err == nil; ms, err = ss.Scan() { + offset = ms.Offset() + for _, msg := range ms.Messages() { + c.m[Hash(msg.Key())] = offset + } + } + } + + // TODO: handle joining segments when they're smaller than max segment size + for _, ds := range segments { + ss = NewSegmentScanner(ds) + + cs, err := NewSegment(ds.path, ds.BaseOffset, ds.maxBytes, cleanedSuffix) + if err != nil { + return nil, err + } + + for ms, err = ss.Scan(); err == nil; ms, err = ss.Scan() { + var retain bool + offset = ms.Offset() + for _, msg := range ms.Messages() { + if c.m[Hash(msg.Key())] <= offset { + retain = true + } + } + + if retain { + if _, err = cs.Write(ms); err != nil { + return nil, err + } + } + } + + if err = cs.Replace(ds); err != nil { + return nil, err + } + + cleaned = append(cleaned, cs) + } + + return cleaned, nil +} + +func Hash(b []byte) uint64 { + h := xxhash.New() + if _, err := h.Write(b); err != nil { + panic(err) + } + return h.Sum64() +} diff --git a/commitlog/compact_cleaner_test.go b/commitlog/compact_cleaner_test.go new file mode 100644 index 00000000..5cdb04ac --- /dev/null +++ b/commitlog/compact_cleaner_test.go @@ -0,0 +1,117 @@ +package commitlog_test + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/travisjeffery/jocko/commitlog" + "github.com/travisjeffery/jocko/protocol" +) + +func TestCompactCleaner(t *testing.T) { + req := require.New(t) + var err error + + var msgSets []commitlog.MessageSet + msgSets = append(msgSets, newMessageSet(0, &protocol.Message{ + Key: []byte("travisjeffery"), + Value: []byte("one tj"), + MagicByte: 2, + Timestamp: time.Now(), + })) + + msgSets = append(msgSets, newMessageSet(1, &protocol.Message{ + Key: []byte("another"), + Value: []byte("one another"), + MagicByte: 2, + Timestamp: time.Now(), + })) + + msgSets = append(msgSets, newMessageSet(2, &protocol.Message{ + Key: []byte("travisjeffery"), + Value: []byte("two tj"), + MagicByte: 2, + Timestamp: time.Now(), + })) + + msgSets = append(msgSets, newMessageSet(3, &protocol.Message{ + Key: []byte("again another"), + Value: []byte("again another"), + MagicByte: 2, + Timestamp: time.Now(), + })) + + path := os.TempDir() + defer os.RemoveAll(path) + + opts := commitlog.Options{ + Path: path, + MaxSegmentBytes: int64(len(msgSets[0]) + len(msgSets[1])), + MaxLogBytes: 1000, + } + l, err := commitlog.New(opts) + require.NoError(t, err) + + for _, msgSet := range msgSets { + _, err = l.Append(msgSet) + require.NoError(t, err) + } + + segments := l.Segments() + req.Equal(2, len(l.Segments())) + segment := segments[0] + + scanner := commitlog.NewSegmentScanner(segment) + ms, err := scanner.Scan() + require.NoError(t, err) + require.Equal(t, msgSets[0], ms) + + cc := commitlog.NewCompactCleaner() + cleaned, err := cc.Clean(segments) + req.NoError(err) + req.Equal(2, len(cleaned)) + + scanner = commitlog.NewSegmentScanner(cleaned[0]) + + var count int + for { + ms, err = scanner.Scan() + if err != nil { + break + } + req.Equal(1, len(ms.Messages())) + req.Equal([]byte("another"), ms.Messages()[0].Key()) + req.Equal([]byte("one another"), ms.Messages()[0].Value()) + count++ + } + req.Equal(1, count) + + scanner = commitlog.NewSegmentScanner(cleaned[1]) + count = 0 + for { + ms, err = scanner.Scan() + if err != nil { + break + } + req.Equal(1, len(ms.Messages())) + req.Equal([]byte("travisjeffery"), ms.Messages()[0].Key()) + req.Equal([]byte("two tj"), ms.Messages()[0].Value()) + count++ + } + req.Equal(1, count) + +} + +func newMessageSet(offset uint64, pmsgs ...*protocol.Message) commitlog.MessageSet { + var cmsgs []commitlog.Message + for _, msg := range pmsgs { + b, err := protocol.Encode(msg) + if err != nil { + panic(err) + } + cmsgs = append(cmsgs, b) + } + return commitlog.NewMessageSet(offset, cmsgs...) +} diff --git a/commitlog/index_test.go b/commitlog/index_test.go index 6ec5f0df..04d39dc7 100644 --- a/commitlog/index_test.go +++ b/commitlog/index_test.go @@ -11,7 +11,7 @@ import ( ) func TestIndex(t *testing.T) { - path := filepath.Join(os.TempDir(), fmt.Sprintf(indexNameFormat, rand.Int63())) + path := filepath.Join(os.TempDir(), fmt.Sprintf(fileFormat, rand.Int63(), indexSuffix)) totalEntries := rand.Intn(10) + 10 //case for roundDown bytes := int64(totalEntries*entryWidth + 1) @@ -72,7 +72,7 @@ func TestIndex(t *testing.T) { } func TestIndexScanner(t *testing.T) { - path := filepath.Join(os.TempDir(), fmt.Sprintf(indexNameFormat, rand.Int63())) + path := filepath.Join(os.TempDir(), fmt.Sprintf(fileFormat, rand.Int63(), indexSuffix)) totalEntries := rand.Intn(10) + 10 //case for roundDown bytes := int64(totalEntries*entryWidth + 1) diff --git a/commitlog/message.go b/commitlog/message.go index e02364b6..0999ce60 100644 --- a/commitlog/message.go +++ b/commitlog/message.go @@ -43,7 +43,7 @@ func (m Message) Value() []byte { func (m Message) Size() int32 { var size int32 = 4 + 1 + 1 - if m.MagicByte() == 1 { + if m.MagicByte() > 0 { size += 8 } size += 4 @@ -71,13 +71,9 @@ func (m Message) keyOffsets() (start, end, size int32) { } func (m Message) valueOffsets() (start, end, size int32) { - keyStart, keyEnd, keySize := m.keyOffsets() - if keySize == -1 { - start = keyStart + 4 - } else { - start = keyEnd + 1 - } + _, keyEnd, _ := m.keyOffsets() + start = keyEnd size = int32(Encoding.Uint32(m[start:])) - end = start + size + end = start + 4 + size return } diff --git a/commitlog/segment.go b/commitlog/segment.go index a09afa24..c7deaa9a 100644 --- a/commitlog/segment.go +++ b/commitlog/segment.go @@ -13,8 +13,10 @@ import ( ) const ( - logNameFormat = "%020d.log" - indexNameFormat = "%020d.index" + fileFormat = "%020d%s" + logSuffix = ".log" + cleanedSuffix = ".cleaned" + indexSuffix = ".index" ) type Segment struct { @@ -26,30 +28,32 @@ type Segment struct { NextOffset int64 Position int64 maxBytes int64 + path string + suffix string sync.Mutex } -func NewSegment(path string, baseOffset int64, maxBytes int64) (*Segment, error) { - logPath := filepath.Join(path, fmt.Sprintf(logNameFormat, baseOffset)) - log, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - return nil, errors.Wrap(err, "open file failed") +func NewSegment(path string, baseOffset, maxBytes int64, args ...interface{}) (*Segment, error) { + var suffix string + if len(args) != 0 { + suffix = args[0].(string) } - s := &Segment{ - log: log, - writer: log, - reader: log, maxBytes: maxBytes, BaseOffset: baseOffset, NextOffset: baseOffset, + path: path, + suffix: suffix, } - err = s.SetupIndex(path) - if err == io.EOF { - return s, nil + log, err := os.OpenFile(s.logPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return nil, errors.Wrap(err, "open file failed") } - + s.log = log + s.writer = log + s.reader = log + err = s.SetupIndex() return s, err } @@ -58,15 +62,18 @@ func NewSegment(path string, baseOffset int64, maxBytes int64) (*Segment, error) // - Sanity check of the loaded Index // - Truncates the Index (clears it) // - Reads the log file from the beginning and re-initializes the Index -func (s *Segment) SetupIndex(path string) (err error) { - indexPath := filepath.Join(path, fmt.Sprintf(indexNameFormat, s.BaseOffset)) +func (s *Segment) SetupIndex() (err error) { s.Index, err = NewIndex(options{ - path: indexPath, + path: s.indexPath(), baseOffset: s.BaseOffset, }) if err != nil { return err } + return s.BuildIndex() +} + +func (s *Segment) BuildIndex() (err error) { if err = s.Index.SanityCheck(); err != nil { return err } @@ -80,44 +87,55 @@ func (s *Segment) SetupIndex(path string) (err error) { } b := new(bytes.Buffer) + + nextOffset := s.BaseOffset + position := int64(0) + +loop: for { // get offset and size - _, err := io.CopyN(b, s.log, 8) + _, err = io.CopyN(b, s.log, 8) if err != nil { - return err + break loop } - s.NextOffset = int64(Encoding.Uint64(b.Bytes()[0:8])) _, err = io.CopyN(b, s.log, 4) if err != nil { - return err + break loop } size := int64(Encoding.Uint32(b.Bytes()[8:12])) _, err = io.CopyN(b, s.log, size) if err != nil { - return err + break loop } // Reset the buffer to not get an overflow b.Truncate(0) - err = s.Index.WriteEntry(Entry{ - Offset: s.NextOffset, - Position: s.Position, - }) + entry := Entry{ + Offset: nextOffset, + Position: position, + } + err = s.Index.WriteEntry(entry) if err != nil { - return err + break loop } - s.Position += size + msgSetHeaderLen - s.NextOffset++ + position += size + msgSetHeaderLen + nextOffset++ _, err = s.log.Seek(size, 1) if err != nil { - return err + break loop } } + if err == io.EOF { + s.NextOffset = nextOffset + s.Position = position + return nil + } + return err } func (s *Segment) IsFull() bool { @@ -161,6 +179,36 @@ func (s *Segment) Close() error { return s.Index.Close() } +// Cleaner creates a cleaner segment for this segment. +func (s *Segment) Cleaner() (*Segment, error) { + return NewSegment(s.path, s.BaseOffset, s.maxBytes, cleanedSuffix) +} + +// Replace replaces the given segment with the callee. +func (s *Segment) Replace(old *Segment) (err error) { + if err = old.Close(); err != nil { + return err + } + if err = s.Close(); err != nil { + return err + } + if err = os.Rename(s.logPath(), old.logPath()); err != nil { + return err + } + if err = os.Rename(s.indexPath(), old.indexPath()); err != nil { + return err + } + s.suffix = "" + log, err := os.OpenFile(s.logPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return errors.Wrap(err, "open file failed") + } + s.log = log + s.writer = log + s.reader = log + return s.SetupIndex() +} + func (s *Segment) findEntry(offset int64) (e *Entry, err error) { s.Lock() defer s.Unlock() @@ -219,3 +267,11 @@ func (s *SegmentScanner) Scan() (ms MessageSet, err error) { msgSet := append(header, payload...) return msgSet, nil } + +func (s *Segment) logPath() string { + return filepath.Join(s.path, fmt.Sprintf(fileFormat, s.BaseOffset, logSuffix+s.suffix)) +} + +func (s *Segment) indexPath() string { + return filepath.Join(s.path, fmt.Sprintf(fileFormat, s.BaseOffset, indexSuffix+s.suffix)) +} diff --git a/jocko/broker.go b/jocko/broker.go index fe02100d..971b5eea 100644 --- a/jocko/broker.go +++ b/jocko/broker.go @@ -917,28 +917,16 @@ func (b *Broker) handleOffsetFetch(ctx context.Context, header *protocol.Request resp := new(protocol.OffsetFetchResponse) resp.APIVersion = req.Version() + resp.Responses = make([]protocol.OffsetFetchTopicResponse, len(req.Topics)) + // state := b.fsm.State() - // group := protocol.Group{} // _, g, err := state.GetGroup(req.GroupID) + + // // If group doesn't exist then create it? // if err != nil { - // group.ErrorCode = protocol.ErrUnknown.Code() - // group.GroupID = id - // resp.Groups = append(resp.Groups, group) - // return resp - // } - // group.GroupID = id - // group.State = "Stable" - // group.ProtocolType = "consumer" - // group.Protocol = "consumer" - // for id, member := range g.Members { - // group.GroupMembers[id] = &protocol.GroupMember{ - // ClientID: member.ID, - // // TODO: ??? - // ClientHost: "", - // GroupMemberMetadata: member.Metadata, - // GroupMemberAssignment: member.Assignment, - // } + // // TODO: handle err + // panic(err) // } return resp diff --git a/protocol/message.go b/protocol/message.go index 41d967b2..e823254f 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -3,16 +3,18 @@ package protocol import "time" type Message struct { - Timestamp time.Time - Key []byte - Value []byte - MagicByte int8 + Crc int32 + MagicByte int8 + Attributes int8 + Timestamp time.Time + Key []byte + Value []byte } func (m *Message) Encode(e PacketEncoder) error { e.Push(&CRCField{}) e.PutInt8(m.MagicByte) - e.PutInt8(0) // attributes + e.PutInt8(m.Attributes) if m.MagicByte > 0 { e.PutInt64(m.Timestamp.UnixNano() / int64(time.Millisecond)) } @@ -34,7 +36,7 @@ func (m *Message) Decode(d PacketDecoder) error { if m.MagicByte, err = d.Int8(); err != nil { return err } - if _, err := d.Int8(); err != nil { + if m.Attributes, err = d.Int8(); err != nil { return err } if m.MagicByte > 0 {