diff --git a/README.md b/README.md index ffb81abf..d8b8dafb 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ # An Alternative Elixir Driver for MongoDB [![Build Status](https://travis-ci.org/zookzook/elixir-mongodb-driver.svg?branch=master)](https://travis-ci.org/zookzook/elixir-mongodb-driver) -[![Coverage Status](https://coveralls.io/repos/github/zookzook/elixir-mongodb-driver/badge.svg?branch=master)](https://coveralls.io/github/zookzook/elixir-mongodb-driver?branch=master) [![Hex.pm](https://img.shields.io/hexpm/v/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver) [![Hex.pm](https://img.shields.io/hexpm/dt/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver) [![Hex.pm](https://img.shields.io/hexpm/dw/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver) @@ -8,7 +7,7 @@ ## Features - * Supports MongoDB versions 3.2, 3.4, 3.6, 4.0, 4.2, 4.4 + * Supports MongoDB versions 3.2, 3.4, 3.6, 4.x, 5.x * Connection pooling ([through DBConnection 2.x](https://github.com/elixir-ecto/db_connection)) * Streaming cursors * Performant ObjectID generation diff --git a/examples/reader.ex b/examples/reader.ex new file mode 100644 index 00000000..2d3585a8 --- /dev/null +++ b/examples/reader.ex @@ -0,0 +1,48 @@ +defmodule Reader do + + require Logger + + ## + # see https://github.com/zookzook/elixir-mongodb-driver/issues/63 for more information + # + # 1. start a replica set and call the Reader.test() + # 2. go to the primary db and call db.adminCommand({replSetStepDown: 30}) + # 3. check the log to see the error message only one time + ## + def start_link(conn) do + Logger.info("starting reader") + + Task.start_link(fn -> read(conn, false) end) + end + + defp read(conn, error) do + + if error do + Logger.info("Called with error") + end + + # Gets an enumerable cursor for the results + cursor = Mongo.find(conn, "data", %{}) + + error = case cursor do + {:error, error} -> + Logger.info("Error: #{inspect error}") + true + + _ -> + cursor + |> Enum.to_list() + |> Enum.count() + false + end + + read(conn, error) + end + + def test() do + {:ok, conn} = Mongo.start_link(url: "mongodb://localhost:27017,localhost:27018,localhost:27019/load?replicaSet=rs_1") + + Enum.map(1..10_000, fn counter -> Mongo.insert_one(conn, "data", %{counter: counter}) end) + Reader.start_link(conn) + end +end \ No newline at end of file diff --git a/lib/mongo.ex b/lib/mongo.ex index 250ba512..33b00bb8 100644 --- a/lib/mongo.ex +++ b/lib/mongo.ex @@ -52,6 +52,8 @@ defmodule Mongo do import Mongo.Utils import Mongo.WriteConcern + require Logger + use Bitwise use Mongo.Messages @@ -413,16 +415,20 @@ defmodule Mongo do :ok <- Session.end_implict_session(topology_pid, session) do case result do {:error, error} -> - case Error.should_retry_read(error, cmd, opts) do - true -> issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2)) - false -> {:error, error} + + cond do + Error.not_writable_primary_or_recovering?(error, opts) -> + ## in case of explicity + issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2)) + + Error.should_retry_read(error, cmd, opts) -> + issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2)) + + true -> + {:error, error} end _other -> result end - else - {:new_connection, _server} -> - :timer.sleep(1000) - issue_command(topology_pid, cmd, :read, opts) end end def issue_command(topology_pid, cmd, :write, opts) do @@ -433,11 +439,25 @@ defmodule Mongo do with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts), result <- exec_command_session(session, cmd, opts), :ok <- Session.end_implict_session(topology_pid, session) do - result - else - {:new_connection, _server} -> - :timer.sleep(1000) - issue_command(topology_pid, cmd, :write, opts) + + case result do + {:error, error} -> + cond do + Error.not_writable_primary_or_recovering?(error, opts) -> + ## in case of explicity + issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2)) + + Error.should_retry_write(error, cmd, opts) -> + issue_command(topology_pid, cmd, :write, Keyword.put(opts, :write_counter, 2)) + + true -> + {:error, error} + end + + result -> + result + end + end end @@ -731,16 +751,16 @@ defmodule Mongo do doc <- Session.update_session(session, doc, opts), {:ok, doc} <- check_for_error(doc, event) do {:ok, doc} - else + else {:error, error} -> - ## todo update Topology - case Error.should_retry_write(error, cmd, opts) do - true -> - with :ok <- Session.select_server(session, opts) do - exec_command_session(session, cmd, Keyword.put(opts, :write_counter, 2)) - end - false -> {:error, error} + case Error.not_writable_primary_or_recovering?(error, opts) do + true -> + Session.mark_server_unknown(session) + {:error, error} + false -> + {:error, error} end + end end diff --git a/lib/mongo/bulk_write.ex b/lib/mongo/bulk_write.ex index acd85335..e7c9533f 100644 --- a/lib/mongo/bulk_write.ex +++ b/lib/mongo/bulk_write.ex @@ -187,15 +187,11 @@ defmodule Mongo.BulkWrite do result = one_bulk_write(topology_pid, session, bulk, opts), :ok <- Session.end_implict_session(topology_pid, session) do result - else - {:new_connection, _server} -> - :timer.sleep(1000) - write(topology_pid, bulk, opts) end end - def write(topology_pid, %OrderedBulk{coll: coll, ops: ops} = bulk, opts) do + def write(topology_pid, %OrderedBulk{coll: coll, ops: ops}, opts) do write_concern = write_concern(opts) @@ -210,10 +206,6 @@ defmodule Mongo.BulkWrite do |> BulkWriteResult.reduce(empty) do result - else - {:new_connection, _server} -> - :timer.sleep(1000) - write(topology_pid, bulk, opts) end end diff --git a/lib/mongo/change_stream.ex b/lib/mongo/change_stream.ex index 1f9abc8b..b05bca02 100644 --- a/lib/mongo/change_stream.ex +++ b/lib/mongo/change_stream.ex @@ -88,8 +88,7 @@ defmodule Mongo.ChangeStream do "cursor" => %{ "id" => cursor_id, "ns" => coll, - "firstBatch" => docs} = response} <- doc, - {:ok, wire_version} <- Mongo.wire_version(topology_pid) do + "firstBatch" => docs} = response} <- doc do [%{"$changeStream" => stream_opts} | _pipeline] = Keyword.get(cmd, :pipeline) # extract the change stream options @@ -101,7 +100,7 @@ defmodule Mongo.ChangeStream do # The initial aggregate response did not include a postBatchResumeToken. has_values = stream_opts["startAtOperationTime"] || stream_opts["startAfter"] || stream_opts["resumeAfter"] - op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], wire_version) + op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], Session.wire_version(session)) # When the ChangeStream is started: # If startAfter is set, cache it. @@ -145,7 +144,7 @@ defmodule Mongo.ChangeStream do case token_changes(old_token, new_token) do true -> fun.(new_token) - false -> nil + false -> :noop end {:ok, %{cursor_id: new_cursor_id, docs: docs, change_stream: change_stream}} @@ -154,22 +153,20 @@ defmodule Mongo.ChangeStream do {:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable} {:error, _error} -> - with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do + [%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options - [%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options + stream_opts = update_stream_options(stream_opts, resume_token, op_time, Session.wire_version(session)) + aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end) - stream_opts = update_stream_options(stream_opts, resume_token, op_time, wire_version) - aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end) + # kill the cursor + kill_cursors(session, coll, [cursor_id], opts) - # kill the cursor - kill_cursors(session, coll, [cursor_id], opts) - - # Start aggregation again... - with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do - {:resume, state} - end + # Start aggregation again... + with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do + {:resume, state} end - + reason -> + {:error, reason} end end diff --git a/lib/mongo/error.ex b/lib/mongo/error.ex index 5022d7e3..d37cf673 100644 --- a/lib/mongo/error.ex +++ b/lib/mongo/error.ex @@ -2,38 +2,85 @@ defmodule Mongo.Error do alias Mongo.Events - defexception [:message, :code, :host, :error_labels, :resumable, :retryable_reads, :retryable_writes] + defexception [:message, :code, :host, :error_labels, :resumable, :retryable_reads, :retryable_writes, :not_writable_primary_or_recovering] - @host_unreachable 6 + @exceeded_time_limit 262 + @failed_to_satisfy_read_preference 133 @host_not_found 7 + @host_unreachable 6 + @interrupted_at_shutdown 11600 + @interrupted_due_to_repl_state_change 11602 @network_timeout 89 - @shutdown_in_progress 91 + @not_primary_no_secondary_ok 13435 + @not_primary_or_secondary 13436 + @not_writable_primary 10107 @primary_stepped_down 189 - @exceeded_time_limit 262 + @retry_change_stream 234 + @shutdown_in_progress 91 @socket_exception 9001 - @not_master 10107 - @interrupted_at_shutdown 11600 - @interrupted_due_to_repl_state_change 11602 - @not_master_no_slaveok 13435 - @not_master_or_secondary 13436 - @stale_shard_version 63 - @stale_epoch 150 @stale_config 13388 - @retry_change_stream 234 - @failed_to_satisfy_read_preference 133 - - @retryable_writes [@interrupted_at_shutdown, @interrupted_due_to_repl_state_change, @not_master, @not_master_no_slaveok, - @not_master_or_secondary, @primary_stepped_down, @shutdown_in_progress, @host_not_found, - @host_unreachable, @network_timeout, @socket_exception, @exceeded_time_limit ] - - @retryable_reads [@interrupted_at_shutdown, @interrupted_due_to_repl_state_change, @not_master, - @not_master_no_slaveok, @not_master_or_secondary, @primary_stepped_down, - @host_not_found, @host_unreachable , @network_timeout, @socket_exception] + @stale_epoch 150 + @stale_shard_version 63 - @resumable [@host_unreachable, @host_not_found, @network_timeout, @shutdown_in_progress, @primary_stepped_down, - @exceeded_time_limit, @socket_exception, @not_master, @interrupted_at_shutdown, @interrupted_at_shutdown, - @interrupted_due_to_repl_state_change, @not_master_no_slaveok, @not_master_or_secondary, @stale_shard_version, - @stale_epoch, @stale_config, @retry_change_stream, @failed_to_satisfy_read_preference] + @retryable_writes [ + @exceeded_time_limit, + @host_not_found, + @host_unreachable, + @interrupted_at_shutdown, + @interrupted_due_to_repl_state_change, + @network_timeout, + @not_primary_no_secondary_ok, + @not_primary_or_secondary, + @not_writable_primary, + @primary_stepped_down, + @shutdown_in_progress, + @socket_exception + ] + + @retryable_reads [ + @host_not_found, + @host_unreachable, + @interrupted_due_to_repl_state_change, + @network_timeout, + @not_primary_no_secondary_ok, + @not_primary_or_secondary, + @not_writable_primary, + @primary_stepped_down, + @socket_exception, + @interrupted_at_shutdown + ] + + @resumable [ + @exceeded_time_limit, + @interrupted_due_to_repl_state_change, + @stale_epoch, + @failed_to_satisfy_read_preference, + @host_not_found, + @host_unreachable, + @interrupted_at_shutdown, + @interrupted_at_shutdown, + @network_timeout, + @not_primary_no_secondary_ok, + @not_primary_or_secondary, + @not_writable_primary, + @primary_stepped_down, + @retry_change_stream, + @shutdown_in_progress, + @socket_exception, + @stale_config, + @stale_shard_version + ] + + # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#not-writable-primary-and-node-is-recovering + @not_writable_primary_or_recovering [ + @interrupted_at_shutdown, + @interrupted_due_to_repl_state_change, + @not_primary_no_secondary_ok, + @not_primary_or_secondary, + @not_writable_primary, + @primary_stepped_down, + @shutdown_in_progress + ] @type t :: %__MODULE__{ message: String.t, @@ -42,7 +89,8 @@ defmodule Mongo.Error do error_labels: [String.t] | nil, resumable: boolean, retryable_reads: boolean, - retryable_writes: boolean + retryable_writes: boolean, + not_writable_primary_or_recovering: boolean } def message(e) do @@ -65,8 +113,17 @@ defmodule Mongo.Error do resumable = Enum.any?(@resumable, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "ResumableChangeStreamError")) retryable_reads = Enum.any?(@retryable_reads, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "RetryableReadError")) retryable_writes = Enum.any?(@retryable_writes, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "RetryableWriteError")) - %Mongo.Error{message: msg, code: code, error_labels: errorLabels, resumable: resumable, retryable_reads: retryable_reads, retryable_writes: retryable_writes} + not_writable_primary_or_recovering = Enum.any?(@not_writable_primary_or_recovering, &(&1 == code)) + + %Mongo.Error{message: msg, + code: code, + error_labels: errorLabels, + resumable: resumable, + retryable_reads: retryable_reads, + retryable_writes: retryable_writes, + not_writable_primary_or_recovering: not_writable_primary_or_recovering} end + def exception(message: message, code: code) do %Mongo.Error{message: message, code: code, resumable: Enum.any?(@resumable, &(&1 == code))} end @@ -75,6 +132,7 @@ defmodule Mongo.Error do %Mongo.Error{message: message, resumable: false} end + @doc """ Return true if the error is retryable for read operations. """ @@ -115,6 +173,24 @@ defmodule Mongo.Error do def has_label(_other, _label) do false end + + def not_writable_primary?(%Mongo.Error{code: code}) do + code == @not_writable_primary + end + def not_primary_no_secondary_ok?(%Mongo.Error{code: code}) do + code == @not_primary_no_secondary_ok + end + def not_primary_or_secondary?(%Mongo.Error{code: code}) do + code == @not_primary_or_secondary + end + + @doc """ + Return true if the error == not writable primary or in recovering mode. + """ + def not_writable_primary_or_recovering?(%Mongo.Error{not_writable_primary_or_recovering: result}, opts) do + ## no explicit session, no retry counter but not_writable_primary_or_recovering + Keyword.get(opts, :session, nil) == nil && Keyword.get(opts, :retry_counter, nil) == nil && result + end end defmodule Mongo.WriteError do diff --git a/lib/mongo/monitor.ex b/lib/mongo/monitor.ex index 815c43a6..c20f8cef 100644 --- a/lib/mongo/monitor.ex +++ b/lib/mongo/monitor.ex @@ -12,6 +12,7 @@ defmodule Mongo.Monitor do The result of the `isMaster` command is mapped the `ServerDescription` structure and sent to the topology process, which updates it internal data structure. """ + require Logger use GenServer @@ -48,6 +49,9 @@ defmodule Mongo.Monitor do GenServer.cast(pid, :update) end + def set_heartbeat_frequency_ms(pid, heartbeat_frequency_ms) do + GenServer.cast(pid, {:update, heartbeat_frequency_ms}) + end @doc """ Initialize the monitor process @@ -105,6 +109,14 @@ defmodule Mongo.Monitor do {:noreply, new_state, new_state.heartbeat_frequency_ms} end + def handle_cast({:update, heartbeat_frequency_ms}, state) do + new_state = state + |> update_server_description() + |> Map.put(:heartbeat_frequency_ms, heartbeat_frequency_ms) + # we return with heartbeat_frequency_ms, so after heartbeat_frequency_ms handle_info(:timeout...) gets called. + {:noreply, new_state, new_state.heartbeat_frequency_ms} + end + def handle_cast(:stop, state) do exit(:normal) {:noreply, state} @@ -116,6 +128,8 @@ defmodule Mongo.Monitor do """ def handle_info(:timeout, state) do new_state = update_server_description(state) + + ## todo Logger.info("XXX: heartbeat_frequency_ms: #{inspect new_state.heartbeat_frequency_ms}") {:noreply, new_state, new_state.heartbeat_frequency_ms} end def handle_info(:update, state) do diff --git a/lib/mongo/session.ex b/lib/mongo/session.ex index e36f4df6..0971fcfe 100644 --- a/lib/mongo/session.ex +++ b/lib/mongo/session.ex @@ -122,6 +122,7 @@ defmodule Mongo.Session do defstruct [ topology: nil, conn: nil, + address: nil, recovery_token: nil, server_session: nil, causal_consistency: false, @@ -136,8 +137,8 @@ defmodule Mongo.Session do Start the generic state machine. """ # @spec start_link(GenServer.server, ServerSession.t, atom, integer, keyword()) :: {:ok, Session.t} | :ignore | {:error, term()} - def start_link(topology, conn, server_session, type, wire_version, opts) do - {:ok, spawn_link(__MODULE__, :init, [topology, conn, server_session, type, wire_version, opts])} + def start_link(topology, conn, address, server_session, type, wire_version, opts) do + {:ok, spawn_link(__MODULE__, :init, [topology, conn, address, server_session, type, wire_version, opts])} end @doc """ @@ -153,8 +154,7 @@ defmodule Mongo.Session do with {:ok, session} <- Topology.checkout_session(topology_pid, type, :explicit, opts) do {:ok, session} else - {:new_connection, _server} -> - :timer.sleep(1000) + :new_connection -> start_session(topology_pid, type, opts) end end @@ -170,8 +170,7 @@ defmodule Mongo.Session do with {:ok, session} <- Topology.checkout_session(topology_pid, type, :implicit, opts) do {:ok, session} else - {:new_connection, _server} -> - :timer.sleep(1000) + :new_connection -> start_implicit_session(topology_pid, type, opts) {:error, error} -> {:error, error} other -> {:error, Mongo.Error.exception("Unknow result #{inspect other} while calling Topology.checkout_session/4")} @@ -180,6 +179,9 @@ defmodule Mongo.Session do end end + def mark_server_unknown(pid) do + call(pid, :mark_server_unknown) + end def select_server(pid, opts) do call(pid, {:select_server, opts}) end @@ -357,6 +359,14 @@ defmodule Mongo.Session do end + @doc """ + Return the wire_version used in the session. + """ + @spec connection(Session.t) :: pid + def wire_version(pid) do + call(pid, :wire_version) + end + @doc """ Return the connection used in the session. """ @@ -393,7 +403,7 @@ defmodule Mongo.Session do send(pid, {:cast, arguments}) end - def init(topology, conn, server_session, type, wire_version, opts) do + def init(topology, conn, address, server_session, type, wire_version, opts) do server_session = case opts[:retryable_write] do ## in case of `:retryable_write` we need to inc the transaction id true -> ServerSession.next_txn_num(server_session) @@ -403,6 +413,7 @@ defmodule Mongo.Session do data = %Session{ topology: topology, conn: conn, + address: address, server_session: server_session, implicit: (type == :implicit), wire_version: wire_version, @@ -524,6 +535,9 @@ defmodule Mongo.Session do def handle_call_event(:abort_transaction, _state, _data) do {:keep_state_and_data, :ok} end + def handle_call_event(:wire_version, _state, %{wire_version: wire_version}) do + {:keep_state_and_data, wire_version} + end def handle_call_event(:connection, _state, %{conn: conn}) do {:keep_state_and_data, conn} end @@ -546,6 +560,10 @@ defmodule Mongo.Session do _ -> {:keep_state_and_data, :noop} end end + def handle_call_event(:mark_server_unknown, _state, %Session{topology: topology, address: address}) do + Topology.mark_server_unknown(topology, address) + {:keep_state_and_data, :ok} + end def handle_cast_event({:update_recovery_token, recovery_token}, _state, %Session{} = data) do %Session{data | recovery_token: recovery_token} end diff --git a/lib/mongo/stream.ex b/lib/mongo/stream.ex index 0e56bdca..cc4aac0c 100644 --- a/lib/mongo/stream.ex +++ b/lib/mongo/stream.ex @@ -7,8 +7,6 @@ defmodule Mongo.Stream do defstruct [:topology_pid, :session, :cursor, :coll, :docs, :cmd, :opts] - alias Mongo.Session - def new(topology_pid, cmd, opts) do ## check, if retryable reads are enabled diff --git a/lib/mongo/topology.ex b/lib/mongo/topology.ex index 8bc24c76..b91afb66 100644 --- a/lib/mongo/topology.ex +++ b/lib/mongo/topology.ex @@ -1,22 +1,34 @@ defmodule Mongo.Topology do @moduledoc false - require Logger - use GenServer - alias Mongo.Events.{ServerDescriptionChangedEvent, ServerOpeningEvent, ServerClosedEvent, - TopologyDescriptionChangedEvent, TopologyOpeningEvent, TopologyClosedEvent, - ServerSelectionEmptyEvent} - alias Mongo.TopologyDescription - alias Mongo.ServerDescription + + alias Mongo.Events.ServerDescriptionChangedEvent + alias Mongo.Events.ServerSelectionEmptyEvent + alias Mongo.Events.TopologyClosedEvent + alias Mongo.Events.TopologyDescriptionChangedEvent + alias Mongo.Events.TopologyOpeningEvent + alias Mongo.Events.ServerClosedEvent + alias Mongo.Events.ServerOpeningEvent alias Mongo.Monitor - alias Mongo.Session.SessionPool + alias Mongo.ServerDescription alias Mongo.Session + alias Mongo.Session.SessionPool + alias Mongo.TopologyDescription - @limits [:compression, :logical_session_timeout, :max_bson_object_size, :max_message_size_bytes, :max_wire_version, :max_write_batch_size, :read_only] + @limits [ + :compression, + :logical_session_timeout, + :max_bson_object_size, + :max_message_size_bytes, + :max_wire_version, + :max_write_batch_size, + :read_only, + ] # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#heartbeatfrequencyms-defaults-to-10-seconds-or-60-seconds - @heartbeat_frequency_ms 10_000 + @max_heartbeat_frequency_ms 10_000 + @min_heartbeat_frequency_ms 500 @default_checkout_timeout 60_000 @@ -58,6 +70,11 @@ defmodule Mongo.Topology do GenServer.call(pid, {:select_server, type, opts}, timeout) end + def mark_server_unknown(pid, address) do + server_description = ServerDescription.from_is_master_error(address, "not writable primary or recovering") + update_server_description(pid, server_description) + end + def limits(pid) do GenServer.call(pid, :limits) end @@ -103,7 +120,7 @@ defmodule Mongo.Topology do set_name: set_name, servers: servers, local_threshold_ms: local_threshold_ms, - heartbeat_frequency_ms: @heartbeat_frequency_ms + heartbeat_frequency_ms: @min_heartbeat_frequency_ms }), seeds: seeds, opts: opts, @@ -131,9 +148,7 @@ defmodule Mongo.Topology do end Enum.each(state.connection_pools, fn {_address, pid} -> GenServer.stop(pid) end) Enum.each(state.monitors, fn {_address, pid} -> GenServer.stop(pid) end) - :ok = Mongo.Events.notify(%TopologyClosedEvent{ - topology_pid: self() - }) + Mongo.Events.notify(%TopologyClosedEvent{topology_pid: self()}) end # see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#updating-the-topologydescription @@ -180,7 +195,7 @@ defmodule Mongo.Topology do {:ok, pool} = DBConnection.start_link(Mongo.MongoDBConnection, conn_opts) connection_pools = Map.put(state.connection_pools, host, pool) - Process.send_after(self(), {:new_connection, state.waiting_pids, host}, 1000) + Process.send_after(self(), {:new_connection, state.waiting_pids}, 10) %{state | connection_pools: connection_pools, waiting_pids: []} end @@ -207,8 +222,8 @@ defmodule Mongo.Topology do {:noreply, %{state | session_pool: SessionPool.checkin(pool, server_session)}} end - def handle_info({:new_connection, waiting_pids, host}, state) do - Enum.each(waiting_pids, fn from -> GenServer.reply(from, {:new_connection, host}) end) + def handle_info({:new_connection, waiting_pids}, state) do + Enum.each(waiting_pids, fn from -> GenServer.reply(from, :new_connection) end) {:noreply, state} end @@ -219,6 +234,7 @@ defmodule Mongo.Topology do state |> get_and_update_in([:topology], &TopologyDescription.update(&1, server_description, length(state.seeds))) |> process_events() + |> update_heartbeat_frequency() |> update_monitor() |> update_session_pool(logical_session_timeout) end @@ -226,9 +242,35 @@ defmodule Mongo.Topology do state |> get_and_update_in([:topology], &TopologyDescription.update(&1, server_description, length(state.seeds))) |> process_events() + |> update_heartbeat_frequency() |> update_monitor() end + defp update_heartbeat_frequency(%{:topology => %{heartbeat_frequency_ms: current} = topology, monitors: monitors} = state) do + case TopologyDescription.select_servers(topology, :write, []) do + :empty -> + case current == @min_heartbeat_frequency_ms do + true -> + state + false -> + Enum.each(monitors, fn {_address, pid} -> Monitor.set_heartbeat_frequency_ms(pid, @min_heartbeat_frequency_ms) end) + put_in(state[:topology][:heartbeat_frequency_ms], @min_heartbeat_frequency_ms) + end + _host -> + case current == @max_heartbeat_frequency_ms do + true -> + state + false -> + + ## filter own pid + Enum.each(monitors, fn {_address, pid} -> Monitor.set_heartbeat_frequency_ms(pid, @max_heartbeat_frequency_ms) end) + Process.send_after(self(), {:new_connection, state.waiting_pids}, 10) + state = put_in(state[:topology][:heartbeat_frequency_ms], @max_heartbeat_frequency_ms) + %{state | waiting_pids: []} + end + end + end + defp process_events({events, state}) do Enum.each(events, fn {:force_check, _} = message -> :ok = GenServer.cast(self(), message) @@ -270,7 +312,7 @@ defmodule Mongo.Topology do with {:ok, connection} <- get_connection(address, state), wire_version <- wire_version(address, topology), {server_session, new_state} <- checkout_server_session(state), - {:ok, session} <- Session.start_link(self(), connection, server_session, type, wire_version, opts) do + {:ok, session} <- Session.start_link(self(), connection, address, server_session, type, wire_version, opts) do {:reply, {:ok, session}, new_state} else error -> {:reply, error, state} ## in case of an error, just return the error @@ -301,7 +343,7 @@ defmodule Mongo.Topology do case TopologyDescription.select_servers(topology, :write, []) do :empty -> Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :limits, cmd_type: :write, topology: topology}) - {:reply, nil, state} + {:reply, {:error, :empty}, state} {:ok, {address, _opts}} -> with {:ok, limits} <- get_limits(address, topology) do {:reply, {:ok, limits}, state} @@ -314,8 +356,8 @@ defmodule Mongo.Topology do def handle_call(:wire_version, _from, %{:topology => topology} = state) do case TopologyDescription.select_servers(topology, :write, []) do :empty -> - Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :wire_version, cmd_type: :write, topology: topology}) - {:reply, nil, state} + Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :wire_version, cmd_type: :read, topology: topology}) + {:reply, {:error, :empty}, state} {:ok, {address, _opts}} -> {:reply, {:ok, wire_version(address, topology)}, state} @@ -360,7 +402,7 @@ defmodule Mongo.Topology do ## # update the monitor process. For new servers the function creates new monitor processes. # - defp update_monitor(state) do + defp update_monitor(%{topology: %{heartbeat_frequency_ms: heartbeat_frequency_ms}} = state) do arbiters = fetch_arbiters(state) old_addrs = Map.keys(state.monitors) # remove arbiters from connection pool as descriptions are recieved @@ -376,7 +418,7 @@ defmodule Mongo.Topology do Mongo.Events.notify(%ServerOpeningEvent{address: address, topology_pid: self()}) - args = [server_description.address, self(), @heartbeat_frequency_ms, Keyword.put(connopts, :pool, DBConnection.ConnectionPool)] + args = [server_description.address, self(), heartbeat_frequency_ms, Keyword.put(connopts, :pool, DBConnection.ConnectionPool)] {:ok, pid} = Monitor.start_link(args) %{ state | monitors: Map.put(state.monitors, address, pid) } diff --git a/test/mongo/cursor_test.exs b/test/mongo/cursor_test.exs index e30dba35..d329f394 100644 --- a/test/mongo/cursor_test.exs +++ b/test/mongo/cursor_test.exs @@ -39,7 +39,7 @@ defmodule Mongo.CursorTest do # issue #35: Crash executing find function without enough permission test "matching errors in the next function of the stream api", c do - assert {:error, %Mongo.Error{__exception__: true, code: 2, error_labels: '', host: nil, message: "unknown operator: $gth", resumable: false, retryable_reads: false, retryable_writes: false}} == Mongo.find(c.pid, "test", [_id: ["$gth": 1]]) + assert {:error, %Mongo.Error{__exception__: true, code: 2, error_labels: '', host: nil, message: "unknown operator: $gth", resumable: false, retryable_reads: false, retryable_writes: false, not_writable_primary_or_recovering: false}} == Mongo.find(c.pid, "test", [_id: ["$gth": 1]]) end end diff --git a/test/mongo/not_writable_primary_test.exs b/test/mongo/not_writable_primary_test.exs new file mode 100644 index 00000000..9a69484a --- /dev/null +++ b/test/mongo/not_writable_primary_test.exs @@ -0,0 +1,25 @@ +defmodule Mongo.NotWritablePrimaryTest do + use ExUnit.Case, async: false + + setup_all do + assert {:ok, top} = Mongo.TestConnection.connect + Mongo.drop_database(top) + %{pid: top} + end + + test "not writable primary", c do + + top = c.pid + + cmd = [ + configureFailPoint: "failCommand", + mode: [times: 1], + data: [errorCode: 10107, failCommands: ["insert"], closeConnection: false] + ] + + assert {:ok, %Mongo.InsertOneResult{}} = Mongo.insert_one(top, "users", %{name: "Greta1"}) + Mongo.admin_command(top, cmd) + assert {:ok, %Mongo.InsertOneResult{}} = Mongo.insert_one(top, "users", %{name: "Greta2"}) + end + +end