Skip to content


improved the reconnection workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
zookzook committed Apr 26, 2024
1 parent 1218418 commit f12f4e5
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 43 deletions.
21 changes: 19 additions & 2 deletions lib/mongo/mongo_db_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ defmodule Mongo.MongoDBConnection do
wire_version: 0,
auth_mechanism: opts[:auth_mechanism] || nil,
connection_type: Keyword.fetch!(opts, :connection_type),
server_pid: Keyword.get(opts, :server_pid),
topology_pid: Keyword.fetch!(opts, :topology_pid),
stable_api: Keyword.get(opts, :stable_api),
use_op_msg: Keyword.get(opts, :stable_api) != nil,
Expand All @@ -48,8 +49,24 @@ defmodule Mongo.MongoDBConnection do

@impl true
def disconnect(_error, %{connection: {mod, socket}, connection_type: type, topology_pid: pid, host: host}) do
GenServer.cast(pid, {:disconnect, type, host})
## the stream monitor disconnects, we change the mode of the parent monitor
def disconnect(_error, %{connection: {mod, socket}, connection_type: :stream_monitor, parent_pid: parent_pid}) do
## Logger.debug("MongoDB-Connection: disconnected stream monitor: #{inspect(error)}")
GenServer.cast(parent_pid, :stop_streaming_mode)

def disconnect(_error, %{connection: {mod, socket}, connection_type: :monitor, topology_pid: topology_pid, host: host, server_pid: server_pid}) do
## Logger.debug("MongoDB-Connection: disconnected: #{inspect(error)}, #{inspect(server_pid)}, #{inspect(host)}, cast disconnect :monitor")
GenServer.cast(server_pid, :stop_streaming_mode)
GenServer.cast(topology_pid, {:disconnect, :monitor, host, server_pid})

def disconnect(_error, %{connection: {mod, socket}}) do
## Logger.debug("MongoDB-Connection: disconnected: #{inspect error}, #{inspect type}, #{inspect host} #{inspect server_pid}")
Expand Down
28 changes: 21 additions & 7 deletions lib/mongo/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ defmodule Mongo.Monitor do
GenServer.cast(pid, :update)

def stop_streaming_mode(pid) do
GenServer.cast(pid, :stop_streaming_mode)

def set_heartbeat_frequency_ms(pid, heartbeat_frequency_ms) do
GenServer.cast(pid, {:update, heartbeat_frequency_ms})
Expand All @@ -59,7 +63,7 @@ defmodule Mongo.Monitor do
Initialize the monitor process
def init([address, topology_pid, heartbeat_frequency_ms, connection_opts]) do
## debug info("Starting monitor process with pid #{inspect self()}, #{inspect address}")
##"Starting monitor process with pid #{inspect(self())}, #{inspect(address)}")

# monitors don't authenticate and use the "admin" database
opts =
Expand All @@ -73,6 +77,7 @@ defmodule Mongo.Monitor do
|> Keyword.put(:topology_pid, topology_pid)
|> Keyword.put(:pool_size, 1)
|> Keyword.put(:idle_interval, 5_000)
|> Keyword.put(:server_pid, self())

with {:ok, pid} <- DBConnection.start_link(Mongo.MongoDBConnection, opts) do
Expand All @@ -97,18 +102,17 @@ defmodule Mongo.Monitor do

@doc """
In case of terminating we stop the our linked processes as well:
In case of terminating we stop our linked processes as well:
* connection
* streaming process
def terminate(reason, %{connection_pid: connection_pid, streaming_pid: nil}) do
## debug info("Terminating monitor for reason #{inspect reason}")
## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}")
GenServer.stop(connection_pid, reason)

def terminate(reason, %{connection_pid: connection_pid, streaming_pid: streaming_pid}) do
## debug info("Terminating monitor for reason #{inspect reason}, #{inspect self()}, #{inspect streaming_pid}")

## Logger.debug("Terminating monitor #{inspect(self())} for reason #{inspect(reason)}, #{inspect(streaming_pid)}")
GenServer.stop(connection_pid, reason)
GenServer.stop(streaming_pid, reason)
Expand All @@ -117,6 +121,7 @@ defmodule Mongo.Monitor do
Report the connection event, so the topology process can now create the connection pool.
def connected(_connection, me, topology_pid) do
##"Monitor #{inspect(me)} connected to server! ")
Topology.monitor_connected(topology_pid, me)
GenServer.cast(me, :update)
Expand All @@ -125,6 +130,15 @@ defmodule Mongo.Monitor do
{:reply, Map.put(state, :pid, self()), state}

def handle_cast(:stop_streaming_mode, %{streaming_pid: streaming_pid} = state) when streaming_pid != nil do
spawn(fn -> GenServer.stop(streaming_pid) end)
{:noreply, %{state | mode: :polling_mode, streaming_pid: nil}}

def handle_cast(:stop_streaming_mode, state) do
{:noreply, %{state | mode: :polling_mode}}

# Update the server description or the rrt value
Expand Down Expand Up @@ -207,11 +221,11 @@ defmodule Mongo.Monitor do
# Starts the streaming mode
defp start_streaming_mode(%{address: address, topology_pid: topology_pid, opts: opts} = state, _server_description) do
args = [topology_pid, address, opts]
args = [self(), topology_pid, address, opts]

case StreamingHelloMonitor.start_link(args) do
{:ok, pid} ->
## debug info("Starting streaming mode: #{inspect self()}")
## Logger.debug("Starting streaming mode: #{inspect(pid)}")
%{state | mode: :streaming_mode, streaming_pid: pid, heartbeat_frequency_ms: 10_000}

error ->
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/password_safe.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Mongo.PasswordSafe do

use GenServer

def new() do
def start_link() do
GenServer.start_link(@me, [])

Expand Down
4 changes: 2 additions & 2 deletions lib/mongo/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ defmodule Mongo.Repo do
@callback update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}

@doc """
Same as `c:update/1` but raises an error.
Same as `c:update/2` but raises an error.
@callback update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()

Expand All @@ -471,7 +471,7 @@ defmodule Mongo.Repo do
@callback insert_or_update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}

@doc """
Same as `c:insert_or_update/1` but raises an error.
Same as `c:insert_or_update/2` but raises an error.
@callback insert_or_update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()

Expand Down
8 changes: 5 additions & 3 deletions lib/mongo/streaming_hello_monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ defmodule Mongo.StreamingHelloMonitor do
@doc """
Initialize the monitor process
def init([topology_pid, address, opts]) do
def init([monitor_pid, topology_pid, address, opts]) do
heartbeat_frequency_ms = 10_000

opts =
|> Keyword.drop([:after_connect])
|> Keyword.put(:after_connect, {StreamingHelloMonitor, :connected, [self()]})
|> Keyword.put(:connection_type, :stream_monitor)
|> Keyword.put(:server_pid, self())
|> Keyword.put(:monitor_pid, monitor_pid)

## debug info("Starting stream hello monitor with options #{inspect(opts, pretty: true)}")

Expand Down Expand Up @@ -65,7 +67,7 @@ defmodule Mongo.StreamingHelloMonitor do
In this case we stop the DBConnection.
def terminate(reason, %{connection_pid: connection_pid}) do
## debug info("Terminating streaming hello monitor for reason #{inspect reason}")
## Logger.debug("Terminating streaming hello monitor for reason #{inspect(reason)}")
GenServer.stop(connection_pid, reason)

Expand All @@ -84,7 +86,7 @@ defmodule Mongo.StreamingHelloMonitor do

def handle_info({:EXIT, _pid, reason}, state) do
## debug Logger.warn("Stopped with reason #{inspect reason}")
Logger.warning("Stopped with reason #{inspect(reason)}")
{:stop, reason, state}

Expand Down
96 changes: 72 additions & 24 deletions lib/mongo/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ defmodule Mongo.Topology do, :get_state)

# 97
def select_server(pid, type, opts \\ []) do
timeout = Keyword.get(opts, :checkout_timeout, @default_checkout_timeout), {:select_server, type, opts}, timeout)
Expand Down Expand Up @@ -109,7 +108,7 @@ defmodule Mongo.Topology do

def stop(pid) do
GenServer.stop(pid, :stop)

## GenServer Callbacks
Expand Down Expand Up @@ -163,14 +162,7 @@ defmodule Mongo.Topology do

def terminate(_reason, state) do
case state.opts[:pw_safe] do
nil -> nil
pid -> GenServer.stop(pid)

Enum.each(state.connection_pools, fn {_address, pid} -> GenServer.stop(pid) end)
Enum.each(state.monitors, fn {_address, pid} -> GenServer.stop(pid) end)
def terminate(_reason, _state) do
Mongo.Events.notify(%TopologyClosedEvent{topology_pid: self()})

Expand Down Expand Up @@ -204,18 +196,25 @@ defmodule Mongo.Topology do
# In case of :monitor or :stream_monitor we mark the server description of the address as unknown
def handle_cast({:disconnect, kind, address}, state) when kind in [:monitor, :stream_monitor] do
server_description = ServerDescription.parse_hello_response(address, "#{inspect(kind)} disconnected")
def handle_cast({:disconnect, :monitor, address, pid}, state) do
server_description = ServerDescription.parse_hello_response(address, "monitor disconnected")
## Logger.debug("Disconnect monitor with #{inspect(pid)}")

new_state =
|> remove_address(state)
|> close_connection_pool(pid, state)
|> maybe_reinit()

handle_cast({:server_description, server_description}, new_state)

def handle_cast({:disconnect, _kind, _host}, state) do
def handle_cast({:disconnect, :stream_monitor, _host, _pid}, state) do
## IO.inspect("ignored: kind stream_monitor with #{inspect pid}")
{:noreply, state}

def handle_cast({:disconnect, _kind, _host, _pid}, state) do
## IO.inspect("ignored: kind #{inspect kind}")
{:noreply, state}

Expand All @@ -233,6 +232,7 @@ defmodule Mongo.Topology do

{host, ^monitor_pid} ->
arbiters = fetch_arbiters(state)
Mongo.Events.notify(%ServerOpeningEvent{address: host, topology_pid: self()})

if host in arbiters do
Expand All @@ -243,8 +243,9 @@ defmodule Mongo.Topology do
|> Keyword.put(:topology_pid, self())
|> connect_opts_from_address(host)

## Logger.debug("Starting connection pool for #{inspect(host)}")
{:ok, pool} = DBConnection.start_link(Mongo.MongoDBConnection, conn_opts)
connection_pools = Map.put(state.connection_pools, host, pool)
connection_pools = replace_pool(state.connection_pools, host, pool)

Process.send_after(self(), {:new_connection, state.waiting_pids}, 10)

Expand Down Expand Up @@ -279,6 +280,49 @@ defmodule Mongo.Topology do
{:noreply, state}

## remove the address only if the pid is the same
defp close_connection_pool(address, pid, state) do
## Logger.debug("Closing connection pool by pid: #{inspect(state.monitors[address] == pid)}, #{inspect(pid)}, #{inspect(state.monitors[address])}")

case state.monitors[address] == pid do
true ->
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})
## stopping the connection pool
case state.connection_pools[address] do
nil ->

pid ->
if Process.alive?(pid) do
## Logger.debug("Stopping the connection pool #{inspect(pid)} für #{inspect(address)}")

%{state | connection_pools: Map.delete(state.connection_pools, address)}

false ->

## replaces a pool for the host address
defp replace_pool(connection_pools, host, pool) do
## if we found an existing pool, we will stop it first
case Map.get(connection_pools, host) do
nil ->

pid ->
if Process.alive?(pid) do
## Logger.debug("Stopping the connection pool #{inspect(pid)}")

Map.put(connection_pools, host, pool)

# Update server description: in case of logical session the function creates a session pool for the `deployment`.
Expand Down Expand Up @@ -510,9 +554,6 @@ defmodule Mongo.Topology do
Enum.reduce(added, state, fn address, state ->
server_description = state.topology.servers[address]
connopts = connect_opts_from_address(state.opts, address)

Mongo.Events.notify(%ServerOpeningEvent{address: address, topology_pid: self()})

args = [server_description.address, self(), heartbeat_frequency_ms, Keyword.put(connopts, :pool, DBConnection.ConnectionPool)]
{:ok, pid} = Monitor.start_link(args)

Expand Down Expand Up @@ -549,16 +590,23 @@ defmodule Mongo.Topology do

defp remove_address(address, state) do
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})

case state.monitors[address] do
nil -> :ok
pid -> GenServer.stop(pid)
nil ->

pid ->
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})
## Logger.debug("Stopping: #{inspect(pid)} for #{inspect(address)}")

case state.connection_pools[address] do
nil -> :ok
pid -> GenServer.stop(pid)
nil ->

pid ->
## Logger.debug("Connection pool: #{inspect(address)}")

%{state | monitors: Map.delete(state.monitors, address), connection_pools: Map.delete(state.connection_pools, address)}
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/url_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ defmodule Mongo.UrlParser do

value ->
## start GenServer and put id
with {:ok, pid} <-,
with {:ok, pid} <- Mongo.PasswordSafe.start_link(),
:ok <- Mongo.PasswordSafe.set_password(pid, value) do
|> Keyword.put(:password, "*****")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule Mongodb.Mixfile do
{:patch, "~> 0.12.0", only: [:dev, :test]},
{:jason, "~> 1.3", only: [:dev, :test]},
{:credo, "~> 1.7.0", only: [:dev, :test], runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false}
{:ex_doc, "== 0.24.1", only: :dev, runtime: false}

Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
"ex_doc": {:hex, :ex_doc, "0.24.1", "15673de99154f93ca7f05900e4e4155ced1ee0cd34e0caeee567900a616871a4", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "07972f17bdf7dc7b5bd76ec97b556b26178ed3f056e7ec9288eb7cea7f91cce2"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
Expand Down
2 changes: 1 addition & 1 deletion test/mongo/password_safe_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Mongo.PasswordSafeTest do

test "encrypted password" do
pw = "my-secret-password"
{:ok, pid} =
{:ok, pid} = PasswordSafe.start_link()
PasswordSafe.set_password(pid, pw)
%{key: _key, pw: enc_pw} = :sys.get_state(pid)
assert enc_pw != pw
Expand Down

0 comments on commit f12f4e5

Please sign in to comment.