Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
handle offset commit better when messages are skipped (#396)
Browse files Browse the repository at this point in the history
it was reported that resume was not working as expected due to how message offsets were not getting committed because of namespace filters, this change will ensure the offset of all messages eventually get committed
  • Loading branch information
jipperinbham authored Aug 15, 2017
1 parent dac78e0 commit b1d776b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
4 changes: 2 additions & 2 deletions pipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (p *Pipe) Listen(fn func(message.Msg, offset.Offset) (message.Msg, error))
select {
case <-p.chStop:
if len(p.In) > 0 {
log.With("buffer_length", len(p.In)).Infoln("received stop, message buffer not empty, continuing...")
log.With("path", p.path).With("buffer_length", len(p.In)).Infoln("received stop, message buffer not empty, continuing...")
continue
}
log.Infoln("received stop, message buffer is empty, closing...")
Expand Down Expand Up @@ -133,7 +133,7 @@ func (p *Pipe) Stop() {
for {
select {
case <-timeout:
log.Errorln("timeout reached waiting for Out channels to clear")
log.With("path", p.path).Errorln("timeout reached waiting for Out channels to clear")
return
default:
}
Expand Down
37 changes: 27 additions & 10 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewNodeWithOptions(name, kind, ns string, options ...OptionFunc) (*Node, er
path: name,
depth: 1,
nsFilter: compiledNs,
pipe: pipe.NewPipe(nil, ""),
pipe: pipe.NewPipe(nil, name),
children: make([]*Node, 0),
transforms: make([]*Transform, 0),
done: make(chan struct{}),
Expand Down Expand Up @@ -156,11 +156,10 @@ func WithWriter(a adaptor.Adaptor) OptionFunc {
func WithParent(parent *Node) OptionFunc {
return func(n *Node) error {
n.parent = parent
// TODO: remove path param
n.pipe = pipe.NewPipe(parent.pipe, "")
parent.children = append(parent.children, n)
n.path = parent.path + "/" + n.Name
n.depth = parent.depth + 1
n.pipe = pipe.NewPipe(parent.pipe, n.path)
return nil
}
}
Expand Down Expand Up @@ -515,21 +514,39 @@ type writeResult struct {
}

func (n *Node) write(msg message.Msg, off offset.Offset) (message.Msg, error) {
if n.om != nil {
n.offsetLock.Lock()
msg = message.WithConfirms(n.confirms, msg)
n.offsetLock.Unlock()
}
if !n.nsFilter.MatchString(msg.Namespace()) {
n.l.With("ns", msg.Namespace()).Debugln("message skipped by namespace filter")
if msg.Confirms() != nil {
n.offsetLock.Lock()
if len(n.pendingOffsets) == 0 {
n.om.CommitOffset(off, false)
}
n.offsetLock.Unlock()
}
return msg, nil
}
msg, err := n.applyTransforms(msg)
if err != nil || msg == nil {
if err != nil {
return nil, err
} else if msg == nil {
if n.om != nil {
n.offsetLock.Lock()
if len(n.pendingOffsets) == 0 {
n.om.CommitOffset(off, false)
}
n.offsetLock.Unlock()
}
return nil, err
}
if n.om != nil {
n.offsetLock.Lock()
msg = message.WithConfirms(n.confirms, msg)
n.pendingOffsets = append(n.pendingOffsets, off)
n.offsetLock.Unlock()
}

n.offsetLock.Lock()
n.pendingOffsets = append(n.pendingOffsets, off)
n.offsetLock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), n.writeTimeout)
defer cancel()
c := make(chan writeResult)
Expand Down
7 changes: 3 additions & 4 deletions pipeline/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ var (
func() (*Node, *StopWriter, func()) {
a := &StopWriter{}
n, _ := NewNodeWithOptions(
"starter", "stopWriter", defaultNsString,
"ns_filter_starter", "stopWriter", defaultNsString,
WithClient(a),
WithReader(a),
WithCommitLog([]commitlog.OptionFunc{
Expand All @@ -622,16 +622,15 @@ var (
}...),
)
NewNodeWithOptions(
"stopper", "stopWriter", "/blah/",
"ns_filter_stopper", "stopWriter", "/blah/",
WithClient(a),
WithWriter(a),
WithParent(n),
WithResumeTimeout(5*time.Second),
WithOffsetManager(&offset.MockManager{MemoryMap: map[string]uint64{}}),
)
return n, a, func() {}
},
0, 0, ErrResumeTimedOut,
0, 0, nil,
},
{
"with_offset_commit_error",
Expand Down

0 comments on commit b1d776b

Please sign in to comment.