Skip to content

Commit

Permalink
Merge pull request #21 from Kriechi/add-blocking-operations
Browse files Browse the repository at this point in the history
Implement blocking Dequeue and Peek
  • Loading branch information
joncrlsn authored Mar 1, 2020
2 parents e30a4da + b50dd0a commit 74fd78b
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 23 deletions.
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# dque - a fast embedded durable queue for Go

[![Go Report Card](https://goreportcard.com/badge/github.com/joncrlsn/dque)](https://goreportcard.com/report/github.com/joncrlsn/dque)
[![GoDoc](https://godoc.org/github.com/joncrlsn/dque?status.svg)](https://godoc.org/github.com/joncrlsn/dque)
[![GoDoc](https://godoc.org/github.com/joncrlsn/dque?status.svg)](https://godoc.org/github.com/joncrlsn/dque)


dque is:
Expand All @@ -12,7 +12,7 @@ dque is:
* embedded -- compiled into your Golang program
* synchronized -- safe for concurrent usage
* fast or safe, you choose -- turbo mode lets the OS decide when to write to disk
* has a liberal license -- allows any use, commercial or personal
* has a liberal license -- allows any use, commercial or personal

I love tools that do one thing well. Hopefully this fits that category.

Expand All @@ -22,13 +22,13 @@ I am indebted to Gabor Cselle who, years ago, inspired me with an example of an
There are two performance modes: safe and turbo
##### safe mode
* safe mode is the default
* forces an fsync to disk every time you enqueue or dequeue an item.
* forces an fsync to disk every time you enqueue or dequeue an item.
* while this is the safest way to use dque with little risk of data loss, it is also the slowest.
##### turbo mode
##### turbo mode
* can be enabled/disabled with a call to [DQue.TurboOn()](https://godoc.org/github.com/joncrlsn/dque#DQue.TurboOn) or [DQue.TurboOff()](https://godoc.org/github.com/joncrlsn/dque#DQue.TurboOff)
* lets the OS batch up your changes to disk, which makes it a lot faster.
* also allows you to flush changes to disk at opportune times. See [DQue.TurboSync()](https://godoc.org/github.com/joncrlsn/dque#DQue.TurboSync)
* comes with a risk that a power failure could lose changes. By turning on Turbo mode you accept that risk.
* comes with a risk that a power failure could lose changes. By turning on Turbo mode you accept that risk.
* run the benchmark to see the difference on your hardware.

### implementation
Expand All @@ -40,7 +40,7 @@ There are two performance modes: safe and turbo
* Because the encoding/gob package is used to store the struct to disk:
* Only structs can be stored in the queue.
* Only one type of struct can be stored in each queue.
* Only public fields in a struct will be stored.
* Only public fields in a struct will be stored.
* A function is required that returns a pointer to a new struct of the type stored in the queue. This function is used when loading segments into memory from disk. I'd love to find a way to avoid this function.
* Queue segment implementation:
* For nice visuals, see [Gabor Cselle's documentation here](http://www.gaborcselle.com/open_source/java/persistent_queue.html). Note that Gabor's implementation kept the entire queue in memory as well as disk. dque keeps only the head and tail segments in memory.
Expand Down Expand Up @@ -117,6 +117,11 @@ func ExampleDQue_main() {
}
}

// Dequeue the next item in the queue and block until one is available
if iface, err = q.DequeueBlock(); err != nil {
log.Fatal("Error dequeuing item ", err)
}

// Assert type of the response to an Item pointer so we can work with it
item, ok := iface.(*Item)
if !ok {
Expand Down
13 changes: 12 additions & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ package dque_test

import (
"fmt"
"github.com/joncrlsn/dque"
"log"

"github.com/joncrlsn/dque"
)

// Item is what we'll be storing in the queue. It can be any struct
Expand Down Expand Up @@ -67,6 +68,16 @@ func ExampleDQue() {
log.Println("Dequeued an interface:", iface)
log.Println("Size should be zero:", q.Size())

go func() {
err := q.Enqueue(&Item{"Joe", 1})
log.Println("Enqueued from goroutine", err == nil)
}()

// Dequeue the next item in the queue and block until one is available
if iface, err = q.DequeueBlock(); err != nil {
log.Fatal("Error dequeuing item ", err)
}

// Assert type of the response to an Item pointer so we can work with it
item, ok := iface.(*Item)
if !ok {
Expand Down
88 changes: 76 additions & 12 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type DQue struct {
builder func() interface{} // builds a structure to load via gob

mutex sync.Mutex

emptyCond *sync.Cond
mutexEmptyCond sync.Mutex

turbo bool
}

Expand Down Expand Up @@ -87,6 +91,7 @@ func New(name string, dirPath string, itemsPerSegment int, builder func() interf
q.fullPath = fullPath
q.config.ItemsPerSegment = itemsPerSegment
q.builder = builder
q.emptyCond = sync.NewCond(&q.mutexEmptyCond)

if err := q.lock(); err != nil {
return nil, err
Expand Down Expand Up @@ -121,6 +126,7 @@ func Open(name string, dirPath string, itemsPerSegment int, builder func() inter
q.fullPath = fullPath
q.config.ItemsPerSegment = itemsPerSegment
q.builder = builder
q.emptyCond = sync.NewCond(&q.mutexEmptyCond)

if err := q.lock(); err != nil {
return nil, err
Expand Down Expand Up @@ -157,6 +163,10 @@ func NewOrOpen(name string, dirPath string, itemsPerSegment int, builder func()
// Close releases the lock on the queue rendering it unusable for further usage by this instance.
// Close will return an error if it has already been called.
func (q *DQue) Close() error {
// only allow Close while no other function is active
q.mutex.Lock()
defer q.mutex.Unlock()

if q.fileLock == nil {
return ErrQueueClosed
}
Expand All @@ -169,6 +179,9 @@ func (q *DQue) Close() error {
// Finally mark this instance as closed to prevent any further access
q.fileLock = nil

// Wake-up any waiting goroutines for blocking queue access - they should get a ErrQueueClosed
q.emptyCond.Broadcast()

// Safe-guard ourself from accidentally using segments after closing the queue
q.firstSegment = nil
q.lastSegment = nil
Expand All @@ -178,14 +191,14 @@ func (q *DQue) Close() error {

// Enqueue adds an item to the end of the queue
func (q *DQue) Enqueue(obj interface{}) error {
if q.fileLock == nil {
return ErrQueueClosed
}

// This is heavy-handed but its safe
q.mutex.Lock()
defer q.mutex.Unlock()

if q.fileLock == nil {
return ErrQueueClosed
}

// If this segment is full then create a new one
if q.lastSegment.sizeOnDisk() >= q.config.ItemsPerSegment {

Expand Down Expand Up @@ -214,20 +227,23 @@ func (q *DQue) Enqueue(obj interface{}) error {
return errors.Wrap(err, "error adding item to the last segment")
}

// Wakeup any goroutine that is currently waiting for an item to be enqueued
q.emptyCond.Broadcast()

return nil
}

// Dequeue removes and returns the first item in the queue.
// When the queue is empty, nil and dque.ErrEmpty are returned.
func (q *DQue) Dequeue() (interface{}, error) {
if q.fileLock == nil {
return nil, ErrQueueClosed
}

// This is heavy-handed but its safe
q.mutex.Lock()
defer q.mutex.Unlock()

if q.fileLock == nil {
return nil, ErrQueueClosed
}

// Remove the first object from the first segment
obj, err := q.firstSegment.remove()
if err == errEmptySegment {
Expand Down Expand Up @@ -284,14 +300,14 @@ func (q *DQue) Dequeue() (interface{}, error) {
// When the queue is empty, nil and dque.ErrEmpty are returned.
// Do not use this method with multiple dequeueing threads or you may regret it.
func (q *DQue) Peek() (interface{}, error) {
if q.fileLock == nil {
return nil, ErrQueueClosed
}

// This is heavy-handed but it is safe
q.mutex.Lock()
defer q.mutex.Unlock()

if q.fileLock == nil {
return nil, ErrQueueClosed
}

// Return the first object from the first segment
obj, err := q.firstSegment.peek()
if err == errEmptySegment {
Expand All @@ -305,6 +321,42 @@ func (q *DQue) Peek() (interface{}, error) {
return obj, nil
}

// DequeueBlock behaves similar to Dequeue, but is a blocking call until an item is available.
func (q *DQue) DequeueBlock() (interface{}, error) {
q.mutexEmptyCond.Lock()
defer q.mutexEmptyCond.Unlock()
for {
obj, err := q.Dequeue()
if err == ErrEmpty {
q.emptyCond.Wait()
// Wait() atomically unlocks mutexEmptyCond and suspends execution of the calling goroutine.
// Receiving the signal does not guarantee an item is available, let's loop and check again.
continue
} else if err != nil {
return nil, err
}
return obj, nil
}
}

// PeekBlock behaves similar to Peek, but is a blocking call until an item is available.
func (q *DQue) PeekBlock() (interface{}, error) {
q.mutexEmptyCond.Lock()
defer q.mutexEmptyCond.Unlock()
for {
obj, err := q.Peek()
if err == ErrEmpty {
q.emptyCond.Wait()
// Wait() atomically unlocks mutexEmptyCond and suspends execution of the calling goroutine.
// Receiving the signal does not guarantee an item is available, let's loop and check again.
continue
} else if err != nil {
return nil, err
}
return obj, nil
}
}

// Size locks things up while calculating so you are guaranteed an accurate
// size... unless you have changed the itemsPerSegment value since the queue
// was last empty. Then it could be wildly inaccurate.
Expand Down Expand Up @@ -358,6 +410,10 @@ func (q *DQue) Turbo() bool {
// risk of losing data if a power-loss occurs.
// If turbo is already on an error is returned
func (q *DQue) TurboOn() error {
// This is heavy-handed but it is safe
q.mutex.Lock()
defer q.mutex.Unlock()

if q.fileLock == nil {
return ErrQueueClosed
}
Expand All @@ -375,6 +431,10 @@ func (q *DQue) TurboOn() error {
// they happen.
// If turbo is already off an error is returned
func (q *DQue) TurboOff() error {
// This is heavy-handed but it is safe
q.mutex.Lock()
defer q.mutex.Unlock()

if q.fileLock == nil {
return ErrQueueClosed
}
Expand All @@ -395,6 +455,10 @@ func (q *DQue) TurboOff() error {
// TurboSync allows you to fsync changes to disk, but only if turbo is on.
// If turbo is off an error is returned
func (q *DQue) TurboSync() error {
// This is heavy-handed but it is safe
q.mutex.Lock()
defer q.mutex.Unlock()

if q.fileLock == nil {
return ErrQueueClosed
}
Expand Down
Loading

0 comments on commit 74fd78b

Please sign in to comment.