Skip to content

Commit

Permalink
uses sarama's offset manager to mark offsets.
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Dec 8, 2016
1 parent 85d32c3 commit 8b58ebe
Showing 1 changed file with 9 additions and 84 deletions.
93 changes: 9 additions & 84 deletions offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (cmd *offsetCmd) run(as []string, q chan struct{}) {
}

cmd.connect()
defer logClose("offset manager", cmd.offsetManager)
defer logClose("client", cmd.client)

if cmd.out == nil {
Expand Down Expand Up @@ -229,107 +230,31 @@ func (cmd *offsetCmd) fetchGroupOffset(top string, prt int32, po int64) int64 {
if err != nil {
failf("failed to read consumer offsets for group=%s topic=%s partition=%d err=%v", cmd.group, top, prt, err)
}
co, _ := pom.NextOffset()
pom.Close()
defer logClose(fmt.Sprintf("partition %v offset manager", prt), pom)

co, _ := pom.NextOffset()
return co
}

func (cmd *offsetCmd) setConsumerOffsets(top string, prt int32) {
var (
err error
po = cmd.newOffsets
pom, err = cmd.offsetManager.ManagePartition(top, prt)
po = cmd.newOffsets
)
defer logClose(fmt.Sprintf("partition %v offset manager", prt), pom)

memberID, generationID := cmd.join(top)
if cmd.newOffsets == sarama.OffsetNewest || cmd.newOffsets == sarama.OffsetOldest {
if po, err = cmd.client.GetOffset(top, prt, cmd.newOffsets); err != nil {
failf("failed to read offsets for topic=%s partition=%d err=%v", top, prt, err)
}
}

cmd.commit(top, prt, po, generationID, memberID)
}

func (cmd *offsetCmd) join(top string) (string, int32) {
var (
err error
res *sarama.JoinGroupResponse
)

joinGroupReq := &sarama.JoinGroupRequest{
GroupId: cmd.group,
SessionTimeout: 30 * 1000,
ProtocolType: "consumer",
}

meta := &sarama.ConsumerGroupMemberMetadata{
Version: 1,
Topics: []string{top},
}

if err := joinGroupReq.AddGroupProtocolMetadata("range", meta); err != nil {
failf("failed to add meta data err=%v", err)
}

if err = joinGroupReq.AddGroupProtocolMetadata("roundrobin", meta); err != nil {
failf("failed to add meta data err=%v", err)
}

if res, err = cmd.broker.JoinGroup(joinGroupReq); err != nil {
failf("failed to join consumer group err=%v", err)
}

if res.Err != sarama.ErrNoError {
failf("failed to join consumer group responseErr=%v", res.Err)
}

return res.MemberId, res.GenerationId
}

func (cmd *offsetCmd) commit(top string, prt int32, offset int64, generationID int32, memberID string) {
var (
ocr *sarama.OffsetCommitResponse
err error
)

v := int16(0)
if cmd.version.IsAtLeast(sarama.V0_8_2_0) {
v = 1
}
if cmd.version.IsAtLeast(sarama.V0_9_0_0) {
v = 2
}

req := &sarama.OffsetCommitRequest{
Version: v,
ConsumerGroup: cmd.group,
ConsumerGroupGeneration: generationID,
ConsumerID: memberID,
RetentionTime: -1,
}
req.AddBlock(top, prt, offset, 0, "")

if ocr, err = cmd.broker.CommitOffset(req); err != nil {
failf("failed to commit offsets err=%v", err)
}

for top, perrs := range ocr.Errors {
for prt, kerr := range perrs {
if kerr != sarama.ErrNoError {
failf("failed to commit offsets topic=%s, partition=%v err=%v kerr=%v", top, prt, err, kerr)
}
}
}
pom.MarkOffset(po, "")
}

func (cmd *offsetCmd) printOffset(o offsets) {
var (
err error
buf []byte
)

if buf, err = json.Marshal(o); err != nil {
buf, err := json.Marshal(o)
if err != nil {
failf("failed to marshal JSON for consumer group %#v err=%v", o, err)
}

Expand Down

0 comments on commit 8b58ebe

Please sign in to comment.