From edf909f395b027b418431654bd6bf4cd5281ea84 Mon Sep 17 00:00:00 2001 From: Oleg Pudeyev Date: Wed, 5 Aug 2020 21:41:52 -0400 Subject: [PATCH 1/5] not master wip --- lib/mongo.ex | 30 +++++++++++++++++++----------- lib/mongo/error.ex | 26 ++++++++++++++++++++------ lib/mongo/session.ex | 5 ++++- 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/lib/mongo.ex b/lib/mongo.ex index dc20d913..3bd994ec 100644 --- a/lib/mongo.ex +++ b/lib/mongo.ex @@ -407,7 +407,7 @@ defmodule Mongo do opts = Mongo.retryable_reads(opts) with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts), - result <- exec_command_session(session, cmd, opts), + result <- exec_command_session(topology_pid, session, cmd, opts), :ok <- Session.end_implict_session(topology_pid, session) do case result do {:error, error} -> @@ -429,7 +429,7 @@ defmodule Mongo do opts = Mongo.retryable_writes(opts, acknowledged?(cmd[:writeConcerns])) with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts), - result <- exec_command_session(session, cmd, opts), + result <- exec_command_session(topology_pid, session, cmd, opts), :ok <- Session.end_implict_session(topology_pid, session) do result else @@ -723,20 +723,28 @@ defmodule Mongo do end @doc false - @spec exec_command_session(GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t} - def exec_command_session(session, cmd, opts) do - with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd), + @spec exec_command_session(GenServer.server, GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t} + def exec_command_session(topology_pid, session, cmd, opts) do + with {:ok, conn, new_cmd, address} <- Session.bind_session(session, cmd), {:ok, _cmd, {doc, event}} <- DBConnection.execute(conn, %Query{action: :command}, [new_cmd], defaults(opts)), doc <- Session.update_session(session, doc, opts), - {:ok, doc} <- check_for_error(doc, event) do + {:ok, doc} <- check_for_error(doc, event, address) do {:ok, doc} else {:error, error} -> ## todo update Topology + IO.inspect(error) + if error.not_master_or_recovering do + IO.inspect("ok") + server_description = Mongo.ServerDescription.from_is_master_error(error.address, error) + GenServer.cast(topology_pid, {:server_description, server_description}) + end + #require IEx; IEx.pry + #:debugger.start() case Error.should_retry_write(error, cmd, opts) do true -> with :ok <- Session.select_server(session, opts) do - exec_command_session(session, cmd, Keyword.put(opts, :write_counter, 2)) + exec_command_session(topology_pid, session, cmd, Keyword.put(opts, :write_counter, 2)) end false -> {:error, error} end @@ -748,13 +756,13 @@ defmodule Mongo do @spec exec_command(GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t} def exec_command(conn, cmd, opts) do with {:ok, _cmd, {doc, event}} <- DBConnection.execute(conn, %Query{action: :command}, [cmd], defaults(opts)), - {:ok, doc} <- check_for_error(doc, event) do + {:ok, doc} <- check_for_error(doc, event, conn) do {:ok, doc} end end - defp check_for_error(%{"ok" => ok} = response, {event, duration}) when ok == 1 do + defp check_for_error(%{"ok" => ok} = response, {event, duration}, conn) when ok == 1 do Events.notify(%CommandSucceededEvent{ reply: response, duration: duration, @@ -765,9 +773,9 @@ defmodule Mongo do }, :commands) {:ok, response} end - defp check_for_error(doc, {event, duration}) do + defp check_for_error(doc, {event, duration}, address) do - error = Mongo.Error.exception(doc) + error = Mongo.Error.exception(doc, address) Events.notify(%CommandFailedEvent{ failure: error, diff --git a/lib/mongo/error.ex b/lib/mongo/error.ex index 40965023..239a04d1 100644 --- a/lib/mongo/error.ex +++ b/lib/mongo/error.ex @@ -2,7 +2,7 @@ defmodule Mongo.Error do alias Mongo.Events - defexception [:message, :code, :host, :error_labels, :resumable, :retryable_reads, :retryable_writes] + defexception [:message, :code, :host, :error_labels, :resumable, :retryable_reads, :retryable_writes, :not_master_or_recovering, :address] @host_unreachable 6 @host_not_found 7 @@ -21,6 +21,18 @@ defmodule Mongo.Error do @stale_config 13388 @retry_change_stream 234 @failed_to_satisfy_read_preference 133 + + # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#not-master-and-node-is-recovering + @not_master_or_recovering [ + @interrupted_at_shutdown, + @interrupted_due_to_repl_state_change, + @not_master_or_secondary, + @primary_stepped_down, + @shutdown_in_progress, + + @not_master, + @not_master_no_slaveok, + ] @retryable_writes [@interrupted_at_shutdown, @interrupted_due_to_repl_state_change, @not_master, @not_master_no_slaveok, @not_master_or_secondary, @primary_stepped_down, @shutdown_in_progress, @host_not_found, @@ -42,7 +54,8 @@ defmodule Mongo.Error do error_labels: [String.t] | nil, resumable: boolean, retryable_reads: boolean, - retryable_writes: boolean + retryable_writes: boolean, + not_master_or_recovering: boolean, } def message(e) do @@ -60,12 +73,13 @@ defmodule Mongo.Error do %Mongo.Error{message: "#{host} ssl #{action}: #{formatted_reason} - #{inspect(reason)}", host: host, resumable: false} end - def exception(%{"code" => code, "errmsg" => msg} = doc) do + def exception(%{"code" => code, "errmsg" => msg} = doc, address) do errorLabels = doc["errorLabels"] || [] resumable = Enum.any?(@resumable, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "ResumableChangeStreamError")) retryable_reads = Enum.any?(@retryable_reads, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "RetryableReadError")) retryable_writes = Enum.any?(@retryable_writes, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "RetryableWriteError")) - %Mongo.Error{message: msg, code: code, error_labels: errorLabels, resumable: resumable, retryable_reads: retryable_reads, retryable_writes: retryable_writes} + not_master_or_recovering = Enum.any?(@not_master_or_recovering, &(&1 == code)) + %Mongo.Error{message: msg, code: code, error_labels: errorLabels, resumable: resumable, retryable_reads: retryable_reads, retryable_writes: retryable_writes, not_master_or_recovering: not_master_or_recovering, address: address} end def exception(message: message, code: code) do %Mongo.Error{message: message, code: code, resumable: Enum.any?(@resumable, &(&1 == code))} @@ -76,7 +90,7 @@ defmodule Mongo.Error do end @doc """ - Return true if the error is retryalble for read operations. + Return true if the error is retryable for read operations. """ def should_retry_read(%Mongo.Error{retryable_reads: true}, cmd, opts) do [{command_name,_}|_] = cmd @@ -93,7 +107,7 @@ defmodule Mongo.Error do end @doc """ - Return true if the error is retryalble for writes operations. + Return true if the error is retryable for writes operations. """ def should_retry_write(%Mongo.Error{retryable_writes: true}, cmd, opts) do [{command_name,_}|_] = cmd diff --git a/lib/mongo/session.ex b/lib/mongo/session.ex index e36f4df6..f1710eb7 100644 --- a/lib/mongo/session.ex +++ b/lib/mongo/session.ex @@ -473,7 +473,10 @@ defmodule Mongo.Session do |> ReadPreference.add_read_preference(opts) |> filter_nils() - {:keep_state_and_data, {:ok, conn, cmd}} + {host, port} = Mongo.MongoDBConnection.Utils.hostname_port(opts) + address = "#{host}:#{port}" + + {:keep_state_and_data, {:ok, conn, cmd, address}} end def handle_call_event({:bind_session, cmd}, :starting_transaction, %Session{conn: conn, From bbf159cef597c11aec711ea7e0b5dfa580ac099c Mon Sep 17 00:00:00 2001 From: Michael Maier Date: Sat, 28 Aug 2021 11:40:25 +0200 Subject: [PATCH 2/5] first approach for #63 --- lib/mongo.ex | 88 ++++++++------- lib/mongo/bulk_write.ex | 8 -- lib/mongo/change_stream.ex | 14 ++- lib/mongo/error.ex | 130 +++++++++++++++++------ lib/mongo/monitor.ex | 14 +++ lib/mongo/session.ex | 31 ++++-- lib/mongo/stream.ex | 2 - lib/mongo/topology.ex | 93 ++++++++++++---- test/mongo/cursor_test.exs | 2 +- test/mongo/not_writable_primary_test.exs | 25 +++++ 10 files changed, 290 insertions(+), 117 deletions(-) create mode 100644 test/mongo/not_writable_primary_test.exs diff --git a/lib/mongo.ex b/lib/mongo.ex index f90ce877..33b00bb8 100644 --- a/lib/mongo.ex +++ b/lib/mongo.ex @@ -52,6 +52,8 @@ defmodule Mongo do import Mongo.Utils import Mongo.WriteConcern + require Logger + use Bitwise use Mongo.Messages @@ -409,20 +411,24 @@ defmodule Mongo do opts = Mongo.retryable_reads(opts) with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts), - result <- exec_command_session(topology_pid, session, cmd, opts), + result <- exec_command_session(session, cmd, opts), :ok <- Session.end_implict_session(topology_pid, session) do case result do {:error, error} -> - case Error.should_retry_read(error, cmd, opts) do - true -> issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2)) - false -> {:error, error} + + cond do + Error.not_writable_primary_or_recovering?(error, opts) -> + ## in case of explicity + issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2)) + + Error.should_retry_read(error, cmd, opts) -> + issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2)) + + true -> + {:error, error} end _other -> result end - else - {:new_connection, _server} -> - :timer.sleep(1000) - issue_command(topology_pid, cmd, :read, opts) end end def issue_command(topology_pid, cmd, :write, opts) do @@ -431,13 +437,27 @@ defmodule Mongo do opts = Mongo.retryable_writes(opts, acknowledged?(cmd[:writeConcerns])) with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts), - result <- exec_command_session(topology_pid, session, cmd, opts), + result <- exec_command_session(session, cmd, opts), :ok <- Session.end_implict_session(topology_pid, session) do - result - else - {:new_connection, _server} -> - :timer.sleep(1000) - issue_command(topology_pid, cmd, :write, opts) + + case result do + {:error, error} -> + cond do + Error.not_writable_primary_or_recovering?(error, opts) -> + ## in case of explicity + issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2)) + + Error.should_retry_write(error, cmd, opts) -> + issue_command(topology_pid, cmd, :write, Keyword.put(opts, :write_counter, 2)) + + true -> + {:error, error} + end + + result -> + result + end + end end @@ -724,31 +744,23 @@ defmodule Mongo do end @doc false - @spec exec_command_session(GenServer.server, GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t} - def exec_command_session(topology_pid, session, cmd, opts) do - with {:ok, conn, new_cmd, address} <- Session.bind_session(session, cmd), + @spec exec_command_session(GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t} + def exec_command_session(session, cmd, opts) do + with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd), {:ok, _cmd, {doc, event}} <- DBConnection.execute(conn, %Query{action: :command}, [new_cmd], defaults(opts)), doc <- Session.update_session(session, doc, opts), - {:ok, doc} <- check_for_error(doc, event, address) do + {:ok, doc} <- check_for_error(doc, event) do {:ok, doc} - else + else {:error, error} -> - ## todo update Topology - IO.inspect(error) - if error.not_master_or_recovering do - IO.inspect("ok") - server_description = Mongo.ServerDescription.from_is_master_error(error.address, error) - GenServer.cast(topology_pid, {:server_description, server_description}) - end - #require IEx; IEx.pry - #:debugger.start() - case Error.should_retry_write(error, cmd, opts) do - true -> - with :ok <- Session.select_server(session, opts) do - exec_command_session(topology_pid, session, cmd, Keyword.put(opts, :write_counter, 2)) - end - false -> {:error, error} + case Error.not_writable_primary_or_recovering?(error, opts) do + true -> + Session.mark_server_unknown(session) + {:error, error} + false -> + {:error, error} end + end end @@ -757,13 +769,13 @@ defmodule Mongo do @spec exec_command(GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t} def exec_command(conn, cmd, opts) do with {:ok, _cmd, {doc, event}} <- DBConnection.execute(conn, %Query{action: :command}, [cmd], defaults(opts)), - {:ok, doc} <- check_for_error(doc, event, conn) do + {:ok, doc} <- check_for_error(doc, event) do {:ok, doc} end end - defp check_for_error(%{"ok" => ok} = response, {event, duration}, conn) when ok == 1 do + defp check_for_error(%{"ok" => ok} = response, {event, duration}) when ok == 1 do Events.notify(%CommandSucceededEvent{ reply: response, duration: duration, @@ -774,9 +786,9 @@ defmodule Mongo do }, :commands) {:ok, response} end - defp check_for_error(doc, {event, duration}, address) do + defp check_for_error(doc, {event, duration}) do - error = Mongo.Error.exception(doc, address) + error = Mongo.Error.exception(doc) Events.notify(%CommandFailedEvent{ failure: error, diff --git a/lib/mongo/bulk_write.ex b/lib/mongo/bulk_write.ex index acd85335..e86f6b5e 100644 --- a/lib/mongo/bulk_write.ex +++ b/lib/mongo/bulk_write.ex @@ -187,10 +187,6 @@ defmodule Mongo.BulkWrite do result = one_bulk_write(topology_pid, session, bulk, opts), :ok <- Session.end_implict_session(topology_pid, session) do result - else - {:new_connection, _server} -> - :timer.sleep(1000) - write(topology_pid, bulk, opts) end end @@ -210,10 +206,6 @@ defmodule Mongo.BulkWrite do |> BulkWriteResult.reduce(empty) do result - else - {:new_connection, _server} -> - :timer.sleep(1000) - write(topology_pid, bulk, opts) end end diff --git a/lib/mongo/change_stream.ex b/lib/mongo/change_stream.ex index 1f9abc8b..bc975c68 100644 --- a/lib/mongo/change_stream.ex +++ b/lib/mongo/change_stream.ex @@ -3,6 +3,8 @@ defmodule Mongo.ChangeStream do alias Mongo.Session alias Mongo.Error + require Logger + import Record, only: [defrecordp: 2] defstruct [:topology_pid, :session, :doc, :cmd, :on_resume_token, :opts] @@ -69,6 +71,7 @@ defmodule Mongo.ChangeStream do def aggregate(topology_pid, cmd, fun, opts) do + Logger.info("start_implicit_session ") with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts), {:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do @@ -84,6 +87,7 @@ defmodule Mongo.ChangeStream do def aggregate(topology_pid, session, doc, cmd, fun) do + Logger.info("aggregate wire version: #{inspect Mongo.wire_version(topology_pid)}") with %{"operationTime" => op_time, "cursor" => %{ "id" => cursor_id, @@ -128,6 +132,7 @@ defmodule Mongo.ChangeStream do change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd, on_resume_token: fun) = change_stream, opts) do + Logger.info("Get more") get_more = [ getMore: %BSON.LongNumber{value: cursor_id}, collection: coll, @@ -145,7 +150,7 @@ defmodule Mongo.ChangeStream do case token_changes(old_token, new_token) do true -> fun.(new_token) - false -> nil + false -> :noop end {:ok, %{cursor_id: new_cursor_id, docs: docs, change_stream: change_stream}} @@ -154,6 +159,7 @@ defmodule Mongo.ChangeStream do {:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable} {:error, _error} -> + Logger.info("Resuming....: #{inspect Mongo.wire_version(topology_pid) }") with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do [%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options @@ -165,11 +171,14 @@ defmodule Mongo.ChangeStream do kill_cursors(session, coll, [cursor_id], opts) # Start aggregation again... + Logger.info("Calling aggregate again") with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do {:resume, state} end end - + reason -> + Logger.info("Error: #{inspect reason}") + {:error, nil} end end @@ -251,6 +260,7 @@ defmodule Mongo.ChangeStream do """ def kill_cursors(session, coll, cursor_ids, opts) do + ## todo Logger.info("Kill-Cursor") cmd = [ killCursors: coll, cursors: cursor_ids |> Enum.map(fn id -> %BSON.LongNumber{value: id} end) diff --git a/lib/mongo/error.ex b/lib/mongo/error.ex index 239a04d1..d37cf673 100644 --- a/lib/mongo/error.ex +++ b/lib/mongo/error.ex @@ -2,50 +2,85 @@ defmodule Mongo.Error do alias Mongo.Events - defexception [:message, :code, :host, :error_labels, :resumable, :retryable_reads, :retryable_writes, :not_master_or_recovering, :address] + defexception [:message, :code, :host, :error_labels, :resumable, :retryable_reads, :retryable_writes, :not_writable_primary_or_recovering] - @host_unreachable 6 + @exceeded_time_limit 262 + @failed_to_satisfy_read_preference 133 @host_not_found 7 + @host_unreachable 6 + @interrupted_at_shutdown 11600 + @interrupted_due_to_repl_state_change 11602 @network_timeout 89 - @shutdown_in_progress 91 + @not_primary_no_secondary_ok 13435 + @not_primary_or_secondary 13436 + @not_writable_primary 10107 @primary_stepped_down 189 - @exceeded_time_limit 262 + @retry_change_stream 234 + @shutdown_in_progress 91 @socket_exception 9001 - @not_master 10107 - @interrupted_at_shutdown 11600 - @interrupted_due_to_repl_state_change 11602 - @not_master_no_slaveok 13435 - @not_master_or_secondary 13436 - @stale_shard_version 63 - @stale_epoch 150 @stale_config 13388 - @retry_change_stream 234 - @failed_to_satisfy_read_preference 133 - - # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#not-master-and-node-is-recovering - @not_master_or_recovering [ + @stale_epoch 150 + @stale_shard_version 63 + + @retryable_writes [ + @exceeded_time_limit, + @host_not_found, + @host_unreachable, @interrupted_at_shutdown, @interrupted_due_to_repl_state_change, - @not_master_or_secondary, + @network_timeout, + @not_primary_no_secondary_ok, + @not_primary_or_secondary, + @not_writable_primary, @primary_stepped_down, @shutdown_in_progress, - - @not_master, - @not_master_no_slaveok, + @socket_exception ] - @retryable_writes [@interrupted_at_shutdown, @interrupted_due_to_repl_state_change, @not_master, @not_master_no_slaveok, - @not_master_or_secondary, @primary_stepped_down, @shutdown_in_progress, @host_not_found, - @host_unreachable, @network_timeout, @socket_exception, @exceeded_time_limit ] + @retryable_reads [ + @host_not_found, + @host_unreachable, + @interrupted_due_to_repl_state_change, + @network_timeout, + @not_primary_no_secondary_ok, + @not_primary_or_secondary, + @not_writable_primary, + @primary_stepped_down, + @socket_exception, + @interrupted_at_shutdown + ] - @retryable_reads [@interrupted_at_shutdown, @interrupted_due_to_repl_state_change, @not_master, - @not_master_no_slaveok, @not_master_or_secondary, @primary_stepped_down, - @host_not_found, @host_unreachable , @network_timeout, @socket_exception] + @resumable [ + @exceeded_time_limit, + @interrupted_due_to_repl_state_change, + @stale_epoch, + @failed_to_satisfy_read_preference, + @host_not_found, + @host_unreachable, + @interrupted_at_shutdown, + @interrupted_at_shutdown, + @network_timeout, + @not_primary_no_secondary_ok, + @not_primary_or_secondary, + @not_writable_primary, + @primary_stepped_down, + @retry_change_stream, + @shutdown_in_progress, + @socket_exception, + @stale_config, + @stale_shard_version + ] - @resumable [@host_unreachable, @host_not_found, @network_timeout, @shutdown_in_progress, @primary_stepped_down, - @exceeded_time_limit, @socket_exception, @not_master, @interrupted_at_shutdown, @interrupted_at_shutdown, - @interrupted_due_to_repl_state_change, @not_master_no_slaveok, @not_master_or_secondary, @stale_shard_version, - @stale_epoch, @stale_config, @retry_change_stream, @failed_to_satisfy_read_preference] + # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#not-writable-primary-and-node-is-recovering + @not_writable_primary_or_recovering [ + @interrupted_at_shutdown, + @interrupted_due_to_repl_state_change, + @not_primary_no_secondary_ok, + @not_primary_or_secondary, + @not_writable_primary, + @primary_stepped_down, + @shutdown_in_progress + ] @type t :: %__MODULE__{ message: String.t, @@ -55,7 +90,7 @@ defmodule Mongo.Error do resumable: boolean, retryable_reads: boolean, retryable_writes: boolean, - not_master_or_recovering: boolean, + not_writable_primary_or_recovering: boolean } def message(e) do @@ -73,14 +108,22 @@ defmodule Mongo.Error do %Mongo.Error{message: "#{host} ssl #{action}: #{formatted_reason} - #{inspect(reason)}", host: host, resumable: false} end - def exception(%{"code" => code, "errmsg" => msg} = doc, address) do + def exception(%{"code" => code, "errmsg" => msg} = doc) do errorLabels = doc["errorLabels"] || [] resumable = Enum.any?(@resumable, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "ResumableChangeStreamError")) retryable_reads = Enum.any?(@retryable_reads, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "RetryableReadError")) retryable_writes = Enum.any?(@retryable_writes, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "RetryableWriteError")) - not_master_or_recovering = Enum.any?(@not_master_or_recovering, &(&1 == code)) - %Mongo.Error{message: msg, code: code, error_labels: errorLabels, resumable: resumable, retryable_reads: retryable_reads, retryable_writes: retryable_writes, not_master_or_recovering: not_master_or_recovering, address: address} + not_writable_primary_or_recovering = Enum.any?(@not_writable_primary_or_recovering, &(&1 == code)) + + %Mongo.Error{message: msg, + code: code, + error_labels: errorLabels, + resumable: resumable, + retryable_reads: retryable_reads, + retryable_writes: retryable_writes, + not_writable_primary_or_recovering: not_writable_primary_or_recovering} end + def exception(message: message, code: code) do %Mongo.Error{message: message, code: code, resumable: Enum.any?(@resumable, &(&1 == code))} end @@ -89,6 +132,7 @@ defmodule Mongo.Error do %Mongo.Error{message: message, resumable: false} end + @doc """ Return true if the error is retryable for read operations. """ @@ -129,6 +173,24 @@ defmodule Mongo.Error do def has_label(_other, _label) do false end + + def not_writable_primary?(%Mongo.Error{code: code}) do + code == @not_writable_primary + end + def not_primary_no_secondary_ok?(%Mongo.Error{code: code}) do + code == @not_primary_no_secondary_ok + end + def not_primary_or_secondary?(%Mongo.Error{code: code}) do + code == @not_primary_or_secondary + end + + @doc """ + Return true if the error == not writable primary or in recovering mode. + """ + def not_writable_primary_or_recovering?(%Mongo.Error{not_writable_primary_or_recovering: result}, opts) do + ## no explicit session, no retry counter but not_writable_primary_or_recovering + Keyword.get(opts, :session, nil) == nil && Keyword.get(opts, :retry_counter, nil) == nil && result + end end defmodule Mongo.WriteError do diff --git a/lib/mongo/monitor.ex b/lib/mongo/monitor.ex index 815c43a6..c20f8cef 100644 --- a/lib/mongo/monitor.ex +++ b/lib/mongo/monitor.ex @@ -12,6 +12,7 @@ defmodule Mongo.Monitor do The result of the `isMaster` command is mapped the `ServerDescription` structure and sent to the topology process, which updates it internal data structure. """ + require Logger use GenServer @@ -48,6 +49,9 @@ defmodule Mongo.Monitor do GenServer.cast(pid, :update) end + def set_heartbeat_frequency_ms(pid, heartbeat_frequency_ms) do + GenServer.cast(pid, {:update, heartbeat_frequency_ms}) + end @doc """ Initialize the monitor process @@ -105,6 +109,14 @@ defmodule Mongo.Monitor do {:noreply, new_state, new_state.heartbeat_frequency_ms} end + def handle_cast({:update, heartbeat_frequency_ms}, state) do + new_state = state + |> update_server_description() + |> Map.put(:heartbeat_frequency_ms, heartbeat_frequency_ms) + # we return with heartbeat_frequency_ms, so after heartbeat_frequency_ms handle_info(:timeout...) gets called. + {:noreply, new_state, new_state.heartbeat_frequency_ms} + end + def handle_cast(:stop, state) do exit(:normal) {:noreply, state} @@ -116,6 +128,8 @@ defmodule Mongo.Monitor do """ def handle_info(:timeout, state) do new_state = update_server_description(state) + + ## todo Logger.info("XXX: heartbeat_frequency_ms: #{inspect new_state.heartbeat_frequency_ms}") {:noreply, new_state, new_state.heartbeat_frequency_ms} end def handle_info(:update, state) do diff --git a/lib/mongo/session.ex b/lib/mongo/session.ex index f1710eb7..f0f4417d 100644 --- a/lib/mongo/session.ex +++ b/lib/mongo/session.ex @@ -99,6 +99,8 @@ defmodule Mongo.Session do import Keywords import Mongo.WriteConcern + require Logger + alias BSON.Timestamp alias Mongo.Error alias Mongo.ReadPreference @@ -122,6 +124,7 @@ defmodule Mongo.Session do defstruct [ topology: nil, conn: nil, + address: nil, recovery_token: nil, server_session: nil, causal_consistency: false, @@ -136,8 +139,8 @@ defmodule Mongo.Session do Start the generic state machine. """ # @spec start_link(GenServer.server, ServerSession.t, atom, integer, keyword()) :: {:ok, Session.t} | :ignore | {:error, term()} - def start_link(topology, conn, server_session, type, wire_version, opts) do - {:ok, spawn_link(__MODULE__, :init, [topology, conn, server_session, type, wire_version, opts])} + def start_link(topology, conn, address, server_session, type, wire_version, opts) do + {:ok, spawn_link(__MODULE__, :init, [topology, conn, address, server_session, type, wire_version, opts])} end @doc """ @@ -153,8 +156,7 @@ defmodule Mongo.Session do with {:ok, session} <- Topology.checkout_session(topology_pid, type, :explicit, opts) do {:ok, session} else - {:new_connection, _server} -> - :timer.sleep(1000) + :new_connection -> start_session(topology_pid, type, opts) end end @@ -170,8 +172,9 @@ defmodule Mongo.Session do with {:ok, session} <- Topology.checkout_session(topology_pid, type, :implicit, opts) do {:ok, session} else - {:new_connection, _server} -> - :timer.sleep(1000) + :new_connection -> + ## todo Logger.info("New connection while checkout session") + ## :timer.sleep(1000) start_implicit_session(topology_pid, type, opts) {:error, error} -> {:error, error} other -> {:error, Mongo.Error.exception("Unknow result #{inspect other} while calling Topology.checkout_session/4")} @@ -180,7 +183,11 @@ defmodule Mongo.Session do end end + def mark_server_unknown(pid) do + call(pid, :mark_server_unknown) + end def select_server(pid, opts) do + Logger.info("Select server") call(pid, {:select_server, opts}) end @@ -393,7 +400,7 @@ defmodule Mongo.Session do send(pid, {:cast, arguments}) end - def init(topology, conn, server_session, type, wire_version, opts) do + def init(topology, conn, address, server_session, type, wire_version, opts) do server_session = case opts[:retryable_write] do ## in case of `:retryable_write` we need to inc the transaction id true -> ServerSession.next_txn_num(server_session) @@ -403,6 +410,7 @@ defmodule Mongo.Session do data = %Session{ topology: topology, conn: conn, + address: address, server_session: server_session, implicit: (type == :implicit), wire_version: wire_version, @@ -473,10 +481,7 @@ defmodule Mongo.Session do |> ReadPreference.add_read_preference(opts) |> filter_nils() - {host, port} = Mongo.MongoDBConnection.Utils.hostname_port(opts) - address = "#{host}:#{port}" - - {:keep_state_and_data, {:ok, conn, cmd, address}} + {:keep_state_and_data, {:ok, conn, cmd}} end def handle_call_event({:bind_session, cmd}, :starting_transaction, %Session{conn: conn, @@ -549,6 +554,10 @@ defmodule Mongo.Session do _ -> {:keep_state_and_data, :noop} end end + def handle_call_event(:mark_server_unknown, _state, %Session{topology: topology, address: address}) do + Topology.mark_server_unknown(topology, address) + {:keep_state_and_data, :ok} + end def handle_cast_event({:update_recovery_token, recovery_token}, _state, %Session{} = data) do %Session{data | recovery_token: recovery_token} end diff --git a/lib/mongo/stream.ex b/lib/mongo/stream.ex index 0e56bdca..cc4aac0c 100644 --- a/lib/mongo/stream.ex +++ b/lib/mongo/stream.ex @@ -7,8 +7,6 @@ defmodule Mongo.Stream do defstruct [:topology_pid, :session, :cursor, :coll, :docs, :cmd, :opts] - alias Mongo.Session - def new(topology_pid, cmd, opts) do ## check, if retryable reads are enabled diff --git a/lib/mongo/topology.ex b/lib/mongo/topology.ex index 8bc24c76..1e16cf5d 100644 --- a/lib/mongo/topology.ex +++ b/lib/mongo/topology.ex @@ -4,19 +4,33 @@ defmodule Mongo.Topology do require Logger use GenServer - alias Mongo.Events.{ServerDescriptionChangedEvent, ServerOpeningEvent, ServerClosedEvent, - TopologyDescriptionChangedEvent, TopologyOpeningEvent, TopologyClosedEvent, - ServerSelectionEmptyEvent} - alias Mongo.TopologyDescription - alias Mongo.ServerDescription + + alias Mongo.Events.ServerDescriptionChangedEvent + alias Mongo.Events.ServerSelectionEmptyEvent + alias Mongo.Events.TopologyClosedEvent + alias Mongo.Events.TopologyDescriptionChangedEvent + alias Mongo.Events.TopologyOpeningEvent + alias Mongo.Events.ServerClosedEvent + alias Mongo.Events.ServerOpeningEvent alias Mongo.Monitor - alias Mongo.Session.SessionPool + alias Mongo.ServerDescription alias Mongo.Session + alias Mongo.Session.SessionPool + alias Mongo.TopologyDescription - @limits [:compression, :logical_session_timeout, :max_bson_object_size, :max_message_size_bytes, :max_wire_version, :max_write_batch_size, :read_only] + @limits [ + :compression, + :logical_session_timeout, + :max_bson_object_size, + :max_message_size_bytes, + :max_wire_version, + :max_write_batch_size, + :read_only, + ] # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#heartbeatfrequencyms-defaults-to-10-seconds-or-60-seconds - @heartbeat_frequency_ms 10_000 + @max_heartbeat_frequency_ms 10_000 + @min_heartbeat_frequency_ms 500 @default_checkout_timeout 60_000 @@ -58,6 +72,12 @@ defmodule Mongo.Topology do GenServer.call(pid, {:select_server, type, opts}, timeout) end + def mark_server_unknown(pid, address) do + ## todo Logger.info("mark_server_unknown #{inspect address} ") + server_description = ServerDescription.from_is_master_error(address, "not writable primary or recovering") + update_server_description(pid, server_description) + end + def limits(pid) do GenServer.call(pid, :limits) end @@ -103,7 +123,7 @@ defmodule Mongo.Topology do set_name: set_name, servers: servers, local_threshold_ms: local_threshold_ms, - heartbeat_frequency_ms: @heartbeat_frequency_ms + heartbeat_frequency_ms: @min_heartbeat_frequency_ms }), seeds: seeds, opts: opts, @@ -131,9 +151,7 @@ defmodule Mongo.Topology do end Enum.each(state.connection_pools, fn {_address, pid} -> GenServer.stop(pid) end) Enum.each(state.monitors, fn {_address, pid} -> GenServer.stop(pid) end) - :ok = Mongo.Events.notify(%TopologyClosedEvent{ - topology_pid: self() - }) + Mongo.Events.notify(%TopologyClosedEvent{topology_pid: self()}) end # see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#updating-the-topologydescription @@ -180,7 +198,7 @@ defmodule Mongo.Topology do {:ok, pool} = DBConnection.start_link(Mongo.MongoDBConnection, conn_opts) connection_pools = Map.put(state.connection_pools, host, pool) - Process.send_after(self(), {:new_connection, state.waiting_pids, host}, 1000) + Process.send_after(self(), {:new_connection, state.waiting_pids}, 10) %{state | connection_pools: connection_pools, waiting_pids: []} end @@ -207,8 +225,9 @@ defmodule Mongo.Topology do {:noreply, %{state | session_pool: SessionPool.checkin(pool, server_session)}} end - def handle_info({:new_connection, waiting_pids, host}, state) do - Enum.each(waiting_pids, fn from -> GenServer.reply(from, {:new_connection, host}) end) + def handle_info({:new_connection, waiting_pids}, state) do + ## todo Logger.info("Notify waitings pid with new_connection #{inspect waiting_pids}") + Enum.each(waiting_pids, fn from -> GenServer.reply(from, :new_connection) end) {:noreply, state} end @@ -219,6 +238,7 @@ defmodule Mongo.Topology do state |> get_and_update_in([:topology], &TopologyDescription.update(&1, server_description, length(state.seeds))) |> process_events() + |> update_heartbeat_frequency() |> update_monitor() |> update_session_pool(logical_session_timeout) end @@ -226,9 +246,39 @@ defmodule Mongo.Topology do state |> get_and_update_in([:topology], &TopologyDescription.update(&1, server_description, length(state.seeds))) |> process_events() + |> update_heartbeat_frequency() |> update_monitor() end + defp update_heartbeat_frequency(%{:topology => %{heartbeat_frequency_ms: current} = topology, monitors: monitors} = state) do + case TopologyDescription.select_servers(topology, :write, []) do + :empty -> + case current == @min_heartbeat_frequency_ms do + true -> + ## todo Logger.info("Topology: no primary found, searching") + state + false -> + ## todo Logger.info("Topology: no primary found, start searching") + Enum.each(monitors, fn {_address, pid} -> Monitor.set_heartbeat_frequency_ms(pid, @min_heartbeat_frequency_ms) end) + put_in(state[:topology][:heartbeat_frequency_ms], @min_heartbeat_frequency_ms) + end + host -> + case current == @max_heartbeat_frequency_ms do + true -> + ## todo Logger.info("Topology: primary exist") + state + false -> + ## todo Logger.info("Topology: primary found #{inspect host}, stop searching") + + ## filter own pid + Enum.each(monitors, fn {_address, pid} -> Monitor.set_heartbeat_frequency_ms(pid, @max_heartbeat_frequency_ms) end) + Process.send_after(self(), {:new_connection, state.waiting_pids}, 10) + state = put_in(state[:topology][:heartbeat_frequency_ms], @max_heartbeat_frequency_ms) + %{state | waiting_pids: []} + end + end + end + defp process_events({events, state}) do Enum.each(events, fn {:force_check, _} = message -> :ok = GenServer.cast(self(), message) @@ -270,7 +320,7 @@ defmodule Mongo.Topology do with {:ok, connection} <- get_connection(address, state), wire_version <- wire_version(address, topology), {server_session, new_state} <- checkout_server_session(state), - {:ok, session} <- Session.start_link(self(), connection, server_session, type, wire_version, opts) do + {:ok, session} <- Session.start_link(self(), connection, address, server_session, type, wire_version, opts) do {:reply, {:ok, session}, new_state} else error -> {:reply, error, state} ## in case of an error, just return the error @@ -312,10 +362,11 @@ defmodule Mongo.Topology do end def handle_call(:wire_version, _from, %{:topology => topology} = state) do - case TopologyDescription.select_servers(topology, :write, []) do + case TopologyDescription.select_servers(topology, :read, []) do :empty -> - Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :wire_version, cmd_type: :write, topology: topology}) - {:reply, nil, state} + Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :wire_version, cmd_type: :read, topology: topology}) + ## todo fix me + {:reply, {:ok, 9}, state} {:ok, {address, _opts}} -> {:reply, {:ok, wire_version(address, topology)}, state} @@ -360,7 +411,7 @@ defmodule Mongo.Topology do ## # update the monitor process. For new servers the function creates new monitor processes. # - defp update_monitor(state) do + defp update_monitor(%{topology: %{heartbeat_frequency_ms: heartbeat_frequency_ms}} = state) do arbiters = fetch_arbiters(state) old_addrs = Map.keys(state.monitors) # remove arbiters from connection pool as descriptions are recieved @@ -376,7 +427,7 @@ defmodule Mongo.Topology do Mongo.Events.notify(%ServerOpeningEvent{address: address, topology_pid: self()}) - args = [server_description.address, self(), @heartbeat_frequency_ms, Keyword.put(connopts, :pool, DBConnection.ConnectionPool)] + args = [server_description.address, self(), heartbeat_frequency_ms, Keyword.put(connopts, :pool, DBConnection.ConnectionPool)] {:ok, pid} = Monitor.start_link(args) %{ state | monitors: Map.put(state.monitors, address, pid) } diff --git a/test/mongo/cursor_test.exs b/test/mongo/cursor_test.exs index e30dba35..d329f394 100644 --- a/test/mongo/cursor_test.exs +++ b/test/mongo/cursor_test.exs @@ -39,7 +39,7 @@ defmodule Mongo.CursorTest do # issue #35: Crash executing find function without enough permission test "matching errors in the next function of the stream api", c do - assert {:error, %Mongo.Error{__exception__: true, code: 2, error_labels: '', host: nil, message: "unknown operator: $gth", resumable: false, retryable_reads: false, retryable_writes: false}} == Mongo.find(c.pid, "test", [_id: ["$gth": 1]]) + assert {:error, %Mongo.Error{__exception__: true, code: 2, error_labels: '', host: nil, message: "unknown operator: $gth", resumable: false, retryable_reads: false, retryable_writes: false, not_writable_primary_or_recovering: false}} == Mongo.find(c.pid, "test", [_id: ["$gth": 1]]) end end diff --git a/test/mongo/not_writable_primary_test.exs b/test/mongo/not_writable_primary_test.exs new file mode 100644 index 00000000..9a69484a --- /dev/null +++ b/test/mongo/not_writable_primary_test.exs @@ -0,0 +1,25 @@ +defmodule Mongo.NotWritablePrimaryTest do + use ExUnit.Case, async: false + + setup_all do + assert {:ok, top} = Mongo.TestConnection.connect + Mongo.drop_database(top) + %{pid: top} + end + + test "not writable primary", c do + + top = c.pid + + cmd = [ + configureFailPoint: "failCommand", + mode: [times: 1], + data: [errorCode: 10107, failCommands: ["insert"], closeConnection: false] + ] + + assert {:ok, %Mongo.InsertOneResult{}} = Mongo.insert_one(top, "users", %{name: "Greta1"}) + Mongo.admin_command(top, cmd) + assert {:ok, %Mongo.InsertOneResult{}} = Mongo.insert_one(top, "users", %{name: "Greta2"}) + end + +end From ce83c507ff5d60edb31e0ccc3692fb7ea76e9ce7 Mon Sep 17 00:00:00 2001 From: Michael Maier Date: Sat, 28 Aug 2021 13:56:55 +0200 Subject: [PATCH 3/5] #63: fixed wire version in change streams, removed some compiler warnings --- lib/mongo/bulk_write.ex | 2 +- lib/mongo/change_stream.ex | 35 +++++++++++------------------------ lib/mongo/session.ex | 11 +++++++++++ lib/mongo/topology.ex | 17 ++++------------- 4 files changed, 27 insertions(+), 38 deletions(-) diff --git a/lib/mongo/bulk_write.ex b/lib/mongo/bulk_write.ex index e86f6b5e..e7c9533f 100644 --- a/lib/mongo/bulk_write.ex +++ b/lib/mongo/bulk_write.ex @@ -191,7 +191,7 @@ defmodule Mongo.BulkWrite do end - def write(topology_pid, %OrderedBulk{coll: coll, ops: ops} = bulk, opts) do + def write(topology_pid, %OrderedBulk{coll: coll, ops: ops}, opts) do write_concern = write_concern(opts) diff --git a/lib/mongo/change_stream.ex b/lib/mongo/change_stream.ex index bc975c68..b05bca02 100644 --- a/lib/mongo/change_stream.ex +++ b/lib/mongo/change_stream.ex @@ -3,8 +3,6 @@ defmodule Mongo.ChangeStream do alias Mongo.Session alias Mongo.Error - require Logger - import Record, only: [defrecordp: 2] defstruct [:topology_pid, :session, :doc, :cmd, :on_resume_token, :opts] @@ -71,7 +69,6 @@ defmodule Mongo.ChangeStream do def aggregate(topology_pid, cmd, fun, opts) do - Logger.info("start_implicit_session ") with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts), {:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do @@ -87,13 +84,11 @@ defmodule Mongo.ChangeStream do def aggregate(topology_pid, session, doc, cmd, fun) do - Logger.info("aggregate wire version: #{inspect Mongo.wire_version(topology_pid)}") with %{"operationTime" => op_time, "cursor" => %{ "id" => cursor_id, "ns" => coll, - "firstBatch" => docs} = response} <- doc, - {:ok, wire_version} <- Mongo.wire_version(topology_pid) do + "firstBatch" => docs} = response} <- doc do [%{"$changeStream" => stream_opts} | _pipeline] = Keyword.get(cmd, :pipeline) # extract the change stream options @@ -105,7 +100,7 @@ defmodule Mongo.ChangeStream do # The initial aggregate response did not include a postBatchResumeToken. has_values = stream_opts["startAtOperationTime"] || stream_opts["startAfter"] || stream_opts["resumeAfter"] - op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], wire_version) + op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], Session.wire_version(session)) # When the ChangeStream is started: # If startAfter is set, cache it. @@ -132,7 +127,6 @@ defmodule Mongo.ChangeStream do change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd, on_resume_token: fun) = change_stream, opts) do - Logger.info("Get more") get_more = [ getMore: %BSON.LongNumber{value: cursor_id}, collection: coll, @@ -159,26 +153,20 @@ defmodule Mongo.ChangeStream do {:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable} {:error, _error} -> - Logger.info("Resuming....: #{inspect Mongo.wire_version(topology_pid) }") - with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do - - [%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options + [%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options - stream_opts = update_stream_options(stream_opts, resume_token, op_time, wire_version) - aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end) + stream_opts = update_stream_options(stream_opts, resume_token, op_time, Session.wire_version(session)) + aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end) - # kill the cursor - kill_cursors(session, coll, [cursor_id], opts) + # kill the cursor + kill_cursors(session, coll, [cursor_id], opts) - # Start aggregation again... - Logger.info("Calling aggregate again") - with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do - {:resume, state} - end + # Start aggregation again... + with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do + {:resume, state} end reason -> - Logger.info("Error: #{inspect reason}") - {:error, nil} + {:error, reason} end end @@ -260,7 +248,6 @@ defmodule Mongo.ChangeStream do """ def kill_cursors(session, coll, cursor_ids, opts) do - ## todo Logger.info("Kill-Cursor") cmd = [ killCursors: coll, cursors: cursor_ids |> Enum.map(fn id -> %BSON.LongNumber{value: id} end) diff --git a/lib/mongo/session.ex b/lib/mongo/session.ex index f0f4417d..a2116162 100644 --- a/lib/mongo/session.ex +++ b/lib/mongo/session.ex @@ -364,6 +364,14 @@ defmodule Mongo.Session do end + @doc """ + Return the wire_version used in the session. + """ + @spec connection(Session.t) :: pid + def wire_version(pid) do + call(pid, :wire_version) + end + @doc """ Return the connection used in the session. """ @@ -532,6 +540,9 @@ defmodule Mongo.Session do def handle_call_event(:abort_transaction, _state, _data) do {:keep_state_and_data, :ok} end + def handle_call_event(:wire_version, _state, %{wire_version: wire_version}) do + {:keep_state_and_data, wire_version} + end def handle_call_event(:connection, _state, %{conn: conn}) do {:keep_state_and_data, conn} end diff --git a/lib/mongo/topology.ex b/lib/mongo/topology.ex index 1e16cf5d..b91afb66 100644 --- a/lib/mongo/topology.ex +++ b/lib/mongo/topology.ex @@ -1,8 +1,6 @@ defmodule Mongo.Topology do @moduledoc false - require Logger - use GenServer alias Mongo.Events.ServerDescriptionChangedEvent @@ -73,7 +71,6 @@ defmodule Mongo.Topology do end def mark_server_unknown(pid, address) do - ## todo Logger.info("mark_server_unknown #{inspect address} ") server_description = ServerDescription.from_is_master_error(address, "not writable primary or recovering") update_server_description(pid, server_description) end @@ -226,7 +223,6 @@ defmodule Mongo.Topology do end def handle_info({:new_connection, waiting_pids}, state) do - ## todo Logger.info("Notify waitings pid with new_connection #{inspect waiting_pids}") Enum.each(waiting_pids, fn from -> GenServer.reply(from, :new_connection) end) {:noreply, state} end @@ -255,20 +251,16 @@ defmodule Mongo.Topology do :empty -> case current == @min_heartbeat_frequency_ms do true -> - ## todo Logger.info("Topology: no primary found, searching") state false -> - ## todo Logger.info("Topology: no primary found, start searching") Enum.each(monitors, fn {_address, pid} -> Monitor.set_heartbeat_frequency_ms(pid, @min_heartbeat_frequency_ms) end) put_in(state[:topology][:heartbeat_frequency_ms], @min_heartbeat_frequency_ms) end - host -> + _host -> case current == @max_heartbeat_frequency_ms do true -> - ## todo Logger.info("Topology: primary exist") state false -> - ## todo Logger.info("Topology: primary found #{inspect host}, stop searching") ## filter own pid Enum.each(monitors, fn {_address, pid} -> Monitor.set_heartbeat_frequency_ms(pid, @max_heartbeat_frequency_ms) end) @@ -351,7 +343,7 @@ defmodule Mongo.Topology do case TopologyDescription.select_servers(topology, :write, []) do :empty -> Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :limits, cmd_type: :write, topology: topology}) - {:reply, nil, state} + {:reply, {:error, :empty}, state} {:ok, {address, _opts}} -> with {:ok, limits} <- get_limits(address, topology) do {:reply, {:ok, limits}, state} @@ -362,11 +354,10 @@ defmodule Mongo.Topology do end def handle_call(:wire_version, _from, %{:topology => topology} = state) do - case TopologyDescription.select_servers(topology, :read, []) do + case TopologyDescription.select_servers(topology, :write, []) do :empty -> Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :wire_version, cmd_type: :read, topology: topology}) - ## todo fix me - {:reply, {:ok, 9}, state} + {:reply, {:error, :empty}, state} {:ok, {address, _opts}} -> {:reply, {:ok, wire_version(address, topology)}, state} From 7809a34bece525e878c6dd069c4c91750e9c8c2e Mon Sep 17 00:00:00 2001 From: Michael Maier Date: Sun, 29 Aug 2021 12:14:26 +0200 Subject: [PATCH 4/5] #63: removed Logger.info code --- lib/mongo/session.ex | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/mongo/session.ex b/lib/mongo/session.ex index a2116162..0971fcfe 100644 --- a/lib/mongo/session.ex +++ b/lib/mongo/session.ex @@ -99,8 +99,6 @@ defmodule Mongo.Session do import Keywords import Mongo.WriteConcern - require Logger - alias BSON.Timestamp alias Mongo.Error alias Mongo.ReadPreference @@ -173,8 +171,6 @@ defmodule Mongo.Session do {:ok, session} else :new_connection -> - ## todo Logger.info("New connection while checkout session") - ## :timer.sleep(1000) start_implicit_session(topology_pid, type, opts) {:error, error} -> {:error, error} other -> {:error, Mongo.Error.exception("Unknow result #{inspect other} while calling Topology.checkout_session/4")} @@ -187,7 +183,6 @@ defmodule Mongo.Session do call(pid, :mark_server_unknown) end def select_server(pid, opts) do - Logger.info("Select server") call(pid, {:select_server, opts}) end From 44f08971e265532dbfb79e1fd0e3f4cb9128b74c Mon Sep 17 00:00:00 2001 From: Michael Maier Date: Sat, 18 Sep 2021 11:45:45 +0200 Subject: [PATCH 5/5] #63: added the reader example --- README.md | 3 +-- examples/reader.ex | 48 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 examples/reader.ex diff --git a/README.md b/README.md index ffb81abf..d8b8dafb 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ # An Alternative Elixir Driver for MongoDB [![Build Status](https://travis-ci.org/zookzook/elixir-mongodb-driver.svg?branch=master)](https://travis-ci.org/zookzook/elixir-mongodb-driver) -[![Coverage Status](https://coveralls.io/repos/github/zookzook/elixir-mongodb-driver/badge.svg?branch=master)](https://coveralls.io/github/zookzook/elixir-mongodb-driver?branch=master) [![Hex.pm](https://img.shields.io/hexpm/v/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver) [![Hex.pm](https://img.shields.io/hexpm/dt/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver) [![Hex.pm](https://img.shields.io/hexpm/dw/mongodb_driver.svg)](https://hex.pm/packages/mongodb_driver) @@ -8,7 +7,7 @@ ## Features - * Supports MongoDB versions 3.2, 3.4, 3.6, 4.0, 4.2, 4.4 + * Supports MongoDB versions 3.2, 3.4, 3.6, 4.x, 5.x * Connection pooling ([through DBConnection 2.x](https://github.com/elixir-ecto/db_connection)) * Streaming cursors * Performant ObjectID generation diff --git a/examples/reader.ex b/examples/reader.ex new file mode 100644 index 00000000..2d3585a8 --- /dev/null +++ b/examples/reader.ex @@ -0,0 +1,48 @@ +defmodule Reader do + + require Logger + + ## + # see https://github.com/zookzook/elixir-mongodb-driver/issues/63 for more information + # + # 1. start a replica set and call the Reader.test() + # 2. go to the primary db and call db.adminCommand({replSetStepDown: 30}) + # 3. check the log to see the error message only one time + ## + def start_link(conn) do + Logger.info("starting reader") + + Task.start_link(fn -> read(conn, false) end) + end + + defp read(conn, error) do + + if error do + Logger.info("Called with error") + end + + # Gets an enumerable cursor for the results + cursor = Mongo.find(conn, "data", %{}) + + error = case cursor do + {:error, error} -> + Logger.info("Error: #{inspect error}") + true + + _ -> + cursor + |> Enum.to_list() + |> Enum.count() + false + end + + read(conn, error) + end + + def test() do + {:ok, conn} = Mongo.start_link(url: "mongodb://localhost:27017,localhost:27018,localhost:27019/load?replicaSet=rs_1") + + Enum.map(1..10_000, fn counter -> Mongo.insert_one(conn, "data", %{counter: counter}) end) + Reader.start_link(conn) + end +end \ No newline at end of file