Skip to content

Commit

Permalink
Merge pull request #6 from globocom/handle-buffer-closed
Browse files Browse the repository at this point in the history
Prevent panics from happening when the buffer is closed
  • Loading branch information
rafaeleyng authored Sep 8, 2020
2 parents 0127683 + 115ecd8 commit 3093ea2
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 12 deletions.
45 changes: 39 additions & 6 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
var (
// ErrTimeout indicates an operation has timed out.
ErrTimeout = errors.New("operation timed-out")
// ErrClosed indicates the buffer is closed and can no longer be used.
ErrClosed = errors.New("buffer is closed")
)

type (
Expand All @@ -23,9 +25,15 @@ type (
}
)

// Push appends an item to the end of the buffer. It times out if it cannot be
// performed in a timely fashion.
// Push appends an item to the end of the buffer.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Push(item interface{}) error {
if buffer.closed() {
return ErrClosed
}

select {
case buffer.dataCh <- item:
return nil
Expand All @@ -34,9 +42,15 @@ func (buffer *Buffer) Push(item interface{}) error {
}
}

// Flush outputs the buffer to a permanent destination. It times out if it cannot be
// performed in a timely fashion.
// Flush outputs the buffer to a permanent destination.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has been closed.
func (buffer *Buffer) Flush() error {
if buffer.closed() {
return ErrClosed
}

select {
case buffer.flushCh <- struct{}{}:
return nil
Expand All @@ -45,9 +59,19 @@ func (buffer *Buffer) Flush() error {
}
}

// Close flushes the buffer and prevents it from being further used. If it succeeds,
// the buffer cannot be used after it has been closed as all further operations will panic.
// Close flushes the buffer and prevents it from being further used.
//
// It returns an ErrTimeout if if cannot be performed in a timely fashion, and
// an ErrClosed if the buffer has already been closed.
//
// An ErrTimeout can either mean that a flush could not be triggered, or it can
// mean that a flush was triggered but it has not finished yet. In any case it is
// safe to call Close again.
func (buffer *Buffer) Close() error {
if buffer.closed() {
return ErrClosed
}

select {
case buffer.closeCh <- struct{}{}:
// noop
Expand All @@ -66,6 +90,15 @@ func (buffer *Buffer) Close() error {
}
}

func (buffer Buffer) closed() bool {
select {
case <-buffer.doneCh:
return true
default:
return false
}
}

func (buffer *Buffer) consume() {
count := 0
items := make([]interface{}, buffer.options.Size)
Expand Down
54 changes: 48 additions & 6 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,21 @@ var _ = Describe("Buffer", func() {
Expect(err2).To(Succeed())
Expect(err3).To(MatchError(buffer.ErrTimeout))
})

It("fails when the buffer is closed", func() {
// arrange
sut := buffer.New(
buffer.WithSize(2),
buffer.WithFlusher(flusher),
)
_ = sut.Close()

// act
err := sut.Push(1)

// assert
Expect(err).To(MatchError(buffer.ErrClosed))
})
})

Context("Flushing", func() {
Expand Down Expand Up @@ -204,6 +219,21 @@ var _ = Describe("Buffer", func() {
// assert
Expect(err).To(MatchError(buffer.ErrTimeout))
})

It("fails when the buffer is closed", func() {
// arrange
sut := buffer.New(
buffer.WithSize(2),
buffer.WithFlusher(flusher),
)
_ = sut.Close()

// act
err := sut.Flush()

// assert
Expect(err).To(MatchError(buffer.ErrClosed))
})
})

Context("Closing", func() {
Expand Down Expand Up @@ -244,7 +274,7 @@ var _ = Describe("Buffer", func() {
Expect(err).To(MatchError(buffer.ErrTimeout))
})

It("allow Close to be called again if it fails", func() {
It("fails when the buffer is closed", func() {
// arrange
flusher.Func = func() { time.Sleep(2 * time.Second) }

Expand All @@ -253,22 +283,34 @@ var _ = Describe("Buffer", func() {
buffer.WithFlusher(flusher),
buffer.WithCloseTimeout(time.Second),
)
_ = sut.Push(1)
_ = sut.Close()

// act
err := sut.Close()

// assert
Expect(err).To(MatchError(buffer.ErrTimeout))
Expect(err).To(MatchError(buffer.ErrClosed))
})

It("allows Close to be called again if it fails", func() {
// arrange
time.Sleep(time.Second)
flusher.Func = func() { time.Sleep(2 * time.Second) }

sut := buffer.New(
buffer.WithSize(1),
buffer.WithFlusher(flusher),
buffer.WithCloseTimeout(time.Second),
)
_ = sut.Push(1)

// act
err = sut.Close()
err1 := sut.Close()
time.Sleep(time.Second)
err2 := sut.Close()

// assert
Expect(err).To(BeNil())
Expect(err1).To(MatchError(buffer.ErrTimeout))
Expect(err2).To(Succeed())
})
})
})
Expand Down

0 comments on commit 3093ea2

Please sign in to comment.