Skip to content
This repository has been archived by the owner on Jul 30, 2021. It is now read-only.

Mongo driver 2 support + createIndexes fix #23

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ this to resume interrupted tailers so that no information is lost.

## Version history

### 0.5

Move from the Moped driver to the native Mongo 2.0 driver.

### 0.4

Add support for [tokumx](http://www.tokutek.com/products/tokumx-for-mongodb/). Backwards incompatible changes to persistent tailers to accomodate that. See [UPGRADING.md](UPGRADING.md).
17 changes: 7 additions & 10 deletions lib/mongoriver/persistent_tailer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,21 @@ def initialize(upstream, type, service, opts={})
db = opts[:db] || "_mongoriver"
collection = opts[:collection] || 'oplog-tailers'
@service = service
@state_collection = @upstream_conn.db(db).collection(collection)
@state_collection = @upstream_conn.use(db)[collection]
end

def read_state
row = @state_collection.find_one(:service => @service)
return nil unless row

# Try to do seamless upgrades from old mongoriver versions
case row['v']
when nil
row = @state_collection.find(:service => @service).first
return nil unless row && row.is_a?(Array)
if row[0] == 'state'
return row[1]
else
log.warn("Old style timestamp found in database. Converting!")
ts = Time.at(row['timestamp'].seconds)
ts = Time.at(row[1].seconds)
return {
'position' => most_recent_position(ts),
'time' => ts
}
when 1
return row['state']
end
end

Expand Down
65 changes: 34 additions & 31 deletions lib/mongoriver/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,38 +101,10 @@ def handle_insert(db_name, collection_name, data)
end
end

def handle_create_index(spec)
db_name, collection_name = parse_ns(spec['ns'])
index_key = spec['key'].map do |field, dir|
if dir.is_a?(Numeric)
[field, dir.round]
else
[field, dir]
end
end
options = {}

spec.each do |key, value|
case key
when 'v'
unless value == 1
raise NotImplementedError.new("Only v=1 indexes are supported, " \
"not v=#{value.inspect}")
end
when 'ns', 'key', '_id' # do nothing
else
options[key.to_sym] = value
end
end

assert(options.include?(:name),
"No name defined for index spec #{spec.inspect}")

trigger(:create_index, db_name, collection_name, index_key, options)
end

def handle_cmd(db_name, collection_name, data)
if deleted_from_collection = data['deleteIndexes']
if data['createIndexes']
handle_create_index(data)
elsif deleted_from_collection = data['deleteIndexes']
index_name = data['index']
trigger(:drop_index, db_name, deleted_from_collection, index_name)
elsif created_collection = data['create']
Expand All @@ -159,5 +131,36 @@ def handle_create_collection(db_name, collection_name, data)

trigger(:create_collection, db_name, collection_name, options)
end

def handle_create_index(spec)
db_name, collection_name = parse_ns(spec['createIndexes'] || spec['ns'])

index_key = spec['key'].map do |field, dir|
if dir.is_a?(Numeric)
[field, dir.round]
else
[field, dir]
end
end
options = {}

spec.each do |key, value|
case key
when 'v'
unless value <= 2
raise NotImplementedError.new("Only v=1 and 2 indexes are supported, " \
"not v=#{value.inspect}")
end
when 'createIndexes', 'ns', 'key', '_id' # do nothing
else
options[key.to_sym] = value
end
end

assert(options.include?(:name),
"No name defined for index spec #{spec.inspect}")

trigger(:create_index, db_name, collection_name, index_key, options)
end
end
end
61 changes: 24 additions & 37 deletions lib/mongoriver/tailer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def latest_oplog_entry(before_time=nil)

case database_type
when :mongo
record = oplog_collection.find_one(query, :sort => [['$natural', -1]])
record = oplog_collection.find(query).sort( {'$natural' => -1 } ).limit(1).first
when :toku
record = oplog_collection.find_one(query, :sort => [['_id', -1]])
record = oplog_collection.find(query).sort( {'_id' => -1} ).limit(1).first
end
record
end
Expand All @@ -84,20 +84,19 @@ def connect_upstream
@upstream_conn = Mongo::ReplSetConnection.new(@upstreams, opts)
when :slave, :direct
opts = @conn_opts.merge(:slave_ok => true)
host, port = parse_direct_upstream
@upstream_conn = Mongo::Connection.new(host, port, opts)
@upstream_conn = Mongo::Client.new(@upstreams[0], opts)
raise "Server at #{@upstream_conn.host}:#{@upstream_conn.port} is the primary -- if you're ok with that, check why your wrapper is passing :direct rather than :slave" if @type == :slave && @upstream_conn.primary?
ensure_upstream_replset!
when :existing
raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:db)
raise "Must pass in a single existing Mongo::Connection with :existing" unless @upstreams.length == 1 && @upstreams[0].respond_to?(:use)
@upstream_conn = @upstreams[0]
else
raise "Invalid connection type: #{@type.inspect}"
end
end

def connection_config
@upstream_conn.db('admin').command(:ismaster => 1)
@upstream_conn.use('admin').command(:ismaster => 1).documents.first
end

def ensure_upstream_replset!
Expand All @@ -108,46 +107,27 @@ def ensure_upstream_replset!
end
end

def parse_direct_upstream
raise "When connecting directly to a mongo instance, must provide a single upstream" unless @upstreams.length == 1
upstream = @upstreams[0]
parse_host_spec(upstream)
end

def parse_host_spec(host_spec)
host, port = host_spec.split(':')
host = '127.0.0.1' if host.to_s.length == 0
port = '27017' if port.to_s.length == 0
[host, port.to_i]
end

def oplog_collection
@upstream_conn.db('local').collection(oplog)
@upstream_conn.use('local')[oplog]
end

# Start tailing the oplog.
#
#
# @param [Hash]
# @option opts [BSON::Timestamp, BSON::Binary] :from Placeholder indicating
# @option opts [BSON::Timestamp, BSON::Binary] :from Placeholder indicating
# where to start the query from. Binary value is used for tokumx.
# The timestamp is non-inclusive.
# @option opts [Hash] :filter Extra filters for the query.
# @option opts [Bool] :dont_wait(false)
# @option opts [Bool] :dont_wait(false)
def tail(opts = {})
raise "Already tailing the oplog!" if @cursor

query = build_tail_query(opts)

mongo_opts = {:timeout => false}.merge(opts[:mongo_opts] || {})

oplog_collection.find(query, mongo_opts) do |oplog|
oplog.add_option(Mongo::Constants::OP_QUERY_TAILABLE)
oplog.add_option(Mongo::Constants::OP_QUERY_OPLOG_REPLAY) if query['ts']
oplog.add_option(Mongo::Constants::OP_QUERY_AWAIT_DATA) unless opts[:dont_wait]

log.debug("Starting oplog stream from #{opts[:from] || 'start'}")
@cursor = oplog
end
cursor_type = opts[:dont_wait] ? :tailable : :tailable_await
oplog_replay = query['ts'] ? true : false
mongo_opts = {:timeout => false, :cursor_type => cursor_type, :oplog_replay => oplog_replay}.merge(opts[:mongo_opts] || {})
@cursor = oplog_collection.find(query, mongo_opts).to_enum.lazy
end

# Deprecated: use #tail(:from => ts, ...) instead
Expand All @@ -161,10 +141,9 @@ def tailing
end

def stream(limit=nil, &blk)
count = 0
@streaming = true
state = TailerStreamState.new(limit)
while !@stop && !state.break? && @cursor.has_next?
while !@stop && !state.break? && cursor_has_more?
state.increment

record = @cursor.next
Expand All @@ -180,8 +159,7 @@ def stream(limit=nil, &blk)
end
end
@streaming = false

return @cursor.has_next?
return cursor_has_more?
end

def stop
Expand Down Expand Up @@ -212,5 +190,14 @@ def build_tail_query(opts = {})

query
end

def cursor_has_more?
begin
@cursor.peek
true
rescue StopIteration
false
end
end
end
end
13 changes: 7 additions & 6 deletions lib/mongoriver/toku.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
module Mongoriver
# This module deals with converting TokuMX oplog records into mongodb oplogs.
# This module deals with converting TokuMX oplog records into mongodb oplogs.
class Toku
# @returns true if conn is a TokuMX database and the oplog records need to
# be converted
# @returns true if conn is a TokuMX database and the oplog records need to
# be converted
def self.conversion_needed?(conn)
conn.server_info.has_key? "tokumxVersion"
server_info = conn.command({:buildinfo => 1}).documents.first
server_info.has_key? "tokumxVersion"
end

def self.operations_for(record, conn=nil)
Expand All @@ -19,7 +20,7 @@ def self.operations_for(record, conn=nil)
end

# Convert hash representing a tokumx oplog record to mongodb oplog records.
#
#
# Things to note:
# 1) Unlike mongo oplog, the timestamps will not be monotonically
# increasing
Expand Down Expand Up @@ -71,7 +72,7 @@ def self.insert_record(operation, full_record)
"op" => "i",
# namespace being updated. in form of database-name.collection.name
"ns" => operation["ns"],
# operation being done.
# operation being done.
# e.g. {"_id"=>BSON::ObjectId('53fb8f6b9e126b2106000003')}
"o" => operation["o"]
}
Expand Down
2 changes: 1 addition & 1 deletion lib/mongoriver/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Mongoriver
VERSION = "0.4.5"
VERSION = "0.5.0"
end
4 changes: 2 additions & 2 deletions mongoriver.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ Gem::Specification.new do |gem|
gem.require_paths = ["lib"]
gem.version = Mongoriver::VERSION

gem.add_runtime_dependency('mongo', ['>= 1.12.5', '< 2.0'])
gem.add_runtime_dependency('bson_ext', ['>= 1.12.5', '< 2.0'])
gem.add_runtime_dependency('mongo', '>= 2.0')
gem.add_runtime_dependency('bson_ext')
gem.add_runtime_dependency('log4r')

gem.add_development_dependency('rake')
Expand Down
8 changes: 8 additions & 0 deletions test/cursor_stub.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'mongo'

class CursorStub
include Enumerable

def initialize
@events = []
@index = 0
Expand Down Expand Up @@ -30,4 +32,10 @@ def next
@index += 1
@events[@index - 1]
end

def each(&block)
@events.each do |event|
block.call(event)
end
end
end
6 changes: 4 additions & 2 deletions test/test_mongoriver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ def create_op(op)
end

before do
conn = stub(:db => nil, :server_info => {})
conn = stub(:use => nil)
buildinfo_command = stub(:documents => [{}])
conn.expects(:command).with(:buildinfo => 1).returns(buildinfo_command)
@tailer = Mongoriver::Tailer.new([conn], :existing)
@outlet = Mongoriver::AbstractOutlet.new
@stream = Mongoriver::Stream.new(@tailer, @outlet)
Expand Down Expand Up @@ -62,4 +64,4 @@ def create_op(op)
@outlet.expects(:drop_database).once.with('foo')
@stream.send(:handle_op, create_op({'op'=>'c', 'ns'=>'foo.$cmd', 'o'=>{'dropDatabase'=>1.0}}))
end
end
end
19 changes: 9 additions & 10 deletions test/test_mongoriver_connected.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@

def connect
begin
host, port = MONGO_SERVER.split(':', 2)
Mongo::Connection.new(host, port)
rescue Mongo::ConnectionFailure
Mongo::Client.new(MONGO_SERVER)
rescue Mongo::Error
nil
end
end
Expand Down Expand Up @@ -60,16 +59,16 @@ def run_stream(stream, start)
@outlet.expects(:drop_collection).once.with(db, collection+'_foo').in_sequence(op_sequence)
@outlet.expects(:drop_database).once.with(db) { @stream.stop }.in_sequence(op_sequence)

coll = @mongo[db][collection]
coll = @mongo.use(db)[collection]
coll.insert(doc)
coll.update({'_id' => 'foo'}, doc.merge('bar' => 'qux'))
coll.remove({'_id' => 'foo'})

name = coll.ensure_index(index_keys)
coll.drop_index(name)

@mongo[db].rename_collection(collection, collection+'_foo')
@mongo[db].drop_collection(collection+'_foo')
@mongo.rename_collection(collection, collection+'_foo')
@mongo.drop_collection(collection+'_foo')
@mongo.drop_database(db)

run_stream(@stream, @tail_from)
Expand All @@ -88,7 +87,7 @@ def run_stream(stream, start)
it 'ignores everything before the operation passed in' do
name = '_test_mongoriver'

@mongo[name][name].insert(:a => 5)
@mongo.use(name)[name].insert(:a => 5)

@outlet.expects(:insert).never
@outlet.expects(:drop_database).with(anything) { @stream.stop }
Expand All @@ -110,9 +109,9 @@ def run_stream(stream, start)
db_name != name || update['record'] == 'newvalue'
end

@mongo[name][name].insert('record' => 'value')
@mongo[name][name].update({'record' => 'value'}, {'record' => 'newvalue'})
@mongo.use(name)[name].insert('record' => 'value')
@mongo.use(name)[name].update({'record' => 'value'}, {'record' => 'newvalue'})
run_stream(@stream, Time.now-3)
end
end
end
end
Loading