Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements #1

Open
wants to merge 44 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
56f94a5
Small changes inside of gem requirements and Rakefile
flavio Nov 11, 2011
0216709
Fix broken unit test
flavio Nov 22, 2011
04625aa
Indent code using spaces rather then tabs.
flavio Nov 22, 2011
65989bf
Ensure pop respects sorting contraints
flavio Nov 22, 2011
32a8489
Extend peek tests
flavio Nov 22, 2011
46f46db
Better testing of timeout
flavio Dec 12, 2011
f07a6f2
Allow timeout feature to be disabled
flavio Dec 13, 2011
5d805da
Version bump
flavio Dec 13, 2011
20529d4
Remove older releases from git
flavio Dec 13, 2011
69ecc4a
Fix bug introduced by the removal of the timeout feature
flavio Dec 13, 2011
359b3e4
Version bump 0.6.2
flavio Dec 13, 2011
21b657e
Adding a function to increase the priority of an item in the queue
Feb 2, 2012
25b7090
Minor version bump: 0.6.3
flavio Feb 2, 2012
848cc2c
Adding a priority change test
Feb 2, 2012
e278e26
Add methods to alter the priority of a queued item.
flavio Feb 2, 2012
8b7a469
Fix the priority calls
Feb 6, 2012
a71c172
Oops, don't re-convert to ObjectId
Feb 6, 2012
67d7b2b
Revert "Oops, don't re-convert to ObjectId"
flavio Feb 7, 2012
e7294ef
Revert "Fix the priority calls"
flavio Feb 7, 2012
9378970
update version
jordimassaguerpla Feb 21, 2012
6885eea
Merge pull request #1 from openSUSE/review_120221_update_version_in_g…
flavio Feb 21, 2012
16f691d
remove pkg/*.gem files from gemspec
jordimassaguerpla Feb 21, 2012
18a675d
Merge pull request #2 from openSUSE/review_120221_remove_pkg_gem_file…
flavio Feb 21, 2012
df8a4c8
generate gemspec for release
jordimassaguerpla Mar 22, 2012
554fab5
Add queue.unlock to re-add items to the queue
Mar 22, 2012
c49511c
Version bump to 0.6.4
jordimassaguerpla Mar 22, 2012
9dd8669
Add indexes to the collection
flavio Mar 27, 2012
57de5c6
Version bump: 0.6.5
flavio Mar 27, 2012
c9fa1ad
gemspec update for v0.6.5 release
jordimassaguerpla Mar 27, 2012
9fdac23
update date to gemspec for creating 0.6.6 gem
jordimassaguerpla Mar 28, 2012
8c1df86
Adding lock_until function
Mar 28, 2012
5bf84c9
Merge branch 'master' of [email protected]:openSUSE/Dequeue
Mar 28, 2012
96e8437
Increase the version
Mar 28, 2012
26075ec
Version bump to 0.6.6
jordimassaguerpla Mar 28, 2012
f3a539d
fix lock_until
flavio Mar 28, 2012
777ea06
Remove spec
flavio Mar 28, 2012
d0c756e
version bump: 0.6.7
flavio Mar 28, 2012
a83fd57
peek: backport changes made to pop
flavio Mar 28, 2012
679e34f
version bump: 0.6.8
flavio Mar 28, 2012
11e98b2
Always set locked_at attribute when an item is popped
flavio Sep 26, 2012
26e71c0
Cleanup specs
flavio Sep 26, 2012
97325ee
Upgrade version
flavio Sep 26, 2012
8cb1b12
get version from VERSION file when building gem
jordimassaguerpla Oct 29, 2012
2551850
Merge pull request #3 from openSUSE/review_121029_fix_version_in_gems…
flavio Oct 29, 2012
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pkg
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ group :development do
gem "shoulda", ">= 0"
gem "bundler", "~> 1.0.0"
gem "jeweler", "~> 1.6.2"
gem 'rdoc'
gem "rcov", ">= 0"
gem 'rspec', ">= 2"
gem 'timecop'
end
15 changes: 15 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ GEM
remote: http://rubygems.org/
specs:
bson (1.3.1)
diff-lcs (1.1.3)
git (1.2.5)
jeweler (1.6.2)
bundler (~> 1.0)
Expand All @@ -12,7 +13,18 @@ GEM
bson (>= 1.3.1)
rake (0.9.2)
rcov (0.9.9)
rdoc (3.11)
json (~> 1.4)
rspec (2.7.0)
rspec-core (~> 2.7.0)
rspec-expectations (~> 2.7.0)
rspec-mocks (~> 2.7.0)
rspec-core (2.7.1)
rspec-expectations (2.7.0)
diff-lcs (~> 1.1.2)
rspec-mocks (2.7.0)
shoulda (2.11.3)
timecop (0.3.5)

PLATFORMS
ruby
Expand All @@ -23,4 +35,7 @@ DEPENDENCIES
json
mongo
rcov
rdoc
rspec (>= 2)
shoulda
timecop
4 changes: 3 additions & 1 deletion README.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Items popped from the queue must be confirmed complete, or they will be reissued

queue = Mongo::Dequeue.new(collection, options)

If timeout is set to nil read timeout feature will be disabled.

=== Pushing Items
Items have a body, passed as the first argument to push(). A body can be a string, number, hash, or array. These
values are preserved in the Mongo collection used to store the queue, allowing you to inspect the queue and see these values.
Expand Down Expand Up @@ -93,4 +95,4 @@ but not confirmed.
=== TODO

* Adjust auto generated duplication_key to be more accurate.
* Full examples
* Full examples
6 changes: 5 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Jeweler::Tasks.new do |gem|
end
Jeweler::RubygemsDotOrgTasks.new

require 'rake/rdoctask'
require 'rdoc/task'
Rake::RDocTask.new do |rdoc|
version = File.exist?('VERSION') ? File.read('VERSION') : ""

Expand All @@ -34,3 +34,7 @@ Rake::RDocTask.new do |rdoc|
rdoc.rdoc_files.include('README*')
rdoc.rdoc_files.include('lib/**/*.rb')
end

require 'rspec/core/rake_task'
RSpec::Core::RakeTask.new(:spec)
task :default => :spec
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.6.0
0.6.9
182 changes: 139 additions & 43 deletions lib/mongo-dequeue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ class Mongo::Dequeue
#
def initialize(collection, opts={})
@collection = collection
@collection.ensure_index([["locked", Mongo::ASCENDING],
["complete", Mongo::ASCENDING],
["priority", Mongo::DESCENDING],
["inserted_at", Mongo::ASCENDING]])
@collection.ensure_index([["duplicate_key", Mongo::ASCENDING],
["complete", Mongo::ASCENDING],
["locked_at", Mongo::ASCENDING]])
@config = DEFAULT_CONFIG.merge(opts)
@batch = []
end
Expand All @@ -32,7 +39,9 @@ def flush!
end

# Insert a new item into the queue.
#
# Valid options:
# - :priority: integer value, 3 by default.
# - :duplicate_key
# Example:
# queue.insert(:name => 'Billy', :email => '[email protected]', :message => 'Here is the thing you asked for')
def push(body, item_opts = {})
Expand All @@ -48,6 +57,7 @@ def push(body, item_opts = {})
:body => body,
:inserted_at => Time.now.utc,
:complete => false,
:locked => false,
:locked_till => nil,
:completed_at => nil,
:priority => item_opts[:priority] || @config[:default_priority],
Expand Down Expand Up @@ -87,6 +97,7 @@ def batchprocess()
'body': e.body,
'inserted_at': nowutc,
'complete': false,
'locked' : false,
'locked_till': null,
'completed_at': null,
'priority': e.priority,
Expand Down Expand Up @@ -118,40 +129,84 @@ def batchprocess()
# {:body=>"foo", :id=>"4e039c372b70275e345206e4"}

def pop(opts = {})
begin
timeout = opts[:timeout] || @config[:timeout]
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
cmd['update'] = {'$set' => {:locked_till => Time.now.utc+timeout}}
cmd['query'] = {:complete => false, '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] }
cmd['sort'] = {:priority=>-1,:inserted_at=>1}
cmd['limit'] = 1
cmd['new'] = true
result = collection.db.command(cmd)
rescue Mongo::OperationFailure => of
return nil
end
return {
:body => result['value']['body'],
:id => result['value']['_id'].to_s
}
end
timeout = opts[:timeout] || @config[:timeout]
locked_at = Time.now.utc

cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
if timeout
cmd['update'] = { '$set' => { :locked_at => locked_at,
:locked_till => locked_at + timeout,
:locked => true } }
cmd['query'] = {:complete => false,
'$or'=>[ {:locked => false},
{:locked_till=> nil},
{:locked_till=>{'$lt'=>Time.now.utc}}] }
else
cmd['update'] = { '$set' => { :locked => true,
:locked_at => locked_at } }
cmd['query'] = { "$and" => [{:complete => false}, {:locked => false },
{ "$or" => [{:locked_till => nil},
{:locked_till => {'$lt' => Time.now.utc}}]
}]}
end
cmd['limit'] = 1
cmd['new'] = true

sort_directive = BSON::OrderedHash.new
sort_directive[:priority] = Mongo::DESCENDING
sort_directive[:inserted_at] = Mongo::ASCENDING
cmd['sort'] = sort_directive

result = collection.db.command(cmd)

if result['value']
{ :body => result['value']['body'],
:id => result['value']['_id'].to_s }
else
nil
end
rescue Mongo::OperationFailure => of
nil
end

# "Re-add" the document to the queue
def unlock(id)
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
cmd['query'] = {:_id => BSON::ObjectId.from_string(id)}
cmd['update'] = {'$set' => {:locked => false, :locked_till => nil}}
collection.db.command(cmd)
rescue Mongo::OperationFailure => of
nil
end

def lock_until(id, timeout)
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
cmd['query'] = {:_id => BSON::ObjectId.from_string(id)}
cmd['update'] = {'$set' => {:locked => false,
:locked_till => (Time.now.utc + timeout)}}
collection.db.command(cmd)
rescue Mongo::OperationFailure => of
nil
end

# Remove the document from the queue. This should be called when the work is done and the document is no longer needed.
# You must provide the process identifier that the document was locked with to complete it.
def complete(id)
begin
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
cmd['query'] = {:_id => BSON::ObjectId.from_string(id)}
cmd['update'] = {'$set' => {:completed_at => Time.now.utc, :complete => true}, '$inc' => {:completecount => 1} }
cmd['limit'] = 1
collection.db.command(cmd)
rescue Mongo::OperationFailure => of
#opfailure happens when item has been already completed
return nil
end
end
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
cmd['query'] = {:_id => BSON::ObjectId.from_string(id)}
cmd['update'] = {'$set' => {:completed_at => Time.now.utc,
:complete => true},
'$inc' => {:completecount => 1} }
cmd['limit'] = 1
collection.db.command(cmd)
rescue Mongo::OperationFailure => of
#opfailure happens when item has been already completed
nil
end

# Removes completed job history
def cleanup()
Expand All @@ -167,10 +222,10 @@ def stats
return db.eval(
function(){
var nowutc = new Date();
var a = db.#{collection.name}.count({'complete': false, '$or':[{'locked_till':null},{'locked_till':{'$lt':nowutc}}] });
var a = db.#{collection.name}.count({'complete': false, '$or':[{'locked' : false}, {'locked_till':null},{'locked_till':{'$lt':nowutc}}] });
var c = db.#{collection.name}.count({'complete': true});
var t = db.#{collection.name}.count();
var l = db.#{collection.name}.count({'complete': false, 'locked_till': {'$gte':nowutc} });
var l = db.#{collection.name}.count({'complete': false, 'locked' : true, 'locked_till': {'$gte':nowutc} });
var rc = db.#{collection.name}.group({
'key': {},
'cond': {'complete':true},
Expand Down Expand Up @@ -236,20 +291,61 @@ def self.generate_duplicate_key(body)
return Digest::MD5.hexdigest(body.to_json) #won't ever match a duplicate. Need a better way to handle hashes and arrays.
end

def peek
firstfew = collection.find({
:complete => false,
'$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}]
},
:sort => [[:priority, :descending],[:inserted_at, :ascending]],
:limit => 10)
return firstfew
end
def peek(opts = {})
timeout = opts[:timeout] || @config[:timeout]
query = {:complete => false, }

if timeout
query['$or'] = [ {:locked => false},
{:locked_till=> nil},
{:locked_till=>{'$lt'=>Time.now.utc}}]
else
query = { "$and" => [{:complete => false}, {:locked => false },
{ "$or" => [{:locked_till => nil},
{:locked_till => {'$lt' => Time.now.utc}}]
}]}

end

collection.find( query,
:sort => [[:priority, :descending],[:inserted_at, :ascending]],
:limit => 10)
end

# Set the priority of an item to a custom value
def change_item_priority obj_id, priority
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
cmd['update'] = { '$set' => { :priority => priority } }
cmd['query'] = { '_id' => obj_id }

collection.db.command(cmd)
end

# Increase the priority of an item by a given value
def increase_item_priority obj_id, step=1
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
cmd['update'] = { '$inc' => { :priority => step } }
cmd['query'] = { '_id' => obj_id }

collection.db.command(cmd)
end

# Decrease the priority of an item by a given value
def decrease_item_priority obj_id, step=1
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
cmd['update'] = { '$inc' => { :priority => - step } }
cmd['query'] = { '_id' => obj_id }

collection.db.command(cmd)
end

protected

def value_of(result) #:nodoc:
result['okay'] == 0 ? nil : result['value']
end

end
end
Loading