Skip to content

Commit

Permalink
Merge pull request #105 from zookzook/feature/replica-set-connection
Browse files Browse the repository at this point in the history
Feature/replica set connection
  • Loading branch information
zookzook authored Sep 18, 2021
2 parents b7aa63c + 44f0897 commit af4d10e
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 107 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# An Alternative Elixir Driver for MongoDB
[![Build Status](https://travis-ci.org/zookzook/elixir-mongodb-driver.svg?branch=master)](https://travis-ci.org/zookzook/elixir-mongodb-driver)
[![Coverage Status](https://coveralls.io/repos/github/zookzook/elixir-mongodb-driver/badge.svg?branch=master)](https://coveralls.io/github/zookzook/elixir-mongodb-driver?branch=master)
[![Hex.pm](https://img.shields.io/hexpm/v/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver)
[![Hex.pm](https://img.shields.io/hexpm/dt/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver)
[![Hex.pm](https://img.shields.io/hexpm/dw/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver)
[![Hex.pm](https://img.shields.io/hexpm/dd/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver)

## Features

* Supports MongoDB versions 3.2, 3.4, 3.6, 4.0, 4.2, 4.4
* Supports MongoDB versions 3.2, 3.4, 3.6, 4.x, 5.x
* Connection pooling ([through DBConnection 2.x](https://github.com/elixir-ecto/db_connection))
* Streaming cursors
* Performant ObjectID generation
Expand Down
48 changes: 48 additions & 0 deletions examples/reader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule Reader do

require Logger

##
# see https://github.com/zookzook/elixir-mongodb-driver/issues/63 for more information
#
# 1. start a replica set and call the Reader.test()
# 2. go to the primary db and call db.adminCommand({replSetStepDown: 30})
# 3. check the log to see the error message only one time
##
def start_link(conn) do
Logger.info("starting reader")

Task.start_link(fn -> read(conn, false) end)
end

defp read(conn, error) do

if error do
Logger.info("Called with error")
end

# Gets an enumerable cursor for the results
cursor = Mongo.find(conn, "data", %{})

error = case cursor do
{:error, error} ->
Logger.info("Error: #{inspect error}")
true

_ ->
cursor
|> Enum.to_list()
|> Enum.count()
false
end

read(conn, error)
end

def test() do
{:ok, conn} = Mongo.start_link(url: "mongodb://localhost:27017,localhost:27018,localhost:27019/load?replicaSet=rs_1")

Enum.map(1..10_000, fn counter -> Mongo.insert_one(conn, "data", %{counter: counter}) end)
Reader.start_link(conn)
end
end
60 changes: 40 additions & 20 deletions lib/mongo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ defmodule Mongo do
import Mongo.Utils
import Mongo.WriteConcern

require Logger

use Bitwise
use Mongo.Messages

Expand Down Expand Up @@ -413,16 +415,20 @@ defmodule Mongo do
:ok <- Session.end_implict_session(topology_pid, session) do
case result do
{:error, error} ->
case Error.should_retry_read(error, cmd, opts) do
true -> issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2))
false -> {:error, error}

cond do
Error.not_writable_primary_or_recovering?(error, opts) ->
## in case of explicity
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2))

Error.should_retry_read(error, cmd, opts) ->
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2))

true ->
{:error, error}
end
_other -> result
end
else
{:new_connection, _server} ->
:timer.sleep(1000)
issue_command(topology_pid, cmd, :read, opts)
end
end
def issue_command(topology_pid, cmd, :write, opts) do
Expand All @@ -433,11 +439,25 @@ defmodule Mongo do
with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts),
result <- exec_command_session(session, cmd, opts),
:ok <- Session.end_implict_session(topology_pid, session) do
result
else
{:new_connection, _server} ->
:timer.sleep(1000)
issue_command(topology_pid, cmd, :write, opts)

case result do
{:error, error} ->
cond do
Error.not_writable_primary_or_recovering?(error, opts) ->
## in case of explicity
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2))

Error.should_retry_write(error, cmd, opts) ->
issue_command(topology_pid, cmd, :write, Keyword.put(opts, :write_counter, 2))

true ->
{:error, error}
end

result ->
result
end

end
end

Expand Down Expand Up @@ -731,16 +751,16 @@ defmodule Mongo do
doc <- Session.update_session(session, doc, opts),
{:ok, doc} <- check_for_error(doc, event) do
{:ok, doc}
else
else
{:error, error} ->
## todo update Topology
case Error.should_retry_write(error, cmd, opts) do
true ->
with :ok <- Session.select_server(session, opts) do
exec_command_session(session, cmd, Keyword.put(opts, :write_counter, 2))
end
false -> {:error, error}
case Error.not_writable_primary_or_recovering?(error, opts) do
true ->
Session.mark_server_unknown(session)
{:error, error}
false ->
{:error, error}
end

end

end
Expand Down
10 changes: 1 addition & 9 deletions lib/mongo/bulk_write.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,11 @@ defmodule Mongo.BulkWrite do
result = one_bulk_write(topology_pid, session, bulk, opts),
:ok <- Session.end_implict_session(topology_pid, session) do
result
else
{:new_connection, _server} ->
:timer.sleep(1000)
write(topology_pid, bulk, opts)
end

end

def write(topology_pid, %OrderedBulk{coll: coll, ops: ops} = bulk, opts) do
def write(topology_pid, %OrderedBulk{coll: coll, ops: ops}, opts) do

write_concern = write_concern(opts)

Expand All @@ -210,10 +206,6 @@ defmodule Mongo.BulkWrite do
|> BulkWriteResult.reduce(empty) do

result
else
{:new_connection, _server} ->
:timer.sleep(1000)
write(topology_pid, bulk, opts)
end

end
Expand Down
29 changes: 13 additions & 16 deletions lib/mongo/change_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ defmodule Mongo.ChangeStream do
"cursor" => %{
"id" => cursor_id,
"ns" => coll,
"firstBatch" => docs} = response} <- doc,
{:ok, wire_version} <- Mongo.wire_version(topology_pid) do
"firstBatch" => docs} = response} <- doc do

[%{"$changeStream" => stream_opts} | _pipeline] = Keyword.get(cmd, :pipeline) # extract the change stream options

Expand All @@ -101,7 +100,7 @@ defmodule Mongo.ChangeStream do
# The initial aggregate response did not include a postBatchResumeToken.

has_values = stream_opts["startAtOperationTime"] || stream_opts["startAfter"] || stream_opts["resumeAfter"]
op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], wire_version)
op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], Session.wire_version(session))

# When the ChangeStream is started:
# If startAfter is set, cache it.
Expand Down Expand Up @@ -145,7 +144,7 @@ defmodule Mongo.ChangeStream do

case token_changes(old_token, new_token) do
true -> fun.(new_token)
false -> nil
false -> :noop
end

{:ok, %{cursor_id: new_cursor_id, docs: docs, change_stream: change_stream}}
Expand All @@ -154,22 +153,20 @@ defmodule Mongo.ChangeStream do
{:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable}
{:error, _error} ->

with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do
[%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options

[%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options
stream_opts = update_stream_options(stream_opts, resume_token, op_time, Session.wire_version(session))
aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end)

stream_opts = update_stream_options(stream_opts, resume_token, op_time, wire_version)
aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end)
# kill the cursor
kill_cursors(session, coll, [cursor_id], opts)

# kill the cursor
kill_cursors(session, coll, [cursor_id], opts)

# Start aggregation again...
with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do
{:resume, state}
end
# Start aggregation again...
with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do
{:resume, state}
end

reason ->
{:error, reason}
end
end

Expand Down
Loading

0 comments on commit af4d10e

Please sign in to comment.