Skip to content

Commit

Permalink
fix: added support for read preference specified by the URL
Browse files Browse the repository at this point in the history
  • Loading branch information
zookzook committed Sep 12, 2023
1 parent fd1607f commit a56be72
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 54 deletions.
58 changes: 30 additions & 28 deletions lib/mongo/read_preference.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ defmodule Mongo.ReadPreference do
@moduledoc ~S"""
Determines which servers are considered suitable for read operations
A read preference consists of a mode and optional `tag_sets`, max_staleness_ms, and `hedge`.
A read preference consists of a mode and optional `tags`, max_staleness_ms, and `hedge`.
The mode prioritizes between primaries and secondaries to produce either a single suitable server or a list of candidate servers.
If tag_sets and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
If tags and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
If hedge is set, it configures how server hedged reads are used.
The default mode is `:primary`.
The default tag_sets is a list with an empty tag set: [{}].
The default tags is a list with an empty tag set: [{}].
The default max_staleness_ms is unset.
The default hedge is unset.
## mode
* `:primary` Only an available primary is suitable.
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tag_sets and maxStalenessSeconds) are suitable.
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tags and maxStalenessSeconds) are suitable.
* `:primary_preferred` If a primary is available, only the primary is suitable. Otherwise, all secondaries are candidates,
but only eligible secondaries are suitable.
* `:secondary_preferred` All secondaries are candidates. If there is at least one eligible secondary, only eligible secondaries are suitable.
Expand All @@ -32,14 +32,14 @@ defmodule Mongo.ReadPreference do
| :primary_preferred
| :secondary_preferred
| :nearest,
tag_sets: [%{String.t() => String.t()}],
tags: [%{String.t() => String.t()}],
max_staleness_ms: non_neg_integer,
hedge: BSON.document()
}

@primary %{
mode: :primary,
tag_sets: [],
tags: [],
max_staleness_ms: 0
}

Expand All @@ -56,35 +56,35 @@ defmodule Mongo.ReadPreference do
"""
def add_read_preference(cmd, opts) do
case Keyword.get(opts, :read_preference) do
nil -> cmd
pref -> cmd ++ ["$readPreference": pref]
nil ->
cmd

pref ->
cmd ++ ["$readPreference": pref]
end
end

@doc """
From the specs:
Use of slaveOk
There are two usages of slaveOK:
* A driver query parameter that predated read preference modes and tag set lists.
* A wire protocol flag on OP_QUERY operations
Converts the preference to the mongodb format
"""
def slave_ok(%{:mode => :primary}) do
def convert(%{:mode => :primary}) do
%{:mode => :primary}
end

def slave_ok(config) do
def convert(config) do
mode =
case config[:mode] do
:primary_preferred -> :primaryPreferred
:secondary_preferred -> :secondaryPreferred
other -> other
:primary_preferred ->
:primaryPreferred

:secondary_preferred ->
:secondaryPreferred

other ->
other
end

filter_nils(mode: mode, tag_sets: config[:tag_sets])
filter_nils(mode: mode, tags: [config[:tags]])
end

##
Expand All @@ -106,7 +106,7 @@ defmodule Mongo.ReadPreference do
end

# For mode 'secondaryPreferred', drivers MUST set the slaveOK wire protocol flag. If the read preference contains a
# non-empty tag_sets parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty,
# non-empty tags parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty,
# drivers MUST use $readPreference; otherwise, drivers MUST NOT use $readPreference
def mongos(%{mode: :secondary_preferred} = config) do
transform(config)
Expand All @@ -127,11 +127,13 @@ defmodule Mongo.ReadPreference do

max_staleness_seconds =
case config[:max_staleness_ms] do
i when is_integer(i) -> div(i, 1000)
nil -> nil
i when is_integer(i) ->
div(i, 1000)

nil ->
nil
end

[mode: mode, tag_sets: config[:tag_sets], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]]
|> filter_nils()
filter_nils(mode: mode, tags: [config[:tags]], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge])
end
end
14 changes: 14 additions & 0 deletions lib/mongo/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ defmodule Mongo.Topology do
# checkout a new session
#
def handle_call({:checkout_session, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
opts = merge_read_preferences(opts, state.opts)

case TopologyDescription.select_servers(topology, read_write_type, opts) do
:empty ->
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :checkout_session, cmd_type: read_write_type, topology: topology, opts: opts})
Expand All @@ -398,6 +400,8 @@ defmodule Mongo.Topology do
end

def handle_call({:select_server, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
opts = merge_read_preferences(opts, state.opts)

case TopologyDescription.select_servers(topology, read_write_type, opts) do
:empty ->
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :select_server, cmd_type: read_write_type, topology: topology, opts: opts})
Expand Down Expand Up @@ -579,4 +583,14 @@ defmodule Mongo.Topology do
defp fetch_arbiters(state) do
Enum.flat_map(state.topology.servers, fn {_, s} -> s.arbiters end)
end

defp merge_read_preferences(opts, url_opts) do
case Keyword.get(url_opts, :read_preference) do
nil ->
opts

read_preference ->
Keyword.put_new(opts, :read_preference, read_preference)
end
end
end
64 changes: 42 additions & 22 deletions lib/mongo/topology_description.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,17 @@ defmodule Mongo.TopologyDescription do
def select_servers(topology, :write, opts) do
servers =
case topology.type do
:single -> topology.servers
:sharded -> mongos_servers(topology)
:replica_set_with_primary -> primary_servers(topology)
_ -> []
:single ->
topology.servers

:sharded ->
mongos_servers(topology)

:replica_set_with_primary ->
primary_servers(topology)

_other ->
[]
end

addr =
Expand All @@ -115,27 +122,40 @@ defmodule Mongo.TopologyDescription do

{servers, read_prefs} =
case topology.type do
:unknown -> {[], nil}
:single -> {topology.servers, nil}
:sharded -> {mongos_servers(topology), ReadPreference.mongos(read_preference)}
_ -> {select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.slave_ok(read_preference)}
:unknown ->
{[], nil}

:single ->
{topology.servers, nil}

:sharded ->
{mongos_servers(topology), ReadPreference.mongos(read_preference)}

_other ->
{select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.convert(read_preference)}
end

opts =
case read_prefs do
nil -> Keyword.delete(opts, :read_preference)
prefs -> Keyword.put(opts, :read_preference, prefs)
nil ->
Keyword.delete(opts, :read_preference)

prefs ->
Keyword.put(opts, :read_preference, prefs)
end

addr =
servers
|> Enum.map(fn {server, _} -> server end)
|> Enum.take_random(1)
|> Enum.map(fn {server, _} -> server end)

# check now three possible cases
case addr do
[] -> :empty
[result] -> {:ok, {result, opts}}
[] ->
:empty

[result] ->
{:ok, {result, opts}}
end
end

Expand All @@ -153,30 +173,30 @@ defmodule Mongo.TopologyDescription do

##
#
# Select the primary without without tag_sets or maxStalenessSeconds
# Select the primary without without tags or maxStalenessSeconds
#
defp select_replica_set_server(topology, :primary, _read_preference) do
primary_servers(topology)
end

##
#
# Select the secondary with without tag_sets or maxStalenessSeconds
# Select the secondary with without tags or maxStalenessSeconds
#
defp select_replica_set_server(topology, :secondary, read_preference) do
topology
|> secondary_servers()
|> filter_out_stale(topology, read_preference.max_staleness_ms)
|> select_tag_sets(read_preference.tag_sets)
|> select_tag_sets(read_preference.tags)
|> filter_latency_window(topology.local_threshold_ms)
end

##
# From the specs
#
# 'primaryPreferred' is equivalent to selecting a server with read preference mode 'primary'
# (without tag_sets or maxStalenessSeconds), or, if that fails, falling back to selecting with read preference mode
# 'secondary' (with tag_sets and maxStalenessSeconds, if provided).
# (without tags or maxStalenessSeconds), or, if that fails, falling back to selecting with read preference mode
# 'secondary' (with tags and maxStalenessSeconds, if provided).
defp select_replica_set_server(topology, :primary_preferred, read_preference) do
case primary_servers(topology) do
[] -> select_replica_set_server(topology, :secondary, read_preference)
Expand All @@ -186,8 +206,8 @@ defmodule Mongo.TopologyDescription do

##
# From the specs
# 'secondaryPreferred' is the inverse: selecting with mode 'secondary' (with tag_sets and maxStalenessSeconds) and
# falling back to selecting with mode 'primary' (without tag_sets or maxStalenessSeconds).
# 'secondaryPreferred' is the inverse: selecting with mode 'secondary' (with tags and maxStalenessSeconds) and
# falling back to selecting with mode 'primary' (without tags or maxStalenessSeconds).
#
defp select_replica_set_server(topology, :secondary_preferred, read_preference) do
case select_replica_set_server(topology, :secondary, read_preference) do
Expand All @@ -202,11 +222,11 @@ defmodule Mongo.TopologyDescription do
# The term 'nearest' is unfortunate, as it implies a choice based on geographic locality or absolute lowest latency, neither of which are true.
#
# Instead, and unlike the other read preference modes, 'nearest' does not favor either primaries or secondaries;
# instead all servers are candidates and are filtered by tag_sets and maxStalenessSeconds.
# instead all servers are candidates and are filtered by tags and maxStalenessSeconds.
defp select_replica_set_server(%{:servers => servers} = topology, :nearest, read_preference) do
servers
|> filter_out_stale(topology, read_preference.max_staleness_ms)
|> select_tag_sets(read_preference.tag_sets)
|> select_tag_sets(read_preference.tags)
|> filter_latency_window(topology.local_threshold_ms)
end

Expand Down
59 changes: 59 additions & 0 deletions lib/mongo/url_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ defmodule Mongo.UrlParser do
"""

require Logger

@mongo_url_regex ~r/^mongodb(?<srv>\+srv)?:\/\/((?<username>[^:]+):(?<password>[^@]+)@)?(?<seeds>[^\/]+)(\/(?<database>[^?]+))?(\?(?<options>.*))?$/

# https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options
Expand Down Expand Up @@ -181,6 +183,7 @@ defmodule Mongo.UrlParser do
frags <- resolve_srv_url(frags),
opts <- parse_seeds(opts, frags),
opts <- parse_query_options(opts, frags),
opts <- process_read_preferences(opts),
# Parse fixed parameters (database, username & password) & merge them with query options
opts <- Enum.reduce(frags, opts, &add_option/2) do
opts
Expand All @@ -191,4 +194,60 @@ defmodule Mongo.UrlParser do
end

def parse_url(opts), do: opts

defp process_read_preferences(opts) do
opts =
case Keyword.get(opts, :read_preference) do
nil ->
opts

mode ->
read_preference =
%{mode: mode}
|> extend_read_preference_tags(opts)
|> extend_max_staleness_ms(opts)

Keyword.put(opts, :read_preference, read_preference)
end

Keyword.drop(opts, [:read_preference_tags, :max_staleness_seconds])
end

defp extend_read_preference_tags(read_preference, opts) do
case Keyword.get(opts, :read_preference_tags, []) |> parse_tags() do
[] ->
read_preference

tags ->
Map.put(read_preference, :tags, Keyword.new(tags))
end
end

defp extend_max_staleness_ms(read_preference, opts) do
case Keyword.get(opts, :max_staleness_seconds) do
nil ->
read_preference

max_staleness_seconds ->
Map.put(read_preference, :max_staleness_ms, max_staleness_seconds * 1_000)
end
end

defp parse_tags(tags) do
tags
|> String.split(",")
|> Enum.map(fn key_value -> to_tuple(key_value) end)
|> Enum.reject(fn key_value -> key_value == nil end)
end

defp to_tuple(key_value) do
case String.split(key_value, ":") do
[key, value] ->
{String.to_atom(key), value}

_other ->
Logger.warning("Unable to parse the read preference tags #{inspect(key_value)}")
nil
end
end
end
8 changes: 4 additions & 4 deletions test/mongo/topology_description_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,25 @@ defmodule Mongo.TopologyDescriptionTest do
read_preference: ReadPreference.primary(%{mode: :secondary})
]

assert {:ok, {^sharded_server, [{:read_preference, [mode: :secondary, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)
assert {:ok, {^sharded_server, [{:read_preference, [mode: :secondary, tags: [[]], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)

opts = [
read_preference: ReadPreference.primary(%{mode: :primary_preferred})
]

assert {:ok, {^sharded_server, [{:read_preference, [mode: :primaryPreferred, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)
assert {:ok, {^sharded_server, [{:read_preference, [mode: :primaryPreferred, tags: [[]], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)

opts = [
read_preference: ReadPreference.primary(%{mode: :secondary_preferred})
]

assert {:ok, {^sharded_server, [{:read_preference, [mode: :secondaryPreferred, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)
assert {:ok, {^sharded_server, [{:read_preference, [mode: :secondaryPreferred, tags: [[]], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)

opts = [
read_preference: ReadPreference.primary(%{mode: :nearest})
]

assert {:ok, {^sharded_server, [{:read_preference, [mode: :nearest, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)
assert {:ok, {^sharded_server, [{:read_preference, [mode: :nearest, tags: [[]], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)
end

test "replica set server selection" do
Expand Down
Loading

0 comments on commit a56be72

Please sign in to comment.